# Parallelization Testing

In this notebook, I will learn how to use dask within xarray to parallelize running code and speed up parts of the Argo analysis. I'll start by running a simple test case (I hope to find) in xarray's documentation. If this work successfully, I will then move on to running the depth-->density interpolation function to see if that comes with speed improvements too.

In [1]:
import xarray as xr
import matplotlib.pyplot as plt
import matplotlib as mpl
from matplotlib.path import Path
import seaborn as sns
import seaborn
import pandas as pd
import numpy as np
from importlib import reload
import cartopy.crs as ccrs
import cmocean.cm as cmo
import gsw
import dask.array as da
import dask

In [3]:
import os
os.chdir('/home.ufs/amf2288/argo-intern/funcs')
import density_funcs as df
import EV_funcs as ef
import filt_funcs as ff
import plot_funcs as pf
import processing_funcs as prf

In [4]:
reload(df)
reload(ef)
reload(ff)
reload(prf)

<module 'processing_funcs' from '/home/amf2288/argo-intern/funcs/processing_funcs.py'>

# Reproducable Test

Goal here is to make a really big array and then test loading with dask vs loading without dask. I'm following the rough steps Stephan Hoyer outlines in this blogpost (https://stephanhoyer.com/2015/06/11/xray-dask-out-of-core-labeled-arrays/), including creating a dataset with the same dimensions of

Dimensions:(latitude: 256, longitude: 512, time: 52596)

In [24]:
#from dask.distributed import Client

In [25]:
#client = Client()
#client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 9
Total threads: 72,Total memory: 0.98 TiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42894,Workers: 9
Dashboard: http://127.0.0.1:8787/status,Total threads: 72
Started: Just now,Total memory: 0.98 TiB

0,1
Comm: tcp://127.0.0.1:37910,Total threads: 8
Dashboard: http://127.0.0.1:33734/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:33144,
Local directory: /tmp/dask-scratch-space-2594/worker-ghbrom8i,Local directory: /tmp/dask-scratch-space-2594/worker-ghbrom8i

0,1
Comm: tcp://127.0.0.1:37934,Total threads: 8
Dashboard: http://127.0.0.1:36003/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:34076,
Local directory: /tmp/dask-scratch-space-2594/worker-6wn1czp3,Local directory: /tmp/dask-scratch-space-2594/worker-6wn1czp3

0,1
Comm: tcp://127.0.0.1:33475,Total threads: 8
Dashboard: http://127.0.0.1:39961/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:39551,
Local directory: /tmp/dask-scratch-space-2594/worker-npb_621n,Local directory: /tmp/dask-scratch-space-2594/worker-npb_621n

0,1
Comm: tcp://127.0.0.1:35628,Total threads: 8
Dashboard: http://127.0.0.1:36023/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:36683,
Local directory: /tmp/dask-scratch-space-2594/worker-fkp7zhg3,Local directory: /tmp/dask-scratch-space-2594/worker-fkp7zhg3

0,1
Comm: tcp://127.0.0.1:37911,Total threads: 8
Dashboard: http://127.0.0.1:45272/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:40838,
Local directory: /tmp/dask-scratch-space-2594/worker-ldig5236,Local directory: /tmp/dask-scratch-space-2594/worker-ldig5236

0,1
Comm: tcp://127.0.0.1:34845,Total threads: 8
Dashboard: http://127.0.0.1:37663/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:45841,
Local directory: /tmp/dask-scratch-space-2594/worker-wp64wabo,Local directory: /tmp/dask-scratch-space-2594/worker-wp64wabo

0,1
Comm: tcp://127.0.0.1:40937,Total threads: 8
Dashboard: http://127.0.0.1:32816/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:40822,
Local directory: /tmp/dask-scratch-space-2594/worker-cn854oek,Local directory: /tmp/dask-scratch-space-2594/worker-cn854oek

0,1
Comm: tcp://127.0.0.1:40479,Total threads: 8
Dashboard: http://127.0.0.1:46663/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:44291,
Local directory: /tmp/dask-scratch-space-2594/worker-6dfjz5yx,Local directory: /tmp/dask-scratch-space-2594/worker-6dfjz5yx

0,1
Comm: tcp://127.0.0.1:40724,Total threads: 8
Dashboard: http://127.0.0.1:40469/status,Memory: 111.95 GiB
Nanny: tcp://127.0.0.1:39545,
Local directory: /tmp/dask-scratch-space-2594/worker-3sdj8n49,Local directory: /tmp/dask-scratch-space-2594/worker-3sdj8n49


