In [None]:
# NB: This is for spark running on parquet files converted from
#     baconbits skims. This is a prototype, there is lots of boilerplate.
#     We're making it better :-)

import pyspark.sql
import os
import sys
from pyarrow.compat import guid
from coffea.processor.spark.detail import _spark_initialize

# The following line is necessary because we're working in a
# virtualenv. Without it, executors will use the wrong interpreter!
#os.environ['PYSPARK_PYTHON'] = sys.executable

#    .config("spark.driver.extraClassPath","/home/cms.lgray/sparkMeasure/target/scala-2.12/spark-measure_2.12-0.14-SNAPSHOT.jar") \
#    .master('local[*]') \
#.master('') \
#spark://cmsspark-submit.fnal.gov:7077
conf = pyspark.sql.SparkSession.builder \
    .appName('baconbits-spark-%s' % guid()) \
    .master('spark://cmsspark-submit.fnal.gov:7077') \
    .config('spark.driver.cores',8) \
    .config('spark.driver.memory','10g') \
    .config('spark.executor.memory', "20g") \
    .config('spark.debug.maxToStringFields',1000) \
    .config('spark.executorEnv.XRD_RUNFORKHANDLER','1') \
    .config('spark.executorEnv.X509_PROXY',"%s" % os.environ['X509_USER_PROXY']) \
    .config('spark.files','file://%s' % os.environ['X509_USER_PROXY']) \
    .config('spark.sql.execution.arrow.enabled','true') \
    .config("spark.blockManager.port", "8000") \
    .config('spark.sql.execution.arrow.pyspark.enabled','true') \
    .config('spark.sql.execution.arrow.maxRecordsPerBatch', 200000) \
    .config('spark.hadoop.fs.xrootd.read.buffer', 100 * 1048576) \
    .config('spark.hadoop.fs.xrootd.write.buffer', 100 * 1048576) \
    .config('spark.driver.extraClassPath','file:///opt/hadoop-xrootd/hadoop-xrootd-1.0.4-jar-with-dependencies.jar') \
    .config('spark.executor.extraClassPath','file:///opt/hadoop-xrootd/hadoop-xrootd-1.0.4-jar-with-dependencies.jar')
session = _spark_initialize(config=conf,log_level='WARN',laurelin_version='0.4.2-SNAPSHOT')
sc = session.sparkContext
sc.setLogLevel("WARN")
spark = session

#from sparkmeasure import TaskMetrics
#taskmetrics = TaskMetrics(spark)

partitionsize = 200000
thread_workers = 16


In [None]:
import pyspark.sql.functions as fn
from tqdm import tqdm
import json

datasets = {}

with open('metadata/samples_allyears.json') as f:
    temp = json.load(f)
    for dsgroup,datasetlist in temp.items():
        if dsgroup != 'Hbb_create_2017': continue
        datasets = datasetlist


        
datasets_spark = {}

for ds, flist in datasets.items():
    if '/skim/' in flist[0]:
        datasets_spark[ds] = {'files': flist, 'treename': 'otree'}
    else:
        datasets_spark[ds] = {'files': flist, 'treename': 'Events'}


In [None]:
#get the hbb analysis worker
from coffea.util import load

processor_instance = load('boostedHbbProcessor.coffea')


In [None]:
import time
from coffea.processor import run_spark_job
from coffea.processor.spark.spark_executor import spark_executor

tic = time.time()
final_accumulator = run_spark_job(datasets_spark, processor_instance, spark_executor, 
                                  spark=spark, partitionsize=partitionsize, thread_workers=thread_workers,
                                  executor_args={'file_type': 'root', 'cache': True})
dt = time.time() - tic


In [None]:
nevt = sum(spark_executor.counts.values())
print('processed:',nevt,'events')
print('total time: ',dt/60)
print('μs/evt', dt/nevt*1e6)
print('Mevt/s', nevt/dt/1e6)


In [None]:
from coffea import hist
from coffea.util import save
import gzip
import pickle
import numexpr
import numpy as np

nbins = sum(sum(arr.size for arr in h._sumw.values()) for h in final_accumulator.values() if isinstance(h, hist.Hist))
nfilled = sum(sum(np.sum(arr>0) for arr in h._sumw.values()) for h in final_accumulator.values() if isinstance(h, hist.Hist))
print("Processed %.1fM events" % (nevt/1e6, ))
print("Filled %.1fM bins" % (nbins/1e6, ))
print("Nonzero bins: %.1f%%" % (100*nfilled/nbins, ))

# Pickle is not very fast or memory efficient, will be replaced by something better soon
save(final_accumulator,'hists.coffea')

#dt = time.time() - tstart
#print("%.2f us*cpu/event overall" % (1e6*dt*nworkers/final_accumulators['nentries'], ))


In [None]:
spark.stop()
