In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, hypot, least, greatest, sum

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm, LinearSegmentedColormap

from sparkhistogram import computeWeightedHistogram

In [None]:
# start Spark session

bLocal = False  # enable only if running locally

if bLocal:
    spark = (
        SparkSession.builder
        .master("local[*]")
        .appName("hike_sensitivity_boolean_pyspark_lambda")
        .config("spark.ui.showConsoleProgress", "false")
        .getOrCreate()
    )
    
spark

---

# open data

In [None]:
# data folder(s) (shall have "/" at the end):
masterpath = ("" if bLocal else "root://eosuser.cern.ch/")+"/DATA_MASTER_PATH/"
datasets = [
    masterpath + "23_hike_pinunu-background/2306_zoptical-zanalyze_lambda_mass_prod/",  # whole folder with the mass-production background data
]

In [None]:
# dataset schema
schema =\
"class string," +\
"iEv int," +\
"W float," +\
"iPair int," +\
"Vertex_nConverted float," +\
"Vertex_xRec_Z float," +\
"Vertex_xRec_X float," +\
"Vertex_xRec_Y float," +\
"Vertex_xRecPre_Z float," +\
"Vertex_xRecPre_X float," +\
"Vertex_xRecPre_Y float," +\
"Cluster0_nHits int," +\
"Cluster0_xRec_Z float," +\
"Cluster0_xRec_X float," +\
"Cluster0_xRec_Y float," +\
"Vertex_pRec0_Z float," +\
"Vertex_pRec0_X float," +\
"Vertex_pRec0_Y float," +\
"Cluster0_ERec float," +\
"Cluster0_PosRes float," +\
"Cluster0_ERes float," +\
"Cluster0_xPre1_Z float," +\
"Cluster0_xPre1_X float," +\
"Cluster0_xPre1_Y float," +\
"Cluster0_xPre2_Z float," +\
"Cluster0_xPre2_X float," +\
"Cluster0_xPre2_Y float," +\
"Vertex_pRecPre0_Z float," +\
"Vertex_pRecPre0_X float," +\
"Vertex_pRecPre0_Y float," +\
"Cluster0_PreRes float," +\
"Cluster1_nHits int," +\
"Cluster1_xRec_Z float," +\
"Cluster1_xRec_X float," +\
"Cluster1_xRec_Y float," +\
"Vertex_pRec1_Z float," +\
"Vertex_pRec1_X float," +\
"Vertex_pRec1_Y float," +\
"Cluster1_ERec float," +\
"Cluster1_PosRes float," +\
"Cluster1_ERes float," +\
"Cluster1_xPre1_Z float," +\
"Cluster1_xPre1_X float," +\
"Cluster1_xPre1_Y float," +\
"Cluster1_xPre2_Z float," +\
"Cluster1_xPre2_X float," +\
"Cluster1_xPre2_Y float," +\
"Vertex_pRecPre1_Z float," +\
"Vertex_pRecPre1_X float," +\
"Vertex_pRecPre1_Y float," +\
"Cluster1_PreRes float"

In [None]:
# open dataset
df = (
    spark
    .read \
    .option("header", "true") \
    .csv(datasets, schema=schema)
)

In [None]:
%%time

# there are faulty vertex (as reconstructed by the calorimeter) data --> throwing them away

df.select(col("Vertex_xRec_Z").isNotNull().alias("Vertex_Rec_Broken")).groupBy("Vertex_Rec_Broken").count().show()

print("--> removing all these broken events ('false' above)...")
df = df.filter(col("Vertex_xRec_Z").isNotNull())

print("---")

---

# condition datasets

In [None]:
%%time

# format class variable

# set all possible labels here:
classlabel = {
    "k-2pi" : 0,
    "k-pinunu" : 1,
    "lambda-pin" : 5,
}
classlabel_udf = udf(lambda s : {str(s_val) : s_key for s_key, s_val in classlabel.items()}[s])

df = df.withColumn("class_lab", classlabel_udf(col("class")))

df.groupBy("class", "class_lab").count().show()
print("---")

In [None]:
# variable conditioning - select best vertex quantities

bestvtx_udf = udf(lambda x_rec, x_pre, b_pre : x_pre if b_pre else x_rec )

