<!-- <img  src="https://swan.web.cern.ch/sites/swan.web.cern.ch/files/pictures/logo_swan_letters.png" alt="SWAN" style="float: left; width: 15%; margin-right: 5%; margin-left: 17%; margin-top: 1.0em; margin-bottom: 2.0em;">
<img src="https://spark.apache.org/images/spark-logo-trademark.png" alt="EP-SFT" style="float: left; width: 25%; margin-right: 0%; margin-left: 0%; margin-bottom: 2.0em;">
<img src="https://cms-docdb.cern.ch/cgi-bin/PublicDocDB/RetrieveFile?docid=3045&filename=CMSlogo_color_label_1024_May2014.png&version=3" alt="CMS" style="float: left; width: 12%; margin-left: 5%; margin-right: 5%; margin-bottom: 2.0em;"> -->
<p style="clear: both;">
<div style="text-align:center"><h1>CMS H&#8594;µµ analysis  
     <br> with Coffea package from Fermilab</h1></div>
<div style="text-align:center"><i>Author: Dmitry Kondratyev, based on example code by Lindsey Gray and adapted for the demonstration by Stefan Piperov</i></div>
<hr style="border-top-width: 4px; border-top-color: #34609b;">

# Search for Higgs boson decaying into two muons

This code uses awkward array toolset, and utilizing Coffea [histograms](https://coffeateam.github.io/coffea/modules/coffea.hist.html).
This also shows the analysis object syntax implemented by Coffea [JaggedCandidateArray](https://coffeateam.github.io/coffea/api/coffea.analysis_objects.JaggedCandidateMethods.html), and the usage of custom [accumulators](https://coffeateam.github.io/coffea/api/coffea.processor.AccumulatorABC.html) other than histograms.  Further, it introduces the [processor](https://coffeateam.github.io/coffea/api/coffea.processor.ProcessorABC.html) concept and the interface to apache spark.


Instructions to run on Hammer at Purdue:
===
It is assumed that this demonstration is being run in a Jupyter Lab session on Hammer, as described in the README file.  


In [None]:
%%bash -l
hostname
ml anaconda/5.3.1-py37
source activate hmumu_coffea
echo $SPARK_HOME
pwd
voms-proxy-info

Prepare directories for output files  

**Note** these paths need to be changed also further down in "Out_dir" in the "Run the SPARK Executor" cell

In [None]:
%%bash -l 
# mkdir -p /tmp/spiperov/hmm/coffea/test_2016_test/unbinned
mkdir -p /depot/cms/hmm/coffea/Stefan/test_2018_test/unbinned
mkdir -p /depot/cms/hmm/coffea/Stefan/test_2018_test/binned

In [None]:
import time
import coffea
print("Coffea version: ", coffea.__version__)
import socket

print(socket.gethostname())
from coffea import util
import coffea.processor as processor
import multiprocessing as mp
print(f"{mp.cpu_count()} CPUs")

Prepare data samples
===


In [None]:
from python.samples_info import SamplesInfo
samples = [
### Data ###
    'data_A', #not available for 2016
    'data_B',
    'data_C',
    'data_D','data_E',
    'data_F',
    'data_G','data_H',

### Essential MC ###    
    'dy_m105_160_amc', 
    'dy_m105_160_vbf_amc',
#      'ggh_amcPS', 
#     'vbf_powhegPS', 
#     'ttjets_dl',
#    "ewk_lljj_mll105_160_ptj0",

### Non-essential MC ### 
#     'ttjets_sl',
#     'ttz',
#     'ttw',
#     'st_tw_top','st_tw_antitop',
#     'ww_2l2nu',
#     'wz_2l2q',
#     'wz_3lnu',
#     'wz_1l1nu2q',
#      'zz',
# # ##
    
]

purdue = 'root://xrootd.rcac.purdue.edu/'

year = '2017'
label = 'test' # change this to save to other directory

samp_info = SamplesInfo(year=year, out_path=f'test_{year}_{label}', server=purdue, debug=True)

# 'outer' refers to parallelization by sample, 'inner' - by ROOT file in each sample
samp_info.load(samples, nchunks=1, parallelize_outer=1, parallelize_inner=10)
samp_info.compute_lumi_weights()


Prepare SPARK executor
===


In [None]:
# Run this cell before establishing spark connection

import os
os.environ['PYTHONPATH'] = os.environ['PYTHONPATH'] + ':' + '/usr/local/lib/python3.6/site-packages'
# os.environ['PYTHONPATH'] = os.environ['PYTHONPATH'] + ':' + '/home/spiperov/dmitry/hmumu-coffea_20May2020'

In [None]:
import pyspark.sql
from pyarrow.compat import guid
from coffea.processor.spark.detail import _spark_initialize, _spark_stop
from coffea.processor.spark.spark_executor import spark_executor
from python.dimuon_processor import DimuonProcessor

distributed_cluster = True

if distributed_cluster:
# Distributed SPARK cluster started in advance on hammer-c019
    spark_config = pyspark.sql.SparkSession.builder \
    .appName('spark-executor-test-%s' % guid()) \
    .master('spark://hammer-c019.rcac.purdue.edu:7077') \
    .config('spark.driver.memory', '20g') \
    .config('spark.executor.memory', '20g') \
    .config('spark.sql.execution.arrow.enabled','true') \
    .config('spark.sql.execution.arrow.maxRecordsPerBatch', 200000)
else:
# local spark cluster created on the spot
    spark_config = pyspark.sql.SparkSession.builder \
    .appName('spark-executor-test-%s' % guid()) \
    .master('local[*]') \
    .config('spark.driver.memory', '20g') \
    .config('spark.executor.memory', '20g') \
    .config('spark.sql.execution.arrow.enabled','true') \
    .config('spark.sql.execution.arrow.maxRecordsPerBatch', 200000)

spark = _spark_initialize(config=spark_config, log_level='WARN', 
                          spark_progress=False, laurelin_version='0.5.1')
print(spark)

partitionsize = 200000
thread_workers = 2




In [None]:
print(samp_info.full_fileset)

In [None]:
print(samp_info.filesets_chunked.items())

Run the SPARK executor
===


In [None]:
tstart = time.time()

"""
fileset1 = {
    'DoubleMuon': { 'files': [
        'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root',
        'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012C_DoubleMuParked.root',
                             ], 
                    'treename': 'Events'
                  }
}
output = processor.run_spark_job(fileset1, DimuonProcessor(samp_info=samp_info, do_jecunc=False), spark_executor, spark=spark,\
                                         partitionsize=partitionsize, thread_workers=thread_workers,\
                                         executor_args={'file_type': 'edu.vanderbilt.accre.laurelin.Root', 'cache': False, 'nano': True, 'retries': 5})
"""
output = processor.run_spark_job(samp_info.full_fileset, DimuonProcessor(samp_info=samp_info, do_jecunc=False, use_spark=True), spark_executor, spark=spark,\
                                         partitionsize=partitionsize, thread_workers=thread_workers,\
                                         executor_args={'file_type': 'edu.vanderbilt.accre.laurelin.Root', 'cache': False, 'nano': True, 'retries': 5})
"""
for ds_name, fileset_ in samp_info.filesets_chunked.items():
    for ichunk, ifileset in enumerate(fileset_):
        print(f"Processing {ds_name}, chunk {ichunk+1}/{samp_info.nchunks} ...")
        print(ifileset)

#        output = processor.run_spark_job(fileset, DimuonProcessor(), spark_executor,\
#                                spark=spark, partitionsize=partitionsize, thread_workers=thread_workers,\
#                                executor_args={'file_type': 'edu.vanderbilt.accre.laurelin.Root', 'cache': False})
        output = processor.run_spark_job(ifileset, DimuonProcessor(samp_info=samp_info, do_jecunc=False), spark_executor, spark=spark,\
                                         partitionsize=partitionsize, thread_workers=thread_workers,\
                                         executor_args={'file_type': 'edu.vanderbilt.accre.laurelin.Root', 'cache': False, 'nano': True, 'retries': 5})
###
        #out_dir = f"/depot/cms/hmm/coffea/{samp_info.out_path}/"
        out_dir = f"/depot/cms/hmm/coffea/Stefan/{samp_info.out_path}/"
        #out_dir = f"/tmp/spiperov/hmm/coffea/{samp_info.out_path}/"

        try:
            os.mkdir(out_dir)
        except:
            pass

        for mode in output.keys():
            out_dir_ = f"{out_dir}/{mode}/"
            out_path_ = f"{out_dir_}/{ds_name}_{ichunk}.coffea"
            try:
                os.mkdir(out_dir_)
            except:
                pass
            util.save(output[mode], out_path_)

        output.clear()
        print(f"Saved output to {out_dir}")
    
elapsed = time.time() - tstart
"""
print(f"Total time: {elapsed} s")

Plot Data/MC comparison
---

In [None]:
import os,glob
import argparse
from python.postprocessing import postprocess, plot, save_shapes
from config.variables import variables
from config.datasets import datasets
import pandas as pd

year = '2016'
label = 'test'

to_plot = ['dimuon_mass']
# to_plot = ['dimuon_mass','mu1_pt']
vars_to_plot = {v.name:v for v in variables if v.name in to_plot}
samples = [
    'data_A',
    'data_B',
    'data_C',
    'data_D',
    'data_E',
    'data_F',
    'data_G',
    'data_H',
    'dy_m105_160_amc',
    'dy_m105_160_vbf_amc',
    'ewk_lljj_mll105_160_ptj0',
    'ttjets_dl',
    'ttjets_sl',
    'ttz',
    'ttw',
    'st_tw_top','st_tw_antitop',
    'ww_2l2nu',
    'wz_2l2q',
    'wz_3lnu',
    'zz',
    'ggh_amcPS',
    'vbf_powhegPS',
]


postproc_args = {
    'modules': ['to_pandas',  'get_hists'],
    'year': year,
    'label': label,
#    'in_path': f'/depot/cms/hmm/coffea/test_{year}_{label}/',
    'in_path': f'/depot/cms/hmm/coffea/Stefan/test_{year}_{label}/',
#    'in_path': f'/tmp/spiperov/hmm/coffea/test_{year}_{label}/',
    'syst_variations': ['nominal'],
    'samples':samples,
    'channels': ['vbf'],
    'regions': ['h-peak', 'h-sidebands'],
    'vars_to_plot': list(vars_to_plot.values()),
    'wgt_variations': False,
}


dfs, hist_dfs, edges = postprocess(postproc_args)
hist = {}
for var, hists in hist_dfs.items():
    hist[var] = pd.concat(hists, ignore_index=True)

plot(vars_to_plot['dimuon_mass'], hist, 'wgt_nominal', edges['dimuon_mass'], postproc_args, save=False, show=True, plotsize=8)
# for vname, var in vars_to_plot.items():
#     for r in postproc_args['regions']:
#        plot(var, hist, 'wgt_nominal', edges[vname], postproc_args, r, save=False, show=True, plotsize=8)