## Add additional requirements locally

In [None]:
# !pip install --upgrade dask-labextension pandas numpy distributed dask black[jupyter] uproot3 pyarrow astropy toml numba

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.colors
from matplotlib.colors import LogNorm
import pandas

# import warnings
# warnings.simplefilter(action='ignore', category=FutureWarning)
# import cdms
import sys
import scipy
import random
import scipy.signal
import numpy as np

# from importlib import reload

In [None]:
import dask.dataframe as dd
import dask.array as da

## Autoreload `cdat`

In [None]:
from cdat.root import read_root_file
from cdat.daskutils import dask_histogram, dask_histogram2d

In [None]:
# !ls '/cvmfs/data/CDMS/RQanalysis_testing/RQroot/'

In [None]:
# Finding address of the files
data_dir = "/cvmfs/data/CDMS/RQanalysis_testing/RQroot/"
series_numbers = ["25220213_171932", "25220214_092356", "25220215_073923"]
files = [f"{data_dir}OFResults_{i}.root" for i in series_numbers]

In [None]:
# this loads the first file to inspect structure of the dataframe
# dask_dataframe = dd.from_map(read_root_file, files,branches=["*OFL*","*trig*",'*Integral*','*file*','*Midas*'])

In [None]:
# dask_dataframe.head(100000)

In [None]:
%%time

KeV_bin = {"NFC1": 22.9, "NFH": 7.1, "NFE": 5.7, "NFC2": 17.1}
live_time = 44.85  # h
det_mass = 0.96  # g

data = dd.from_map(read_root_file, files, branches=["*trig_ch*", "*Integral*_total"])
trigger_ch = {"NFC1": 1, "NFH": 2, "NFE": 3, "NFC2": 4}
# For different detectors, building histogram domains separately.
xedges = {}
chunk_hist = {}
centers = {}
bin_sizes = {}
for det, code in trigger_ch.items():
    if det in ["NFC1", "NFC2"]:
        start, end = 0, 8000
        step = (end - start) / 1000
        xedges[det] = np.arange(start, end, step)
    if det == "NFH":
        start, end = 0, 2500
        step = (end - start) / 1000
        xedges[det] = np.arange(start, end, step)
    if det == "NFE":
        start, end = 0, 2000
        step = (end - start) / 1000
        xedges[det] = np.arange(start, end, step)
    # Empty arrays for each channel corresponding to xedges.
    # We fill the empty array as we iterate through the data.
    centers[det] = (xedges[det][:-1] + xedges[det][1:]) / 2
    bin_sizes[det] = xedges[det][1:] - xedges[det][:-1]

# iterating over dets
histograms = []
for det, code in trigger_ch.items():
    # Applying some inline cuts
    subchunk = data.query(f"trig_ch == {code}")
    # Filling numpy arrays for histograms.
    histograms.append(
        dask_histogram(
            subchunk, f"Integral_{det}_total", bins=xedges[det], bins_range=None
        )[1]
    )

histograms = da.compute(*histograms, num_workers=4)
# turn list into dict {ch:hist[ch]}
chunk_hist = dict(zip(centers.keys(), histograms))
# plotting histograms.
fig, ax = plt.subplots(2, 2, figsize=(16, 16))
for det, code in trigger_ch.items():
    i, j = (code - 1) // 2, code % 2
    ax[i, j].step(
        centers[det], chunk_hist[det] / (det_mass * live_time * bin_sizes[det])
    )
    ax[i, j].set_xlabel(f"Integral_{det}_total (uA)", fontsize=20)
    ax[i, j].set_ylabel(r"Count/($gram.hour.bin$)", fontsize=20)
    ax[i, j].set_yscale("log")
    ax[i, j].tick_params(axis="both", which="both", labelsize=15)

In [None]:
%%time

data = dd.from_map(
    read_root_file, files, branches=["*OFL*", "*trig_ch*", "*Integral*_total"]
)

trigger_ch = {"NFC1": 1, "NFH": 2, "NFE": 3, "NFC2": 4}
KeV_bin = {"NFC1": 22.9, "NFH": 7.1, "NFE": 5.7, "NFC2": 17.1}
auto_proc_high_energy = {"NFE": 0.103, "NFH": 0.049, "NFC1": 0.129, "NFC2": 0.0796}
live_time = 44.85  # h
det_mass = 0.96  # g


# For different detectors, building histogram domains separately.

xedges = {}
yedges = {}

chunk_hist = {}

for det, code in trigger_ch.items():
    if det in ["NFC1", "NFC2"]:
        xstart, xend = 0, 900
        xstep = (xend - xstart) / 100
        xedges[det] = np.arange(xstart, xend, xstep)

        ystart, yend = 0, 80
        ystep = (yend - ystart) / 100
        yedges[det] = np.arange(ystart, yend, ystep)

    if det == "NFH":
        xstart, xend = 0, 900
        xstep = (xend - xstart) / 100
        xedges[det] = np.arange(xstart, xend, xstep)
        ystart, yend = 0, 80
        ystep = (yend - ystart) / 100
        yedges[det] = np.arange(ystart, yend, ystep)
    if det == "NFE":
        xstart, xend = 0, 900
        xstep = (xend - xstart) / 100
        xedges[det] = np.arange(xstart, xend, xstep)
        ystart, yend = 0, 80
        ystep = (yend - ystart) / 100
        yedges[det] = np.arange(ystart, yend, ystep)

    chunk_hist[det] = np.zeros(shape=(len(xedges[det]) - 1, len(yedges[det]) - 1))
