In [1]:
import xarray as xr
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.path import Path
import numpy as np
import gsw
import warnings
import cmocean
from collections import OrderedDict
import os
import glob
from tqdm import tqdm_notebook as bar
from tqdm import tqdm
from dask import delayed
import dask
from shapely.geometry import Polygon, Point, MultiPoint
from numpy.random import choice
from dask.diagnostics import ProgressBar
warnings.filterwarnings(action='once')

In [2]:
# Internal functions for checking if a point is inside a grid box 
def _insideTest(box, x, y):
    """
    Params:
    -------
    box: Current Box of particle
    x: x coord (lon)
    y: y coord (lat)
    
    
    
    Returns:
    -------
    if inside box: TRUE
    if outside box: FALSE
    if neither: rude message
    
    """
    
    loncheck = np.logical_and(box[0] < x, box[2] > x)
    
    latcheck = np.logical_and(box[1] < y, box[3] > y)
    inside = np.logical_and(loncheck, latcheck)
    if np.sum(inside) == 0:
        return False
    elif np.sum(inside) == 1:
        return True
    else:
        print('fuck up')

# wraps around the inside test to test all the boxes - this is probably the worst way to do this but it works

def _test_inside_wrap(boxes, point):
    """
    Params:
    -------
    boxes: list of boxes
    point: current point
    
    Returns:
    -------
    
    """
    
    x = point[0]
    y = point[1]
    
    test = [_insideTest(box, x, y) for box in boxes]
    
    return int(np.where(test)[0])


# center of box
def _box_center(box):
    return [np.nanmean([box[0], box[2]]),np.nanmean([box[1], box[3]]) ]


# computes a single super trajectory.
# its an internal function for the ensemble runs 
def _super_trajectory_internal(P, T,
                               boxes, 
                               duration,
                               initial_location,
                               bad_boxes,
                               n_steps,
                              ):
    """
    Params:
    -------
    P: Transit Matrix
    T: fixed timestep (days)
    duration: (days)
    initial location: start point 
    bad_boxes: boxes where no movement will occur
    
    
    
    Returns:
    -------
    points: The full trajectory points
    
    """
    
    # save the start point 
    points = []
    
    I = len(boxes)
    
    # find box of intitial location
    cb = _test_inside_wrap(boxes, initial_location) # cb=current box
    
    #save the start location as the box center
    points.append(_box_center(boxes[cb]))
    
    
    # loop through number of steps to take
    for i in range(n_steps):
        
        if cb not in bad_boxes:
            # use random choice generator feeding in the probabilities associated with the current box
            next_box = choice(I, p=P[cb, :])
            
            # store the next box
            points.append(_box_center(boxes[next_box]))
            
            #update box
            cb = next_box
        else:
            # if its a bad box just dont do anything and keep position same
            points.append(_box_center(boxes[cb]))
        
    return np.array(points)


def ensemble_trajectory(P, T, boxes, duration, initial_location, n_iter=1e3):
    """
    Runs all the iterations at a single initial location
    This wrapper function is meant to be passed to dask so you can run each
    set of initial locations in parallel.
    
    Params:
    -------
    
    
    Returns:
    -------
    
    """
    
    results = []
    n_steps = int(duration/T)
    bad_boxes = np.where(np.sum(P, axis=1) == 0)[0]
    
    # loop through all the iterations
    for i in bar(range(n_iter)):
        
        # calls the super trajectory for a single iteration
        results.append(_super_trajectory_internal(P, T, 
                                                  boxes, 
                                                  duration, 
                                                  initial_location,
                                                  bad_boxes,
                                                  n_steps
                                                 ))


    
    F = np.stack(results)
    F = dask.array.from_array(F, chunks=100)
        
    return F

def ensemble_experiments(P, T, B, duration, initial_locations,  n_iter=100):
    """
    Dask enabled super trajectories ... creates dask runs for each set of iterations from an initial location
    in parallel with each other . DOES NOT ACTUALLY RUN ANY CALCS... it just creates the delayed functions
    you then have to use dask compute

    Params:
    -------
    
    
    Returns:
    -------

    
    """
    all_results = []
#     delayed_lists = []
    for iloc in bar(initial_locations):
        all_results.append(delayed(ensemble_trajectory)(P,
                                                 T, 
                                                 B, duration, iloc, n_iter=n_iter))
        
    return all_results
    


  and should_run_async(code)


In [3]:
P = np.load('P_tester_0.25_res.npy')
B = np.load('boxes_0.25_res.npy', allow_pickle=True)

In [4]:
duration = 365*1 # duration in days
T = 2 # timestep in days
initial_locations = [
    [360-30.011, 54.01],
    [360-30.011, 53.51],
#     [360-30.011, 53.01],
#     [360-30.011, 52.51],
#     [360-30.011, 52.01],
#     [360-30.011, 51.51],
#     [360-30.011, 51.01],
#     [360-30.011, 50.51],
#     [360-30.011, 50.01]
]

all_results = []
delayed_lists = []
n_iter = 10

In [5]:
# do the calculations of super trajectories
pre_results = ensemble_experiments(P, T, B,
                               duration, initial_locations,
                               n_iter=n_iter)

with ProgressBar():
    results = dask.compute(pre_results)
    
all_results = dask.array.stack(results[0])

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for iloc in bar(initial_locations):


HBox(children=(FloatProgress(value=0.0, max=2.0), HTML(value='')))


[                                        ] | 0% Completed |  3.7s

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=10.0), HTML(value='')))

[                                        ] | 0% Completed | 12.2s

[########################################] | 100% Completed | 12.3s


In [6]:
# coords : days, iter, release_point
trajectories = all_results.compute() # puts it in a numpy array
days = np.arange(0, trajectories.shape[2])
niter = np.arange(0, trajectories.shape[1])
release_point = np.arange(1, trajectories.shape[0]+1)

coords = ['release_point', 'niter', 'days']

ds = xr.Dataset(
    {
        'lon':(coords, trajectories[: , :, :, 0]),
        'lat':(coords, trajectories[: , :, :, 1]),
    },
    coords= {
        'release_point': release_point,
        'niter': niter,
        'days': days
    }
)

# ds.to_netcdf('tm_cross_nac_10000iter_P25_larger_grid_1day_surf.nc')

In [7]:
# Just looking at the final dataset
ds

  and should_run_async(code)
