## Diving into parallelization


Many times a dataset will simply be too large to fit into memory or it may be considerably faster to run it on multiple processors. Python has often struggled with parallelization and may times tasks aren't well suited for it. But many scientific computing tasks, especially those dealing with larger rasters and massive netcdf, are quite conducive to parallel computing. Some such as raster algebra might even be called [embarasslingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel). This is where `dask` comes in. "Dask is a flexible library for parallel computing in Python." And most importantly for us it was designed specifically with N-Dimensional arrays and integrates seamlessly with `numpy` and `xarray`.

<img src="../data/dask-overview.svg" alt="drawing" width="550"/>


First let's pull in our normal assortment of python packages

In [None]:
%matplotlib inline
import netCDF4
import matplotlib.pyplot as plt
# numpy 
import numpy as np
import dask
# xarray (very handy)
import xarray as xr
import rasterio
# http://geo.holoviews.org/index.html
import holoviews as hv
import geoviews as gv
import geoviews.feature as gf

import warnings
import sys
gv.extension('bokeh')

In [None]:
imerg_xds = xr.open_mfdataset('../data/IMERG/3B-DAY.MS.MRG.3IMERG.2018*.nc4',combine='by_coords')
imerg_xds

First let's see how many CPUs are available

In [3]:
import multiprocessing

multiprocessing.cpu_count()

16

In [4]:
from psutil import virtual_memory

mem = virtual_memory()
print("Total memory in MB:", mem.total / (1024 * 1024)) # divide by 1024*1024 to get MB
print("Current memory available in MB:", mem.available / (1024 * 1024))

Total memory in MB: 31846.578125
Current memory available in MB: 21388.04296875


You want the number of processes times the number of threads to equal the number of cores. So for example you might do something like the following for a 36 core machine:

Four processes with nine threads each
Twelve processes with three threads each
One process with thirty-six threads
Typically one decides between these choices based on the workload. The difference here is due to Python's Global Interpreter Lock, which limits parallelism for some kinds of data. If you are working mostly with Numpy, Pandas, Scikit-Learn, or other numerical programming libraries in Python then you don't need to worry about the GIL, and you probably want to prefer few processes with many threads each. This helps because it allows data to move freely between your cores because it all lives in the same process. However, if you're doing mostly Pure Python programming, like dealing with text data, dictionaries/lists/sets, and doing most of your computation in tight Python for loops then you'll want to prefer having many processes with few threads each. This incurs extra communication costs, but lets you bypass the GIL.

In short, if you're using mostly numpy/pandas-style data, try to get at least eight threads or so in a process. Otherwise, maybe go for only two threads in a process.


### Dask for Parallel Python

https://docs.dask.org/en/latest/best-practices.html

In [26]:
from dask.distributed import Client

client = Client(processes=False, n_workers=2, threads_per_worker=10, memory_limit='8GB')
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: inproc://172.17.0.2/2275/46  Dashboard: http://172.17.0.2/2275/46:45651/status,Cluster  Workers: 2  Cores: 20  Memory: 16.00 GB


distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/usr/local/envs/earthml/lib/python3.6/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "/usr/local/envs/earthml/lib/python3.6/site-packages/distributed/client.py", line 1290, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/usr/local/envs/earthml/lib/python3.6/site-packages/distributed/utils.py", line 662, in log_errors
    yield
  File "/usr/local/envs/earthml/lib/python3.6/site-packages/distributed/client.py", line 1019, in _reconnect
    await self._close()
  File "/usr/local/envs/earthml/lib/python3.6/site-packages/distributed/client.py", line 1290, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._bas

In [25]:
client.shutdown()

In [17]:
imerg_xds.HQprecipitation

<xarray.DataArray 'HQprecipitation' (time: 303, lon: 550, lat: 501)>
dask.array<concatenate, shape=(303, 550, 501), dtype=float32, chunksize=(1, 550, 501), chunktype=numpy.ndarray>
Coordinates:
  * lon      (lon) float32 -84.95 -84.85 -84.74999 ... -30.149998 -30.049992
  * lat      (lat) float32 -35.05 -34.95 -34.85 ... 14.850003 14.950002
  * time     (time) datetime64[ns] 2018-02-01 2018-02-02 ... 2018-11-30
Attributes:
    units:         mm
    long_name:     Daily accumulated High Quality precipitation from all avai...
    origname:      HQprecipitation
    fullnamepath:  /HQprecipitation

