# Continuing on the Dask Data structures

Dask provides several Pythonic data structures designed to handle and manipulate data that exceeds our local memory capacity:

- `dask.bag`: A distributed generic Python list of objects, analogous to a PySpark RDD.
- `dask.array`: Distributed NumPy arrays.
- `dask.dataframe`: Distributed pandas dataframes, offering functionality similar to pandas but capable of handling larger-than-memory datasets.

All these high-level data structure APIs are optimized to leverage the Directed Acyclic Graph (DAG) optimization features of the Dask scheduler. Consequently, they rely on lazy computation, allowing for efficient execution of operations on large datasets.

## Start the Dask cluster

In [None]:
from dask.distributed import Client

# use the provided master
client = Client('dask-scheduler:8786')
    
# print the status of the client        
client

The `dask.dataframe` API provides a parallel DataFrame object that closely resembles the Pandas DataFrame interface. 

A Dask DataFrame consists of multiple Pandas DataFrames or Series stored in memory across all machines in the cluster. The table of a Dask DataFrame is partitioned along the index axis **\***.

When performing operations on a Dask DataFrame, it triggers corresponding operations on the constituent Pandas DataFrames, taking into account potential parallelism and memory limitations.

**\*** _Quick question: In this context, what type of data partitioning does "partitioned along the index" refer to: Vertical or Horizontal?_



### (On Data Partitioning)

Partitioning is a crucial aspect of data management and processing in distributed systems, such as frameworks like Spark and Dask.

Distributing our dataset into numerous small partitions allows for parallel processing on each node. However, it's essential to acknowledge that every call the central scheduler makes to access data on a remote partition (on a worker) incurs some time, typically in the order of a few hundred milliseconds.

As users of distributed processing systems, it is our responsibility (not Dask's or Spark's) to determine and optimize the number of partitions. While the distributed framework may make an initial choice or guess, optimizing performance relies on us making the appropriate choice based on the task at hand.

For instance, when initially loading data from a set of files, the number of partitions might align with the number of CSV files we are importing. Nonetheless, as we alter the size of our DataFrames through filtering or joining, it may be prudent to reassess the number of partitions to optimize the Dask scheduler's overhead.

_There is always a cost associated with having too many or too few partitions, and unfortunately, there is no single rule to determine the "right" number of partitions._

As a _rule of thumb_, partitions should comfortably fit in memory. However, they should not be excessively numerous, as the overhead of the central scheduler can impact computation performance.

This suggests that the number of partitions should fall within a "reasonable range," and their number and size should be selected by the user to optimize execution.


## Basic operations on DataFrames

Let's start by reading a set of structured files (comma-separated) into a DataFrame.

In pure Pandas, we would have to loop over all the files, open each one of them as a DataFrame, and concatenate all the DataFrames into a large DataFrame.

The resulting DataFrame is a single entity stored in memory, allowing for single-threaded data access.

In [None]:
! ls datasets/accounts_csv

In [None]:
! head -10 datasets/accounts_csv/accounts.0.csv

In [None]:
# import the necessary libraries
from glob import iglob
import pandas as pd
import os

# define the path to the CSV files
path = os.path.join('datasets', 'accounts_csv', 'accounts.*.csv')

# use glob to find all matching file paths
all_files = iglob(path, recursive=True)

# create a generator to read each CSV file as a Pandas DataFrame
dataframes = (pd.read_csv(f) for f in all_files)

# concatenate all DataFrames into a single large DataFrame
large_dataframe = pd.concat(dataframes, ignore_index=True)

In [None]:
# print the pandas DataFrame
large_dataframe

In Dask, DataFrames can be created from a `glob` pattern using the wildcard `*`, which reads all files in the specified path matching that pattern into the same Dask DataFrame.

It's important to remember that Dask DataFrames are a collection of Pandas DataFrames scattered across the workers.