#     centers[det] = (xedges[det][:-1]+xedges[det][1:])/2
#     bin_sizes = xedges[det][1:] - xedges[det][:-1]

histograms = []

# iterating over dets
for det, code in trigger_ch.items():
    # Applying some inline cuts
    subchunk = data.query(f"trig_ch == {code}")
    subchunk[f"OFL_{det}_total_scaled"] = (
        subchunk.loc[:, f"OFL_{det}_total"] * 100 / auto_proc_high_energy[det]
    )
    # Filling numpy arrays for histograms.
    ## ** (Maybe using weights inside the histogram.)

    histograms.append(
        dask_histogram2d(
            subchunk,
            f"OFL_{det}_total_scaled",
            f"OFL_chi2_{det}_total",
            bins=(xedges[det], yedges[det]),
            bins_range=None,
        )
    )

histograms = da.compute(*histograms)
chunk_hist = dict(zip(xedges.keys(), histograms))

real_min_c = {}
real_max_c = {}
min_c = {}
max_c = {}
for det in trigger_ch.keys():
    real_min_c[det] = np.min(chunk_hist[det])
    real_max_c[det] = np.max(chunk_hist[det])
    chunk_hist[det] = scipy.signal.convolve2d(
        chunk_hist[det], np.ones((2, 2)), mode="same"
    )
    chunk_hist[det] = np.log(chunk_hist[det] + 1)
    min_c[det] = np.min(chunk_hist[det])
    max_c[det] = np.max(chunk_hist[det])
fig, ax = plt.subplots(2, 2, figsize=(16, 16))
cmap = plt.get_cmap("viridis")
dets = trigger_ch.keys()
for count in range(data.npartitions):
    chunk = data.get_partition(count)[
        ["trig_ch"]
        + [f"OFL_{det}_total" for det in dets]
        + [f"OFL_chi2_{det}_total" for det in dets]
    ].compute()
    for det, code in trigger_ch.items():
        # For each det, we use a copy of the chunk to keep chunkes unchanged.
        subchunk = chunk.query(f"trig_ch == {code}")
        # Adjusting the copy
        subchunk.loc[:, f"OFL_{det}_total"] *= 100 / auto_proc_high_energy[det]
        i, j = (code - 1) // 2, code % 2
        xidx = np.clip(
            np.digitize(subchunk[f"OFL_{det}_total"], xedges[det]),
            0,
            chunk_hist[det].shape[0] - 1,
        )
        yidx = np.clip(
            np.digitize(subchunk[f"OFL_chi2_{det}_total"], yedges[det]),
            0,
            chunk_hist[det].shape[1] - 1,
        )
        norm = plt.Normalize(min_c[det], max_c[det])
        s = norm(chunk_hist[det][xidx, yidx])
        c = cmap(s)
        s = 4 / (s + 0.1)
        # Scatter plots
        ax[i][j] = subchunk.plot.scatter(
            f"OFL_{det}_total", f"OFL_chi2_{det}_total", ax=ax[i, j], c=c, s=s
        )

for det, code in trigger_ch.items():
    i, j = (code - 1) // 2, code % 2

    norm = matplotlib.colors.LogNorm(real_min_c[det] + 1, real_max_c[det] + 1)
    fig.colorbar(plt.cm.ScalarMappable(norm=norm, cmap=cmap), ax=ax[i][j])
    ax[i, j].set_xlabel(f"OFL_{det}_total (uA)", fontsize=20)
    ax[i, j].set_ylabel(f"OFL_chi2_{det}_total (uA)", fontsize=20)
    ax[i, j].tick_params(axis="both", which="both", labelsize=15)
    ax[i][j].set_ylim([0, 80])
    ax[i][j].set_xlim([0, 900])

In [None]:
# ***** Produce the heatmap too! ******

In [None]:
from numba import njit

In [None]:
@njit
def chi2_template(x, a, b, exp):
    return a * x**exp + b

In [None]:
@njit
def compute_cut(energy, chi2, trigger_ch, first_eh_peak, a, b, exp):
    cut = np.zeros(energy[1].shape, dtype=np.bool_)
    for i in range(len(cut)):
        ch = trigger_ch[i]
        if ch == 0:
            cut[i] = 0
        else:
            energy_normalized = energy[ch][i] * 100 / first_eh_peak[ch]
            cut[i] = (energy_normalized > 600) | (
                chi2[ch][i] < chi2_template(energy_normalized, a[ch], b[ch], exp[ch])
            )
    return cut

In [None]:
from numba.typed import List

In [None]:
energy = List()
energy.append(np.array([1, 100]))
energy.append(np.array([1, 100]))

