In [None]:
## STANDARD IMPORTS
import os
import time
import h5py
import numpy as np
import numpy.ma as ma
import pandas as pd
import json

# analysis
import scipy.stats
from scipy.optimize import curve_fit

# data handling
!pip3 install ../../../h5flow
import h5flow
from h5flow.data import dereference

## 3D PLOTTING
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib import cm, colors
import matplotlib.patches as mpatches
from matplotlib.colors import BoundaryNorm

In [None]:
dirname = 'mc_processed_MiniRun5_1E19_RHC.flow.0000000.FLOW_nfiles_6_TrapType_evts_all'

# print configuration and hit configuration
config_filename = dirname+'/config.json'
with open(config_filename) as json_file:
    config = json.load(json_file)
    print(json.dumps(config, indent=4))


# loop over nfiles from config.json
nfiles = config['nfiles']

spes_filenames = []
noise_filenames = []
hits_filenames = []
hits_config_filenames = []
true_hits_filenames = []

for i in range(nfiles):
    spes_filenames.append(dirname + f'/spes_evt_{i}.npz')
    noise_filenames.append(dirname + f'/noise_evt_{i}.npz')
    hits_filenames.append(dirname + f'/hits_evt_{i}.npz')
    hits_config_filenames.append(dirname + f'/hits_config_{i}.json')
    true_hits_filenames.append(dirname + f'/true_hits_{i}.csv')

    with open(hits_config_filenames[i]) as json_file:
        hits_config = json.load(json_file)
        print(f'Config for file {i}:')
        print(json.dumps(hits_config, indent=4))

    # check if true hits file exists
    if os.path.exists(true_hits_filenames[i]):
        true_hits = pd.read_csv(true_hits_filenames[i])
        print(f'True hits for file {i}:')
        print(true_hits.head())



In [288]:
import scipy.stats

def clopper_pearson_interval(k, n, alpha=0.6827):
    alpha = 1 - alpha
    lo = scipy.stats.beta.ppf(alpha / 2, k, n - k + 1) if k > 0 else 0.0
    hi = scipy.stats.beta.ppf(1 - alpha / 2, k + 1, n - k) if k < n else 1.0
    return lo, hi


# gaussian fit func
def gaussian(x, A, mu, sig):
    return A * np.exp(-np.power(x - mu, 2.) / (2 * np.power(sig, 2.)))

In [None]:
tolerance = 6

# info by trap type (ArClight)
acl_rec_true_hits_tot = 0
acl_true_hits_tot = 0
acl_rec_hits_tot = 0
acl_delta_t = []
# info by trap type (LCM)
lcm_rec_true_hits_tot = 0
lcm_true_hits_tot = 0
lcm_rec_hits_tot = 0
lcm_delta_t = []

# info by trap type (ArClight) binned by pileup
acl_rec_true_hits_pu = []
acl_true_hits_pu = []
acl_rec_hits_pu = []
acl_delta_t_pu = []
# info by trap type (LCM) binned by pileup
lcm_rec_true_hits_pu = []
lcm_true_hits_pu = []
lcm_rec_hits_pu = []
lcm_delta_t_pu = []

