In [None]:
!pip install --upgrade pandas numpy dask black[jupyter] uproot3 pyarrow astropy toml

In [None]:
import uproot3

import dask.dataframe as dd
import pandas as pd
import numpy as np
from astropy import units as u
import matplotlib.pyplot as plt
from dataclasses import dataclass

%matplotlib inline

In [None]:
!ls -lah *.root

In [None]:
import tdat

In [None]:
input_files = !ls data*.root

In [None]:
input_files

In [None]:
dask_dataframe = dd.from_map(tdat.read_root_file, input_files)

In [None]:
dask_dataframe.head()

In [None]:
dask_dataframe.value1.mean().compute()

In [None]:
len(dask_dataframe.value1)

In [None]:
type(_)

In [None]:
# dask_dataframe.to_parquet

In [None]:
tdat.dask_memory_usage_GB(dask_dataframe)

In [None]:
centroids, histogram = tdat.dask_histogram(
    dask_dataframe, "value1", bins=20, bins_range=[-5, 5]
)

In [None]:
plt.bar(centroids, histogram.compute())
plt.grid();

In [None]:
from tdat.filters import ComputeColumn, QueryFilter

In [None]:
ComputeColumn??

In [None]:
func = ComputeColumn("value1", "value2")

In [None]:
dask_dataframe_added_column = dask_dataframe.map_partitions(func)

In [None]:
filt = QueryFilter("computed_col < 1")

In [None]:
QueryFilter??

In [None]:
dask_dataframe_added_column_filtered = dask_dataframe_added_column.map_partitions(filt)

In [None]:
output_pandas_dataframe = dask_dataframe_added_column_filtered.compute()

In [None]:
# filters also run on pandas dataframes

filt(output_pandas_dataframe).head()

In [None]:
len(output_pandas_dataframe) / 1e6

## Execute multiple computations at the same time

No computation is executed when calling `dask_histogram`,
computation is triggered by `dd.compute` and input files are read 1 once.

Also, common part of the computation diagram are executed once.

In [None]:
centroids, histogram_squaredsum = tdat.dask_histogram(
    dask_dataframe_added_column, "value1", bins=20, bins_range=[-5, 5]
)

centroids, histogram_filtered = tdat.dask_histogram(
    dask_dataframe_added_column_filtered, "value1", bins=20, bins_range=[-5, 5]
)

In [None]:
h1,h2,h3 = dd.compute(histogram, histogram_squaredsum, histogram_filtered)

In [None]:
for i, h in enumerate([h1, h2, h3]):
    plt.subplot(1, 3, i+1)
    plt.bar(centroids, h)
    plt.grid();

## Use the configuration files

In [None]:
%%file filters/datasetname/compute_squaredsum.toml

class = "ComputeColumn"
column_1 = "value1"
column_2 = "value2"

In [None]:
%%file filters/datasetname/filter_computedcol.toml

class = "QueryFilter"
query = "computed_col < 1"

In [None]:
compute_squaredsum = tdat.load_filter("filters/datasetname/compute_squaredsum.toml")

In [None]:
dask_dataframe.map_partitions(compute_squaredsum).head()

In [None]:
tdat.load_filter("filters/datasetname/filter_computedcol.toml")

## Use Pipeline

In [None]:
%%file filters/datasetname/dataset_pipeline.toml

class = "Pipeline"
steps = ["filters/datasetname/compute_squaredsum.toml", "filters/datasetname/filter_computedcol.toml"]

In [None]:
pipeline = tdat.load_filter("filters/datasetname/dataset_pipeline.toml")

In [None]:
pipeline.filters

In [None]:
pipeline.filters[1].query

In [None]:
dask_dataframe_cleaned = pipeline.apply(dask_dataframe)

In [None]:
dask_dataframe_cleaned.head()