<img src="https://swan.web.cern.ch/sites/swan.web.cern.ch/files/pictures/logo_swan_letters.png" alt="SWAN" style="float: left; width: 20%; margin-right: 15%; margin-left: 15%; margin-bottom: 2.0em;">
<img src="https://spark.apache.org/images/spark-logo-trademark.png" alt="EP-SFT" style="float: left; width: 25%; margin-right: 15%; margin-bottom: 2.0em;">
<p style="clear: both;">
<div style="text-align:center"><h1>Physics analysis with Apache Spark using Coffea and Laurelin packages from Fermilab</h1></div>
<div style="text-align:center"><i>Author: Lindsey Gray; Contact: Lindsey Gray / Prasanth Kothuri</i></div>
<hr style="border-top-width: 4px; border-top-color: #34609b;">

# Dimuon spectrum

This code is a columnar adaptation of [a ROOT tutorial](https://root.cern.ch/doc/master/df102__NanoAODDimuonAnalysis_8py.html) showcasing the awkward array toolset, and utilizing Coffea [histograms](https://coffeateam.github.io/coffea/modules/coffea.hist.html).
This also shows the analysis object syntax implemented by Coffea [JaggedCandidateArray](https://coffeateam.github.io/coffea/api/coffea.analysis_objects.JaggedCandidateMethods.html), and the usage of custom [accumulators](https://coffeateam.github.io/coffea/api/coffea.processor.AccumulatorABC.html) other than histograms.  Further, it introduces the [processor](https://coffeateam.github.io/coffea/api/coffea.processor.ProcessorABC.html) concept and the interface to apache spark.

# Configuration

In [1]:
LOCAL_EXECUTION_MODE=True
LOCAL_FILE_MODE=False

# Payload
Now let's make some Coffea

In [2]:
#
# All of these configs are either:
#   * Coffea specific and rolled into the library
#   * Site-specific and rolled into site-wide Spark configs
# The goal is that a user never sees any of this
#

import pyspark
import os, os.path

# Work around arrow compatibility issue
os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = "1"

if LOCAL_EXECUTION_MODE:
    masterURL = "local[16]"
else:
    masterURL = "spark://hammer-c026:7077"

spark = pyspark.sql.SparkSession.builder \
    .master(masterURL) \
    .config('spark.jars.packages',
            'edu.vanderbilt.accre:laurelin:1.0.2') \
    .config('spark.driver.memory', '24g') \
    .config('spark.executor.memory', '24g') \
    .config('spark.sql.execution.arrow.enabled', 'false') \
    .config('spark.speculation', 'true') \
    .config('spark.sql.execution.arrow.maxRecordsPerBatch', 200000) \
    .config('spark.driver.extraClassPath', os.path.join(os.getcwd(), '..', 'hadoop-xrootd-1.0.4-jar-with-dependencies.jar')) \
    .config('spark.executor.extraClassPath', os.path.join(os.getcwd(), '..', 'hadoop-xrootd-1.0.4-jar-with-dependencies.jar')) \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x2b7cf736d310>


In [3]:
import time

%matplotlib inline
from coffea import hist
from coffea.analysis_objects import JaggedCandidateArray
import coffea.processor as processor

In [None]:
'''
# Look at ProcessorABC documentation to see the expected methods and what they are supposed to do
class DimuonProcessor(processor.ProcessorABC):
    def __init__(self):
        self._columns = ['nMuon', 'Muon_pt', 'Muon_eta', 'Muon_phi', 'Muon_mass', 'Muon_charge']
        dataset_axis = hist.Cat("dataset", "Primary dataset")
        mass_axis = hist.Bin("mass", r"$m_{\mu\mu}$ [GeV]", 30000, 0.25, 300)
        
        self._accumulator = processor.dict_accumulator({
            'mass': hist.Hist("Counts", dataset_axis, mass_axis),
            'cutflow': processor.defaultdict_accumulator(int),
        })
    
    @property
    def accumulator(self):
        return self._accumulator
    
    @property
    def columns(self):
        return self._columns
    
    def process(self, df):
        output = self.accumulator.identity()
        
        dataset = df['dataset']
        muons = JaggedCandidateArray.candidatesfromcounts(
            df['nMuon'],
            pt=df['Muon_pt'].content,
            eta=df['Muon_eta'].content,
            phi=df['Muon_phi'].content,
            mass=df['Muon_mass'].content,
            charge=df['Muon_charge'].content,
            )
        
        output['cutflow']['all events'] += muons.size
        
        twomuons = (muons.counts == 2)
        output['cutflow']['two muons'] += twomuons.sum()
        
        opposite_charge = twomuons & (muons['charge'].prod() == -1)
        output['cutflow']['opposite charge'] += opposite_charge.sum()
        
        dimuons = muons[opposite_charge].distincts()
        output['mass'].fill(dataset=dataset, mass=dimuons.mass.flatten())
        
        return output

    def postprocess(self, accumulator):
        return accumulator
'''

Prepare data samples
===

In [4]:
from python.samples_info import SamplesInfo
samples = [
### Data ###
    'data_A', #not available for 2016
    'data_B',
#    'data_C',
#    'data_D','data_E',
#    'data_F',
#    'data_G','data_H',

### Essential MC ###    
#    'dy_m105_160_amc', 
#    'dy_m105_160_vbf_amc',
#      'ggh_amcPS', 
#     'vbf_powhegPS', 
#     'ttjets_dl',
#    "ewk_lljj_mll105_160_ptj0",

### Non-essential MC ### 
#     'ttjets_sl',
#     'ttz',
#     'ttw',
#     'st_tw_top','st_tw_antitop',
#     'ww_2l2nu',
#     'wz_2l2q',
#     'wz_3lnu',
#     'wz_1l1nu2q',
#      'zz',
# # ##
    
]

purdue = 'root://xrootd.rcac.purdue.edu/'

year = '2017'
label = 'test' # change this to save to other directory

samp_info = SamplesInfo(year=year, out_path=f'test_{year}_{label}', server=purdue, debug=True)

# 'outer' refers to parallelization by sample, 'inner' - by ROOT file in each sample
samp_info.load(samples, nchunks=1, parallelize_outer=1, parallelize_inner=10)
samp_info.compute_lumi_weights()


year:  2017
Default lumi:  41530.0
Loading data_A
Couldn't load data_A! Skipping.
Loading data_B

Total events in 2017: 769080716
Loaded 144492 of 2017 data events
This is ~ 0.02% of 2017 data.
Integrated luminosity: 41530.0/pb

Missing samples: ['data_A']
Loading took 15.61 s


In [5]:
import pyspark.sql
from pyarrow.compat import guid
from coffea.processor.spark.detail import _spark_initialize, _spark_stop
from coffea.processor.spark.spark_executor import spark_executor
from python.dimuon_processor import DimuonProcessor
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
print("PWD IS %s " % os.getcwd())

partitionsize = 20000000
thread_workers = 16

tstart = time.time()    
if LOCAL_FILE_MODE == False:
    file_prefix = 'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/'
else:
    file_prefix = '../input/'
    
fileset = {
    'DoubleMuon': { 'files': [
        file_prefix + 'Run2012B_DoubleMuParked.root',
        file_prefix + 'Run2012C_DoubleMuParked.root',
                             ], 
                    'treename': 'Events'
                  }
}

output = processor.run_spark_job(fileset, DimuonProcessor(samp_info=samp_info, do_jecunc=False, use_spark=True), spark_executor, 
                                 spark=spark, partitionsize=partitionsize, thread_workers=thread_workers,
                                 executor_args={'file_type': 'edu.vanderbilt.accre.laurelin.Root', 'cache': False}
                                )

elapsed = time.time() - tstart
print(output)

PWD IS /home/spiperov/tmp/Jun5_SPARK_tests/hammer-slurm-spark-coffea/notebooks 


HBox(children=(FloatProgress(value=0.0, description='loading', max=1.0, style=ProgressStyle(description_width=…




HBox(children=(FloatProgress(value=0.0, description='pruning', max=1.0, style=ProgressStyle(description_width=…




HBox(children=(FloatProgress(value=0.0, description='Processing', max=1.0, style=ProgressStyle(description_wid…




Py4JJavaError: An error occurred while calling o527.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 673, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 290, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 101, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 92, in verify_result_length
    result = f(*a)
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<string>", line 33, in coffea_udf
  File "/home/spiperov/tmp/Jun5_SPARK_tests/hammer-slurm-spark-coffea/notebooks/python/dimuon_processor.py", line 273, in process
    df = df[hlt&(df.Muon.counts>1)]
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/virtual.py", line 407, in counts
    return self._util_counts(self.array)
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/base.py", line 440, in _util_counts
    return array.counts
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/jagged.py", line 378, in counts
    self._valid()
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/jagged.py", line 469, in _valid
    raise ValueError("maximum offset {0} is beyond the length of the content ({1})".format(self._offsets.max(), len(self._content)))
ValueError: maximum offset 493478 is beyond the length of the content (200000)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3263)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3260)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3260)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 290, in dump_stream
    for series in iterator:
  File "<string>", line 1, in <lambda>
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 101, in <lambda>
    return lambda *a: (verify_result_length(*a), arrow_return_type)
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 92, in verify_result_length
    result = f(*a)
  File "/home/spiperov/.conda/envs/cent7/5.3.1-py37/hmumu_coffea/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<string>", line 33, in coffea_udf
  File "/home/spiperov/tmp/Jun5_SPARK_tests/hammer-slurm-spark-coffea/notebooks/python/dimuon_processor.py", line 273, in process
    df = df[hlt&(df.Muon.counts>1)]
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/virtual.py", line 407, in counts
    return self._util_counts(self.array)
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/base.py", line 440, in _util_counts
    return array.counts
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/jagged.py", line 378, in counts
    self._valid()
  File "/home/spiperov/.local/lib/python3.7/site-packages/awkward/array/jagged.py", line 469, in _valid
    raise ValueError("maximum offset {0} is beyond the length of the content ({1})".format(self._offsets.max(), len(self._content)))
ValueError: maximum offset 493478 is beyond the length of the content (200000)

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
	at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
	at org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
	at org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:823)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
ax = hist.plot1d(output['mass'], overlay='dataset')
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_ylim(0.1, 1e6)

import matplotlib.pyplot as plt
ax.figure.set_size_inches(10,10)
txt_opts = {'horizontalalignment': 'center',
            'verticalalignment': 'center',
            'transform': ax.transAxes}
plt.text(0.85, 0.75, 'Z', **txt_opts)
plt.text(0.55, 0.77, r"$\Upsilon$(1,2,3S)", **txt_opts)
plt.text(0.37, 0.95, r"J/$\Psi$", **txt_opts)
plt.text(0.40, 0.77, r"$\Psi$'", **txt_opts)
plt.text(0.22, 0.80, r"$\phi$", **txt_opts)
plt.text(0.16, 0.83, r"$\rho,\omega$", **txt_opts)
plt.text(0.11, 0.78, r"$\eta$", **txt_opts);

In [None]:
print("Events/s:", output['cutflow']['all events']/elapsed)