When reading data into a Dask DataFrame, Dask automatically creates parallel jobs to read the data in parallel. By default, it creates one (parallel) job per chunk, which is typically the number of input files. However, it is recommended to consider how we want to partition our dataset to make the best use of our workers.

The Dask DataFrame API don't optimize the query execution for you like Spark or a SQL DBMS would. 

This means that users waste a lot of computation not profiting from the number of optimization techniques normally available in structured data processing tools.

Starting only from the most recent version (`2024.4`), Dask is now transitioning to set a `query-planning` optimizer by default. 

We should make however sure that the `query-planning` is activated, by issuing:

In [None]:
import dask
dask.config.set({'dataframe.query-planning':True})

In [None]:
# import the necessary libraries
import dask.dataframe as dd

# define the path to the CSV files
path = os.path.join('datasets', 'accounts_csv', 'accounts.*.csv')

# read the CSV files into a Dask DataFrame
df = dd.read_csv(path)


As always, reading data is a lazy operation, meaning it is postponed until we have something to compute.

Let's compute the length of the entire DataFrame.

In [None]:
# compute the length of the Dask DataFrame
len(df)

At this stage, each file was loaded into a separate Pandas DataFrame and scattered across the nodes.

Computing `len()` means that `len` was applied to each individual Pandas DataFrame, and then the sub-results were aggregated and combined to provide the overall total length of the DataFrame.

In [None]:
# "print" the Dask DataFrame
df

We can reassign the number of partitions to the Dask DataFrame object using the `repartition` method.

In [None]:
# change the number of partitions to 8
df = df.repartition(npartitions=8)

In [None]:
# "print" the Dask DataFrame
df

In [None]:
# compute the length of the Dask DataFrame
len(df)

In [None]:
# compute the number of partitions of the Dask DataFrame
df.npartitions

It is worth mentioning that unlike Pandas, Dask only reads a sample from the beginning of the file (or the list of files) to start inferring the data types.

These inferred data types are then enforced when reading all partitions, which can potentially lead to incorrect data type assignments.

For example, consider the case where we start reading a CSV file in which the first $n$ entries of a column are all integers, but the column is actually supposed to be of type _string_.

In [None]:
df = dd.read_csv(os.path.join('datasets', 'nycflights', '*.csv'))

In [None]:
df.head()

In [None]:
# read CSV files - explicitely parse dates 
df = dd.read_csv(os.path.join('datasets', 'nycflights', '*.csv'))

# re-format the date 
df['Date'] = dd.to_datetime(
    df.rename(
        columns = {'Year':'year', 'Month':'month', 'DayofMonth':'day'}
    )[['year', 'month', 'day']],
    meta=pd.Series(dtype="datetime64[ns]")
)

In [None]:
# the head of the *distributed* Dask DataFrame
df.head()

In [None]:
# the tail of the *distributed* Dask DataFrame
df.tail()

In this case, the datatypes inferred for `CRSElapsedTime` and `TailNum` are in fact incorrect. 

However, we can force Dask to interpret the data types as we want by using the `dtype` assignment. This is usually the most viable and robust solution, but not the only one.

Alternatively, we could:
- Increase the size of the sample used to infer the data types.
- Use `assume_missing` to make Dask assume that columns inferred to be `int` (which don't allow missing values) are actually `floats` (which do allow missing values).

However, in general, the best solution is still to hard-code the data types.

In [None]:
# read CSV files - explicitely parse dates 
df = dd.read_csv(os.path.join('datasets', 'nycflights', '*.csv'),
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float})

# re-format the date 
df['Date'] = dd.to_datetime(
    df.rename(
        columns = {'Year':'year', 'Month':'month', 'DayofMonth':'day'}
    )[['year', 'month', 'day']],
    meta=pd.Series(dtype="datetime64[ns]")
)

In [None]:
# repartition the dataframe
df = df.repartition(npartitions=8)

In [None]:
# print the tail of the dataframe
df.tail()

