In [None]:
import parsl
import os
from parsl.app.app import python_app, bash_app
from parsl.configs.local_threads import config

from parsl.providers import LocalProvider,CondorProvider
from parsl.channels import LocalChannel,SSHChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor

from parsl.addresses import address_by_hostname

x509_proxy = 'x509up_u%s'%(os.getuid())

wrk_init = '''
source /cvmfs/sft.cern.ch/lcg/views/LCG_95apython3/x86_64-centos7-gcc7-opt/setup.sh
export PATH=`pwd`/.local/bin:$PATH
export PYTHONPATH=`pwd`/.local/lib/python3.6/site-packages:$PYTHONPATH

export X509_USER_PROXY=`pwd`/%s
mkdir -p ./htex_Local
'''%(x509_proxy)

twoGB = 2048
nproc = 4

condor_cfg = '''
transfer_output_files = htex_Local
RequestMemory = %d
RequestCpus = %d
'''%(twoGB*nproc,nproc)

xfer_files = ['/afs/hep.wisc.edu/home/lgray/.local','/tmp/%s'%(x509_proxy)]

#envs={'PYTHONPATH':'/afs/hep.wisc.edu/home/lgray/.local/lib/python3.6/site-packages:%s'%os.environ['PYTHONPATH'],
#      'X509_USER_PROXY':'./%s'%x509_proxy,
#      'PATH':'/afs/hep.wisc.edu/home/lgray/.local/bin:%s'%os.environ['PATH']}

local_htex = Config(
    executors=[
        HighThroughputExecutor(
            label="htex_Local",
            address=address_by_hostname(),
            prefetch_capacity=0,
            worker_debug=True,
            cores_per_worker=1,
            max_workers=nproc,
            #max_blocks=200,
            #workers_per_node=1,
            worker_logdir_root='./',
            provider=CondorProvider(
                channel=LocalChannel(),
                init_blocks=16,
                max_blocks=200,
                nodes_per_block=1,
                worker_init = wrk_init,                
                transfer_input_files=xfer_files,
                scheduler_options=condor_cfg
            ),
        )
    ],
    #strategy=None,
)

#parsl.set_stream_logger() # <-- log everything to stdout

print(parsl.__version__)
print(parsl.load(local_htex))

In [None]:
@python_app
def hello ():
    say_hello = 'Hello World!'
    print(say_hello)
    return say_hello

In [None]:
print(hello().result())


In [None]:
import lz4.frame as lz4f
import cloudpickle as cpkl
import pprint
import numpy as np
from fnal_column_analysis_tools import hist, processor

class ParslTestProcessor(processor.ProcessorABC):
    def __init__(self, debug=False):
        self._debug = debug

        dataset = hist.Cat("dataset", "Primary dataset")
        gencat = hist.Bin("AK8Puppijet0_isHadronicV", "Matched", 4, 0., 4)
        jetpt = hist.Bin("AK8Puppijet0_pt", "Jet $p_T$", [450, 500, 550, 600, 675, 800, 1000])
        jetpt_coarse = hist.Bin("AK8Puppijet0_pt", "Jet $p_T$", [450, 2000])
        jetmass = hist.Bin("AK8Puppijet0_msd", "Jet $m_{sd}$", 23, 40, 201)
        jetmass_coarse = hist.Bin("AK8Puppijet0_msd", "Jet $m_{sd}$", [40, 100, 140, 200])
        jetrho = hist.Bin("jetrho", r"Jet $\rho$", 13, -6, -2.1)
        doubleb = hist.Bin("AK8Puppijet0_deepdoubleb", "Double-b", 20, 0., 1)
        doublec = hist.Bin("AK8Puppijet0_deepdoublec", "Double-c", 20, 0., 1.)
        doublecvb = hist.Bin("AK8Puppijet0_deepdoublecvb", "Double-cvb", 20, 0., 1.)
        doubleb_coarse = [1., 0.93, 0.92, 0.89, 0.85, 0.7,0.]
        doubleb_coarse = hist.Bin("AK8Puppijet0_deepdoubleb", "Double-b", doubleb_coarse[::-1])
        doublec_coarse = [0.87, 0.84, 0.83, 0.79, 0.69, 0.58,0.]
        doublec_coarse = hist.Bin("AK8Puppijet0_deepdoublec", "Double-c", doublec_coarse[::-1])
        doublecvb_coarse = [0.93, 0.91, 0.86, 0.76, 0.6, 0.17, 0.12,0.]
        doublecvb_coarse = hist.Bin("AK8Puppijet0_deepdoublecvb", "Double-cvb", doublecvb_coarse[::-1])
        n2ddt = hist.Bin("AK8Puppijet0_N2sdb1_ddt", "N2 DDT", 20, -0.25, 0.25)
        n2ddt_coarse = hist.Bin("AK8Puppijet0_N2sdb1_ddt", "N2 DDT", [-0.1, 0.])

        hists = processor.dict_accumulator()
        hist.Hist.DEFAULT_DTYPE = 'f'  # save some space by keeping float bin counts instead of double
        
        hists['tagtensor_def'] = hist.Hist("Events", dataset,  
                                           jetpt, jetmass, 
                                           doubleb_coarse, doublec_coarse, 
                                           doublecvb_coarse)
        
        self._accumulator = hists

    @property
    def accumulator(self):
        return self._accumulator
    
    def process(self, df):
        dataset = df['dataset']
                
        AK8Puppijet_pt = df['AK8Puppi.pt']
        AK8Puppijet_msd = df['AddAK8Puppi.mass_sd0']
        AK8Puppijet_deepdoubleb = df['AddAK8Puppi.deepdoubleb']
        AK8Puppijet_deepdoublec = df['AddAK8Puppi.deepdoublec']
        AK8Puppijet_deepdoublecvb = df['AddAK8Puppi.deepdoublecvb']
                
        leading_jet = AK8Puppijet_pt.argmax()
        AK8Puppijet0_pt = AK8Puppijet_pt[leading_jet].flatten()
        AK8Puppijet0_msd = AK8Puppijet_msd[leading_jet].flatten()
        AK8Puppijet0_deepdoubleb = AK8Puppijet_deepdoubleb[leading_jet].flatten()
        AK8Puppijet0_deepdoublec = AK8Puppijet_deepdoublec[leading_jet].flatten()
        AK8Puppijet0_deepdoublecvb = AK8Puppijet_deepdoublecvb[leading_jet].flatten()
        
        hout = self.accumulator.identity()
        hout['tagtensor_def'].fill(dataset=dataset,
                                   AK8Puppijet0_pt=AK8Puppijet0_pt,
                                   AK8Puppijet0_msd=AK8Puppijet0_msd,
                                   AK8Puppijet0_deepdoubleb=AK8Puppijet0_deepdoubleb,
                                   AK8Puppijet0_deepdoublec=AK8Puppijet0_deepdoublec,
                                   AK8Puppijet0_deepdoublecvb=AK8Puppijet0_deepdoublecvb)
        
        return hout

    def postprocess(self, accumulator):
        pass
    