# loop over files
for i_file in range(nfiles):

    print(f'File {i_file}:')
    #print(spes_filenames[i_file])
    #print(noise_filenames[i_file])
    #print(hits_filenames[i_file])
    #print(true_hits_filenames[i_file])

    acl_rec_true_hits = 0
    acl_true_hits = 0
    acl_rec_hits = 0

    lcm_rec_true_hits = 0
    lcm_true_hits = 0
    lcm_rec_hits = 0

    # load true hits
    true_hits = pd.read_csv(true_hits_filenames[i_file])
    true_hit_idxs = true_hits['start_time_idx'].values

    # load ticks histogram of all hits
    hits_file = np.load(hits_filenames[i_file])
    hits_arr = hits_file['arr_0']
    flat_hits = np.sum(hits_arr, axis=(0,1))

    # loop over events
    for i_evt_lrs in range(hits_arr.shape[0]):

        # loop over traps
        for i_trap in range(hits_arr.shape[1]):
            is_acl = i_trap % 2 == 0
            i_tpc = i_trap // 2

            hits = np.where(hits_arr[i_evt_lrs, i_trap])[0]
            true_hit_idxs_tpc = true_hits[(true_hits['event_id'] == i_evt_lrs) & (true_hits['tpc_num'] == i_tpc)]['start_time_idx'].values

            # ACLs
            if is_acl:

                # add true hits
                acl_true_hits += len(true_hit_idxs_tpc)
                # fill pileup histogram with length-of-true-hits length-of-true-hits times
                for i in range(len(true_hit_idxs_tpc)):
                    acl_true_hits_pu.append(len(true_hit_idxs_tpc))

                # add rec hits
                for i_hit in hits:
                    acl_rec_hits += 1
                    acl_rec_hits_pu.append(len(true_hit_idxs_tpc))

                    # check if true hit is within tolerance in this specific tpc

                    # get true hit indices within tolerance of this hit
                    delta_ts = (i_hit - true_hit_idxs_tpc)
                    delta_ts = delta_ts[(delta_ts < tolerance) & (delta_ts > 0)]
                    if delta_ts.size > 0:
                        delta_ts = min(delta_ts)
                        acl_delta_t.append(delta_ts)

                        # add to true rec hits
                        acl_rec_true_hits += 1
                        acl_rec_true_hits_pu.append(len(true_hit_idxs_tpc))

            else:

                # add true hits
                lcm_true_hits += len(true_hit_idxs_tpc)
                # fill pileup histogram with length-of-true-hits length-of-true-hits times
                for i in range(len(true_hit_idxs_tpc)):
                    lcm_true_hits_pu.append(len(true_hit_idxs_tpc))

                # add rec hits
                for i_hit in hits:
                    lcm_rec_hits += 1
                    lcm_rec_hits_pu.append(len(true_hit_idxs_tpc))

                    # get true hit indices within tolerance of this hit
                    delta_ts = (i_hit - true_hit_idxs_tpc)
                    delta_ts = delta_ts[(delta_ts < tolerance) & (delta_ts > 0)]
                    if delta_ts.size > 0:
                        delta_ts = min(delta_ts)
                        lcm_delta_t.append(delta_ts)

                        # add to true rec hits
                        lcm_rec_true_hits += 1
                        lcm_rec_true_hits_pu.append(len(true_hit_idxs_tpc))

    # caluclate efficiency +/- clopper pearson
    '''
    print(f'ACL True Hits: {acl_true_hits}')
    print(f'ACL Rec Hits: {acl_rec_hits}')
    print(f'ACL Rec True Hits: {acl_rec_true_hits}')

    print(f'LCM True Hits: {lcm_true_hits}')
    print(f'LCM Rec Hits: {lcm_rec_hits}')
    print(f'LCM Rec True Hits: {lcm_rec_true_hits}')
    '''

    # ACL efficiency
    acl_eff = acl_rec_true_hits / acl_true_hits
    acl_eff_err = clopper_pearson_interval(acl_rec_true_hits, acl_true_hits)
    #print(f'ACL Efficiency: {acl_eff:.2f} + {acl_eff_err[1] - acl_eff:.2f} - {acl_eff - acl_eff_err[0]:.2f}')
    # ACL fake rate
    acl_fake_rate = 1 - (acl_rec_true_hits / acl_rec_hits)
    acl_fake_rate_err = clopper_pearson_interval(acl_rec_true_hits, acl_rec_hits)
    #print(f'ACL Fake Rate: {acl_fake_rate:.2f} + {1 - acl_fake_rate_err[0] - acl_fake_rate:.2f} - {1 - acl_fake_rate_err[1] - acl_fake_rate:.2f}')

    # LCM efficiency
    lcm_eff = lcm_rec_true_hits / lcm_true_hits
    lcm_eff_err = clopper_pearson_interval(lcm_rec_true_hits, lcm_true_hits)
    #print(f'LCM Efficiency: {lcm_eff:.2f} + {lcm_eff_err[1] - lcm_eff:.2f} - {lcm_eff - lcm_eff_err[0]:.2f}')

    # LCM fake rate
    lcm_fake_rate = 1 - (lcm_rec_true_hits / lcm_rec_hits)
    lcm_fake_rate_err = clopper_pearson_interval(lcm_rec_true_hits, lcm_rec_hits)
    #print(f'LCM Fake Rate: {lcm_fake_rate:.2f} + {1 - lcm_fake_rate_err[0] - lcm_fake_rate:.2f} - {1 - lcm_fake_rate_err[1] - lcm_fake_rate:.2f}')

    # add to totals
    acl_rec_true_hits_tot += acl_rec_true_hits
    acl_true_hits_tot += acl_true_hits
    acl_rec_hits_tot += acl_rec_hits

    lcm_rec_true_hits_tot += lcm_rec_true_hits
    lcm_true_hits_tot += lcm_true_hits
    lcm_rec_hits_tot += lcm_rec_hits