In [10]:
overall_bbox = np.array([
    [-74.13671875,-15.3724475816178],
    [-60.984375,-15.372447581617823],
    [-60.984375,-10.685765151760275],
    [-74.13671875,-10.6857651517602]])
max_lon_lat = np.max(overall_bbox, axis=0)
min_lon_lat = np.min(overall_bbox, axis=0)

gv.tile_sources.EsriImagery.options(width=350) * gv.Polygons(overall_bbox).options(alpha=0.25)

In [27]:
subset_imerg_xds = imerg_xds.sel(lon=slice(min_lon_lat[0], max_lon_lat[0]), lat=slice(min_lon_lat[1], max_lon_lat[1]))  
subset_imerg_xds.dims

Frozen(SortedKeysDict({'lon': 131, 'nv': 2, 'lat': 47, 'time': 303}))

In [28]:
rainfall_stacked = subset_imerg_xds.HQprecipitation.stack(z=('lat', 'lon'))
rainfall_stacked

<xarray.DataArray 'HQprecipitation' (time: 303, z: 6157)>
dask.array<reshape, shape=(303, 6157), dtype=float32, chunksize=(1, 6157), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2018-02-01 2018-02-02 ... 2018-11-30
  * z        (z) MultiIndex
  - lat      (z) float64 -15.35 -15.35 -15.35 -15.35 ... -10.75 -10.75 -10.75
  - lon      (z) float64 -74.05 -73.95 -73.85 -73.75 ... -61.25 -61.15 -61.05
Attributes:
    units:         mm
    long_name:     Daily accumulated High Quality precipitation from all avai...
    origname:      HQprecipitation
    fullnamepath:  /HQprecipitation

In [48]:
X = client.persist(rainfall_stacked)

# shape needs to be (n_samples, n_features)
X = X.transpose()
X

<xarray.DataArray 'HQprecipitation' (z: 6157, time: 303)>
dask.array<transpose, shape=(6157, 303), dtype=float32, chunksize=(6157, 1), chunktype=numpy.ndarray>
Coordinates:
  * time     (time) datetime64[ns] 2018-02-01 2018-02-02 ... 2018-11-30
  * z        (z) MultiIndex
  - lat      (z) float64 -15.35 -15.35 -15.35 -15.35 ... -10.75 -10.75 -10.75
  - lon      (z) float64 -74.05 -73.95 -73.85 -73.75 ... -61.25 -61.15 -61.05
Attributes:
    units:         mm
    long_name:     Daily accumulated High Quality precipitation from all avai...
    origname:      HQprecipitation
    fullnamepath:  /HQprecipitation

In [55]:
import sklearn.cluster
import dask_ml.cluster

clf = dask_ml.cluster.SpectralClustering(n_clusters=8, random_state=0, gamma=None,
                         kmeans_params={'init_max_iter': 5},
                         persist_embedding=True, n_jobs=-1)

In [56]:
%time clf.fit(X)

CPU times: user 33.8 s, sys: 5.81 s, total: 39.6 s
Wall time: 34.1 s


SpectralClustering(affinity='rbf', assign_labels='kmeans', coef0=1, degree=3,
                   eigen_solver=None, eigen_tol=0.0, gamma=None,
                   kernel_params=None, kmeans_params={'init_max_iter': 5},
                   n_clusters=8, n_components=100, n_init=10, n_jobs=-1,
                   n_neighbors=10, persist_embedding=True, random_state=0)

In [78]:
%time sklearn.cluster.SpectralClustering(n_clusters=8, gamma=None).fit(X)

CPU times: user 1min 19s, sys: 1min 46s, total: 3min 5s
Wall time: 14.2 s


SpectralClustering(affinity='rbf', assign_labels='kmeans', coef0=1, degree=3,
                   eigen_solver=None, eigen_tol=0.0, gamma=None,
                   kernel_params=None, n_clusters=8, n_init=10, n_jobs=None,
                   n_neighbors=10, random_state=None)

In [54]:
labels = clf.assign_labels_.labels_.compute()
labels.shape

(6157,)

In [60]:
template = X[:, 0]
output_array = template.copy(data=labels)

unstacked = output_array.unstack()
unstacked

