# Using **dask**

[dask](https://dask.org/) is a Python package build upon the scientific stack to enable scalling of Python through interactive sessions to multi-core and multi-node.

Of particular relevance to **SEGY-SAK** is that `xrray.Dataset` loads naturally into `dask`.

This example use the Penobscot 3D available here:
https://s3.us-east-2.amazonaws.com/seismic.euclidity.com/F3/f3_seismic_full.sgy

In [None]:
# when you first download it you will have to convert it to seisnc (NETCDF4)
from segysak.segy import segy_converter
segy_converter('f3_seismic_full.sgy', f3_seismic_full.seisnc, iline=189, xline=193, cdpx=181, cdpy=185)

## Imports and Setup

Here we import the plotting tools, `numpy` and setup the `dask.Client` which will auto start a `localcluster`. Printing the client returns details about the dashboard link and resources.

In [None]:
import numpy as np
from segysak import open_seisnc
import xarray as xr

import matplotlib.pyplot as plt
%matplotlib inline

## Starting the dask cluster and client

This starts a local cluster on your machine.

In [None]:
from dask.distributed import Client

client = Client()
client

We can also scale the cluster to be a bit smaller.

In [None]:
client.cluster.scale(2, memory='500mb')
import time
time.sleep(4)
client

## Lazy loading from SEISNC using chunking

By specifying the chunks argument to the `open_seisnc` command we can ask dask to fetch the data in chunks of size *n*. In this example the `iline` dimension will be chunked in groups of 100. The valid arguments to chunks depends on the dataset but any dimension can be used.

Even though the seis of the dataset is `2.14GB` it hasn't yet been loaded into memory, not will `dask` load it entirely unless the operation demands it.

In [None]:
seisnc = open_seisnc('data/f3_seismic_full.SEISNC', chunks={'iline':200, 'xline':200})
print(seisnc.seis.humanbytes)
print(seisnc.chunks)

Lets see what our dataset looks like. See that the variables are `dask.array`. This means they are references to the on disk data. The dimensions must be loaded so `dask` knows how to manage your dataset.

In [None]:
seisnc

## Operations on SEISNC using `dask`

In this simple example we calculate the mean, of the entire cube. If you check the dashboard (when running this example yourself). You can see the task graph and task stream execution.

In [None]:
mean = seisnc.data.mean()
mean

Whoa-oh, the mean is what? Yeah, `dask` won't calculate anything until you ask it to. This means you can string computations together into a task graph for lazy evaluation. You can visualise the graph using `dask.visualize`

In [None]:
from dask import visualize

In [None]:
# This requires graphviz to be installed.
#visualize(mean, filename='graph', format='png')

Finally, to get the mean try this

In [None]:
mean.compute().values

## Plotting with `dask`

The lazy loading of data means we can plot what we want using `xarray` style slicing and `dask` will fetch only the data we need.

In [None]:
import timeit

In [None]:
std = seisnc.data.std().compute().values

In [None]:
fig, axs = plt.subplots(nrows=2, ncols=3, figsize=(20, 10))

elapsed1 = list()
start_time = timeit.default_timer()

iline = seisnc.sel(iline = 400).transpose('twt', 'xline').data
xline = seisnc.sel(xline = 400).transpose('twt', 'iline').data
zslice = seisnc.sel(twt = 1250, method='nearest').transpose('iline', 'xline').data

# code you want to evaluate
elapsed1.append(timeit.default_timer() - start_time)

elapsed1.append(timeit.default_timer() - start_time)

iline.plot(robust=True, ax=axs[0, 0], yincrease=False)
xline.plot(robust=True, ax=axs[0, 1], yincrease=False)
zslice.plot(robust=True, ax=axs[0, 2])

imshow_kwargs = dict(
    cmap='seismic', aspect='auto', vmin=-std*3, vmax=std*3, interpolation='bicubic'
)

elapsed1.append(timeit.default_timer() - start_time)

axs[1, 0].imshow(iline.values, **imshow_kwargs)
axs[1, 0].set_title('iline')
axs[1, 1].imshow(xline.values, **imshow_kwargs)
axs[1, 1].set_title('xline')
axs[1, 2].imshow(zslice.values, origin='lower', **imshow_kwargs)
axs[1, 2].set_title('twt')

elapsed1.append(timeit.default_timer() - start_time)

Streaming efficiently through a process and back to disk

You can stream back to disk by specifing an output at the end of the process. Don't mix dask collections
like our `seisnc` and 

In [None]:
seisnc_std = (seisnc.data - seisnc.data.mean())/seisnc.data.std()
seisnc_std = seisnc_std*10.0
#print(seisnc_std.std().compute())
seisnc_std.to_netcdf('test_dask5.seisnc', compute=True)

In [None]:
# this bad - very bad when you already have a delayed object like xarray don't use the delayed decorator

from dask import delayed

@delayed(pure=True)
def standardise_to_10(data):
    return (data.data - data.data.mean())/data.data.std()

#print(standardise_to_10(seisnc.data))

s = standardise_to_10(seisnc)
s = s.to_netcdf('test_dask.seisnc', compute=False)
s.result().compute()

## Applying a function to all traces individually

In [None]:
# we need to remove attrs which contain nan for output.
seisnc.attrs = {}

In [None]:
import dask.array as da

def linear_gain(x, twt, gain_per_second):
    x = x*twt*gain_per_second
    return x
    
seisnc = seisnc.transpose('twt', 'iline', 'xline')
    
with_gain = da.apply_along_axis(
    linear_gain, 0, seisnc.data, seisnc.twt.values, 0.1)

With gain is now a dask array and is just a place holder with some information that dask can propergate forward into other processes.

In [None]:
with_gain

To assign the data back to disk we must specify the dimensions like for normal assignments in `xarray`.

In [None]:
seisnc['data'] = (('twt', 'iline', 'xline'), with_gain)

In [None]:
# At this point the data still exists in the task graph and no computation as yet been done.
seisnc

Now we can output the data to disk and complete the computation at the same time.

In [None]:
seisnc.to_netcdf('gained.seisnc')

Finally lets create a new lazy reference to the data and plot it up to see if the linear gain was applied.

In [None]:
fig, axs = plt.subplots(nrows=2, ncols=3, figsize=(20, 10))

gained = open_seisnc('gained.seisnc', chunks={'iline':100, 'xline':100})

iline = gained.sel(iline = 400).transpose('twt', 'xline').data
xline = gained.sel(xline = 400).transpose('twt', 'iline').data
zslice = gained.sel(twt = 1250, method='nearest').transpose('iline', 'xline').data

# code you want to evaluate
iline.plot(robust=True, ax=axs[0, 0], yincrease=False)
xline.plot(robust=True, ax=axs[0, 1], yincrease=False)
zslice.plot(robust=True, ax=axs[0, 2])

std = 400_000

imshow_kwargs = dict(
    cmap='seismic', aspect='auto', vmin=-std*3, vmax=std*3, interpolation='bicubic'
)

axs[1, 0].imshow(iline.values, **imshow_kwargs)
axs[1, 0].set_title('iline')
axs[1, 1].imshow(xline.values, **imshow_kwargs)
axs[1, 1].set_title('xline')
axs[1, 2].imshow(zslice.values, origin='lower', **imshow_kwargs)
axs[1, 2].set_title('twt')