# Distributed RDataFrame

An `RDataFrame` analysis written in Python can be executed both *locally* - possibly in parallel on the cores of the machine - and *distributedly* by offloading computations to external resources, which include:

- [Spark](https://spark.apache.org/) and 
- [Dask](https://dask.org/) clusters. 

- This feature is enabled by the architecture depicted below.

- It shows that RDataFrame computation graphs can be mapped to different kinds of resources via backends.

- In this notebook we will exercise the Dask backend, which divides an `RDataFrame` input dataset in logical ranges and submits computations for each of those ranges to Dask resources.

<img src="../../images/DistRDF_architecture.png" alt="Distributed RDataFrame">

## Create a Dask client (in a dummy `LocalCluster` created inside the notebook)

- In order to work with a Dask cluster we need a `Client` object.
- It represents the connection to that cluster and allows to configure execution-related parameters (e.g. number of cores, memory). 
- The client object is just the intermediary between our client session and the cluster resources.
- Dask supports many different resource managers.
- We will follow the [Dask documentation](https://distributed.dask.org/en/stable/client.html) regarding the creation of a `Client`.

In [None]:
from distributed import Client, LocalCluster
# A LocalCluster creates a test Cluster, segmenting the resources available under your notebook
# It is meant for prototyping purposes and will not give full performance
cluster = LocalCluster(n_workers=2, threads_per_worker=1, processes=True, memory_limit="2GiB")
client = Client(cluster)

## Create a ROOT dataframe

We now create a distributed RDataFrame with Dask. It accepts two more keyword arguments:
- the number of partitions to apply to the dataset (`npartitions`).
- the `Client` object (`daskclient`).

Besides these details, a Dask RDataFrame is not different from a local RDataFrame: the analysis presented in this notebook would not change if we wanted to execute it locally.

In [None]:
# Use a Dask RDataFrame
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame

df = RDataFrame("h42",
                "https://root.cern/files/h1big.root",
                npartitions=4,
                daskclient=client)

## Run your analysis unchanged

- From now on, the rest of your application can be written **exactly** as we have seen with local RDataFrame. 

- The goal of the distributed RDataFrame module is to support all the traditional RDataFrame operations (those that make sense in a distributed context at least). 

- Currently only a subset of those is available and can be found in the corresponding [section of the documentation](https://root.cern/doc/master/classROOT_1_1RDataFrame.html#distrdf)

In [None]:
%%time
df1 = df.Filter("nevent > 1")
df2 = df1.Define("mpt","sqrt(xpt*xpt + ypt*ypt)")
c = df.Count()
m = df2.Mean("mpt")
x = df2.Mean("xpt")
y = df2.Mean("ypt")
E = df2.Mean("Ept")
dept = df2.Mean("dept")
dxpt = df2.Mean("dxpt")
dypt = df2.Mean("dypt")
de33 = df2.Mean("de33")
print(f"Number of events after processing: {c.GetValue()}")
print(f"Mean of column 'mpt': {m.GetValue()}")
print(f"Mean of column 'xpt': {x.GetValue()}")
print(f"Mean of column 'ypt': {y.GetValue()}")
print(f"Mean of column 'Ept': {E.GetValue()}")
print(f"Mean of column 'dept': {dept.GetValue()}")
print(f"Mean of column 'dxpt': {dxpt.GetValue()}")
print(f"Mean of column 'dypt': {dypt.GetValue()}")
print(f"Mean of column 'de33': {de33.GetValue()}")

## Create a Dask client (in the `KubeCluster` available in the Analysis Facility!) 

Now, we are ready to unleash the power of the Analysis Facility. 
Instead of using the dummy `LocalCluster`, which simply uses the resources underneath the jupyter lab instance, we are going to use the Cluster of the high-rate platform (with greater firepower).

To create a `KubeCluster`:
- Click on the Dask icon, in the left bar of the JupyterLab instance: should look like this <img src="../../images/dask.png" alt="Dask icon" width="20">
- Click on the <img src="../../images/new.png" alt="new" width="45"> button, in the bottom part of the column;
- Wait for the cluster to deploy (a few seconds);
- When a dashboard (with several buttons) appears, scroll down until you see a blue box like:
  <img src="../../images/kubecluster.png" alt="Kube cluster" width="500">
- Now, to be able to create a `Client` object, as shown in the previous part, just click on the <img src="../../images/client.png" alt="client" width="20"> icon. A new notebook cell will appear in the Jupyter notebook with all the details of the new Cluster!

Before you run the computation on the `KubeCluster`, <ins>you need to scale it with some *Workers*</ins>. To do so, click on the <span style="color:green">**SCALE**</span> button, in the blue box. In the window that appears, put the number of workers you need (every worker has a single core with 2GB of RAM).

**NOTE**: For this workshop, we are sharing the same resources! Therefore, don't exaggerate with the number of workers! For the sake of this "light" examples, <ins>2-3 workers are enough</ins>!

In [None]:
# SOMETHING LIKE THIS SHOULD APPEAR ON YOUR NOTEBOOK. This has been created on a previous cluster,
# so replace it with your Client information.

#from dask.distributed import Client

#client = Client("tcp://dask-diotalevi-gfibr-scheduler.jhub:8786")
#client

In [None]:
# Use a Dask RDataFrame
RDataFrame = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame

df = RDataFrame("h42",
                "https://root.cern/files/h1big.root",
                npartitions=4,
                daskclient=client)

In [None]:
%%time
df1 = df.Filter("nevent > 1")
df2 = df1.Define("mpt","sqrt(xpt*xpt + ypt*ypt)")
c = df.Count()
m = df2.Mean("mpt")
x = df2.Mean("xpt")
y = df2.Mean("ypt")
E = df2.Mean("Ept")
dept = df2.Mean("dept")
dxpt = df2.Mean("dxpt")
dypt = df2.Mean("dypt")
de33 = df2.Mean("de33")
print(f"Number of events after processing: {c.GetValue()}")
print(f"Mean of column 'mpt': {m.GetValue()}")
print(f"Mean of column 'xpt': {x.GetValue()}")
print(f"Mean of column 'ypt': {y.GetValue()}")
print(f"Mean of column 'Ept': {E.GetValue()}")
print(f"Mean of column 'dept': {dept.GetValue()}")
print(f"Mean of column 'dxpt': {dxpt.GetValue()}")
print(f"Mean of column 'dypt': {dypt.GetValue()}")
print(f"Mean of column 'de33': {de33.GetValue()}")

## Upload a custom header and declare the code on the workers

In the situation where custom C++ code is written in a separate header file to be used in the analysis, we need to make sure that all the workers have the header file (or any other ancillary file) in their filesystem. This is currently achievable by:

1. Register the files for upload via the `distribute_unique_paths` method
2. Register the function that uses that file with the `initialize` function

```python
import ROOT
from pathlib import Path
from distributed import get_worker

# Just an example, imagine this is a real distributed RDataFrame
df_dask = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(...)

# We need to register the header file for upload to the Dask workers.
# For now the interface is still WIP, will be made smoother in the next ROOT release
df_dask._headnode.backend.distribute_unique_paths(
    ["mycustomheader.h"]
)

def my_initialization_function():
    """Load C++ helper functions. Works for both local and distributed execution."""
    try:
        # when using distributed RDataFrame 'mycustomheader.h' is copied to the local_directory
        # of every worker (via `distribute_unique_paths`)
        localdir = get_worker().local_directory
        cpp_header = Path(localdir) / "mycustomheader.h"
    except ValueError:
        # must be local execution
        cpp_header = "mycustomheader.h"

    ROOT.gInterpreter.Declare(f'#include "{str(cpp_header)}"')

ROOT.RDF.Experimental.Distributed.initialize(my_initialization_function)
```