# caluclate efficiency +/- clopper pearson
print('All files: ')
print(f'ACL True Hits: {acl_true_hits_tot}')
print(f'ACL Rec Hits: {acl_rec_hits_tot}')
print(f'ACL Rec True Hits: {acl_rec_true_hits_tot}')

print(f'LCM True Hits: {lcm_true_hits_tot}')
print(f'LCM Rec Hits: {lcm_rec_hits_tot}')
print(f'LCM Rec True Hits: {lcm_rec_true_hits_tot}')

# ACL efficiency
acl_eff = acl_rec_true_hits_tot / acl_true_hits_tot
acl_eff_err = clopper_pearson_interval(acl_rec_true_hits_tot, acl_true_hits_tot)
print(f'ACL Efficiency: {acl_eff:.2f} + {acl_eff_err[1] - acl_eff:.2f} - {acl_eff - acl_eff_err[0]:.2f}')
# ACL fake rate
acl_fake_rate = 1 - (acl_rec_true_hits_tot / acl_rec_hits_tot)
acl_fake_rate_err = clopper_pearson_interval(acl_rec_true_hits, acl_rec_hits)
print(f'ACL Fake Rate: {acl_fake_rate:.2f} + {1 - acl_fake_rate_err[0] - acl_fake_rate:.2f} - {1 - acl_fake_rate_err[1] - acl_fake_rate:.2f}')

# LCM efficiency
lcm_eff = lcm_rec_true_hits_tot / lcm_true_hits_tot
lcm_eff_err = clopper_pearson_interval(lcm_rec_true_hits_tot, lcm_true_hits_tot)
print(f'LCM Efficiency: {lcm_eff:.2f} + {lcm_eff_err[1] - lcm_eff:.2f} - {lcm_eff - lcm_eff_err[0]:.2f}')

# LCM fake rate
lcm_fake_rate = 1 - (lcm_rec_true_hits_tot / lcm_rec_hits_tot)
lcm_fake_rate_err = clopper_pearson_interval(lcm_rec_true_hits_tot, lcm_rec_hits_tot)
print(f'LCM Fake Rate: {lcm_fake_rate:.2f} + {1 - lcm_fake_rate_err[0] - lcm_fake_rate:.2f} - {1 - lcm_fake_rate_err[1] - lcm_fake_rate:.2f}')


In [None]:
def double_sided_crystal_ball(x, beta1, m1, beta2, m2, loc, scale, amplitude):
    """
    A smooth Double-Sided Crystal Ball function.

    Parameters:
    - x: Data points.
    - beta1, m1: Left tail parameters (beta1 = tail exponent, m1 = curvature).
    - beta2, m2: Right tail parameters.
    - loc: Center of the distribution (mean).
    - scale: Core width (sigma).
    - amplitude: Scaling factor.

    Returns:
    - Smooth DSCB function values.
    """

    # Normalized distance from mean
    t = (x - loc) / scale

    # Transition points
    left_cutoff = -beta1
    right_cutoff = beta2

    y = np.zeros_like(t)

    # Left tail (x < loc - beta1 * scale)
    left_mask = t < left_cutoff
    if np.any(left_mask):
        A1 = (m1 / beta1) ** m1 * np.exp(-0.5 * beta1 ** 2)
        B1 = m1 / beta1 - beta1
        y[left_mask] = A1 * (B1 - t[left_mask]) ** -m1

    # Core Gaussian (-beta1 < t < beta2)
    core_mask = (t >= left_cutoff) & (t <= right_cutoff)
    if np.any(core_mask):
        y[core_mask] = np.exp(-0.5 * t[core_mask] ** 2)

    # Right tail (x > loc + beta2 * scale)
    right_mask = t > right_cutoff
    if np.any(right_mask):
        A2 = (m2 / beta2) ** m2 * np.exp(-0.5 * beta2 ** 2)
        B2 = m2 / beta2 - beta2
        y[right_mask] = A2 * (B2 + t[right_mask]) ** -m2

    return amplitude * y


