# Setup
```
source /cvmfs/sft.cern.ch/lcg/views/LCG_88/x86_64-slc6-gcc49-opt/setup.sh
source /cvmfs/sft.cern.ch/lcg/etc/hadoop-confext/hadoop-setconf.sh analytix
export KRB5CCNAME=FILE:/tmp/${USER}_krb
kinit -c /tmp/${USER}_krb
export PYSPARK_DRIVER_PYTHON=jupyter-notebook
export PYSPARK_DRIVER_PYTHON_OPTS="--ip=`hostname` --browser='/dev/null' --port=8889"
pyspark --master yarn --packages org.diana-hep:spark-root_2.11:0.1.16 --files $KRB5CCNAME#krbcache --conf spark.executorEnv.KRB5CCNAME='FILE:$PWD/krbcache' --conf spark.driver.extraClassPath="/usr/lib/hadoop/EOSfs.jar"
```

In [1]:
from math import *
#import numpy as np
#from pyspark.sql import Row

In [2]:
filename = "root://eospublic.cern.ch//eos/opendata/cms/MonteCarlo2012/Summer12_DR53X/ZZTo4mu_8TeV-powheg-pythia6/AODSIM/PU_RD1_START53_V7N-v1/20000/"

In [3]:
ds = sqlContext.read.format("org.dianahep.sparkroot.experimental").option("tree","Events").load(filename)

In [4]:
ds.count()

1499064

In [5]:
ds.printSchema()