chi2 = List()
chi2.append(np.array([1e6, 1e6]))
chi2.append(np.array([1e6, 1e6]))

In [None]:
compute_cut(
    energy=energy,
    chi2=chi2,
    trigger_ch=np.array([0, 1]),
    first_eh_peak=np.array([3, 4]),
    a=np.ones(2),
    b=np.zeros(2),
    exp=np.ones(2),
)

In [None]:
from dataclasses import dataclass
import pandas as pd


@dataclass
class Chi2Cut:
    a: dict
    b: dict
    exp: dict
    trigger_ch_mapping: dict
    first_eh_peak: dict
    energy_limit: float = 600
    column: str = "OFL_{det_name}_total"
    column_chi2: str = "OFL_chi2_{det_name}_total"

    def __post_init__(self):
        self.max_channel_id = max(self.trigger_ch_mapping.keys())
        for var_name in ["a", "b", "exp", "first_eh_peak"]:
            out = np.zeros(self.max_channel_id + 1)
            var = getattr(self, var_name)
            for i in range(self.max_channel_id + 1):
                if i in self.trigger_ch_mapping:
                    out[i] = var.get(self.trigger_ch_mapping[i], var.get("default"))
            setattr(self, var_name + "_array", out)

    def __call__(self, df):
        df = df[df.trig_ch != 0]
        energy = List()
        chi2 = List()
        for i in range(self.max_channel_id + 1):
            if i in self.trigger_ch_mapping:
                det_name = self.trigger_ch_mapping[i]
                energy.append(df[self.column.format(det_name=det_name)].to_numpy())
                chi2.append(df[self.column_chi2.format(det_name=det_name)].to_numpy())
            else:
                energy.append(np.zeros(1, dtype=np.float32))
                chi2.append(np.zeros(1, dtype=np.float32))

        cut = compute_cut(
            energy=energy,
            chi2=chi2,
            trigger_ch=df.trig_ch.to_numpy(),
            first_eh_peak=self.first_eh_peak_array,
            a=self.a_array,
            b=self.b_array,
            exp=self.exp_array,
        )
        df = df.assign(chi2_cut=cut)
        return df

In [None]:
chi2cut = Chi2Cut(
    a=dict(default=0.0005, NFE=0.00015),
    b=dict(default=1.25),
    exp=dict(default=2),
    first_eh_peak={"NFC1": 0.129, "NFC2": 0.0796, "NFH": 0.049, "NFE": 0.103},
    trigger_ch_mapping={1: "NFC1", 2: "NFH", 3: "NFE", 4: "NFC2"},
)

In [None]:
%%time

data = dd.from_map(read_root_file, files, branches=["*OFL*", "*trig*"])
trigger_ch = {"NFC1": 1, "NFH": 2, "NFE": 3, "NFC2": 4}
# Imran's work the same as Valentina's except NFH = 0.0488:
auto_proc_high_energy = {"NFE": 0.103, "NFH": 0.049, "NFC1": 0.129, "NFC2": 0.0796}

fig, ax = plt.subplots(2, 2, figsize=(16, 16))

data_chi2cut = data.map_partitions(chi2cut)
# Iterating the data

for det, code in trigger_ch.items():
    # Working with subchunks to keep the main chunk unchanged.
    subchunk = data_chi2cut.query(f"trig_ch == {code}").compute()
    # define colors for passing and rejected events.
    color = np.array(["r", "b"])[subchunk[f"chi2_cut"].array.astype(np.int8)]
    # adjusting the subchunk
    subchunk.loc[:, f"OFL_{det}_total"] *= 100 / auto_proc_high_energy[det]
    i, j = (code - 1) // 2, code % 2
    # scatter plots for each chunk
    ax[i][j] = subchunk.plot.scatter(
        f"OFL_{det}_total", f"OFL_chi2_{det}_total", c=color, ax=ax[i, j]
    )
# events passing        ax[i][j] = subchunk.plot.scatter(f'OFL_{det}_total', f'OFL_chi2_{det}_total',c ="b", ax = ax[i,j])
# events not passing    ax[i][j] = subchunk.plot.scatter(f'OFL_{det}_total', f'OFL_chi2_{det}_total',c="r", ax = ax[i,j])

# Making plots beautiful.
for det, code in trigger_ch.items():
    energies = np.linspace(0, 900, 1000)
    i, j = (code - 1) // 2, code % 2
    ax[i, j].axvline(x=600, linewidth=2, ls="-.", color="k", label="OFL chi2 bound")
    ax[i][j].set_title(
        f"E = OFL_{det}_total*(100/{auto_proc_high_energy[det]})", fontsize=15
    )
    ax[i][j].set_xlabel(f"Energy (ev)", fontsize=20)
    ax[i][j].set_ylabel(f"OFL_chi2_{det}_total (uA)", fontsize=20)
    ax[i][j].tick_params(axis="both", which="both", labelsize=15)
    ax[i, j].legend(loc="upper right", fontsize=16)

In [None]:
# data_chi2cut.chi2_cut.sum().compute()/len(data_chi2cut)