<xarray.DataArray 'HQprecipitation' (lat: 47, lon: 131)>
array([[1, 1, 1, ..., 1, 1, 1],
       [1, 1, 1, ..., 1, 1, 1],
       [1, 1, 1, ..., 1, 1, 1],
       ...,
       [4, 2, 2, ..., 1, 1, 3],
       [4, 2, 2, ..., 1, 3, 3],
       [2, 2, 2, ..., 3, 3, 3]], dtype=int32)
Coordinates:
    time     datetime64[ns] 2018-02-01
  * lat      (lat) float64 -15.35 -15.25 -15.15 -15.05 ... -10.95 -10.85 -10.75
  * lon      (lon) float64 -74.05 -73.95 -73.85 -73.75 ... -61.25 -61.15 -61.05
Attributes:
    units:         mm
    long_name:     Daily accumulated High Quality precipitation from all avai...
    origname:      HQprecipitation
    fullnamepath:  /HQprecipitation

In [72]:
import cartopy.crs as ccrs
import geoviews.feature as gf

# the * operator allows you to add multiple holoviews plots to the same final plot
gf.ocean * \
gf.land * \
unstacked.hvplot(x="lon", y="lat", width=500, height=300, cmap='Category10',projection=ccrs.Orthographic(-55, -10),
    global_extent=True, clim=(0,8)) * \
gf.coastline * \
gf.borders

In [77]:
gf.ocean * \
gf.land * \
subset_imerg_xds.HQprecipitation.mean("time").hvplot(x="lon", y="lat", width=500, height=300, cmap='jet',projection=ccrs.Orthographic(-55, -10),
    global_extent=True, clim=(0,3)) * \
gf.coastline * \
gf.borders

In [38]:
%time imerg_xds.HQprecipitation.resample(time='1w').mean('time').std('time').load()

CPU times: user 22.3 s, sys: 2.06 s, total: 24.3 s
Wall time: 17.8 s


<xarray.DataArray 'HQprecipitation' (lon: 550, lat: 501)>
array([[1.123427  , 0.9878055 , 1.0199522 , ..., 1.4309781 , 1.4646748 ,
        1.7291449 ],
       [1.087107  , 1.0363683 , 1.007186  , ..., 1.1993551 , 1.664919  ,
        1.7911828 ],
       [1.1282666 , 1.1375161 , 0.9836078 , ..., 1.359191  , 1.4338212 ,
        1.8037225 ],
       ...,
       [1.6832757 , 1.6288465 , 1.6509258 , ..., 0.8540528 , 0.8450356 ,
        1.0815192 ],
       [1.726851  , 1.6118715 , 1.6496263 , ..., 0.7935742 , 0.8663897 ,
        0.9522377 ],
       [1.7353486 , 1.7259917 , 1.7620523 , ..., 0.73096097, 0.73425245,
        1.2999285 ]], dtype=float32)
Coordinates:
  * lon      (lon) float32 -84.95 -84.85 -84.74999 ... -30.149998 -30.049992
  * lat      (lat) float32 -35.05 -34.95 -34.85 ... 14.850003 14.950002

In [39]:
%time imerg_xds.HQprecipitation.load().resample(time='1w').mean('time').std('time')

CPU times: user 16.2 s, sys: 1.09 s, total: 17.3 s
Wall time: 14.2 s


<xarray.DataArray 'HQprecipitation' (lon: 550, lat: 501)>
array([[1.1234272 , 0.98780537, 1.0199522 , ..., 1.430978  , 1.4646748 ,
        1.7291448 ],
       [1.0871071 , 1.0363684 , 1.007186  , ..., 1.1993552 , 1.6649191 ,
        1.7911829 ],
       [1.1282666 , 1.1375163 , 0.9836079 , ..., 1.3591908 , 1.4338212 ,
        1.8037223 ],
       ...,
       [1.6832757 , 1.6288466 , 1.6509256 , ..., 0.85405266, 0.84503555,
        1.0815194 ],
       [1.7268511 , 1.6118715 , 1.6496264 , ..., 0.7935742 , 0.86638963,
        0.9522377 ],
       [1.7353486 , 1.7259917 , 1.7620524 , ..., 0.73096097, 0.7342524 ,
        1.2999284 ]], dtype=float32)
Coordinates:
  * lon      (lon) float32 -84.95 -84.85 -84.74999 ... -30.149998 -30.049992
  * lat      (lat) float32 -35.05 -34.95 -34.85 ... 14.850003 14.950002