# Xarray and parallel computing with dask

xarray integrates with dask to support parallel computations and streaming computation on datasets that don’t fit into memory.

### Outline
- reading and writing data
- automatic parallelization
- dask schedulers and deployments

### Tutorial Duriation
10 minutes

### Going Further

- Xarray's Documentation on using dask: http://xarray.pydata.org/en/latest/dask.html
- Dask's Documentation: http://dask.pydata.org/en/latest/

## Dask

Dask is a flexible parallel computing library for analytic computing.

Dask is composed of two components:

- Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
- “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.

![](images/collections-schedulers.png)


Dask works by constructing "task graphs"

![](images/transpose.svg)


## Reading and Writing data

The usual way to create a dataset filled with dask arrays is to load the data from a netCDF file or files. You can do this by supplying a chunks argument to open_dataset() or using the open_mfdataset() function.

In [None]:
%matplotlib inline

import xarray as xr
import numpy as np

np.warnings.filterwarnings('ignore')


In [None]:
# use dask's distributed scheduler for now
# (see the last section of this notebook for more details)
from distributed import Client, progress
client = Client()
client

In [None]:
ds = xr.open_dataset('./data/rasm.nc', chunks={'time': 10})  # this actually loads data using xr.open_dataset
ds

In [None]:
# xarray is now using dask arrays under the hood
ds['Tair'].data

In [None]:
## Xarray operations using dask are now lazy
t_range = (ds['Tair'].resample(time='AS').max('time') -
           ds['Tair'].resample(time='AS').min('time')).mean('time')

In [None]:
t_range.data.visualize()

In [None]:
results = t_range.persist()
progress(results)

In [None]:
results

## Automatic parallelization

Almost all of xarray’s built-in operations work on dask arrays. If you want to use a function that isn’t wrapped by xarray, one option is to extract dask arrays from xarray objects (.data) and use dask directly.

Another option is to use xarray’s apply_ufunc(), which can automate embarrassingly parallel “map” type operations where a functions written for processing NumPy arrays should be repeatedly applied to xarray objects containing dask arrays. It works similarly to `dask.array.map_blocks()` and `dask.array.atop()`, but without requiring an intermediate layer of abstraction.

In [None]:
import bottleneck

def covariance_gufunc(x, y):
    return ((x - x.mean(axis=-1, keepdims=True))
            * (y - y.mean(axis=-1, keepdims=True))).mean(axis=-1)

def pearson_correlation_gufunc(x, y):
    return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1))

def spearman_correlation_gufunc(x, y):
    x_ranks = bottleneck.rankdata(x, axis=-1)
    y_ranks = bottleneck.rankdata(y, axis=-1)
    return pearson_correlation_gufunc(x_ranks, y_ranks)

def spearman_correlation(x, y, dim):
    return xr.apply_ufunc(
        spearman_correlation_gufunc, x, y,
        input_core_dims=[[dim], [dim]],
        dask='parallelized',
        output_dtypes=[float])

In [None]:
ds['Tair'].roll(time=1)

In [None]:
ds = ds.chunk({'x': 25, 'y': 25, 'time': -1})  # rechunk this dataset

# create some toy data using our dataset
x = ds['Tair']
y = ds['Tair'].roll(time=1)  # lag the air temp data by one month
y['time'] = x['time']  # reset the index 


# calculate the spearman correlation
corr = spearman_correlation(x, y, 'time')
corr = corr.where(corr < 1)  # make sure nans are masked properly
corr

In [None]:
corr.plot()

## dask schedulers and deployments

Dask provides 4 different mechanisms for executing dask task graphs.

- `dask.threaded.get`: a scheduler backed by a thread pool
- `dask.multiprocessing.get`: a scheduler backed by a process pool
- `dask.get`: a synchronous scheduler, good for debugging
- `distributed.Client.get`: a distributed scheduler for executing graphs

In [None]:
import dask.multiprocessing
dask.config.set(scheduler='processes')  # overwrite default with multiprocessing scheduler

# or

from distributed import Client
client = Client()

## Lots more! 

- Setting up dask: http://dask.pydata.org/en/latest/setup.html