Installing for google colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Be careful with the spark version

In [None]:
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
!pip install -q findspark

In [None]:
# Environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

Starting a pyspark session

In [None]:
import findspark
findspark.init()

Be careful with the spark_root_jar version, look the next link: https://github.com/diana-hep/spark-root/tree/master/jars

In [None]:
pyspark_python = "/usr/local/bin/python"
spark_root_jar = "https://github.com/diana-hep/spark-root/blob/master/jars/spark-root_2.11-0.1.18.jar?raw=true"

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("1-Data-Ingestion") \
        .master("local") \
        .config("spark.jars",spark_root_jar) \
        .config("spark.jars.packages","org.diana-hep:root4j:0.1.6") \
        .config("spark.pyspark.python",pyspark_python) \
        .getOrCreate()

In [None]:
spark

In [None]:
spark.sparkContext.addPyFile("utilFunctions.py")

Read the data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
PATH = "/content/drive/MyDrive/UCSP/Big-Data/Final-project-data/"

samples = ["qcd_lepFilter_13TeV", "ttbar_lepFilter_13TeV", "Wlnu_lepFilter_13TeV"]

requiredColumns = [
    "EFlowTrack",
    "EFlowNeutralHadron",
    "EFlowPhoton",
    "Electron",
    "MuonTight",
    "MuonTight_size",
    "Electron_size",
    "MissingET",
    "Jet"
]

In [None]:
from pyspark.sql.functions import lit

dfList = []

for label, sample in enumerate(samples):
  print("Loading {} sample...".format(sample))
  tmpDF = spark.read \
              .format("org.dianahep.sparkroot.experimental") \
              .load(PATH + sample + "/*.root") \
              .select(requiredColumns) \
              .withColumn("label", lit(label))
  dfList.append(tmpDF)

Loading qcd_lepFilter_13TeV sample...
Loading ttbar_lepFilter_13TeV sample...
Loading Wlnu_lepFilter_13TeV sample...


In [None]:
# Merge all samples into a single dataframe
df = dfList[0]
for tmpDF in dfList[1:]:
    df = df.union(tmpDF)

In [None]:
df.select("EFlowTrack").printSchema()

In [None]:
# Import Lorentz Vector and other functions for pTmaps
from utilFunctions import *

