# Continuing on the Dask Data structures

Dask offers several pythonic data structures to handle and operate with larger-than-memory data in a distributed system.
- `dask.bag`: distributed generic python list. The Dask equivalent to a PySpark RDD
- `dask.array`: distributed numpy arrays
- `dask.dataframe`: distributed pandas dataframes

All the high-level data structure APIs are optimized to exploit the DAG optimization features of the Dask scheduler, and thus rely on lazy computation.

## Start the Dask cluster

In [None]:
# set this variable with one of the following values

# -> 'local'
# -> 'docker_container'
# -> 'docker_cluster'

CLUSTER_TYPE ='docker_cluster'

In [None]:
%env CLUSTER_TYPE $CLUSTER_TYPE

In [None]:
%%script bash --bg --out script_out

if [[ "$CLUSTER_TYPE" != "docker_cluster" ]]; then
    echo "Launching scheduler and worker"
    
    HOSTIP=`hostname -I | xargs`
    
    echo "dask-scheduler --host $HOSTIP --dashboard-address $HOSTIP:8787"
    
    # dask scheduler 
    dask-scheduler --host $HOSTIP --dashboard-address $HOSTIP:8787 &

    # dask worker
    dask-worker $HOSTIP:8786 --memory-limit 2GB --nworkers 2 &

fi

In [None]:
host_ip = !hostname -I | xargs
host_ip = host_ip[0]

In [None]:
from dask.distributed import Client

if CLUSTER_TYPE == 'local':
    
    client = Client()

elif CLUSTER_TYPE == 'docker_container':
    
    client = Client('{}:8786'.format(host_ip))
    
elif CLUSTER_TYPE == 'docker_cluster':
    
    # use the provided master
    client = Client('dask-scheduler:8786')
    
client

## Dask DataFrame

The dask.dataframe API implements a blocked parallel DataFrame object that mimics a large subset of the Pandas DataFrame interfaces. 

One Dask DataFrame is comprised of many in-memory pandas DataFrames or Series hosted in all machines of the cluster.
A Dask DataFrame have its table separated (partitioned) along the index **\*** 

One operation on a Dask DataFrame triggers many Pandas operations on the constituent Pandas DataFrames in a way that is mindful of potential parallelism and memory constraints.

**\*** _Quick question. What kind of data partitioning are we talking about in this case? Vertical or Horizontal?_

### On the Data Partitioning

Partitioning is one of the key aspects of data management and processing in distributed systems (both in Spark, Dask, but also other frameworks as well).

Distributing our dataset into a large number of small partitions enables parallel processing in each node.
However, we should always remember that every single call the central scheduler performs to access the data on a remote partition (on a worker) requires time, ideally in the order of a few hundreds milliseconds.

As the user of the distributed processing system, it is **our** (not Dask's or Spark's) job to decide and optimize the number of partitions.
The distributed framework will first operate a choice or a guess, but the optimization of the performances depend on us making the right choice depending on the task.

For example, loading up data from a set of files, initially the number of partitions might be the exactly the number of the CSV files from which we are importing data. 
However, over time, as we reduce or increase the size of our DataFrames by filtering or joining, it may be wise to reconsider how many partitions we need to optimize Dask scheduler overhead. 


_There is always a cost associated in having too many or having too few partitions, but unfortunately, at the same time we don't have a single rule to decide which is the single "right" number of partitions._


As a rule of thumb, we can say that partitions should fit comfortably in memory.
But at the same time, they must not be too many, such as the central scheduler overhead becomes impacting the performances of our computation.

This means that the number of partition should be in a "reasonable region", and their number and size must be chosend by the user when starting optimizing the execution.

Let's start by reading a set of structured files (comma separated) in a DataFrame.

In pure Pandas, we have to loop over all files, open each one of them as a DataFrame, and concatenate all dataframes into a large DF.

The resulting DF is a single entity, stored in memory for single-threaded data access.

In [None]:
! ls datasets/accounts_csv

In [None]:
! head -10 datasets/accounts_csv/accounts.0.csv

In [None]:
from glob import iglob
import pandas as pd
import os