root
 |-- EventAuxiliary: struct (nullable = true)
 |    |-- processHistoryID_: struct (nullable = true)
 |    |    |-- hash_: string (nullable = true)
 |    |-- id_: struct (nullable = true)
 |    |    |-- run_: integer (nullable = true)
 |    |    |-- luminosityBlock_: integer (nullable = true)
 |    |    |-- event_: integer (nullable = true)
 |    |-- processGUID_: string (nullable = true)
 |    |-- time_: struct (nullable = true)
 |    |    |-- timeLow_: integer (nullable = true)
 |    |    |-- timeHigh_: integer (nullable = true)
 |    |-- luminosityBlock_: integer (nullable = true)
 |    |-- isRealData_: boolean (nullable = true)
 |    |-- experimentType_: integer (nullable = true)
 |    |-- bunchCrossing_: integer (nullable = true)
 |    |-- orbitNumber_: integer (nullable = true)
 |    |-- storeNumber_: integer (nullable = true)
 |-- EventProductProvenance: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- branchID_: integer (nullable = true

In [6]:
dsMuons = ds.select("recoMuons_muons__RECO_.recoMuons_muons__RECO_obj").toDF("muons")

In [7]:
dsMuons.printSchema()

root
 |-- muons: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- qx3_: integer (nullable = true)
 |    |    |-- pt_: float (nullable = true)
 |    |    |-- eta_: float (nullable = true)
 |    |    |-- phi_: float (nullable = true)
 |    |    |-- mass_: float (nullable = true)
 |    |    |-- vertex_: struct (nullable = true)
 |    |    |    |-- fCoordinates: struct (nullable = true)
 |    |    |    |    |-- fX: float (nullable = true)
 |    |    |    |    |-- fY: float (nullable = true)
 |    |    |    |    |-- fZ: float (nullable = true)
 |    |    |-- pdgId_: integer (nullable = true)
 |    |    |-- status_: integer (nullable = true)
 |    |    |-- innerTrack_: struct (nullable = true)
 |    |    |    |-- recoMuons_muons__RECO_obj_innerTrack__product_: struct (nullable = true)
 |    |    |    |    |-- processIndex_: short (nullable = true)
 |    |    |    |    |-- productIndex_: short (nullable = true)
 |    |    |    |    |-- elementIndex_: inte

In [8]:
dsMuons.show()

+--------------------+
|               muons|
+--------------------+
|[[-3,20.111635,-1...|
|[[-3,49.427155,-2...|
|[[-3,2.2016237,-1...|
|[[3,33.72951,-2.3...|
|[[-3,3.9327354,-1...|
|[[3,21.213083,-2....|
|[[-3,6.4963756,-2...|
|[[-3,40.716217,1....|
|[[3,1.9911032,2.1...|
|[[3,70.31914,1.69...|
|[[-3,73.06647,-2....|
|[[3,9.637147,-1.0...|
|[[3,42.748066,-2....|
|[[-3,27.396252,1....|
|[[3,3.5388916,-1....|
|[[-3,36.111237,2....|
|[[-3,4.989803,-2....|
|[[-3,36.44466,-0....|
|[[3,34.97526,-1.2...|
|[[3,68.98857,-1.7...|
+--------------------+
only showing top 20 rows



In [9]:
# helper function to calculate isolation from the struct
def iso(isoStruct): 
    neutral = max(0.0, isoStruct.sumNeutralHadronEt + isoStruct.sumPhotonEt - 0.5 * isoStruct.sumPUPt)
    return isoStruct.sumChargedHadronPt + neutral

# just do the looser muon selection here.  will do a second selection for leading muon later
def passMuonSel(muon):
    return ((muon.pt_ > 10.0) and 
        (fabs(muon.eta_) < 2.4) and
        (iso(muon.pfIsolationR04_)/muon.pt_ < 0.5) and
        ((muon.type_ & (1<<1)) != 0)) # global muon 

# method to apply the event level selection cuts,
#  including tighter cuts on the leading muon
def passEventSel(muons):
    return ((len(muons) > 1) and
            (muons[0].pt_ > 25.0) and
            (fabs(muons[0].eta_) < 2.1) and
            (iso(muons[0].pfIsolationR04_)/muons[0].pt_ < 0.12) and
            (muons[0].pdgId_ * muons[1].pdgId_ < 0))

# simplified formula, assuming E >> m
def invariantMass(mu1, mu2):
    return sqrt(2*mu1.pt_*mu2.pt_*(cosh(mu1.eta_-mu2.eta_)-cos(mu1.phi_-mu2.phi_)))
    
def handleEvent(event):
    # first select muons
    selMuons = [muon for muon in event.muons if passMuonSel(muon)]
    # sort in decreasing order of muon pT - makes a noticeable difference in how many events pass
    # if not sorting, can reproduce the scala results exactly
    #sortedMuons = sorted(selMuons, key=lambda muon: -muon.pt_)
    sortedMuons = selMuons
    # check if event passes selection (including requiring at least 2 muons)
    if passEventSel(sortedMuons):
        return [invariantMass(sortedMuons[0], sortedMuons[1])]
        ### from victor's example:
        # muon1, muon2 = sortedMuons[:2]
        # return [Row(mass=invariantMass(muon1, muon2), pt1=muon1.pt_, phi1=muon1.phi_, eta1=muon1.eta_, pt2=muon2.pt_, phi2=muon2.phi_, eta2=muon2.eta_)]
    else:
        return []

In [10]:
#dsDimuons = dsMuons.rdd.filter(lambda event: len(event.muons) > 1)
#print dsDimuons.count()

In [11]:
# do all the steps at once.. didn't immediately see how to do object selection in a separate step
dsMll = dsMuons.rdd.flatMap(handleEvent).map(lambda x: (x, )).toDF().persist()

In [12]:
dsMll.show()

+------------------+
|                _1|
+------------------+
| 75.17152550288458|
| 72.40964018091776|
|   91.587498504177|
| 83.44084509897125|
| 93.57815817875573|
| 90.00495283242167|
| 82.86015951533207|
| 81.95454594633969|
| 83.10113910084489|
| 66.45596691200133|
| 79.47300208296096|
| 91.44308967574325|
| 68.42604644840561|
| 51.22710389646728|
| 51.72832692826305|
| 308.0664543788408|
| 51.83220175373299|
| 90.88472726085514|
|59.662699748378635|
| 36.30271355896827|
+------------------+
only showing top 20 rows



In [13]:
dsMll.write.parquet("file:/tmp/olivito/test_results_mll_pyspark.parquet", mode="overwrite")