myprocessor = ParslTestProcessor()
myprocessorstr = lz4f.compress(cpkl.dumps(myprocessor))

In [None]:
from concurrent.futures import ProcessPoolExecutor
import concurrent.futures
import numpy as np
from io import BytesIO
import time

import random
import os.path as path
from tqdm import tqdm_notebook as tqdm

@python_app
def open_xrd_file(eosroot,fpath,dataset,myprocessorstr):
    import uproot
    import time
    import random
    import cloudpickle as cpkl
    import lz4.frame as lz4f
    from fnal_column_analysis_tools import processor,hist
    myprocessor = cpkl.loads(lz4f.decompress(myprocessorstr))
    tic1 = time.time()
    hists = myprocessor.accumulator.identity()
    numentries = 0
    with uproot.open(eosroot+fpath) as rootf:
        tree = rootf['Events']
        numentries += tree.numentries        
        df = processor.LazyDataFrame(tree)
        df['dataset'] = dataset
        hists += myprocessor.process(df)
    toc1 = time.time()
    return(hists,numentries,(toc1-tic1))

#@python_app
def open_fuse_file(fuseroot,fpath):
    import time
    import numpy as np
    tic1 = time.time()
    nevent_in_file = None
    tic1,toc1 = time.time(),None
    with np.load(fuseroot+fpath) as npzfile:
        nevents_in_file = npzfile['nevents_in_file']
    toc1 = time.time()
    return(nevents_in_file,(toc1-tic1))

def recurse_dir(xrdfs,rootdir,out):
    dirinfo = xrdfs.dirlist(rootdir)
    for dirfile in dirinfo[1]['dirlist']:
        newdir = path.join(rootdir,dirfile['name'])
        if '.root' in newdir:
            out.append(newdir)
        elif '.log.tar.gz' in newdir:
            pass
        else:
            recurse_dir(xrdfs,newdir,out)
            

def print_file(root,thefile):
    time.sleep(1)
    print(root,thefile)
    return None

In [None]:
import time
import pyxrootd.client

fnaleos = "root://cmseos.fnal.gov/"
fuseroot = "/eos/uscms"
xrdfs = pyxrootd.client.FileSystem(fnaleos)

dataset = 'WJetsToQQ_HT-800toInf_qc19_3j_TuneCP5_13TeV-madgraphMLM-pythia8'

testdir = '/store/user/lpcbacon/15/WJetsToQQ_HT-800toInf_qc19_3j_TuneCP5_13TeV_10X/WJetsToQQ_HT-800toInf_qc19_3j_TuneCP5_13TeV-madgraphMLM-pythia8/CRAB3/190125_165530/'
outfiles = list()
recurse_dir(xrdfs,testdir,outfiles)
del xrdfs