path = os.path.join('datasets','accounts_csv','accounts.*.csv')

all_files = iglob(path, recursive=True)     

dataframes = (pd.read_csv(f) for f in all_files)

large_dataframe = pd.concat(dataframes, ignore_index=True)

In [None]:
large_dataframe

In Dask, DataFrames can be created from a glob pattern using the wildcard `*`, so all files in the path matching that pattern will be read into the same Dask DataFrame.

We should remember that Dask DataFrames are a collection of Pandas dataframes, scattered across the workers.

When reading data into a DataFrame Dask will automatically create a number of jobs to read out the data in parallel.
It does this by creating one parallel job per chunk, which by default is the number of input files.
We can (and generally speaking should) however always think about how we want to partition our dataset, to exploits at best our workers.

In [None]:
import dask.dataframe as dd

path = os.path.join('datasets','accounts_csv','accounts.*.csv')

df = dd.read_csv(path)

As always, reading data is a lazy operation, posponed until we have something to compute.

Let's compute the lenght of the total DataFrame.

In [None]:
len(df)

Only at this stage, each file was loaded into a Pandas DataFrame and scattered on the nodes.

Computing `len()` meant that each Pandas DataFrame had `len` applied to it, and then the sub-result were aggregated and combined to provide the overall total length of the DF.

In [None]:
df

We can re-assign the number of partitions to the Dask DataFrame object using the `repartition` method

In [None]:
df = df.repartition(npartitions=8)

In [None]:
df

In [None]:
len(df)

In [None]:
df.npartitions

It is worth mentioning that differently from Pandas, Dask only reads in a sample from the beginning of the file (or the list of files) to start inferring the Data Types.

These inferred datatypes are then enforced when reading all partitions, which may lead to *incorrect* datatype assignment!

Think for instance in the case we start reading a csv file for which the first $n$ entries of a column are all integers, but the column is actually supposed to be a _string_ type.

In [None]:
df = dd.read_csv(os.path.join('datasets', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]})

In [None]:
df.dtypes

In [None]:
df.head()

In [None]:
df.tail()

In this case, the datatypes inferred for `CRSElapsedTime` and `TailNum` are in fact incorrect. 

We can however force Dask to interpret the datatypes as we want them to be using the `dtype` assignment.
This is usually the most viable and robust solution, but not the only one.