Can also use dask gateway, then adapt(min,max) number of cores to use in the calculation

In [77]:
factor = 10

lat, lon, time = 256, 512, 52596*factor

In [78]:
#data = np.random.rand(lat,lon,time)
#data

In [79]:
data = da.random.random((time,lat,lon),chunks=(100,256,512))

In [80]:
ds = xr.Dataset(
    {
        "data": (["time", "latitude", "longitude"], data)
    },
    coords={
        "time": np.arange(time),
        "latitude": np.linspace(-90, 90, lat),
        "longitude": np.linspace(-180, 180, lon)
    }
)

In [81]:
ds

Unnamed: 0,Array,Chunk
Bytes,513.63 GiB,100.00 MiB
Shape,"(525960, 256, 512)","(100, 256, 512)"
Dask graph,5260 chunks in 1 graph layer,5260 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 513.63 GiB 100.00 MiB Shape (525960, 256, 512) (100, 256, 512) Dask graph 5260 chunks in 1 graph layer Data type float64 numpy.ndarray",512  256  525960,

Unnamed: 0,Array,Chunk
Bytes,513.63 GiB,100.00 MiB
Shape,"(525960, 256, 512)","(100, 256, 512)"
Dask graph,5260 chunks in 1 graph layer,5260 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [82]:
%time result = ds.mean('time').compute()

CPU times: user 44min 53s, sys: 10min 21s, total: 55min 14s
Wall time: 1min 4s


Okay this is a sizable dataset, with 525960*256*512 data points. It was parallelized, running on ~60 cores for the (~1min) duration of the calculation. Great! So we know dask + xarray is working here.

## Shape of Argo Data

In [None]:
#(depth*prof)/(lat*lon*time)

In [5]:
depth, prof = 1000, 1000000000

In [6]:
data = da.random.random((depth,prof), chunks=(1000,10000))

In [7]:
ds = xr.Dataset({'data':(['depth','prof'],data)},
               coords={'depth':np.arange(depth),
                      'prof':np.arange(prof)})
ds

Unnamed: 0,Array,Chunk
Bytes,7.28 TiB,76.29 MiB
Shape,"(1000, 1000000000)","(1000, 10000)"
Dask graph,100000 chunks in 1 graph layer,100000 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 7.28 TiB 76.29 MiB Shape (1000, 1000000000) (1000, 10000) Dask graph 100000 chunks in 1 graph layer Data type float64 numpy.ndarray",1000000000  1000,

Unnamed: 0,Array,Chunk
Bytes,7.28 TiB,76.29 MiB
Shape,"(1000, 1000000000)","(1000, 10000)"
Dask graph,100000 chunks in 1 graph layer,100000 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [21]:
mean_prof = ds.data.mean('prof')
ds_anom = ds.data - mean_prof

In [24]:
mean_prof.compute()

KeyboardInterrupt: 

Calculating mean_prof alone ran completely and in parallel, but computing ds_anom only ran on one CPU.

In [14]:
def get_anom(ds, dim1='prof', variable='data'):
    mean_prof = ds[variable].mean(dim=dim1)
    return ds[variable] - mean_prof

In [17]:
def get_anom(ds):
    dim1='prof'
    variable='data'
    mean_prof = ds[variable].mean(dim=dim1)
    return ds[variable] - mean_prof

In [15]:
ds_anom = get_anom(ds)
ds_anom.compute()


KeyboardInterrupt



This started running, but not on multiple CPUs. So I interrupted the run. Really confused, because this ran on multiple CPUs in the old environment. Maybe the new version of dask changed treatment of self-written functions??

In [19]:
ds_anom = xr.apply_ufunc(get_anom, ds, vectorize=True, dask='parallelized', output_dtypes='float64')#, input_core_dims=[['depth']], output_core_dims=[['depth']])

In [20]:
ds_anom.compute()

IndexError: invalid index to scalar variable.

I wanted to see if I put only the .mean() method in a function, would it run in parallel? Yes it did. It ran on pretty much the same number of cores (50) as calling the method directly, without using a function.

Additionally, I added the anomaly portion, to have multiple operations inside the same function. This still ran in parallel.

HOWEVER, when I tried running the groupby_bins() method, this only ran on one core. My new theory: __it seems like there are some functions that automatically run in parallel and some that don't.__

So the next steps should be:
- determine which functions automatically run in parallel
- see if there are ways to parallelize functions that default to running on one core|

Notes on `apply_ufunc`:

It worked (very briefly)!!!! This is exciting, it was just about ready and running on 16 CPUs, but then an IndexError was raised: `IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices`