def plot_histogram_and_fit(ax_main, ax_resid, delta_t, color, label, nbins=10):
    # Histogram
    ax_main.hist(delta_t, bins=nbins, range=(0, tolerance), histtype='step', color=color, label=label)

    # Bin calculations
    hist, bin_edges = np.histogram(delta_t, bins=nbins, range=(0, tolerance))
    bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
    sqrtN = np.sqrt(hist)
    ax_main.errorbar(bin_centers, hist, yerr=sqrtN, fmt='none', color=color, alpha=0.5)

    # Fit Double-sided Crystal Ball
    param_bounds = ([0.1, 0.1, 0.1, 0.1, -np.inf, 1e-6, 1e-6], [10, 10, 10, 10, np.inf, np.inf, np.inf])
    p0 = [1.5, 2.0, 1.5, 2.0, np.mean(delta_t), np.std(delta_t), max(hist)]
    popt, _ = curve_fit(double_sided_crystal_ball, bin_centers, hist, p0=p0, bounds=param_bounds, maxfev=10000)

    # Plot fit
    ax_main.plot(interp_bin_centres, double_sided_crystal_ball(interp_bin_centres, *popt), color='k', linestyle='--', label=f'{label} fit')

    # Labels
    ax_main.set_ylabel('Counts')
    ax_main.legend()

    # Residuals
    resids = (hist - double_sided_crystal_ball(bin_centers, *popt))
    ax_resid.step(bin_centers, resids/sqrtN, color=color)
    ax_resid.fill_between(bin_centers, (resids-sqrtN)/sqrtN, (resids+sqrtN)/sqrtN, color=color, alpha=0.5, step='pre')
    ax_resid.axhline(0, color='k', linestyle='--')
    ax_resid.set_xlabel('Delta T (ticks)')
    ax_resid.set_ylabel(r'($\sigma$)')

    # Print fit parameters on the plot
    ax_main.text(0.5, 0.9, f'Beta1: {popt[0]:.2f}', transform=ax_main.transAxes)
    ax_main.text(
        0.5, 0.85, f'M1: {popt[1]:.2f}', transform=ax_main.transAxes)
    ax_main.text(
        0.5, 0.80, f'Beta2: {popt[2]:.2f}', transform=ax_main.transAxes)
    ax_main.text(
        0.5, 0.75, f'M2: {popt[3]:.2f}', transform=ax_main.transAxes)
    ax_main.text(
        0.5, 0.70, f'Mean (ns): {popt[4]*16:.2f}', transform=ax_main.transAxes)
    ax_main.text(
        0.5, 0.65, f'Sigma (ns): {popt[5]*16:.2f}', transform=ax_main.transAxes)
    # sigma from fwhm
    max_val = np.max(double_sided_crystal_ball(interp_bin_centres, *popt))
    fwhm = interp_bin_centres[np.where(double_sided_crystal_ball(interp_bin_centres, *popt) > max_val/2)]
    fwhm = fwhm[-1] - fwhm[0]
    ax_main.text(0.5, 0.60, f'Sigma_fwhm (ns): {0.425*fwhm*16:.2f}', transform=ax_main.transAxes)

# Set up figure with GridSpec
fig, ax = plt.subplots(2, 2, figsize=(8, 5), gridspec_kw={'height_ratios': [3, 1]})
interp_bin_centres = np.linspace(0, tolerance, 1000)

# Plot ACL
plot_histogram_and_fit(ax[0, 0], ax[1, 0], acl_delta_t, 'b', 'ACL')

# Plot LCM
plot_histogram_and_fit(ax[0, 1], ax[1, 1], lcm_delta_t, 'r', 'LCM')

# Adjust layout
plt.tight_layout()
plt.show()

In [None]:
# vectorised version of clopper_pearson_interval
import numpy as np
import scipy.stats as st

def clopper_pearson_interval_numpy(k, n, alpha=0.05):
    """
    Vectorized Clopper-Pearson confidence interval for binomial proportions.

    Parameters:
    k : array-like
        Number of successes (can be an array).
    n : array-like
        Number of trials (can be an array).
    alpha : float, optional
        Significance level, default is 0.05 for a 95% confidence interval.

    Returns:
    ci_lower : ndarray
        Lower bound of the confidence interval.
    ci_upper : ndarray
        Upper bound of the confidence interval.
    """
    k = np.asarray(k, dtype=np.int64)
    n = np.asarray(n, dtype=np.int64)

    # Ensure valid values
    if np.any(k > n):
        raise ValueError("Number of successes k cannot be greater than number of trials n")

    alpha_2 = alpha / 2

    # Compute lower bound
    ci_lower = np.where(
        k > 0,
        st.beta.ppf(alpha_2, k, n - k + 1),
        0.0  # If k == 0, lower bound is 0
    )
    #ci_lower -= k/n

    # Compute upper bound
    ci_upper = np.where(
        k < n,
        st.beta.ppf(1 - alpha_2, k + 1, n - k),
        1.0  # If k == n, upper bound is 1
    )
    #ci_upper -= k/n

    return ci_lower, ci_upper