We could also:
- Increase the size of the sample used to infer the datatypes
- Use `assume_missing` to make Dask assume that columns inferred to be `int` (which don't allow missing values) are actually `floats` (which do allow missing values)

However, in general the best solution is still to hard-code the datatypes.

In [None]:
df = dd.read_csv(os.path.join('datasets', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float})

In [None]:
df = df.repartition(8)

In [None]:
df.tail()

While on the topic of reading out data from files, it is worth mentioning that Dask offers a vast range of APIs for importing structured data from a multitude of file formats into a DataFrame object, including:
- csv
- json
- avro
- parquet

For a more extensive list, check the official documentation at the [link](https://docs.dask.org/en/stable/dataframe-create.html).

### Computations with Dask DataFrames

Dask DataFrame almost 1-to-1 copy the Pandas DataFrame API.
Effectively, Dask provides a wrapper around the Pandas API in order to manage the same calls on the distributed collection of Pandas DataFrames, which in our case are the partitions or our dataset.

Clearly, using Dask DataFrame is useful only up to the measure where Pandas shows its limits, either due to data size (larger than memory datasets) or computation speed (running a task that could be parallelized efficiently).

Whenever working with small datasets and not-really parallelizable tasks, Pandas will always be the best choice.

For this very reason, the usual computing pattern when dealing with Dask dataframe is simply the following:
1. Use `Dask Bag` to ingest data from semi/unstructured sources and pre-process it into Dask DataFrames
1. Use `Dask DataFrame` for parallel computations and data reduction to produce a resulting slimmed DataFrame that can be fit in memory, then convert it into a single Pandas DataFrame
1. Use `Pandas` for simple (yet fast) DataFrames operations for non-parallelizable tasks


We can always retrieve data from all partitions and create a Pandas DataFrame from a Dask DataFrame using the `compute` method on the DataFrame itself.

At this point this should not come as a surprise, but this operation should be done very carefully, and only at the right time, when we have reduced our very large (possibly TeraBytes-large) Dask DataFrame to something meaningful to be stored in our computers' memory.

In [None]:
pandas_df = df.compute()
pandas_df

Let's work with basic DataFrame operations and inspect their impact in Dask vs Pandas.

In Dask, all operations on columns and individual items are easily parallelized, and thus very fast.

In [None]:
# retrieve the max of the DepDelay column
df.DepDelay.max().compute()

In [None]:
# visualize the graph for this operation
df.DepDelay.max().visualize()

In [None]:
# find the average arrival delay (ArrDelay) of all flights with carrier UnitedAirlines (UA)
df[df.UniqueCarrier=='UA'].ArrDelay.mean().compute()

In [None]:
# visualize the last operation
df[df.UniqueCarrier=='UA'].ArrDelay.mean().visualize()

Dask DataFrames also offer a smart implementation of most operations involving a minimal amount of shuffling, such as group based aggregations and merge operations.

This means that operations such as `groupby`, `resample`, `rolling` and other similar operations are still reasonably fast, even though they are produced with a large under-the-hood optimization.

In [None]:
# find the max airtime by combinations of origin and destination
df.groupby(['Origin','Dest']).AirTime.max().compute()

In [None]:
# visualize the last operation
df.groupby(['Origin','Dest']).AirTime.max().visualize()

As we can see from the graph, the `groupby` method returns a single object from the computation, stored in a single partition.
This is usually a fair choice, as it is normally considered that the result of a groupby aggregation is small enough to be stored into a single worker memory, thus not requiring the result to be split into multiple partitions.


However, it may happen that the retuned object is very large, depending on the input datasets, and we can optimize the number of output partitions by using the `split_out` argument.


In [None]:
# find the max airtime by combinations of origin and destination
# split the result of the groupby operation in 4 partitions
df.groupby(['Origin','Dest']).AirTime.max(split_out=4).visualize()

Note: Dask also supports the same aggregate syntax as Pandas to run sevearal aggregations at the same time on the same group.

In [None]:
df.groupby(['Origin','Dest']).AirTime.aggregate(['mean','std']).compute()

There are clearly operations for which the mapping with the standard Pandas APIs cannot hold.

For instance, the slicing and feature-based indexing based on `loc` does work as expected, but the potition-based indexing operator `iloc` does not really work as one would expect in Dask vs Pandas.

In [None]:
df.loc[df['Dest']=='DEN',['UniqueCarrier']].compute()

In [None]:
df.loc[df['Dest']=='DEN',['UniqueCarrier']].visualize()

`iloc` will raise instead an exception.

This happens because Dask DataFrame does not take into account the length of partitions and the row ordering inside the partition.
After all, what is the real odering of rows when the records are sharded across multiple nodes...? 

We can still use `iloc` to select the index of a some column, but for all rows in all partitions, e.g.: `iloc[:,0:10]`

This makes `iloc` extremely inefficient in Dask.

In [None]:
df.iloc[0:10]

Other very ineffienct operations in Dask are all the actions that require a re-indexing of the entire DataFrame, or sorting or merging of the entries based on a column that is not the index of the DataFrame itself.

This is due to the optimization Dask runs under the hood, where the range of the index column field can (although this might not always be the case) be used to subdivide the data into partitions.

Setting an index on a DataFrame will thus require to sort the entire dataset by the specified column, which is an extremely expensive process.
While the sorting process can be very slow, it can be extremely usefult to do it (although _very_ unfrequently) to speed up further computations.

Dask will in fact also use the index in merge/join operations. Performing a join on a column that is not the index of the DataFrame is also a very expensive operations.

After an unavoidable and expensive operation such a full reshuffling of your data is done (out of necessity), one can always persist the new DataFrame, to speed up subsequent computations.

In [None]:
df_reindexed = df[(df.UniqueCarrier=='UA') & (df['Dest']=='DEN')].set_index('Date')

In [None]:
df_reindexed.ArrDelay.rolling('5D').mean().compute()

In [None]:
df_reindexed.ArrDelay.rolling('5D').mean().visualize()

### Shared, reapeated and intermediate computations

When computing all of the above, we sometimes have to repeat the same operation more than once. 

For most operations, Dask DataFrame _hashes_ the arguments allowing duplicate computations to be shared, and only computed once.

For example, lets compute the mean and standard deviation for departure delay of all non-canceled flights. 
Since dask operations are lazy, those values aren't the final results yet. 
They're just the recipe require to get the result.

If we want to change the approach, we can compute them with two individual calls to compute. 
In this latter case, there is no sharing of intermediate computations and an overall speedup of the computation.

In [None]:
df = df.persist()

In [None]:
df.head()

In [None]:
import dask

#create non-cancelled dataframe
non_cancelled = df[df.Cancelled==0]

# create mean DepDelay series
mean_delay    = non_cancelled.DepDelay.mean()

# create std DepDelay series
std_delay     = non_cancelled.DepDelay.std()

In [None]:
%%time

# compute the time to run both functions
mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

In [None]:
%%time

# compute the time to combining both functions into a single compute call
mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

Using dask.compute takes ~ 1/2 of the time of the other call. 

This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice. 
In particular, using dask.compute only does the following once:

+ the calls to the `read_csv` function
+ the filter on the cancelled flights
+ some of the necessary reductions (sum, count)

Let's see the computation graph:

In [None]:
dask.visualize(mean_delay, std_delay)

What we can see is that a lot of operation have been merged and used one single time, thus resulting in a optimization of the computing time.

## Re-run the same example from the Spark notebook using Dask

To apply all we discussed about Dask high- and low-level APIs, we can re-run the same task we used as an example analysis pipeline in pySpark for the DiMuon invariant mass of the LHC collision data.

Starting from the JSON files already used in the previous pySpark example we will need to:
1. load all files using the Dask Bag API
2. parse the json event collections into python dictionaries
3. use a foldby to count the number of events per sample (mc and data)
4. plot the distribution of the number of muons in the mc and data sample from the Dask.Bag object
5. select only events with exactly 2 muons with opposite charge and fill a Dask.DataFrame with the normalized results
6. create the tranverse momemta $p_{T,1}$ and $p_{T,2}$ features and create a 2d plot of these features
$$p_T = \sqrt{p_x^2 + p_y^2}$$
7. retain only the dimuon pairs where both muons' $p_T$ is greater than 15 (GeV)
8. create the components of the dimuon 4-momentum
$$(E,P_x,P_y,P_z) = (E_1+E_2,p_{x,1}+p_{x,2},p_{y,1}+p_{y,2},p_{z,1}+p_{z,2})$$
9. create and plot the invariant mass spectrum
$$M = \sqrt{E^2 - (P_x^2 + P_y^2 + P_z^2) }$$

Before starting, from a terminal (external to the docker container you are working in), copy the `MAPD-B/spark/datasets/lecture2/dimuon` folder into the `MAPD-B/dask/notebooks/datasets/.` path

In [None]:
client.restart()

In [None]:
# load the dimuon json files in a bag, each containing the text lines
import dask.bag as db
from pprint import pprint

db_lines = db.read_text(os.path.join('datasets','dimuon','*.json'),
                 files_per_partition=4)
pprint(db_lines.take(1))

In [None]:
# extract the json structure from the lines
import json
db_js = db_lines.map(json.loads).flatten()
pprint(db_js.take(1))

In [None]:
# count the number of events per each sample
from operator import add

def incr(tot, _):
    return tot+1

db_js.foldby(key='sample', 
             binop=incr, 
             initial=0, 
             combine=add, 
             combine_initial=0).compute()

In [None]:
# plot the distribution of the number of muons in the mc and data sample
import matplotlib.pyplot as plt
import numpy as np

plt.figure(figsize=(8,6))
mc_counts = db_js.filter(lambda record: record['sample']=='mc')\
                     .pluck('nMuon').compute()
plt.hist(
    mc_counts,
    label = 'MC simulation', bins=range(16), alpha=0.6
)

data_counts = db_js.filter(lambda record: record['sample']=='data')\
                     .pluck('nMuon').compute()
data_counts, data_edges = np.histogram(data_counts,range(16))
data_centers = 0.5*(data_edges[1:] + data_edges[:-1]) 

plt.errorbar(
    data_centers,
    data_counts,
    yerr = np.sqrt(data_counts),
    label = 'Data', color='black', fmt='o'
)

plt.xlim(-0.5,16.5)
plt.xlabel("Number of muons")
plt.ylabel("Counts")
plt.legend()
plt.semilogy()
plt.show()


In [None]:
# select only events with exactly 2 muons with opposite charge and fill a dask dataframe with the results
def flatten_records(record):
    return {
        'sample': record['sample'],
        'E1':  record['Muons'][0]['E'],
        'px1': record['Muons'][0]['px'],
        'py1': record['Muons'][0]['py'],
        'pz1': record['Muons'][0]['pz'],
        'E2':  record['Muons'][1]['E'],
        'px2': record['Muons'][1]['px'],
        'py2': record['Muons'][1]['py'],
        'pz2': record['Muons'][1]['pz'],
    }

df = db_js.filter(lambda record: record['nMuon']==2)\
          .filter(lambda record: record['Muons'][0]['charge']==-record['Muons'][1]['charge'])\
          .map(flatten_records)\
          .to_dataframe()

In [None]:
# create the new pT1 and pT2 features 
# pT = sqrt(px^2 + py^2)
df['pT1'] = np.sqrt(df['px1']**2 + df['py1']**2) 
df['pT2'] = np.sqrt(df['px2']**2 + df['py2']**2) 

In [None]:
# plot the 2 dimensional distribution of the dimuon pair pTs in the mc sample
import matplotlib.colors as clt

plt.figure(figsize=(8,6))

plt.hist2d(
    df.loc[:,'pT1'],
    df.loc[:,'pT2'],
    label = 'MC simulation', 
    bins=(range(0,100,2),range(0,100,2)), 
    norm=clt.LogNorm()
)


plt.xlabel("$p_{T,1}$ (Gev)")
plt.ylabel("$p_{T,2}$ (Gev)")
plt.colorbar()
plt.show()


In [None]:
# retain only those dimuon pairs where both muons' pT is greater than 15 
df = df.loc[(df['pT1']>15) & (df['pT2']>15),:]

In [None]:
# create the 4-momentum features 
df['E']  = df['E1']  + df['E2']
df['Px'] = df['px1'] + df['px2']
df['Py'] = df['py1'] + df['py2']
df['Pz'] = df['pz1'] + df['pz2']

# and compute the invariant mass of the dimuon pair
# M = sqrt(E*E - (Px*Px + Py*Py + Pz*Pz))
df['M'] = np.sqrt(df['E']**2 - (df['Px']**2 + df['Py']**2 + df['Pz']**2))

In [None]:
# extract the slimmed down dataframe containing only the sample, Mass, and pT of the objects
# and store it in a Pandas DataFrame
pandas_df = df.loc[:,['sample','M','pT1','pT2']].compute()

In [None]:
# plot the distribution of the dimuon invariant mass in the mc and data sample
plt.figure(figsize=(8,6))

plt.hist(
    pandas_df.loc[pandas_df['sample']=='mc','M'],
    label = 'MC simulation', bins=range(120), alpha=0.6
)

data_counts, data_edges = np.histogram(pandas_df.loc[pandas_df['sample']=='data','M'],range(120))
data_centers = 0.5*(data_edges[1:] + data_edges[:-1]) 

plt.errorbar(
    data_centers,
    data_counts,
    yerr = np.sqrt(data_counts),
    label = 'Data', color='black', fmt='.'
)

plt.xlim(-0.5,120.5)
plt.xlabel("Number of muons")
plt.ylabel("Counts")
plt.legend()
plt.semilogy()
plt.show()


### Memory management

In [None]:
# to reclaim some memory from the workers
import ctypes

def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client.run(trim_memory)

In [None]:
# to restart the client
client.restart()