df = df.withColumn("Vertex_xBest_Z", bestvtx_udf(col("Vertex_xRec_Z"), col("Vertex_xRecPre_Z"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_xBest_X", bestvtx_udf(col("Vertex_xRec_X"), col("Vertex_xRecPre_X"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_xBest_Y", bestvtx_udf(col("Vertex_xRec_Y"), col("Vertex_xRecPre_Y"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_pBest0_Z", bestvtx_udf(col("Vertex_pRec0_Z"), col("Vertex_pRecPre0_Z"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_pBest0_X", bestvtx_udf(col("Vertex_pRec0_X"), col("Vertex_pRecPre0_X"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_pBest0_Y", bestvtx_udf(col("Vertex_pRec0_Y"), col("Vertex_pRecPre0_Y"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_pBest1_Z", bestvtx_udf(col("Vertex_pRec1_Z"), col("Vertex_pRecPre1_Z"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_pBest1_X", bestvtx_udf(col("Vertex_pRec1_X"), col("Vertex_pRecPre1_X"), col("Vertex_nConverted")).cast("double"))
df = df.withColumn("Vertex_pBest1_Y", bestvtx_udf(col("Vertex_pRec1_Y"), col("Vertex_pRecPre1_Y"), col("Vertex_nConverted")).cast("double"))

In [None]:
# variable conditioning - compute all transverse quantities from cartesian components

df = df.withColumn("Vertex_xRec_T", hypot(col("Vertex_xRec_X"), col("Vertex_xRec_Y")).cast("double"))
df = df.withColumn("Vertex_xRecPre_T", hypot(col("Vertex_xRecPre_X"), col("Vertex_xRecPre_Y")).cast("double"))
df = df.withColumn("Cluster0_xRec_T", hypot(col("Cluster0_xRec_X"), col("Cluster0_xRec_Y")).cast("double"))
df = df.withColumn("Cluster1_xRec_T", hypot(col("Cluster1_xRec_X"), col("Cluster1_xRec_Y")).cast("double"))
df = df.withColumn("Vertex_pRec0_T", hypot(col("Vertex_pRec0_X"), col("Vertex_pRec0_Y")).cast("double"))
df = df.withColumn("Vertex_pRec1_T", hypot(col("Vertex_pRec1_X"), col("Vertex_pRec1_Y")).cast("double"))
df = df.withColumn("Cluster0_xPre1_T", hypot(col("Cluster0_xPre1_X"), col("Cluster0_xPre1_Y")).cast("double"))
df = df.withColumn("Cluster0_xPre2_T", hypot(col("Cluster0_xPre2_X"), col("Cluster0_xPre2_Y")).cast("double"))
df = df.withColumn("Cluster1_xPre1_T", hypot(col("Cluster1_xPre1_X"), col("Cluster1_xPre1_Y")).cast("double"))
df = df.withColumn("Cluster1_xPre2_T", hypot(col("Cluster1_xPre2_X"), col("Cluster1_xPre2_Y")).cast("double"))
df = df.withColumn("Vertex_pRecPre0_T", hypot(col("Vertex_pRecPre0_X"), col("Vertex_pRecPre0_Y")).cast("double"))
df = df.withColumn("Vertex_pRecPre1_T", hypot(col("Vertex_pRecPre1_X"), col("Vertex_pRecPre1_Y")).cast("double"))

df = df.withColumn("Vertex_xBest_T", hypot(col("Vertex_xBest_X"), col("Vertex_xBest_Y")).cast("double"))
df = df.withColumn("Vertex_pBest0_T", hypot(col("Vertex_pBest0_X"), col("Vertex_pBest0_Y")).cast("double"))
df = df.withColumn("Vertex_pBest1_T", hypot(col("Vertex_pBest1_X"), col("Vertex_pBest1_Y")).cast("double"))

In [None]:
# variable conditioning - compute new 2-cluster position and momentum-related variables

xminene_udf = udf(lambda x0, x1, e0, e1 : x0 if (e0<e1) else x1)
xmaxene_udf = udf(lambda x0, x1, e0, e1 : x1 if (e0<e1) else x0)

df = df.withColumn("Clusters_xRec_TMin", least(col("Cluster0_xRec_T"), col("Cluster1_xRec_T")).cast("double"))
df = df.withColumn("Clusters_xPre1_TMin", least(col("Cluster0_xPre1_T"), col("Cluster1_xPre1_T")).cast("double"))
df = df.withColumn("Clusters_xPre2_TMin", least(col("Cluster0_xPre2_T"), col("Cluster1_xPre2_T")).cast("double"))
df = df.withColumn("Vertex_pRec_TMin", least(col("Vertex_pRec0_T"), col("Vertex_pRec1_T")).cast("double"))
df = df.withColumn("Vertex_pRecPre_TMin", least(col("Vertex_pRecPre0_T"), col("Vertex_pRecPre1_T")).cast("double"))
df = df.withColumn("Clusters_xRec_TMax", greatest(col("Cluster0_xRec_T"), col("Cluster1_xRec_T")).cast("double"))
df = df.withColumn("Clusters_xPre1_TMax", greatest(col("Cluster0_xPre1_T"), col("Cluster1_xPre1_T")).cast("double"))
df = df.withColumn("Clusters_xPre2_TMax", greatest(col("Cluster0_xPre2_T"), col("Cluster1_xPre2_T")).cast("double"))
df = df.withColumn("Vertex_pRec_TMax", greatest(col("Vertex_pRec0_T"), col("Vertex_pRec1_T")).cast("double"))
df = df.withColumn("Vertex_pRecPre_TMax", greatest(col("Vertex_pRecPre0_T"), col("Vertex_pRecPre1_T")).cast("double"))
df = df.withColumn("Clusters_xRec_TSum", col("Clusters_xRec_TMax") + col("Clusters_xRec_TMin"))
df = df.withColumn("Clusters_xPre1_TSum", col("Clusters_xPre1_TMax") + col("Clusters_xPre1_TMin"))
df = df.withColumn("Clusters_xPre2_TSum", col("Clusters_xPre2_TMax") + col("Clusters_xPre2_TMin"))
df = df.withColumn("Vertex_pRec_TSum", col("Vertex_pRec_TMax") + col("Vertex_pRec_TMin"))
df = df.withColumn("Vertex_pRecPre_TSum", col("Vertex_pRecPre_TMax") + col("Vertex_pRecPre_TMin"))
df = df.withColumn("Clusters_xRec_TDif", col("Clusters_xRec_TMax") - col("Clusters_xRec_TMin"))
df = df.withColumn("Clusters_xPre1_TDif", col("Clusters_xPre1_TMax") - col("Clusters_xPre1_TMin"))
df = df.withColumn("Clusters_xPre2_TDif", col("Clusters_xPre2_TMax") - col("Clusters_xPre2_TMin"))
df = df.withColumn("Vertex_pRec_TDif", col("Vertex_pRec_TMax") - col("Vertex_pRec_TMin"))
df = df.withColumn("Vertex_pRecPre_TDif", col("Vertex_pRecPre_TMax") - col("Vertex_pRecPre_TMin"))
df = df.withColumn("Clusters_xRec_TAsym", col("Clusters_xRec_TDif") / col("Clusters_xRec_TSum"))
df = df.withColumn("Clusters_xPre1_TAsym", col("Clusters_xPre1_TDif") / col("Clusters_xPre1_TSum"))
df = df.withColumn("Clusters_xPre2_TAsym", col("Clusters_xPre2_TDif") / col("Clusters_xPre2_TSum"))
df = df.withColumn("Vertex_pRec_TAsym", col("Vertex_pRec_TDif") / col("Vertex_pRec_TSum"))
df = df.withColumn("Vertex_pRecPre_TAsym", col("Vertex_pRecPre_TDif") / col("Vertex_pRecPre_TSum"))

df = df.withColumn("Vertex_pBest_TMin", least(col("Vertex_pBest0_T"), col("Vertex_pBest1_T")).cast("double"))
df = df.withColumn("Vertex_pBest_TMax", greatest(col("Vertex_pBest0_T"), col("Vertex_pBest1_T")).cast("double"))
df = df.withColumn("Vertex_pBest_TSum", col("Vertex_pBest_TMax") + col("Vertex_pBest_TMin"))
df = df.withColumn("Vertex_pBest_TDif", col("Vertex_pBest_TMax") - col("Vertex_pBest_TMin"))
df = df.withColumn("Vertex_pBest_TAsym", col("Vertex_pBest_TDif") / col("Vertex_pBest_TSum"))

df = df.withColumn(
    "Clusters_xDist",
    hypot(col("Cluster1_xRec_X")-col("Cluster0_xRec_X"), col("Cluster1_xRec_Y")-col("Cluster0_xRec_Y"))
)

In [None]:
# variable conditioning - compute new 2-cluster energy-related variables

df = df.withColumn("Clusters_EMin", least(col("Cluster0_ERec"), col("Cluster1_ERec")).cast("double"))
df = df.withColumn("Clusters_EMax", greatest(col("Cluster0_ERec"), col("Cluster1_ERec")).cast("double"))
df = df.withColumn("Clusters_ESum", col("Clusters_EMax") + col("Clusters_EMin"))
df = df.withColumn("Clusters_EDif", col("Clusters_EMax") - col("Clusters_EMin"))
df = df.withColumn("Clusters_EAsym", col("Clusters_EDif") / col("Clusters_ESum"))

df = df.withColumn("Clusters_xRec_TEMin", xminene_udf(col("Cluster0_xRec_T"), col("Cluster1_xRec_T"), col("Cluster0_ERec"), col("Cluster1_ERec")).cast("double"))
df = df.withColumn("Clusters_xRec_TEMax", xmaxene_udf(col("Cluster0_xRec_T"), col("Cluster1_xRec_T"), col("Cluster0_ERec"), col("Cluster1_ERec")).cast("double"))

df = df.withColumn(
    "Clusters_ECOG",
    hypot(
        col("Cluster0_ERec")*col("Cluster0_xRec_X")+col("Cluster1_ERec")*col("Cluster1_xRec_X"),
        col("Cluster0_ERec")*col("Cluster0_xRec_Y")+col("Cluster1_ERec")*col("Cluster1_xRec_Y")
    ).cast("double") / col("Clusters_ESum").cast("double")
)

In [None]:
# variable conditioning - compute pion kinematics from two photons (pion energy is simply Clusters_ESum)

df = df.withColumn("Vertex_pRecPi_Z", col("Vertex_pRec0_Z") + col("Vertex_pRec1_Z"))
df = df.withColumn("Vertex_pRecPi_X", col("Vertex_pRec0_X") + col("Vertex_pRec1_X"))
df = df.withColumn("Vertex_pRecPi_Y", col("Vertex_pRec0_Y") + col("Vertex_pRec1_Y"))
df = df.withColumn("Vertex_pRecPrePi_Z", col("Vertex_pRecPre0_Z") + col("Vertex_pRecPre1_Z"))
df = df.withColumn("Vertex_pRecPrePi_X", col("Vertex_pRecPre0_X") + col("Vertex_pRecPre1_X"))
df = df.withColumn("Vertex_pRecPrePi_Y", col("Vertex_pRecPre0_Y") + col("Vertex_pRecPre1_Y"))

df = df.withColumn("Vertex_pRecPi_T", hypot(col("Vertex_pRecPi_X"), col("Vertex_pRecPi_Y")).cast("double"))
df = df.withColumn("Vertex_pRecPrePi_T", hypot(col("Vertex_pRecPrePi_X"), col("Vertex_pRecPrePi_Y")).cast("double"))

df = df.withColumn("Vertex_pRecPi_mod", hypot(col("Vertex_pRecPi_T"), col("Vertex_pRecPi_Z")).cast("double"))
df = df.withColumn("Vertex_pRecPrePi_mod", hypot(col("Vertex_pRecPrePi_T"), col("Vertex_pRecPrePi_Z")).cast("double"))
df = df.withColumn("Vertex_pRecPi_sTh", col("Vertex_pRecPi_T") / col("Vertex_pRecPi_mod"))
df = df.withColumn("Vertex_pRecPrePi_sTh", col("Vertex_pRecPrePi_T") / col("Vertex_pRecPrePi_mod"))

df = df.withColumn("Vertex_pBestPi_Z", col("Vertex_pBest0_Z") + col("Vertex_pBest1_Z"))
df = df.withColumn("Vertex_pBestPi_X", col("Vertex_pBest0_X") + col("Vertex_pBest1_X"))
df = df.withColumn("Vertex_pBestPi_Y", col("Vertex_pBest0_Y") + col("Vertex_pBest1_Y"))

df = df.withColumn("Vertex_pBestPi_T", hypot(col("Vertex_pBestPi_X"), col("Vertex_pBestPi_Y")).cast("double"))

df = df.withColumn("Vertex_pBestPi_mod", hypot(col("Vertex_pBestPi_T"), col("Vertex_pBestPi_Z")).cast("double"))
df = df.withColumn("Vertex_pBestPi_sTh", col("Vertex_pBestPi_T") / col("Vertex_pBestPi_mod"))

---

# boolean analysis

In [None]:
shift_fv = 150  # FV frame shift to match the data reference system

bool_sig_udf = udf(lambda s_class : s_class=="k-pinunu")
bool_bkg_udf = udf(lambda s_class : s_class!="k-pinunu")
bool_2mec_udf = udf(lambda s_class : s_class!="")
bool_fv_udf = udf(lambda z_vtx : (z_vtx > (280-shift_fv)) & (z_vtx < (350-shift_fv)))
bool_rmin_udf = udf(lambda r_min : r_min > 35e-2)
bool_emin_udf = udf(lambda e_min, r_e_min : e_min > (2/r_e_min))
bool_pt_udf = udf(lambda p_t : p_t > 0.140)
bool_psv_udf = udf(lambda n_conv, z_vtx_pre, p_t_pre : (n_conv > 0) & (z_vtx_pre < (350-shift_fv)) & (p_t_pre > 0.140))
bool_even_udf = udf(lambda i_pair : i_pair==1)
bool_odd_udf = udf(lambda i_pair : i_pair==0)
bool_nofused_udf = udf(lambda n_hit_0, n_hit_1 : (n_hit_0==1) & (n_hit_1==1))
bool_fused_udf = udf(lambda n_hit_0, n_hit_1 : (n_hit_0!=1) | (n_hit_1!=1))

df = df.withColumn("bool_sig", bool_sig_udf(col("class_lab")).cast("boolean"))
df = df.withColumn("bool_bkg", bool_bkg_udf(col("class_lab")).cast("boolean"))
df = df.withColumn("bool_2mec", bool_2mec_udf(col("class_lab")).cast("boolean"))
df = df.withColumn("bool_fv", bool_fv_udf(col("Vertex_xRec_Z")).cast("boolean"))
df = df.withColumn("bool_rmin", bool_rmin_udf(col("Clusters_xRec_TMin")).cast("boolean"))
df = df.withColumn("bool_emin", bool_emin_udf(col("Clusters_EMin"), col("Clusters_xRec_TEMin")).cast("boolean"))
df = df.withColumn("bool_pt", bool_pt_udf(col("Vertex_pRecPi_T")).cast("boolean"))
df = df.withColumn("bool_psv", bool_psv_udf(col("Vertex_nConverted"), col("Vertex_xRecPre_Z"), col("Vertex_pRecPrePi_T")).cast("boolean"))
df = df.withColumn("bool_even", bool_even_udf(col("iPair")).cast("boolean"))
df = df.withColumn("bool_odd", bool_odd_udf(col("iPair")).cast("boolean"))
df = df.withColumn("bool_nofused", bool_nofused_udf(col("Cluster0_nHits"), col("Cluster1_nHits")).cast("boolean"))
df = df.withColumn("bool_fused", bool_fused_udf(col("Cluster0_nHits"), col("Cluster1_nHits")).cast("boolean"))

# booleans: with 2 photons in MEC (i.e. the total dataset for each class)
df = df.withColumn("bool_sig_2mec", col("bool_sig") & col("bool_2mec"))
df = df.withColumn("bool_bkg_2mec", col("bool_bkg") & col("bool_2mec"))
df = df.withColumn("bool_bkgEv_2mec", col("bool_nofused") & col("bool_even") & col("bool_bkg") & col("bool_2mec"))
df = df.withColumn("bool_bkgOd_2mec", col("bool_nofused") & ~col("bool_even") & col("bool_bkg") & col("bool_2mec"))
df = df.withColumn("bool_bkgFs_2mec", ~col("bool_nofused") & col("bool_bkg") & col("bool_2mec"))

# booleans: reconstructed in FV with MEC only
df = df.withColumn("bool_sig_2mec_fv", col("bool_sig_2mec") & col("bool_fv"))
df = df.withColumn("bool_bkg_2mec_fv", col("bool_bkg_2mec") & col("bool_fv"))
df = df.withColumn("bool_bkgEv_2mec_fv", col("bool_bkgEv_2mec") & col("bool_fv"))
df = df.withColumn("bool_bkgOd_2mec_fv", col("bool_bkgOd_2mec") & col("bool_fv"))
df = df.withColumn("bool_bkgFs_2mec_fv", col("bool_bkgFs_2mec") & col("bool_fv"))

# booleans: cut on minimum-radius cluster (>35 cm)
df = df.withColumn("bool_sig_2mec_fv_rmin", col("bool_sig_2mec_fv") & col("bool_rmin"))
df = df.withColumn("bool_bkg_2mec_fv_rmin", col("bool_bkg_2mec_fv") & col("bool_rmin"))
df = df.withColumn("bool_bkgEv_2mec_fv_rmin", col("bool_bkgEv_2mec_fv") & col("bool_rmin"))
df = df.withColumn("bool_bkgOd_2mec_fv_rmin", col("bool_bkgOd_2mec_fv") & col("bool_rmin"))
df = df.withColumn("bool_bkgFs_2mec_fv_rmin", col("bool_bkgFs_2mec_fv") & col("bool_rmin"))

# booleans: cut minimum-energy cluster (>2 GeV divided by corresponding cluster radius)
df = df.withColumn("bool_sig_2mec_fv_rmin_emin", col("bool_sig_2mec_fv_rmin") & col("bool_emin"))
df = df.withColumn("bool_bkg_2mec_fv_rmin_emin", col("bool_bkg_2mec_fv_rmin") & col("bool_emin"))
df = df.withColumn("bool_bkgEv_2mec_fv_rmin_emin", col("bool_bkgEv_2mec_fv_rmin") & col("bool_emin"))
df = df.withColumn("bool_bkgOd_2mec_fv_rmin_emin", col("bool_bkgOd_2mec_fv_rmin") & col("bool_emin"))
df = df.withColumn("bool_bkgFs_2mec_fv_rmin_emin", col("bool_bkgFs_2mec_fv_rmin") & col("bool_emin"))

# booleans: cut on pion transverse momentum (>0.140 GeV) computed with MEC only
df = df.withColumn("bool_sig_2mec_fv_rmin_emin_pt", col("bool_sig_2mec_fv_rmin_emin") & col("bool_pt"))
df = df.withColumn("bool_bkg_2mec_fv_rmin_emin_pt", col("bool_bkg_2mec_fv_rmin_emin") & col("bool_pt"))
df = df.withColumn("bool_bkgEv_2mec_fv_rmin_emin_pt", col("bool_bkgEv_2mec_fv_rmin_emin") & col("bool_pt"))
df = df.withColumn("bool_bkgOd_2mec_fv_rmin_emin_pt", col("bool_bkgOd_2mec_fv_rmin_emin") & col("bool_pt"))
df = df.withColumn("bool_bkgFs_2mec_fv_rmin_emin_pt", col("bool_bkgFs_2mec_fv_rmin_emin") & col("bool_pt"))

# booleans: adding the PSV data
df = df.withColumn("bool_sig_2mec_fv_rmin_emin_pt_psv", col("bool_sig_2mec_fv_rmin_emin_pt") & col("bool_psv"))
df = df.withColumn("bool_bkg_2mec_fv_rmin_emin_pt_psv", col("bool_bkg_2mec_fv_rmin_emin_pt") & col("bool_psv"))
df = df.withColumn("bool_bkgEv_2mec_fv_rmin_emin_pt_psv", col("bool_bkgEv_2mec_fv_rmin_emin_pt") & col("bool_psv"))
df = df.withColumn("bool_bkgOd_2mec_fv_rmin_emin_pt_psv", col("bool_bkgOd_2mec_fv_rmin_emin_pt") & col("bool_psv"))
df = df.withColumn("bool_bkgFs_2mec_fv_rmin_emin_pt_psv", col("bool_bkgFs_2mec_fv_rmin_emin_pt") & col("bool_psv"))

In [None]:
%%time

# now count events...

get_n = lambda s : df.filter(df[s]).select(sum(df["W"])).collect()[0][0]

# with 2 photons in MEC (i.e. the total dataset for each class)
cuts = "2mec"
print("counting events in subset %s..." % cuts)
#n_sig_2mec = get_n("bool_sig_%s" % cuts)
n_bkg_2mec = get_n("bool_bkg_%s" % cuts)
#n_bkgEv_2mec = get_n("bool_bkgEv_%s" % cuts)
#n_bkgOd_2mec = get_n("bool_bkgOd_%s" % cuts)
#n_bkgFs_2mec = get_n("bool_bkgFs_%s" % cuts)

# reconstructed in FV with MEC only
cuts = "2mec_fv"
print("counting events in subset %s..." % cuts)
#n_sig_2mec_fv = get_n("bool_sig_%s" % cuts)
n_bkg_2mec_fv = get_n("bool_bkg_%s" % cuts)
#n_bkgEv_2mec_fv = get_n("bool_bkgEv_%s" % cuts)
#n_bkgOd_2mec_fv = get_n("bool_bkgOd_%s" % cuts)
#n_bkgFs_2mec_fv = get_n("bool_bkgFs_%s" % cuts)

# cut on minimum-radius cluster (>35 cm)
cuts = "2mec_fv_rmin"
print("counting events in subset %s..." % cuts)
#n_sig_2mec_fv_rmin = get_n("bool_sig_%s" % cuts)
n_bkg_2mec_fv_rmin = get_n("bool_bkg_%s" % cuts)
#n_bkgEv_2mec_fv_rmin = get_n("bool_bkgEv_%s" % cuts)
#n_bkgOd_2mec_fv_rmin = get_n("bool_bkgOd_%s" % cuts)
#n_bkgFs_2mec_fv_rmin = get_n("bool_bkgFs_%s" % cuts)

# cut minimum-energy cluster (>2 GeV divided by corresponding cluster radius)
cuts = "2mec_fv_rmin_emin"
print("counting events in subset %s..." % cuts)
#n_sig_2mec_fv_rmin_emin = get_n("bool_sig_%s" % cuts)
n_bkg_2mec_fv_rmin_emin = get_n("bool_bkg_%s" % cuts)
#n_bkgEv_2mec_fv_rmin_emin = get_n("bool_bkgEv_%s" % cuts)
#n_bkgOd_2mec_fv_rmin_emin = get_n("bool_bkgOd_%s" % cuts)
#n_bkgFs_2mec_fv_rmin_emin = get_n("bool_bkgFs_%s" % cuts)

# cut on pion transverse momentum (>0.140 GeV) computed with MEC only
cuts = "2mec_fv_rmin_emin_pt"
print("counting events in subset %s..." % cuts)
#n_sig_2mec_fv_rmin_emin_pt = get_n("bool_sig_%s" % cuts)
n_bkg_2mec_fv_rmin_emin_pt = get_n("bool_bkg_%s" % cuts)
#n_bkgEv_2mec_fv_rmin_emin_pt = get_n("bool_bkgEv_%s" % cuts)
#n_bkgOd_2mec_fv_rmin_emin_pt = get_n("bool_bkgOd_%s" % cuts)
#n_bkgFs_2mec_fv_rmin_emin_pt = get_n("bool_bkgFs_%s" % cuts)

# adding the PSV data
cuts = "2mec_fv_rmin_emin_pt_psv"
print("counting events in subset %s..." % cuts)
#n_sig_2mec_fv_rmin_emin_pt_psv = get_n("bool_sig_%s" % cuts)
n_bkg_2mec_fv_rmin_emin_pt_psv = get_n("bool_bkg_%s" % cuts)
#n_bkgEv_2mec_fv_rmin_emin_pt_psv = get_n("bool_bkgEv_%s" % cuts)
#n_bkgOd_2mec_fv_rmin_emin_pt_psv = get_n("bool_bkgOd_%s" % cuts)
#n_bkgFs_2mec_fv_rmin_emin_pt_psv = get_n("bool_bkgFs_%s" % cuts)

print("---")

In [None]:
# total events (with 2 photons in MEC) in the LoI:
normSig = 302
normBkg = 2.29e5  #9.45e7
#normSig = 37800/70e6 * n_sig_2mec  
#normBkg = 1.089e12/(500*500e6 + 7e9) * n_bkg_2mec  

#print("counts for signal:")
#print(normSig*n_sig_2mec/n_sig_2mec)
#print(normSig*n_sig_2mec_fv/n_sig_2mec)
#print(normSig*n_sig_2mec_fv_rmin/n_sig_2mec)
#print(normSig*n_sig_2mec_fv_rmin_emin/n_sig_2mec)
#print(normSig*n_sig_2mec_fv_rmin_emin_pt/n_sig_2mec)
#print(normSig*n_sig_2mec_fv_rmin_emin_pt_psv/n_sig_2mec)

#print("---\ncounts for background (total):")
print("counts for background (total):")
print(normBkg*n_bkg_2mec/n_bkg_2mec)
print(normBkg*n_bkg_2mec_fv/n_bkg_2mec)
print(normBkg*n_bkg_2mec_fv_rmin_emin/n_bkg_2mec)
print(normBkg*n_bkg_2mec_fv_rmin_emin_pt/n_bkg_2mec)
print(normBkg*n_bkg_2mec_fv_rmin_emin_pt_psv/n_bkg_2mec)

#print("---\ncounts for background (even, not fused):")
#print(normBkg*n_bkgEv_2mec/n_bkg_2mec)
#print(normBkg*n_bkgEv_2mec_fv/n_bkg_2mec)
#print(normBkg*n_bkgEv_2mec_fv_rmin_emin/n_bkg_2mec)
#print(normBkg*n_bkgEv_2mec_fv_rmin_emin_pt/n_bkg_2mec)
#print(normBkg*n_bkgEv_2mec_fv_rmin_emin_pt_psv/n_bkg_2mec)
#
#print("---\ncounts for background (odd, not fused):")
#print(normBkg*n_bkgOd_2mec/n_bkg_2mec)
#print(normBkg*n_bkgOd_2mec_fv/n_bkg_2mec)
#print(normBkg*n_bkgOd_2mec_fv_rmin_emin/n_bkg_2mec)
#print(normBkg*n_bkgOd_2mec_fv_rmin_emin_pt/n_bkg_2mec)
#print(normBkg*n_bkgOd_2mec_fv_rmin_emin_pt_psv/n_bkg_2mec)
#
#print("---\ncounts for background (fused):")
#print(normBkg*n_bkgFs_2mec/n_bkg_2mec)
#print(normBkg*n_bkgFs_2mec_fv/n_bkg_2mec)
#print(normBkg*n_bkgFs_2mec_fv_rmin_emin/n_bkg_2mec)
#print(normBkg*n_bkgFs_2mec_fv_rmin_emin_pt/n_bkg_2mec)
#print(normBkg*n_bkgFs_2mec_fv_rmin_emin_pt_psv/n_bkg_2mec)

In [None]:
# plots - define function to generate the 2-dimensional histograms
bPlots = True

def custom_hist2d(df, x, y, bins):
    hist2d = []
    for ix in range(len(bins[0])-1):
        bool_dk_udf = udf(lambda k : bool((k>=bins[0][ix]) & (k<bins[0][ix+1])))
        df_temp = df.withColumn("bool_dx", bool_dk_udf(col(x)).cast("boolean"))
        df_temp = df_temp.filter("bool_dx")
        #hist2d_0 = df_temp.select(x, y).select(y).rdd.flatMap(lambda q: q).histogram(bins[1])
        #hist2d.append(hist2d_0[1])
        hist2d_0 = computeWeightedHistogram(df_temp, y, "W", np.min(bins[1]), np.max(bins[1]), len(bins[1])-1).toPandas()
        hist2d.append(hist2d_0["weighted_sum"])
    return [bins[0], bins[1], hist2d]

In [None]:
%%time

# plots - generate the 2-dimensional histograms --> in hists
if bPlots:

    range_plot = ((150-shift_fv, 400-shift_fv), (0, 0.4)) 
    nbins = (100, 50)

    hists = [[], []]
    for i_class in [0]:

        df_R0 = df.filter("bool_2mec").filter("bool_rmin").filter("bool_emin")
        #if i_class==0:
        #    df_L = df.filter("bool_sig")
        #    df_R = df_R0.filter("bool_sig").filter("bool_psv")
        if i_class==0:
            df_L = df.filter("bool_bkg")
            df_R = df_R0.filter("bool_bkg").filter("bool_psv")
        if i_class==1:
            df_L = df.filter("bool_bkg").filter("bool_nofused").filter("bool_even")
            df_R = df_R0.filter("bool_bkg").filter("bool_nofused").filter("bool_even")
        if i_class==2:
            df_L = df.filter("bool_bkg").filter("bool_nofused").filter("bool_odd")
            df_R = df_R0.filter("bool_bkg").filter("bool_nofused").filter("bool_odd")
        if i_class==3:
            df_L = df.filter("bool_bkg").filter("bool_fused")
            df_R = df_R0.filter("bool_bkg").filter("bool_fused")

        hist = custom_hist2d(
            df_L, "Vertex_xRec_Z", "Vertex_pRecPi_T",
            (tuple(np.linspace(150-shift_fv, 400-shift_fv, nbins[0]+1)), tuple(np.linspace(0, 0.4, nbins[1]+1)))
        )
        hists[0].append(hist)

        hist = custom_hist2d(
            df_R, "Vertex_xRec_Z", "Vertex_pRecPi_T",
            (tuple(np.linspace(150-shift_fv, 400-shift_fv, nbins[0]+1)), tuple(np.linspace(0, 0.4, nbins[1]+1)))
        )
        hists[1].append(hist)

In [None]:
# plots - plot the histograms
if bPlots:

    bPreBool_save = True  # save plots?

    bLog = False
    cmap_name = "jet"

    #cmap = LinearSegmentedColormap.from_list(
    #    "%s_white" % cmap_name, 
    #    list(np.concatenate((np.array([[0, 0, 0, 0]]), plt.get_cmap(cmap_name)(np.arange(256))))),
    #)
    cmap = plt.get_cmap(cmap_name).copy()
    cmap.set_bad('white')

    for i_class in [0]:
        fig, ax = plt.subplots(num=i_class, nrows=1, ncols=2, figsize=(12, 4))
        hist = hists[0][i_class]
        for ia, a in enumerate(hist[2]):
            for ib, b in enumerate(a):
                if (b<=0): hist[2][ia][ib] = np.nan
        
        ax[0].imshow(
            np.flip(np.array(hist[2]).T, axis=0), 
            extent=(hist[0][0], hist[0][-1], hist[1][0], hist[1][-1]), 
            aspect="auto", cmap=cmap, norm=LogNorm() if bLog else None, interpolation="none"
        )

        hist = hists[1][i_class]
        for ia, a in enumerate(hist[2]):
            for ib, b in enumerate(a):
                if (b<=0): hist[2][ia][ib] = np.nan
        
        ax[1].imshow(
            np.flip(np.array(hist[2]).T, axis=0), 
            extent=(hist[0][0], hist[0][-1], hist[1][0], hist[1][-1]), 
            aspect="auto", cmap=cmap, norm=LogNorm() if bLog else None, interpolation="none"
        )

        box = ((280-shift_fv, 1), (280-shift_fv, 0.14), (350-shift_fv, 0.14), (350-shift_fv, 1))
        x_box, y_box = zip(*box)
        ax[0].plot(x_box, y_box, color="0.7", lw=3)
        ax[1].plot(x_box, y_box, color="0.7", lw=3)

        titles = ["signal", "background, even, not fused", "background, odd, not fused", "background, fused"]
        #fig.suptitle(titles[i_class])
        fig.suptitle("lambda background, total")
        fig.supxlabel("Vertex_xRec_Z [m]")
        fig.supylabel("Vertex_pRecPi_T [GeV]")
        for i in (0, 1): ax[i].set_ylim((0, 0.4))
        fig.tight_layout()
        if bPreBool_save:
            #fig.savefig("./output_misc/%s_SPARK.png" % titles[i_class].replace(",", "_").replace(" ", ""))
            fig.savefig("./output_misc/lambda_SPARK.png")

In [None]:
# stop Spark session
spark.stop()