def selection(event, TrkPtMap, NeuPtMap, PhotonPtMap, PTcut=23., ISOcut=0.45):
    """
    This function simulates the trigger selection. 
    Foreach event the presence of one isolated muon or electron with pT >23 GeV is required
    """
    if event.Electron_size == 0 and event.MuonTight_size == 0: 
        return False, False, False
    
    foundMuon = None 
    foundEle =  None 
    
    l = LorentzVector()
    
    for ele in event.Electron:
        if ele.PT <= PTcut: continue
        l.SetPtEtaPhiM(ele.PT, ele.Eta, ele.Phi, 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundEle == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
            foundEle = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                        0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                        0., 0., 0., 1., 0., float(ele.Charge)]
    
    for muon in event.MuonTight:
        if muon.PT <= PTcut: continue
        l.SetPtEtaPhiM(muon.PT, muon.Eta, muon.Phi, 0.)
        
        pfisoCh = PFIso(l, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(l, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(l, 0.3, PhotonPtMap, False)
        if foundMuon == None and (pfisoCh+pfisoNeu+pfisoGamma)<ISOcut:
            foundMuon = [l.E(), l.Px(), l.Py(), l.Pz(), l.Pt(), l.Eta(), l.Phi(),
                         0., 0., 0., pfisoCh, pfisoGamma, pfisoNeu,
                         0., 0., 0., 0., 1., float(muon.Charge)]
            
    if foundEle != None and foundMuon != None:
        if foundEle[5] > foundMuon[5]:
            return True, foundEle, foundMuon
        else:
            return True, foundMuon, foundEle
    if foundEle != None: return True, foundEle, foundMuon
    if foundMuon != None: return True, foundMuon, foundEle
    
    return False, None, None

In [None]:
import math
import numpy as np
from pyspark.ml.linalg import Vectors

def convert(event):
    """
    This function takes as input an event, applies trigger selection 
    and create LLF and HLF datasets
    """
    q = LorentzVector()
    particles = []
    TrkPtMap = ChPtMapp(0.3, event)
    NeuPtMap = NeuPtMapp(0.3, event)
    PhotonPtMap = PhotonPtMapp(0.3, event)
    if TrkPtMap.shape[0] == 0: return Row()
    if NeuPtMap.shape[0] == 0: return Row()
    if PhotonPtMap.shape[0] == 0: return Row()
    
    #
    # Get leptons
    #
    selected, lep, otherlep = selection(event, TrkPtMap, NeuPtMap, PhotonPtMap)
    if not selected: return Row()
    particles.append(lep)
    lepMomentum = LorentzVector(lep[1], lep[2], lep[3], lep[0])
    
    #
    # Select Tracks
    #
    nTrk = 0
    for h in event.EFlowTrack:
        if nTrk>=450: continue
        if h.PT<=0.5: continue
        q.SetPtEtaPhiM(h.PT, h.Eta, h.Phi, 0.)
        if lepMomentum.DeltaR(q) > 0.0001:
            pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
            pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
            pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
            particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                              h.PT, h.Eta, h.Phi, h.X, h.Y, h.Z,
                              pfisoCh, pfisoGamma, pfisoNeu,
                              1., 0., 0., 0., 0., float(np.sign(h.PID))])
            nTrk += 1
    
    #
    # Select Photons
    #
    nPhoton = 0
    for h in event.EFlowPhoton:
        if nPhoton >= 150: continue
        if h.ET <= 1.: continue
        q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h.ET, h.Eta, h.Phi, 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 0., 1., 0., 0., 0.])
        nPhoton += 1
    
    #
    # Select Neutrals
    #
    nNeu = 0
    for h in event.EFlowNeutralHadron:
        if nNeu >= 200: continue
        if h.ET <= 1.: continue
        q.SetPtEtaPhiM(h.ET, h.Eta, h.Phi, 0.)
        pfisoCh = PFIso(q, 0.3, TrkPtMap, True)
        pfisoNeu = PFIso(q, 0.3, NeuPtMap, False)
        pfisoGamma = PFIso(q, 0.3, PhotonPtMap, False)
        particles.append([q.E(), q.Px(), q.Py(), q.Pz(),
                          h.ET, h.Eta, h.Phi, 0., 0., 0.,
                          pfisoCh, pfisoGamma, pfisoNeu,
                          0., 1., 0., 0., 0., 0.])
        nNeu += 1
        
    for iTrk in range(nTrk, 450):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0.,0.,
                          0.,0., 0., 0., 0., 0., 0., 0., 0., 0.])
    for iPhoton in range(nPhoton, 150):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
                          0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
    for iNeu in range(nNeu, 200):
        particles.append([0., 0., 0., 0., 0., 0., 0., 0., 0.,
                          0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])        
    #
    # High Level Features
    #
    myMET = event.MissingET[0]
    MET = myMET.MET
    phiMET = myMET.Phi
    MT = 2.*MET*lepMomentum.Pt()*(1-math.cos(lepMomentum.Phi()-phiMET))
    HT = 0.
    nJets = 0.
    nBjets = 0.
    for jet in event.Jet:
        if jet.PT > 30 and abs(jet.Eta)<2.6:
            nJets += 1
            HT += jet.PT
            if jet.BTag>0: 
                nBjets += 1
    LepPt = lep[4]
    LepEta = lep[5]
    LepPhi = lep[6]
    LepIsoCh = lep[10]
    LepIsoGamma = lep[11]
    LepIsoNeu = lep[12]
    LepCharge = lep[18]
    LepIsEle = lep[16]
    hlf = Vectors.dense([HT, MET, phiMET, MT, nJets, nBjets, LepPt, LepEta, LepPhi,
           LepIsoCh, LepIsoGamma, LepIsoNeu, LepCharge, LepIsEle])     
    #
    # return the Row of low level features and high level features
    #
    return Row(lfeatures=particles, hfeatures=hlf, label=event.label)

In [None]:
features = df.rdd \
            .map(convert) \
            .filter(lambda row: len(row) > 0) \
            .toDF()

In [None]:
# features.printSchema()

In [None]:
dataset_path = "/content/drive/MyDrive/UCSP/Big-Data/Final-project-data/data-processed"
num_partitions = 3000 # used in DataFrame coalesce operation to limit number of output files

%time features.coalesce(num_partitions).write.partitionBy("label").parquet(dataset_path)

In [None]:
print("Number of events written to Parquet:", spark.read.parquet(dataset_path).count())

In [None]:
spark.stop()

References:
- https://colab.research.google.com/github/asifahmed90/pyspark-ML-in-Colab/blob/master/PySpark_Regression_Analysis.ipynb#scrollTo=lh5NCoc8fsSO
-