# calculate efficiency and fake rate as a function of pileup
pu_bins = np.linspace(-1, 7, 9)
dt_bins = np.linspace(0, tolerance, tolerance+1)
# acl
acl_delta_t_puhist = np.histogram2d(acl_rec_true_hits_pu, acl_delta_t, bins=[pu_bins, dt_bins])[0]
acl_rec_true_hits_puhist = np.histogram(acl_rec_true_hits_pu, bins=pu_bins)[0]
acl_true_hits_puhist = np.histogram(acl_true_hits_pu, bins=pu_bins)[0]
acl_rec_hits_puhist = np.histogram(acl_rec_hits_pu, bins=pu_bins)[0]
# lcm
lcm_delta_t_puhist = np.histogram2d(lcm_rec_true_hits_pu, lcm_delta_t, bins=[pu_bins, dt_bins])[0]
lcm_rec_true_hits_puhist = np.histogram(lcm_rec_true_hits_pu, bins=pu_bins)[0]
lcm_true_hits_puhist = np.histogram(lcm_true_hits_pu, bins=pu_bins)[0]
lcm_rec_hits_puhist = np.histogram(lcm_rec_hits_pu, bins=pu_bins)[0]

# acl efficiency in bins of pu
acl_eff_pu = acl_rec_true_hits_puhist / acl_true_hits_puhist
acl_eff_pu_err = clopper_pearson_interval_numpy(acl_rec_true_hits_puhist, acl_true_hits_puhist)

# acl fake rate in bins of pu
acl_fake_rate_pu = 1 - (acl_rec_true_hits_puhist / acl_rec_hits_puhist)
acl_fake_rate_pu_err = clopper_pearson_interval_numpy(acl_rec_true_hits_puhist, acl_rec_hits_puhist)

# lcm efficiency in bins of pu
lcm_eff_pu = lcm_rec_true_hits_puhist / lcm_true_hits_puhist
lcm_eff_pu_err = clopper_pearson_interval_numpy(lcm_rec_true_hits_puhist, lcm_true_hits_puhist)

# lcm fake rate in bins of pu
lcm_fake_rate_pu = 1 - (lcm_rec_true_hits_puhist / lcm_rec_hits_puhist)
lcm_fake_rate_pu_err = clopper_pearson_interval_numpy(lcm_rec_true_hits_puhist, lcm_rec_hits_puhist)

# plot efficiency and fake rate as a function of pileup
fig, ax = plt.subplots(2, figsize=(8, 5))

# plot acl efficiency
ax[0].step(pu_bins[:-1], acl_eff_pu, label='ACL')
ax[0].fill_between(pu_bins[:-1], acl_eff_pu_err[0], acl_eff_pu_err[1], step='pre', alpha=0.2)
# plot lcm efficiency
ax[0].step(pu_bins[:-1], lcm_eff_pu, label='LCM')
ax[0].fill_between(pu_bins[:-1], lcm_eff_pu_err[0], lcm_eff_pu_err[1], step='pre', alpha=0.2)
# formatting
ax[0].set_ylabel('Efficiency')
ax[0].set_xlabel('Pileup')
ax[0].set_xlim(0, 6)
ax[0].legend()

# plot acl fake rate
ax[1].step(pu_bins[:-1], acl_fake_rate_pu, label='ACL')
ax[1].fill_between(pu_bins[:-1], 1-acl_fake_rate_pu_err[0], 1-acl_fake_rate_pu_err[1], step='pre', alpha=0.2)
# plot lcm fake rate
ax[1].step(pu_bins[:-1], lcm_fake_rate_pu, label='LCM')
ax[1].fill_between(pu_bins[:-1], 1-lcm_fake_rate_pu_err[0], 1-lcm_fake_rate_pu_err[1], step='pre', alpha=0.2)
# formatting
ax[1].set_ylabel('Fake Rate')
ax[1].set_xlabel('Pileup')
ax[1].set_xlim(0, 6)
ax[1].legend()

plt.tight_layout()
plt.show()