#with open('rootfiles.txt','w') as f:
#    for afile in outfiles:
#        f.write(afile)


In [None]:
#res = open_fuse_file(fuseroot,outfiles[0]).result()
#print('try fuse:',res[0],res[1])
tic = time.time()
future_to_file = {open_xrd_file(fnaleos, npzfile, dataset, myprocessorstr): npzfile for npzfile in outfiles}
time_per_file = 0.0
total_events = 0
outhists = myprocessor.accumulator.identity()
for i,future in enumerate(tqdm(concurrent.futures.as_completed(future_to_file),total=len(future_to_file))):
    res = future.result()
    time_per_file += res[2]
    total_events += res[1]
    outhists += res[0]
toc = time.time()

print('read',len(future_to_file),'files in',toc-tic,'seconds')
print(time_per_file/len(future_to_file),'seconds per file and',total_events,'total events')

In [None]:
from collections import OrderedDict

process = hist.Cat("process", "Process", sorting='placement')
process_cats = ("dataset",)
process_map = OrderedDict()
process_map["Wqq"] = ("WJetsToQQ*",)
process_map["Zqq"] = ("ZJetsToQQ*",)
process_map["Single Top"] = ("ST*")
process_map["TTbar"] = ("TTTo*",)
#process_map["QCD"] = ("QCD*",)
process_map["HToBB (x10)"] = ("GluGluHToBB*",)
process_map["HToCC (x100)"] = ("GluGluHToCC*",)

import pickle, gzip

#with gzip.open('gghbb_plots.pkl.gz','wb') as fout:
#    pickle.dump(gghbb_hists, fout)
plots = outhists['tagtensor_def'].copy()
plots = plots.group(process, process_cats, process_map)
print(plots)

In [None]:
from copy import deepcopy
all_axes = ['AK8Puppijet0_pt','AK8Puppijet0_msd',
            'AK8Puppijet0_deepdoubleb','AK8Puppijet0_deepdoublec','AK8Puppijet0_deepdoublecvb']
plots_list = ['AK8Puppijet0_pt', 'AK8Puppijet0_msd',
              'AK8Puppijet0_deepdoubleb','AK8Puppijet0_deepdoublec']
names = []
out_plots = []
for i in range(2):
    for j in range(2):        
        sum_over = deepcopy(all_axes)
        name = plots_list[i*2+j]
        sum_over.remove(name)
        out_plots.append(plots.sum(*sum_over))
for aplot in out_plots:
    print(aplot)

In [None]:
import matplotlib.pyplot as plt
from cycler import cycler

%matplotlib inline
from fnal_column_analysis_tools import hist
from fnal_column_analysis_tools.hist import plot

plt.rcParams.update({'font.size': 14, 'axes.titlesize': 18, 'axes.labelsize': 18, 'xtick.labelsize': 12, 'ytick.labelsize': 12})
fig, ax = plt.subplots(2, 2, figsize=(15,15))
import matplotlib

# http://colorbrewer2.org/#type=qualitative&scheme=Paired&n=6
colors = ['#a6cee3','#1f78b4','#b2df8a','#33a02c','#fb9a99','#e31a1c']

fill_opts = {'edgecolor': (0,0,0,0.3), 'alpha': 0.8}
error_opts = {'label':'Stat. Unc.', 'hatch':'///', 'facecolor':'none', 'edgecolor':(0,0,0,.5), 'linewidth': 0}
nostack_fill_opts = {'alpha': 0.2, 'label': '_nolabel_'}
data_err_opts = {'linestyle':'none', 'marker': '.', 'markersize': 10., 'color':'k', 'elinewidth': 1, 'emarker': '_'}

ilumifb = 1.0

for i in range(2):
    for j in range(2):
        ax[i,j].set_prop_cycle(cycler(color=colors))  
        plot.plot1d(out_plots[i*2+j], ax=ax[i,j], overlay="process", clear=False, stack=True, line_opts=None, fill_opts=fill_opts, error_opts=error_opts)        
        #ret = plot.plot1d(ax[0,0], data_plot, "m_J", error_opts=data_err_opts)
        ax[i,j].autoscale(axis='x', tight=True)
        ax[i,j].set_ylim(0, None)
        leg = ax[i,j].legend()
        coffee = plt.text(0., 1., u"☕", fontsize=28, horizontalalignment='left', verticalalignment='bottom', transform=ax[i,j].transAxes)
        lumi = plt.text(1., 1., r"%.1f fb$^{-1}$ (13 TeV)"%ilumifb, fontsize=16, horizontalalignment='right', verticalalignment='bottom', transform=ax[i,j].transAxes)

In [None]:
parsl.dfk().cleanup()
parsl.clear()

In [None]:
np.__version__

In [None]:
print(x509_proxy)