So I think once I can resolve that error, I'll be able to run my functions in parallel, using dask, on functions I've written :)

In [12]:
def get_MLD(
    ds, threshold=0.03, variable="SIG0", dim1="N_PROF", dim2="PRES_INTERPOLATED"
):
    """Takes an xarray and returns a new coordinate "MLD" or mixed layer depth for each profile, defined using the density threshold from the surface.
    ds: xarray with profile and pressure dimensions
    threshold: density value that defines the boundary of the mixed layer, default=0.03
    variable: density coordinate, default='SIG0'
    dim1: profile dimension, default='N_PROF'
    dim2: pressure dimension, default='PRES_INTERPOLATED'
    """

    MLD_li = []

    for n in range(0, len(ds[dim1])):
        SIG0_surface = ds.isel({dim1: n})[variable].isel({dim2: 0})
        SIG0_diff = SIG0_surface + threshold
        MLD_ds = SIG0_surface.where(ds.isel({dim1: n})[variable] < SIG0_diff)
        MLD = MLD_ds.dropna(dim2).isel({dim2: -1})[dim2]
        MLD_li.append(MLD)

    return ds.assign_coords(MLD=(dim1, MLD_li))

In [13]:
#get_MLD = dask.delayed(get_MLD)

In [None]:
%time ds_mld = get_MLD(ds, variable='data',dim1='prof',dim2='depth').compute()

The cpu graph was all over the place for this one. It had very large dips and changes, from 65 cpus down to almost none, and then back again. Normally the cpu load stays relatively constant (within a few machines more or less) the entire time. Additionally, the usage dropped very very low about halfway through running, barely even taking up one core.

I ran this agai and it ran on 65 cpus for about a minute and then dropped down to nothing. I think the kernal disconnected?? Wondering if this is connected to the .values call?? (Trying to load something in memory but it's too large? I don't know)

Next step is to try the apply_ufunc() function to see if this can be used to parallelize an existing function I've written, such as the get_MLD function

#### For prof = 1,000,000:

- .mean(): Interestingly, there was a short spike to 20 cores for inital set-up with this ~Argo-sized dataset, however the rest of the calculation was seemingly done on 1-2. I wonder if this dataset is small enough that dask determines parallelization isn't necessary. I will now try a calculation that takes more time to see if that triggers using more cores. 

- get_MLD(): I ran this size dataset through the function to add MLD and this also only ran on 1-2 cores. Which is strange because it took a long time to run (TIME HERE). I'm wondering if this is related to how I've writen the function. For example, when I use .values within the function, and I wonder if that's going to be very slow because it loads things into memory? Does this play into anything?

- MAJOR UPDATE: it appears dask won't parallelize anything within a function automatically. So in order for me to run a function in parallel, I need to explicitly tell dask to do so using dask.delayed. Here's the documentation on this: https://examples.dask.org/delayed.html

#### For prof = 1,000,000,000

- .mean(): Okay, increasing prof to 1,000,000,000 triggers multiple cores to be used, in this case ~45. It took about 


# Argo Interpolation Test

In [10]:
atl = xr.open_dataset('/swot/SUM05/amf2288/sync-boxes/lon:(-25,-23)_lat:(-70,70)_ds_z.nc',chunks={'N_PROF':2000})

In [8]:
atl = prf.get_MLD(atl)

Calling this function only used one core. But at this point, it's hard to tell if that's because the dataset isn't big enough for dask to deem multiple cores to be necessary, or because there's something deeper going on. I'm almost wondering if I should define an argo-style dataset that's big enough to trigger multiple cores, then pass this to a function to see if this uses multiple cores

In [None]:
print('max: {}, min: {}'.format(atl.SIG0.max().values, atl.SIG0.min().values))

In [None]:
atl_grid = np.linspace(21,28,1000)

In [None]:
number=np.arange(0,len(atl.N_PROF))
atl.sortby('LATITUDE')
atl.coords['N_PROF_NEW']=xr.DataArray(number,dims=atl.N_PROF.dims)

In [None]:
%time rho_atl= df.interpolate2density_prof(atl, atl_grid)

# Time Test

The goal here is to make sure that using dask makes processes quicker than not using dask. For this test, I will load a section first without dask and record the amount of time it takes to perform a calculation. Then I will load the same section with dask and see how long it takes to run the same calculation. Important consideration: I'll need to make sure the task/dataset is big enough to trigger using dask for the section loaded in chunks. I really hope this works, I'm going to be a bit discouraged if there's stil more things to figure out :(