<img src="https://swan.web.cern.ch/sites/swan.web.cern.ch/files/pictures/logo_swan_letters.png" alt="SWAN" style="float: left; width: 20%; margin-right: 15%; margin-left: 15%; margin-bottom: 2.0em;">
<img src="https://spark.apache.org/images/spark-logo-trademark.png" alt="EP-SFT" style="float: left; width: 25%; margin-right: 15%; margin-bottom: 2.0em;">
<p style="clear: both;">
<div style="text-align:center"><h1>Physics analysis with Apache Spark using Coffea and Laurelin packages from Fermilab</h1></div>
<div style="text-align:center"><i>Author: Lindsey Gray; Contact: Lindsey Gray / Prasanth Kothuri</i></div>
<hr style="border-top-width: 4px; border-top-color: #34609b;">

# Dimuon spectrum

This code is a columnar adaptation of [a ROOT tutorial](https://root.cern.ch/doc/master/df102__NanoAODDimuonAnalysis_8py.html) showcasing the awkward array toolset, and utilizing Coffea [histograms](https://coffeateam.github.io/coffea/modules/coffea.hist.html).
This also shows the analysis object syntax implemented by Coffea [JaggedCandidateArray](https://coffeateam.github.io/coffea/api/coffea.analysis_objects.JaggedCandidateMethods.html), and the usage of custom [accumulators](https://coffeateam.github.io/coffea/api/coffea.processor.AccumulatorABC.html) other than histograms.  Further, it introduces the [processor](https://coffeateam.github.io/coffea/api/coffea.processor.ProcessorABC.html) concept and the interface to apache spark.

# Configuration

In [None]:
LOCAL_EXECUTION_MODE=True
LOCAL_FILE_MODE=False

# Payload
Now let's make some Coffea

In [None]:
#
# All of these configs are either:
#   * Coffea specific and rolled into the library
#   * Site-specific and rolled into site-wide Spark configs
# The goal is that a user never sees any of this
#

import pyspark
import os, os.path

# Work around arrow compatibility issue
os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = "1"

if LOCAL_EXECUTION_MODE:
    masterURL = "local[16]"
else:
    masterURL = "spark://localhost:7077"

spark = pyspark.sql.SparkSession.builder \
    .master(masterURL) \
    .config('spark.jars.packages',
            'edu.vanderbilt.accre:laurelin:1.0.2') \
    .config('spark.driver.memory', '24g') \
    .config('spark.executor.memory', '4g') \
    .config('spark.sql.execution.arrow.enabled', 'true') \
    .config('spark.speculation', 'true') \
    .config('spark.sql.execution.arrow.maxRecordsPerBatch', 200000) \
    .config('spark.driver.extraClassPath', os.path.join(os.getcwd(), '..', 'hadoop-xrootd-1.0.4-jar-with-dependencies.jar')) \
    .config('spark.executor.extraClassPath', os.path.join(os.getcwd(), '..', 'hadoop-xrootd-1.0.4-jar-with-dependencies.jar')) \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

In [None]:
import time

%matplotlib inline
from coffea import hist
from coffea.analysis_objects import JaggedCandidateArray
import coffea.processor as processor

In [None]:
# Look at ProcessorABC documentation to see the expected methods and what they are supposed to do
class DimuonProcessor(processor.ProcessorABC):
    def __init__(self):
        self._columns = ['nMuon', 'Muon_pt', 'Muon_eta', 'Muon_phi', 'Muon_mass', 'Muon_charge']
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
        
        self._accumulator = processor.dict_accumulator({
            'mass': hist.Hist("Counts", dataset_axis, mass_axis),
            'cutflow': processor.defaultdict_accumulator(int),
        })
    
    @property
    def accumulator(self):
        return self._accumulator
    
    @property
    def columns(self):
        return self._columns
    
    def process(self, df):
        output = self.accumulator.identity()
        
        dataset = df['dataset']
        muons = JaggedCandidateArray.candidatesfromcounts(
            df['nMuon'],
            pt=df['Muon_pt'].content,
            eta=df['Muon_eta'].content,
            phi=df['Muon_phi'].content,
            mass=df['Muon_mass'].content,
            charge=df['Muon_charge'].content,
            )
        
        output['cutflow']['all events'] += muons.size
        
        twomuons = (muons.counts == 2)
        output['cutflow']['two muons'] += twomuons.sum()
        
        opposite_charge = twomuons & (muons['charge'].prod() == -1)
        output['cutflow']['opposite charge'] += opposite_charge.sum()
        
        dimuons = muons[opposite_charge].distincts()
        output['mass'].fill(dataset=dataset, mass=dimuons.mass.flatten())
        
        return output

    def postprocess(self, accumulator):
        return accumulator

In [None]:
import pyspark.sql
from pyarrow.compat import guid
from coffea.processor.spark.detail import _spark_initialize, _spark_stop
from coffea.processor.spark.spark_executor import spark_executor

print("PWD IS %s " % os.getcwd())

partitionsize = 200000
thread_workers = 16

tstart = time.time()    
if LOCAL_FILE_MODE == False:
    file_prefix = 'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/'
else:
    file_prefix = '../input/'
    
fileset = {
    'DoubleMuon': { 'files': [
        file_prefix + 'Run2012B_DoubleMuParked.root',
        file_prefix + 'Run2012C_DoubleMuParked.root',
                             ], 
                    'treename': 'Events'
                  }
}

output = processor.run_spark_job(fileset, DimuonProcessor(), spark_executor, 
                                 spark=spark, partitionsize=partitionsize, thread_workers=thread_workers,
                                 executor_args={'file_type': 'edu.vanderbilt.accre.laurelin.Root', 'cache': False}
                                )

elapsed = time.time() - tstart
print(output)

In [None]:
ax = hist.plot1d(output['mass'], overlay='dataset')
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_ylim(0.1, 1e6)

import matplotlib.pyplot as plt
ax.figure.set_size_inches(10,10)
txt_opts = {'horizontalalignment': 'center',
            'verticalalignment': 'center',
            'transform': ax.transAxes}
plt.text(0.85, 0.75, 'Z', **txt_opts)
plt.text(0.55, 0.77, r"$\Upsilon$(1,2,3S)", **txt_opts)
plt.text(0.37, 0.95, r"J/$\Psi$", **txt_opts)
plt.text(0.40, 0.77, r"$\Psi$'", **txt_opts)
plt.text(0.22, 0.80, r"$\phi$", **txt_opts)
plt.text(0.16, 0.83, r"$\rho,\omega$", **txt_opts)
plt.text(0.11, 0.78, r"$\eta$", **txt_opts);

In [None]:
print("Events/s:", output['cutflow']['all events']/elapsed)