While on the topic of reading data from files, it is worth mentioning that Dask offers a wide range of APIs for importing structured data from various file formats into a DataFrame object, including:
- CSV
- JSON
- Avro
- Parquet

For a more extensive list, please refer to the official documentation at the following [link](https://docs.dask.org/en/stable/dataframe-create.html).

## Computations with Dask DataFrames

Dask DataFrames are designed to closely mimic the Pandas DataFrame API, with nearly a 1-to-1 correspondence. Dask provides a wrapper around the Pandas API to manage the same calls on the distributed collection of Pandas DataFrames, which in our case are the partitions of our dataset.

Using Dask DataFrames is beneficial when Pandas reaches its limits, such as when dealing with datasets larger than memory or tasks that can be efficiently parallelized.

However, when working with small datasets or tasks that are not easily parallelizable, Pandas will always be the better choice.

For this reason, the typical computing pattern when working with Dask DataFrames is as follows:
1. Use `Dask Bag` to ingest data from semi/unstructured sources and preprocess it into Dask DataFrames.
2. Use `Dask DataFrame` for parallel computations and data reduction to produce a slimmed-down DataFrame that can fit in memory. Then, convert it into a single Pandas DataFrame.
3. Use `Pandas` for simple (yet fast) DataFrame operations for non-parallelizable tasks.

We can always retrieve data from all partitions and create a Pandas DataFrame from a Dask DataFrame using the `compute` method on the DataFrame itself.

However, it's important to note that this operation should be done carefully and only at the appropriate time, especially when dealing with very large (possibly terabytes-sized) Dask DataFrames. It should only be performed when we have reduced the DataFrame to a manageable size that can fit into our computer's memory.

In [None]:
# convert Dask DataFrame to Pandas DataFrame
pandas_df = df.compute()

# display the Pandas DataFrame
pandas_df

Let's work with basic DataFrame operations and compare their impact in Dask vs Pandas.

In Dask, operations on columns and individual items are easily parallelized, making them very fast.

In [None]:
# retrieve the max of the DepDelay column
df.DepDelay.max().compute()

In [None]:
# visualize the execution plan for this operation
df.DepDelay.max().visualize()

In [None]:
# visualize the task graph for this operation
df.DepDelay.max().visualize(tasks=True)

In [None]:
# find the average arrival delay (ArrDelay) 
# of all flights with carrier UnitedAirlines (UA)



In [None]:
# visualize the graph for this last operation




Dask DataFrames also provide efficient implementations of operations that involve minimal shuffling, such as group-based aggregations and merge operations.

This means that operations like `groupby`, `resample`, `rolling`, and similar operations are still _reasonably_ fast, thanks to extensive under-the-hood optimizations.

In [None]:
# find the max airtime by combinations of origin and destination




In [None]:
# visualize the graph for this last operation




As we can see from the graph, the `groupby` method returns a single object from the computation, stored in a single partition. This is generally a reasonable choice, as the result of a groupby aggregation is typically small enough to fit into a single worker's memory, eliminating the need to split the result into multiple partitions.

However, there may be cases where the returned object is very large, depending on the input datasets. In such situations, we can optimize the number of output partitions by using the `split_out` argument.

In [None]:
# find the max airtime by combinations of origin and destination
# split the result of the groupby operation in 4 partitions
df.groupby(['Origin','Dest']).AirTime.max(split_out=4).visualize(True)

Note: Dask also supports the same aggregate syntax as Pandas, allowing us to run several aggregations simultaneously on the same group.

In [None]:
# group the DataFrame by 'Origin' AND 'Dest', and calculate the mean AND standard deviation of 'AirTime'
df.groupby(['Origin', 'Dest'])['AirTime'].aggregate(['mean', 'std']).compute()

There are **certain operations** for which the **mapping with the standard Pandas APIs does not hold** in Dask.

For example, slicing and feature-based indexing using `loc` work as expected in Dask, but the position-based indexing operator `iloc` does not behave the same way as it does in Pandas.

In [None]:
# select rows where 'Dest' column equals 'DEN' and retrieve the 'UniqueCarrier' column
df.loc[df['Dest'] == 'DEN', ['UniqueCarrier']].compute()

In [None]:
# visualize the graph for this last operation
df.loc[df['Dest']=='DEN',['UniqueCarrier']].visualize(True)

In Dask, using `iloc` for position-based indexing will raise an exception instead of behaving as it does in Pandas.

In [None]:
# use iloc to locate the first i-th rows of the *distributed* DataFrame
df.iloc[0:10]

This behavior occurs because Dask DataFrame does not consider the length of partitions or the row ordering within each partition. When records are sharded across multiple nodes, determining the true ordering of rows becomes ambiguous.

While you can still use `iloc` to select the index of a specific column for all rows in all partitions, such as `iloc[:, 0:10]`, it is important to note that `iloc` can be extremely inefficient in Dask due to the underlying data distribution and lack of explicit row ordering.

Other operations that are **inefficient** in Dask include actions that require _re-indexing_ of the entire DataFrame, _sorting_, or _merging based on a column that is not the DataFrame's index_.

Dask optimizes the data distribution by utilizing the range of the index column to subdivide the data into partitions. However, setting an index on a DataFrame requires sorting the entire dataset by the specified column, which can be an expensive process. While sorting can be slow, it can be beneficial to perform it (although infrequently) to accelerate subsequent computations.

Dask also leverages the index in merge/join operations. Performing a join on a column that is not the DataFrame's index is also an expensive operation.

After performing an unavoidable and expensive operation such as reshuffling the data, you can persist the new DataFrame to speed up subsequent computations.

In [None]:
# select rows where 'UniqueCarrier' is 'UA' and 'Dest' is 'DEN', and set the index to 'Date'
df_reindexed = df[(df.UniqueCarrier=='UA') & (df['Dest']=='DEN')].set_index('Date')

In [None]:
# compute a 5-days rolling average of the related ArrivalDelay
df_reindexed.ArrDelay.rolling('5D').mean().compute()

In [None]:
# visualize the graph for this last operation
df_reindexed.ArrDelay.rolling('5D').mean().visualize(True)

## Shared, reapeated and intermediate computations

When performing the computations mentioned above, there may be cases where the same operation needs to be repeated multiple times. 

In Dask DataFrame, for most operations, the arguments are hashed, allowing for duplicate computations to be shared and computed only once.

For example, let's consider computing the mean and standard deviation of departure delay for all non-canceled flights. Since Dask operations are lazy, the computed values are not the final results yet. They represent the recipe required to obtain the final result.

If we change the approach and compute these values with two individual calls to `compute`, there will be no sharing of intermediate computations, resulting in an overall speedup of the computation.

In [None]:
# cache the distributed DataFrame in the memory of the workers
df = df.persist()

In [None]:
# show the head of the DataFrame
df.head()

In [None]:
import dask

# create non-cancelled dataframe (lazy)
non_cancelled = df[df.Cancelled==0]

# create mean DepDelay series (lazy)
mean_delay    = non_cancelled.DepDelay.mean()

# create std DepDelay series (lazy)
std_delay     = non_cancelled.DepDelay.std()

In [None]:
%%time

# compute the time to run both functions
mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

In [None]:
%%time

# compute the time to combining both functions into a single compute call
mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

Using `dask.compute` takes approximately half the time compared to the other approach. This is because when calling `dask.compute`, the task graphs for both results are merged, allowing shared operations to be performed only once instead of twice. Specifically, `dask.compute` performs the following operations once:

- Calls to the `read_csv` function
- Filter on cancelled flights
- Some necessary reductions (such as sum and count)

Let's take a look at the computation graph:

In [None]:
# visualize the graph and spot the two outputs
dask.visualize(mean_delay, std_delay)

What we can observe from the computation graph is that many operations have been merged and used only once, resulting in an optimization of the overall computation time.

## Re-run the same exercise from the Spark notebook using Dask

To apply all we discussed about Dask high- and low-level APIs, we can re-run the same task we used as an example analysis pipeline in PySpark for the DiMuon invariant mass of the LHC collision data.

Starting from the JSON files already used in the previous PySpark example, we will need to:
1. Load all files using the Dask Bag API.
2. Parse the JSON event collections into Python dictionaries.
3. Use a `foldby` operation to count the number of events per sample (mc and data).
4. Plot the distribution of the number of muons in the MC and data samples from the Dask Bag object.
5. Select only events with exactly 2 muons with opposite charge and fill a Dask DataFrame with the normalized results.
6. Create the transverse momenta $p_{T,1}$ and $p_{T,2}$ features and create a 2D plot of these features.
$$p_T = \sqrt{p_x^2 + p_y^2}$$
7. Retain only the dimuon pairs where both muons' $p_T$ is greater than 15 (GeV).
8. Create the components of the dimuon 4-momentum
$$(E,P_x,P_y,P_z) = (E_1+E_2,p_{x,1}+p_{x,2},p_{y,1}+p_{y,2},p_{z,1}+p_{z,2})$$
9. create and plot the invariant mass spectrum
$$M = \sqrt{E^2 - (P_x^2 + P_y^2 + P_z^2) }$$


Before starting, from a terminal (external to the docker container you are working in), copy the `MAPD-B/spark/datasets/lecture2/dimuon` folder into the `MAPD-B/dask/notebooks/datasets/.` path, or create a symbolic link to that path.

In [None]:
# restart the client to free up the workers' memory
client.restart()

#### 1. Load all files using the Dask Bag API.

In [None]:
# load the dimuon json files in a bag, each containing the text lines


#### 2. Parse the JSON event collections into Python dictionaries.

In [None]:
# extract the json structure from the lines


#### 3. Use a `foldby` operation to count the number of events per sample (mc and data).

In [None]:
# count the number of events per each sample


#### 4. Plot the distribution of the number of muons in the MC and data samples from the Dask Bag object.

In [None]:
# plot the distribution of the number of muons in the mc and data sample


#### 5. Select only events with exactly 2 muons with opposite charge and fill a Dask DataFrame with the normalized results.

In [None]:
# select only events with exactly 2 muons with opposite charge and fill a dask dataframe with the results


#### 6. Create the transverse momenta $p_{T,1}$ and $p_{T,2}$ features and create a 2D plot of these features.

In [None]:
# create the new pT1 and pT2 features 
# pT = sqrt(px^2 + py^2)


In [None]:
# plot the 2 dimensional distribution of the dimuon pair pTs in the mc sample


#### 7. Retain only the dimuon pairs where both muons' $p_T$ is greater than 15 (GeV).

In [None]:
# retain only those dimuon pairs where both muons' pT is greater than 15 


#### 8. Create the components of the dimuon 4-momentum

In [None]:
# create the 4-momentum features 


#### 9. create and plot the invariant mass spectrum

In [None]:
# compute the invariant mass of the dimuon pair
# M = sqrt(E*E - (Px*Px + Py*Py + Pz*Pz))



In [None]:
# extract the slimmed down dataframe containing only the sample, Mass, and pT of the objects
# and store it in a Pandas DataFrame


In [None]:
# plot the distribution of the dimuon invariant mass in the mc and data sample


### (in the case of memory management issues)

In [None]:
# to reclaim some memory from the workers
import ctypes

# invoke the C library libc.so.6 to deallocate memory from the worker
def trim_memory() -> int:
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

# run the `trim_memory` function on all Dask workers to reclaim memory
client.run(trim_memory)

In [None]:
# restart the client (this will reset all operations done so far)
client.restart()

## Stop client

In [None]:
client.close()

Finally, use `docker compose down` to stop and clear all running containers.