# Reading LH5 Data at Scale

Reading very large quantities of data can incur performance penalties if not done carefully, due to large memory usage and/or a large number of files to be read. To assist in reading large quantities of data in a scalable way, use the [LH5Iterator](https://legend-pydataobj.readthedocs.io/en/stable/api/lgdo.lh5.html#module-lgdo.lh5.iterator). This will break your data into smaller chunks so that you can process each chunk individually; in addition, special tools exist for parallelizing your processing across the chunks (parallel processing must be independent across threads!)

## LH5Iterator

Let's start by downloading a small test LH5 file with the [pylegendtestdata](https://pypi.org/project/pylegendtestdata/) package (note: by design, this test data is small; while this does not showcase the performance benefits of the LH5Iterator, the construction and operation of the iterator is identical, and can be scaled to much larger datasets)

In [None]:
from __future__ import annotations

import numpy as np
import pandas as pd
from legendtestdata import LegendTestData

from lgdo.lh5 import LH5Iterator

ldata = LegendTestData()

# directories and channels we will be reading from...
lh5_hit_dir = ldata.get_path("lh5/prod-ref-l200/generated/tier/hit/cal/p03/r001/")
lh5_dsp_dir = ldata.get_path("lh5/prod-ref-l200/generated/tier/dsp/cal/p03/r001/")
channels = ["ch1084803", "ch1084804", "ch1121600"]

Now, create an LH5 iterator and loop over its contents:

In [None]:
lh5_it = LH5Iterator(
    lh5_files=f"{lh5_hit_dir}/*.lh5",  # all files in directory
    groups=[f"{ch}/hit" for ch in channels],  # hit table for all channels
    field_mask=["is_valid_cal", "cuspEmax_ctc_cal"],  # read only these fields
    buffer_len=20,  # read 20 entries at a time; default reads 100 MB worth of entries
)

# Loop over each chunk, and show the tables
pd.set_option("display.max_rows", 6)
for tb in lh5_it:
    display(tb.view_as("pd"))
pd.reset_option("display.max_rows")

To really take advantage of the iterator, you can perform operations for each iteration, typically with the aim of performing some data reduction. We will cut entries in our tables with less than 500 keV of energy, and that are invalid.

In addition, the iterator will return certain special values listed in the documentation; we will get the file name using ``current_files`` and the group name using ``current_groups``.

In [None]:
outputs = []
for tb in lh5_it:
    df = tb.view_as("pd")
    df["file"] = lh5_it.current_files
    df["group"] = lh5_it.current_groups
    df = df.query("is_valid_cal and cuspEmax_ctc_cal>500")
    outputs += [df]
df = pd.concat(outputs)
display(df)

## Joining multiple data sources
Sometimes we want to pull data from multiple sources. This can be tables that we want to join together, or metadata about the data in each lh5 group.

#### Friends:
If you want to combine multiple datasets from different tiers of processing, you can create two iterators and "friend" them together so that as we iterate through the data, we join data from the tables in each one together.

In [None]:
# Construct two iterators, with the second one a "friend" of the first
lh5_it = LH5Iterator(
    lh5_files=f"{lh5_hit_dir}/*.lh5",  # all files in hit tier directory
    groups=[f"{ch}/hit" for ch in channels],  # hit table for all channels
    field_mask=["is_valid_cal", "cuspEmax_ctc_cal"],  # read only these fields
    buffer_len=20,  # read 20 entries at a time; default reads 100 MB worth of entries
    friend=LH5Iterator(
        lh5_files=f"{lh5_dsp_dir}/*.lh5",  # all files in dsp tier directory
        groups=[f"{ch}/dsp" for ch in channels],  # dsp table for all channels
        field_mask=["tp_0_est", "tp_90"],
    ),
)

# Display events from both data tiers, with long drift times
outputs = []
for tb in lh5_it:
    df = tb.view_as("pd")
    df = df.query("is_valid_cal and (tp_90 - tp_0_est)>1000")
    outputs += [df]
df = pd.concat(outputs)
display(df)

#### Group data:
We can also insert data about the groups that we are reading, which will be repeated for all data from the same group. This is useful for grabbing metadata about different channels and attaching it to events for easier analysis.

In [None]:
lh5_hit_dir = ldata.get_path("lh5/prod-ref-l200/generated/tier/hit/cal/p03/r001/")
channels = ["ch1084803", "ch1084804", "ch1121600"]
chan_data = {
    "chanid": [1084803, 1084804, 1121600],
    "detname": ["huey", "dewey", "louie"],
}

# set group_data with our channel metadata
lh5_it = LH5Iterator(
    lh5_files=f"{lh5_hit_dir}/*.lh5",  # all files in hit tier directory
    groups=[f"{ch}/hit" for ch in channels],  # hit table for all channels
    field_mask=["is_valid_cal", "cuspEmax_ctc_cal"],  # read only these fields
    group_data=chan_data,  # attach our channel metadata
    buffer_len=20,  # read 20 entries at a time; default reads 100 MB worth of entries
)

# We're going to be fancy with our loop this time...
outputs = []
for tb in lh5_it:
    df = tb.view_as("pd")
    df = df.query("is_valid_cal and cuspEmax_ctc_cal>500")
    outputs += [df]
df = pd.concat(outputs)
display(df)

## Querying data
In the above examples, we have used the iterator to loop through many datasets, and select entries to put into a smaller dataset. Because this is one of the most common use-cases for `LH5Iterator`, specialized functions for speeding this process up exist.

#### query
The [query](https://legend-pydataobj.readthedocs.io/en/stable/api/lgdo.lh5.html#lgdo.lh5.iterator.LH5Iterator.query) function can be used to grab a subset of data based on some selection. The section uses [Table.eval](https://legend-pydataobj.readthedocs.io/en/stable/api/lgdo.types.html#lgdo.types.table.Table.eval) to evaluate the selection, and can access commands from `numexpr` and `awkward` packages.

In [None]:
lh5_it = LH5Iterator(
    lh5_files=f"{lh5_hit_dir}/*.lh5",  # all files in directory
    groups=[f"{ch}/hit" for ch in channels],  # hit table for all channels
    field_mask=["is_valid_cal", "cuspEmax_ctc_cal"],  # read only these fields
    buffer_len=20,  # read 20 entries at a time; default reads 100 MB worth of entries
)
df = lh5_it.query("is_valid_cal & (cuspEmax_ctc_cal>500)", library="pd")
display(df)

For large datasets, you can also apply multi-processing to query using the ``processes`` argument:

In [None]:
df = lh5_it.query("is_valid_cal & (cuspEmax_ctc_cal>500)", processes=3, library="pd")
display(df)

In addition, it is possible to query data and fill a [Hist](https://hist.readthedocs.io/en/latest/) by using the [hist](https://legend-pydataobj.readthedocs.io/en/stable/api/lgdo.lh5.html#lgdo.lh5.iterator.LH5Iterator.hist) command:

In [None]:
from hist import axis

h = lh5_it.hist(
    axis.Regular(30, 0, 3000, label="Energy (keV)"),
    where="is_valid_cal",
    keys="cuspEmax_ctc_cal",
)
display(h)

Both hist and query are also able to take a python function rather than a string for `where`. The function should take a table and iterator as inputs, and return a table, awkward array or pandas dataframe; this allows for more complex selection logic and for producing outputs that aren't directly in the datasets. When writing this function, don't forget to use collective operations that operate on full columns!

In [None]:
# Select valid events and return energy and channel name
def selector(lh5_tb, lh5_it):
    df = lh5_tb.view_as("pd")
    mask = df.is_valid_cal
    E_col = df.cuspEmax_ctc_cal[mask]
    chan_col = lh5_it.current_groups[mask]
    chan_col = np.strings.replace(chan_col, "/hit", "")

    return E_col, chan_col


# 2D histogram with a categorical axis
h = lh5_it.hist(
    [
        axis.Regular(30, 0, 3000, label="Energy (keV)"),
        axis.StrCategory([], growth=True, label="Channel"),
    ],
    where=selector,
)
h.plot1d();

### Parallel processing
To boost the speed of accessing a large amount of data via the iterator, you can access and process datasets in parallel by calling [map](https://legend-pydataobj.readthedocs.io/en/stable/api/lgdo.lh5.html#lgdo.lh5.iterator.LH5Iterator.map). `map` works by splitting the datasets in each file and group among different threads, and iterating over them in parallel. This is built on the python standard library's [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) `Executor` framework; the default is to use multi-processing. Both query and hist are capable of running in parallel by taking advantage of this function; `map` is intended to give advanced users more flexibility.

Map will call a single function on each sub-table read while iterating through datasets. The output of the function will by default be collected into a python list; however, you can also provide an aggregator function that will combine the outputs from different tables in a customizable way (see the `aggregate` and `init` arguments). In addition, it is possible to define a function call to run before beginning and upon termination of the loop, which can be used to initialize more complex processes (see the `begin` and `terminate` arguments).

Map will asynchronously run each loop, and the results (either a list or the result of aggregation) will be returned as an asynchronous iterator over these objects; this will preserve the order of the data as input.

In [None]:
# Count events in each channel with >400 keV of energy and store the results in a dict
def fill(lh5_tab, lh5_it):
    cts = {}
    for chan in np.unique(lh5_it.current_groups):
        key = chan.replace("/hit", "")
        cts[key] = np.sum(
            lh5_tab.is_valid_cal.nda
            & (lh5_tab.cuspEmax_ctc_cal.nda > 400)
            & (lh5_it.current_groups == chan)
        )
    return cts


# add results for each channel
def aggregate(cts_sum, cts_in):
    for key, cts in cts_in.items():
        if key in cts_sum:
            cts_sum[key] += cts
        else:
            cts_sum[key] = cts
    return cts_sum


# call map with 3 processes
results = lh5_it.map(
    fill,  # process chunks of data
    aggregate=aggregate,  # function for combining results
    init={},  # initial value of aggregation
    processes=3,
)

# Now we want to aggregate the results from each thread
total_cts = {}
for cts_sum in results:
    aggregate(total_cts, cts_sum)

display(total_cts)