# USID and Dask.distributed
***
Authors: **Emily Costa, Suhas Somnath**

Created on: 7/30/19

## Introduction 

Certain data processing routines are very time consuming because of the sheer size of the data and/or the computational complexity of the data processing routine. Often, such computations are ``embarrasingly parallel`` meaning that the processing of one portion (e.g. pixel) of data is independent from  the processing of all other portions of data.

The `pyUSID.parallel_compute()` function can effectively distribute the computation over all available cores in a CPU and reduce the computational time. However, `pyUSID.parallel_compute()` only distributes computations within a single CPU in a single personal computer. As a consequence, it may not be feasible to run large / lengthy computations on personal computers. In such cases and when available, it is recommended that such computations be run on a university / national lab compute cluster for timely processing of the data.

This document will demonstrate how `dask.distributed` can be used with USID files managed with pyUSID. `dask.distributed` is a python library for distributed computing. It is a task scheduler that is compatible with other Dask libraries and is meant to install and run intuitively. It uses familiar APIs and is built in Python.

During this tutorial, we will be using `find_all_peaks` as an example for computing on USID main datasets using different libraries and techniques. `find_all_peaks` is a simple function that takes a one-dimensional array and finds all local maxima by simple comparison of neighbouring values.

We will compare the performace of serial computation, dask.distributed, and parallel_compute by plotting the performance benchmarks and cores used while computing the "peaks" of all the arrays in a USID main dataset.


### Prepare the modules:

In [None]:
from __future__ import division, print_function, absolute_import, unicode_literals

# The package for accessing files in directories, etc.:
import os

# Warning package in case something goes wrong
from warnings import warn
import subprocess
#from dask.diagnostics import Profiler, ResourceProfiler, visualize
import dask.array as da


def install(package):
    subprocess.call([sys.executable, "-m", "pip", "install", package])
# Package for downloading online files:
try:
    # This package is not part of anaconda and may need to be installed.
    import wget
except ImportError:
    warn('wget not found.  Will install with pip.')
    import pip
    install(wget)
    import wget

# The mathematical computation package:
import numpy as np
import dask
from dask.distributed import Client, LocalCluster
import dask.array as da

# The package used for creating and manipulating HDF5 files:
import h5py

# Packages for plotting:
import matplotlib.pyplot as plt

# Parallel computation library:
try:
    import joblib
except ImportError:
    warn('joblib not found.  Will install with pip.')
    import pip
    install('joblib')
    import joblib

# Timing
import time

# A handy python utility that allows us to preconfigure parts of a function
from functools import partial

# Finally import pyUSID:
try:
    import pyUSID as usid
except ImportError:
    warn('pyUSID not found.  Will install with pip.')
    import pip
    install('pyUSID')
    import pyUSID as usid
    

# import the scientific function:
import sys
sys.path.append('./supporting_docs/')
from peak_finding import find_all_peaks

### Find main dataset:

In [None]:
url = 'https://raw.githubusercontent.com/pycroscopy/pyUSID/master/data/BELine_0004.h5'
h5_path = 'temp.h5'
if os.path.exists(h5_path):
    os.remove(h5_path)
_ = wget.download(url, h5_path, bar=None)
    # Open the file in read-only mode
h5_file = h5py.File(h5_path, mode='r')
    # Get handle to the the raw data
h5_meas_grp = h5_file['Measurement_000']
    # Accessing the dataset of interest:
h5_main = usid.USIDataset(h5_meas_grp['Channel_000/Raw_Data'])
num_rows, num_cols = h5_main.pos_dim_sizes

### Test find_all_peaks

In order to test the function, we will run it on one of the arrays in the main dataset of our sample data. 

Test the `find_all_peaks` function:

In [None]:
# choose an arbitrary pixel_ind to extract an array from the data
row_ind, col_ind = 110, 25
pixel_ind = col_ind + row_ind * num_cols
spectra = h5_main[pixel_ind]

# run function on the array and return peaks
peak_inds = find_all_peaks(spectra, [20, 60], num_steps=30)

# visualize 
fig, axis = plt.subplots()
axis.scatter(np.arange(len(spectra)), np.abs(spectra), c='black')
axis.axvline(peak_inds[0], color='r', linewidth=2)
axis.set_ylim([0, 1.1 * np.max(np.abs(spectra))]);
axis.set_title('find_all_peaks found peaks at index: {}'.format(peak_inds), fontsize=16)

### Compute Serially
The following cell demonstrates the ineffectiveness of serial computation in this example.

In [None]:
start = time.time()
raw_data = h5_main[()]
serial_results = list()
for vector in raw_data:
    serial_results.append(find_all_peaks(vector, [20, 60], num_steps=30))
time_serial = time.time()-start
core_serial = 1

### Compute using pyUSID parallel_compute() function
Now that we have completed the tedious serial computation, we will run parallel computation using pyUSID's `parallel_compute()` function.

In [None]:
# time for benchmarking
start = time.time()
# create an array with all the data
raw_data = h5_main[()]
# width_bounds and num_steps parameters
args = [[20, 60]]
kwargs = {'num_steps': 30}
# specify cores
cores = 3
# run parallel computation
parallel_results = usid.parallel_compute(raw_data, find_all_peaks, cores=cores, 
                                         func_args=args, func_kwargs=kwargs)
# for benchmarking
time_parallel = time.time() - start
cores_parallel = cores

### Compute using Dask:
This will automate the scaling of cores and parallelization of the code. `dask.Client` automates the delegation of work to the workers. For this example, we need to map all the arrays in the dataset to run through the `find_all_peaks` function then collect them after computation. Before mapping the function, we should move the data from the local client process into the workers of the distributed scheduler. Moving the data will avoid time-consuming effort of the workers retrieving data from the local client.

For more information on `dask.Client` and its API, visit [the documentation](https://distributed.dask.org/en/latest/).

In [None]:
start = time.time()
raw_data = h5_main[()]
args = [20, 60]
kwargs = {'num_steps': 30}
# Instantiate the client
client = Client(processes=False)
# distribute the data to the workers, this should speed up the work.
raw_data = client.scatter(raw_data)
# map each array to the function
L = client.map(find_all_peaks, raw_data_list, args, kwargs)
# gather will compute on the mapping
dask_results = client.gather(L)
# benchmark
cores_dask = client.ncores()
# make sure to close the client
client.close()
# benchmark
time_dask = time.time() - start

### Plot benchmarks:

In [None]:
fig, axis = plt.subplots(figsize=(3.5, 3.5))
axis.scatter(core_serial, time_serial, c='blue')
axis.scatter(cores_parallel, time_parallel, c='green')
axis.scatter(cores_dask, time_dask, c='red')
axis.set_xlabel('CPU cores', fontsize=14)
axis.set_ylabel('Compute time (sec)', fontsize=14)
plt.legend()
fig.tight_layout()
plt.savefig('{}.png'.format(png_name))