# HNL CMS Analysis

Code from Leonardo Lunerti

In [None]:
distributed = False ## Default True - Enable DASK parallelisation

if distributed:
    sched_port = 22690 ## Change this from the DASK cluster information panel

### Dask cluster configuration

**NOTE**: The cell below must be changed every time the Dask cluster is recreated

In [None]:
from dask.distributed import Client
if distributed:  
    client = Client("localhost:" + str(sched_port))
    client

In [None]:
#client.restart() #Execute this only to restart the workers (to relaunch the notebook, for example)

Upload the X509 proxy in the Dask workers

In [None]:
from distributed.diagnostics.plugin import UploadFile
if distributed:
    client.register_worker_plugin(UploadFile("/tmp/x509up_u0"))

In [None]:
def set_proxy(dask_worker):
    import os
    import shutil
    working_dir = dask_worker.local_directory
    proxy_name = 'x509up_u0'
    os.environ['X509_USER_PROXY'] = working_dir + '/' + proxy_name
    os.environ['X509_CERT_DIR']="/cvmfs/grid.cern.ch/etc/grid-security/certificates/"
    return os.environ.get("X509_USER_PROXY"), os.environ.get("X509_CERT_DIR") 

In [None]:
if distributed:
    client.run(set_proxy)

Clear workers from any residual ROOT files (previous runs) and auxiliary files (SFs and configurations)

In [None]:
def clear_nodes(dask_worker, with_aux_files=False):
    import os
    os.popen('rm ./*.root')
    os.popen('rm ./*.csv')
    if with_aux_files:
        os.popen('rm -r ./aux_files')
        os.popen('rm -r ./python')
    return True

In [None]:
if distributed:
    client.run(clear_nodes, with_aux_files=True)

Define the operations for moving the output files to remote storage (via Davix):

In [None]:
def transfer_to_tier(dask_worker=None, remote_folder_name=""):
    import os
    os.popen('for filename in ./*.root; do davix-put "$filename" {}/"$filename" -E $X509_USER_PROXY --capath $X509_CERT_DIR; done'.format(remote_folder_name))
    return True

In [None]:
def transfer_to_tier_monitoring(dask_worker=None, remote_folder_name=""):
    import os
    os.popen('for filename in ./distrdf_*.csv; do davix-put "$filename" {}/monitoring/"$filename" -E $X509_USER_PROXY --capath $X509_CERT_DIR; done'.format(remote_folder_name))
    #os.popen('for filename in ./*.csv; do davix-put "$filename" {}/"$filename" -E $X509_USER_PROXY --capath $X509_CERT_DIR; done'.format(remote_folder_name))
    return True

### Import modules and local X509 proxy configuration

In [None]:
import ROOT
import sys
import math
import os
import json
import time
import subprocess
import numpy
import pandas as pd
from python import hnl_tools

In [None]:
# Loading PROXY locally
os.environ['X509_USER_PROXY'] = "/tmp/x509up_u0"
os.environ['X509_CERT_DIR'] = "/cvmfs/grid.cern.ch/etc/grid-security/certificates/"

In [None]:
# Opts for Lazy snapshot
#opts = ROOT.RDF.RSnapshotOptions()
#opts.fLazy = True

## HNL Analysis

### Set the analysis initial configuration

In [None]:
pre_start = time.time()
# Mandatory root files
configFileName = "/opt/workspace/persistent-storage/AF_HNL_analysis/cfg/DsToHnlMu_HnlToMuPi_prompt_UL_crab_cfg_dask.json"
dataset_to_process = "DsToNMu_NToMuPi_mN1p5_ctau1000p0mm_incl" #"ParkingBPH1_D" 

In [None]:
# Code options
saveOutputTree        = True  # Default False - Save an output tree (after all the cuts)
savePreselectedTree   = True  # Default False - Save an output tree (before the cuts and after best candidate selection)
doHistograms          = False #True Default False - Do not save histogram as output
addSPlotWeight        = False # Default False - Add splot weight for data 
skipPUrw              = False # Default False - Do not apply PU reweighting
skipTrigSF            = False # Default False - Do not apply trigger scale factors
skipMuIDsf            = False # Default False - Do not apply muon id scale factors
skipMuRecosf          = False # Default False - Do not apply muon reco scale factors
nThreads              = 1     # Default 1     - Number of threads
tag                   = ""    # Default ""    - Tag output files
ctauReweighting       = False # Default False - Include ctau reweighting
applyMuDsPtCorr       = False # Default False - Apply reweighting to mu from Ds to correct data/MC pt discrepancies
applyMuHnlPtCorr      = False # Default False - Apply reweighting to mu from Hnl to correct data/MC pt discrepancies
applyMuDsIPSCorr      = False # Default False - Apply reweighting to mu from Ds to correct data/MC IPS discrepancies
applyMuHnlIPSCorr     = False # Default False - Apply reweighting to mu from Hnl to correct data/MC IPS discrepancies
bestCandChecks        = False # Default False - Make checks for best candidate selection
varyMuIDSf            = 0.0   # Default 0.0 - Muon ID sf w/ variations: sf = sf+variation*error
varyMuRecoSf          = 0.0   # Default 0.0 - Muon reco sf w/ variations: sf = sf+variation*error
keep                  = []    # Default []    - Select which branches to keep in the final output tree
saveRemotely          = False # Default False - Enables remote save with davix - MANDATORY IF DISTRIBUTED

### Open the configuration files and get the input files

In [None]:
#open analyzer configuration file
with open(configFileName, "r") as f:
    config = json.loads(f.read())

#get ntuples configuration
with open(config["ntuples_cfg_file_full_path"], "r") as f:
    ntuples = json.loads(f.read())

#get histogram configuration
#with open(config["histogram_cfg_file_full_path"], "r") as f:
#    histos = json.loads(f.read())

#get selection and categorization cuts
with open(config["selection_cfg_file_full_path"], "r") as f:
    selection = json.loads(f.read())

In [None]:
#Get input files

input_file_list = "file_name_list"
tree_name = "wztree"

#if skipSlimCuts:
#    input_file_list = "slimmed_file_name_list"
#    tree_name = "slimmed_tree"
#
#if addSPlotWeight and skipSelCuts and skipSlimCuts:
#    input_file_list = "final_file_name_list"
#    tree_name = "final_tree"

#get input files
inputFileName_list = ntuples[dataset_to_process][input_file_list]
dataset_category = ntuples[dataset_to_process]["dataset_category"]

### Push and set auxiliary files

In [None]:
if distributed:
    client.register_worker_plugin(UploadFile(config["pu_weight_input_file_signal"]))
    client.register_worker_plugin(UploadFile(config["pu_weight_input_file_bkg"]))
    client.register_worker_plugin(UploadFile(config["mu_id_sf_input_file"]))
    client.register_worker_plugin(UploadFile(config["mu_reco_sf_input_file"]))
    client.register_worker_plugin(UploadFile(config["trigger_sf_input_file"]))
    client.register_worker_plugin(UploadFile(config["trigger_eff_data_input_file"]))
    client.register_worker_plugin(UploadFile(config["trigger_eff_mc_input_file"]))
    client.register_worker_plugin(UploadFile(config["hnl_tools"]))

Move the auxiliary files in the HOME directory

In [None]:
#def test(dask_worker):
#    import os
#    return os.listdir("./")
#
#client.run(test)

In [None]:
def set_auxiliary_files(dask_worker):
    import os
    import shutil
    working_dir = dask_worker.local_directory
    os.mkdir(working_dir + '/../../../aux_files/')
    os.mkdir(working_dir + '/../../../python/')
    shutil.copyfile(working_dir + "/" + config["pu_weight_input_file_signal"].split("/")[-1] , working_dir + '/../../../aux_files/' + config["pu_weight_input_file_signal"].split("/")[-1])
    shutil.copyfile(working_dir + "/" + config["pu_weight_input_file_bkg"].split("/")[-1] , working_dir + '/../../../aux_files/' + config["pu_weight_input_file_bkg"].split("/")[-1])
    shutil.copyfile(working_dir + "/" + config["mu_id_sf_input_file"].split("/")[-1] , working_dir + '/../../../aux_files/' + config["mu_id_sf_input_file"].split("/")[-1])
    shutil.copyfile(working_dir + "/" + config["mu_reco_sf_input_file"].split("/")[-1] , working_dir + '/../../../aux_files/' + config["mu_reco_sf_input_file"].split("/")[-1])
    shutil.copyfile(working_dir + "/" + config["trigger_sf_input_file"].split("/")[-1] , working_dir + '/../../../aux_files/' + config["trigger_sf_input_file"].split("/")[-1])
    shutil.copyfile(working_dir + "/" + config["trigger_eff_data_input_file"].split("/")[-1] , working_dir + '/../../../aux_files/' + config["trigger_eff_data_input_file"].split("/")[-1])
    shutil.copyfile(working_dir + "/" + config["trigger_eff_mc_input_file"].split("/")[-1] , working_dir + '/../../../aux_files/' + config["trigger_eff_mc_input_file"].split("/")[-1])
    shutil.copyfile(working_dir + "/" + config["hnl_tools"].split("/")[-1] , working_dir + '/../../../python/' + config["hnl_tools"].split("/")[-1])
    return os.listdir("/srv/python/")

In [None]:
if distributed:
    client.run(set_auxiliary_files)

### Declare custom functions in RDF

In [None]:
# let private function to be avaliable in RDataFrame evn
text_file = open(config["user_defined_function_path"], "r")
data = text_file.read()

def my_initialization_function():
    ROOT.gInterpreter.AddIncludePath("/cvmfs/cms.dodas.infn.it/boost_1_77_0")
    ROOT.gInterpreter.Declare('{}'.format(data))
    #get pu weights histogram
    if dataset_category != "data" and not skipPUrw:
        if dataset_category=="signal":
            ROOT.gInterpreter.Declare("""
            auto pu_weights_file = TFile::Open("{}");
            auto h_pu_weights = pu_weights_file->Get<TH1D>("{}");
            """.format(config["pu_weight_node_input_file_signal"],config["pu_weight_histo_name"])
            )
        else:
            ROOT.gInterpreter.Declare("""
            auto pu_weights_file = TFile::Open("{}");
            auto h_pu_weights = pu_weights_file->Get<TH1D>("{}");
            """.format(config["pu_weight_node_input_file_bkg"],config["pu_weight_histo_name"])
            )
    
    #get trigger scale factors histogram
    if dataset_category != "data" and not skipTrigSF:
        ROOT.gInterpreter.Declare("""
        auto trigger_eff_data_file = TFile::Open("{edata_file}");
        auto trigger_eff_mc_file   = TFile::Open("{emc_file}");
        auto h_trigger_eff_data = trigger_eff_data_file->Get<TH2D>("{edatahisto_name}");
        auto h_trigger_eff_mc   = trigger_eff_mc_file->Get<TH2D>("{emchisto_name}");
        """.format(edata_file=config["trigger_eff_data_node_input_file"],
                   emc_file=config["trigger_eff_mc_node_input_file"],
                   edatahisto_name=config["trigger_eff_data_histo_name"],
                   emchisto_name=config["trigger_eff_mc_histo_name"])
        )

    #get pu weights histogram
    if dataset_category != "data" and (not skipMuIDsf or not skipMuRecosf):
        ROOT.gInterpreter.Declare("""
        #include <boost/property_tree/ptree.hpp>
        #include <boost/property_tree/json_parser.hpp>
        """
        )
        if not skipMuIDsf:
            ROOT.gInterpreter.Declare("boost::property_tree::ptree mu_id_sf_cfg;")
            ROOT.gInterpreter.ProcessLine("""
            boost::property_tree::read_json("{infile}",mu_id_sf_cfg);
            """.format(infile=config["mu_id_sf_node_input_file"])
            )
        if not skipMuRecosf:
            ROOT.gInterpreter.Declare("boost::property_tree::ptree mu_reco_sf_cfg;")
            ROOT.gInterpreter.ProcessLine("""
            boost::property_tree::read_json("{infile}",mu_reco_sf_cfg);
            """.format(infile=config["mu_reco_sf_node_input_file"])
            )
        
    #get mu_Ds pt shape scale factors histogram
    if dataset_category != "data" and applyMuDsPtCorr:
        ROOT.gInterpreter.Declare("""
        auto ds_pt_shape_sf_file = TFile::Open("{}");
        auto h_ds_pt_shape_sf = ds_pt_shape_sf_file->Get<TH1D>("{}");
        """.format(config["ds_pt_shape_sf_input_file"],config["ds_pt_shape_sf_histo_name"])
        )
    #get mu_Hnl pt shape scale factors histogram
    if dataset_category != "data" and applyMuHnlPtCorr:
        ROOT.gInterpreter.Declare("""
        auto hnl_pt_shape_sf_file = TFile::Open("{}");
        auto h_hnl_pt_shape_sf = hnl_pt_shape_sf_file->Get<TH1D>("{}");
        """.format(config["hnl_pt_shape_sf_input_file"],config["hnl_pt_shape_sf_histo_name"])
        )
    #get mu_Ds IPS shape scale factors histogram
    if dataset_category != "data" and applyMuDsIPSCorr:
        ROOT.gInterpreter.Declare("""
        auto ds_ips_shape_sf_file = TFile::Open("{}");
        auto h_ds_ips_shape_sf = ds_ips_shape_sf_file->Get<TH1D>("{}");
        """.format(config["ds_ips_shape_sf_input_file"],config["ds_ips_shape_sf_histo_name"])
        )
    #get mu_Hnl IPS shape scale factors histogram
    if dataset_category != "data" and applyMuHnlIPSCorr:
        ROOT.gInterpreter.Declare("""
        auto hnl_ips_shape_sf_file = TFile::Open("{}");
        auto h_hnl_ips_shape_sf = hnl_ips_shape_sf_file->Get<TH1D>("{}");
        """.format(config["hnl_ips_shape_sf_input_file"],config["hnl_ips_shape_sf_histo_name"])
        )

In [None]:
if distributed:
    ROOT.RDF.Experimental.Distributed.initialize(my_initialization_function)
else:
    my_initialization_function()

### Continue with the analysis flow

Print information on the auxiliary files

In [None]:
if dataset_category != "data" and not skipTrigSF:
    print("eff data histogram from {}, {}".format(config["trigger_eff_data_input_file"],config["trigger_eff_data_histo_name"]))
    print("eff mc histogram from {}, {}".format(config["trigger_eff_mc_input_file"],config["trigger_eff_mc_histo_name"]))
if dataset_category != "data" and applyMuDsPtCorr:
    print("pt shape correction sf from {}, {}".format(config["ds_pt_shape_sf_input_file"],config["ds_pt_shape_sf_histo_name"]))
if dataset_category != "data" and applyMuHnlPtCorr:
    print("pt shape correction sf from {}, {}".format(config["hnl_pt_shape_sf_input_file"],config["hnl_pt_shape_sf_histo_name"]))
if dataset_category != "data" and applyMuDsIPSCorr:
    print("ips shape correction sf from {}, {}".format(config["ds_ips_shape_sf_input_file"],config["ds_ips_shape_sf_histo_name"]))
if dataset_category != "data" and applyMuHnlIPSCorr:
    print("ips shape correction sf from {}, {}".format(config["hnl_ips_shape_sf_input_file"],config["hnl_ips_shape_sf_histo_name"]))

Define unit weights and generator weights

In [None]:
#define unit weights
mc_weight  = 1.
pu_weight  = 1.

# get generator weight for MC
if dataset_category != "data":
    cross_section      = float(ntuples[dataset_to_process]["cross_section"])
    filter_efficiency  = float(ntuples[dataset_to_process]["filter_efficiency"])
    total_events       = float(ntuples[dataset_to_process]["processed_events"])
    mc_weight = cross_section*filter_efficiency/total_events

print("mc_weight: {}".format(mc_weight))

List of columns selected for the analysis as well as entire column list from the ntuples

In [None]:
ListOfColumns = { "C_Ds_mass", "C_Ds_pt", "C_Ds_px", "C_Ds_py", "C_Ds_pz", "C_Ds_vertex_2DDist_BS", 
                 "C_Ds_vertex_2DDist_PV", "C_Ds_vertex_2DErr_BS", "C_Ds_vertex_2DErr_PV", "C_Ds_vertex_2DSig_BS", 
                 "C_Ds_vertex_2DSig_PV", "C_Ds_vertex_3DDist_BS", "C_Ds_vertex_3DDist_PV", "C_Ds_vertex_3DErr_BS", 
                 "C_Ds_vertex_3DErr_PV", "C_Ds_vertex_3DSig_BS", "C_Ds_vertex_3DSig_PV", "C_Ds_vertex_cos2D", 
                 "C_Ds_vertex_cos3D", "C_Ds_vertex_prob", "C_Ds_vertex_x", "C_Ds_vertex_xErr", "C_Ds_vertex_y", 
                 "C_Ds_vertex_yErr", "C_Ds_vertex_z", "C_Ds_vertex_zErr", "C_Hnl_gen_l", "C_Hnl_gen_l_prop", 
                 "C_Hnl_l_prop", "C_Hnl_mass", "C_Hnl_p", "C_Hnl_pt", "C_Hnl_px", "C_Hnl_py", "C_Hnl_pz", 
                 "C_Hnl_vertex_2DDist_BS", "C_Hnl_vertex_2DDist_PV", "C_Hnl_vertex_2DErr_BS", "C_Hnl_vertex_2DErr_PV", 
                 "C_Hnl_vertex_2DSig_BS", "C_Hnl_vertex_2DSig_PV", "C_Hnl_vertex_3DDist_BS", "C_Hnl_vertex_3DDist_PV", 
                 "C_Hnl_vertex_3DErr_BS", "C_Hnl_vertex_3DErr_PV", "C_Hnl_vertex_3DSig_BS", "C_Hnl_vertex_3DSig_PV", 
                 "C_Hnl_vertex_cos2D", "C_Hnl_vertex_cos3D", "C_Hnl_vertex_prob", "C_Hnl_vertex_x", "C_Hnl_vertex_xErr", 
                 "C_Hnl_vertex_y", "C_Hnl_vertex_yErr", "C_Hnl_vertex_z", "C_Hnl_vertex_zErr", "C_mu1mu2_dr", 
                 "C_mu1mu2_mass", "C_mu1pi_dr", "C_mu2pi_dr", "C_mu_Ds_BS_ip", "C_mu_Ds_BS_ip_xy", "C_mu_Ds_BS_ips", 
                 "C_mu_Ds_BS_ips_xy", "C_mu_Ds_BS_x", "C_mu_Ds_BS_y", "C_mu_Ds_BS_z", "C_mu_Ds_PV_ip", "C_mu_Ds_PV_ip_xy", 
                 "C_mu_Ds_PV_ip_z", "C_mu_Ds_PV_ips", "C_mu_Ds_PV_ips_xy", "C_mu_Ds_PV_ips_z", "C_mu_Ds_charge", 
                 "C_mu_Ds_dptopt_MU10p5_IP3p5", "C_mu_Ds_dptopt_MU12_IP6", "C_mu_Ds_dptopt_MU7_IP4", "C_mu_Ds_dptopt_MU8_IP3", 
                 "C_mu_Ds_dptopt_MU8_IP5", "C_mu_Ds_dptopt_MU8_IP6", "C_mu_Ds_dptopt_MU8p5_IP3p5", "C_mu_Ds_dptopt_MU9_IP4", 
                 "C_mu_Ds_dptopt_MU9_IP5", "C_mu_Ds_dptopt_MU9_IP6", "C_mu_Ds_dr_MU10p5_IP3p5", "C_mu_Ds_dr_MU12_IP6", 
                 "C_mu_Ds_dr_MU7_IP4", "C_mu_Ds_dr_MU8_IP3", "C_mu_Ds_dr_MU8_IP5", "C_mu_Ds_dr_MU8_IP6", "C_mu_Ds_dr_MU8p5_IP3p5", 
                 "C_mu_Ds_dr_MU9_IP4", "C_mu_Ds_dr_MU9_IP5", "C_mu_Ds_dr_MU9_IP6", "C_mu_Ds_eta", "C_mu_Ds_fitted_E", 
                 "C_mu_Ds_fitted_px", "C_mu_Ds_fitted_py", "C_mu_Ds_fitted_pz", "C_mu_Ds_iBestPV",
                 "C_mu_Ds_idx", "C_mu_Ds_isGlobal", "C_mu_Ds_isHnlBrother", "C_mu_Ds_isLoose", "C_mu_Ds_isMedium", 
                 "C_mu_Ds_isSoft", "C_mu_Ds_isStandAlone", "C_mu_Ds_isTracker", "C_mu_Ds_matched_MU10p5_IP3p5",
                 "C_mu_Ds_matched_MU12_IP6", "C_mu_Ds_matched_MU7_IP4", "C_mu_Ds_matched_MU8_IP3", "C_mu_Ds_matched_MU8_IP5", 
                 "C_mu_Ds_matched_MU8_IP6", "C_mu_Ds_matched_MU8p5_IP3p5", "C_mu_Ds_matched_MU9_IP4", "C_mu_Ds_matched_MU9_IP5", 
                 "C_mu_Ds_matched_MU9_IP6", "C_mu_Ds_phi", "C_mu_Ds_pt", "C_mu_Hnl_BS_ip", "C_mu_Hnl_BS_ip_xy",
                 "C_mu_Hnl_BS_ips", "C_mu_Hnl_BS_ips_xy", "C_mu_Hnl_PV_ip", "C_mu_Hnl_PV_ip_xy", "C_mu_Hnl_PV_ip_z", 
                 "C_mu_Hnl_PV_ips", "C_mu_Hnl_PV_ips_xy", "C_mu_Hnl_PV_ips_z", "C_mu_Hnl_charge", "C_mu_Hnl_dptopt_MU10p5_IP3p5", 
                 "C_mu_Hnl_dptopt_MU12_IP6", "C_mu_Hnl_dptopt_MU7_IP4", "C_mu_Hnl_dptopt_MU8_IP3", "C_mu_Hnl_dptopt_MU8_IP5", 
                 "C_mu_Hnl_dptopt_MU8_IP6", "C_mu_Hnl_dptopt_MU8p5_IP3p5", "C_mu_Hnl_dptopt_MU9_IP4", "C_mu_Hnl_dptopt_MU9_IP5", 
                 "C_mu_Hnl_dptopt_MU9_IP6", "C_mu_Hnl_dr_MU10p5_IP3p5", "C_mu_Hnl_dr_MU12_IP6", "C_mu_Hnl_dr_MU7_IP4", 
                 "C_mu_Hnl_dr_MU8_IP3", "C_mu_Hnl_dr_MU8_IP5", "C_mu_Hnl_dr_MU8_IP6", "C_mu_Hnl_dr_MU8p5_IP3p5", 
                 "C_mu_Hnl_dr_MU9_IP4", "C_mu_Hnl_dr_MU9_IP5", "C_mu_Hnl_dr_MU9_IP6", "C_mu_Hnl_eta", "C_mu_Hnl_fitted_E", 
                 "C_mu_Hnl_fitted_px", "C_mu_Hnl_fitted_py", "C_mu_Hnl_fitted_pz", "C_mu_Hnl_idx",
                 "C_mu_Hnl_isGlobal", "C_mu_Hnl_isHnlDaughter", "C_mu_Hnl_isLoose", "C_mu_Hnl_isMedium", "C_mu_Hnl_isSoft", 
                 "C_mu_Hnl_isStandAlone", "C_mu_Hnl_isTracker", "C_mu_Hnl_matched_MU10p5_IP3p5",
                 "C_mu_Hnl_matched_MU12_IP6", "C_mu_Hnl_matched_MU7_IP4", "C_mu_Hnl_matched_MU8_IP3", "C_mu_Hnl_matched_MU8_IP5", 
                 "C_mu_Hnl_matched_MU8_IP6", "C_mu_Hnl_matched_MU8p5_IP3p5", "C_mu_Hnl_matched_MU9_IP4", "C_mu_Hnl_matched_MU9_IP5", 
                 "C_mu_Hnl_matched_MU9_IP6", "C_mu_Hnl_phi", "C_mu_Hnl_pt", "C_pi_BS_ip",
                 "C_pi_BS_ip_xy", "C_pi_BS_ip_z", "C_pi_BS_ips", "C_pi_BS_ips_xy", "C_pi_BS_ips_z", "C_pi_PV_ip_xy", "C_pi_PV_ip_z", 
                 "C_pi_PV_ips_xy", "C_pi_PV_ips_z", "C_pi_charge", "C_pi_eta", "C_pi_fitted_E", "C_pi_fitted_px", "C_pi_fitted_py", 
                 "C_pi_fitted_pz", "C_pi_idx", "C_pi_isHnlDaughter", "C_pi_phi", "C_pi_pt", "HLT_mu10p5_ip3p5_dr",
                 "HLT_mu10p5_ip3p5_eta", "HLT_mu10p5_ip3p5_matched", "HLT_mu10p5_ip3p5_prescale", "HLT_mu10p5_ip3p5_pt", "HLT_mu12_ip6_dr", 
                 "HLT_mu12_ip6_eta", "HLT_mu12_ip6_matched", "HLT_mu12_ip6_prescale", "HLT_mu12_ip6_pt", "HLT_mu7_ip4_dr", 
                 "HLT_mu7_ip4_eta", "HLT_mu7_ip4_matched", "HLT_mu7_ip4_prescale", "HLT_mu7_ip4_pt", "HLT_mu8_ip3_dr", "HLT_mu8_ip3_eta", 
                 "HLT_mu8_ip3_matched", "HLT_mu8_ip3_prescale", "HLT_mu8_ip3_pt", "HLT_mu8_ip5_dr", "HLT_mu8_ip5_eta", 
                 "HLT_mu8_ip5_matched", "HLT_mu8_ip5_prescale", "HLT_mu8_ip5_pt", "HLT_mu8_ip6_dr", "HLT_mu8_ip6_eta", "HLT_mu8_ip6_matched", 
                 "HLT_mu8_ip6_prescale", "HLT_mu8_ip6_pt", "HLT_mu8p5_ip3p5_dr", "HLT_mu8p5_ip3p5_eta", "HLT_mu8p5_ip3p5_matched", 
                 "HLT_mu8p5_ip3p5_prescale", "HLT_mu8p5_ip3p5_pt", "HLT_mu9_ip4_dr", "HLT_mu9_ip4_eta", "HLT_mu9_ip4_matched", 
                 "HLT_mu9_ip4_prescale", "HLT_mu9_ip4_pt", "HLT_mu9_ip5_dr", "HLT_mu9_ip5_eta", "HLT_mu9_ip5_matched", "HLT_mu9_ip5_prescale", 
                 "HLT_mu9_ip5_pt", "HLT_mu9_ip6_dr", "HLT_mu9_ip6_eta", "HLT_mu9_ip6_matched", "HLT_mu9_ip6_prescale", "HLT_mu9_ip6_pt", 
                 "HLT_mu_trig_eta", "HLT_mu_trig_idx", "HLT_mu_trig_phi", "HLT_mu_trig_pt", "PV_prob", "PV_x", "PV_xErr", "PV_y", "PV_yErr", 
                 "PV_z", "PV_zErr", "event", "lumi", "mc_weight", "nCand", "nPU_trueInt", "nPV", "numTrack", "pass_presel_0", "pass_presel_1", 
                 "pass_presel_2", "pu_weight", "run", "tot_weight" }

good_cols = {'C_Ds_mass', 'C_Ds_pt', 'C_Ds_px', 'C_Ds_py', 'C_Ds_pz', 'C_Ds_vertex_2DDist_BS', 'C_Ds_vertex_2DDist_PV', 'C_Ds_vertex_2DErr_BS', 
             'C_Ds_vertex_2DErr_PV', 'C_Ds_vertex_2DSig_BS', 'C_Ds_vertex_2DSig_PV', 'C_Ds_vertex_3DDist_BS', 'C_Ds_vertex_3DDist_PV', 
             'C_Ds_vertex_3DErr_BS', 'C_Ds_vertex_3DErr_PV', 'C_Ds_vertex_3DSig_BS', 'C_Ds_vertex_3DSig_PV', 'C_Ds_vertex_cos2D', 
             'C_Ds_vertex_cos3D', 'C_Ds_vertex_prob', 'C_Ds_vertex_x', 'C_Ds_vertex_xErr', 'C_Ds_vertex_y', 'C_Ds_vertex_yErr', 
             'C_Ds_vertex_z', 'C_Ds_vertex_zErr', 'C_Hnl_gen_l', 'C_Hnl_gen_l_prop', 'C_Hnl_l_prop', 'C_Hnl_mass', 'C_Hnl_p', 'C_Hnl_pt', 
             'C_Hnl_px', 'C_Hnl_py', 'C_Hnl_pz', 'C_Hnl_vertex_2DDist_BS', 'C_Hnl_vertex_2DDist_PV', 'C_Hnl_vertex_2DErr_BS', 
             'C_Hnl_vertex_2DErr_PV', 'C_Hnl_vertex_2DSig_BS', 'C_Hnl_vertex_2DSig_PV', 'C_Hnl_vertex_3DDist_BS', 'C_Hnl_vertex_3DDist_PV', 
             'C_Hnl_vertex_3DErr_BS', 'C_Hnl_vertex_3DErr_PV', 'C_Hnl_vertex_3DSig_BS', 'C_Hnl_vertex_3DSig_PV', 'C_Hnl_vertex_cos2D', 
             'C_Hnl_vertex_cos3D', 'C_Hnl_vertex_prob', 'C_Hnl_vertex_x', 'C_Hnl_vertex_xErr', 'C_Hnl_vertex_y', 'C_Hnl_vertex_yErr', 
             'C_Hnl_vertex_z', 'C_Hnl_vertex_zErr', 'C_mu1mu2_dr', 'C_mu1mu2_mass', 'C_mu1pi_dr', 'C_mu2pi_dr', 'C_mu_Ds_BS_ip_xy', 
             'C_mu_Ds_BS_ips_xy', 'C_mu_Ds_BS_x', 'C_mu_Ds_BS_y', 'C_mu_Ds_BS_z', 'C_mu_Ds_PV_ip_xy', 'C_mu_Ds_PV_ip_z', 'C_mu_Ds_PV_ips_xy', 
             'C_mu_Ds_PV_ips_z', 'C_mu_Ds_charge', 'C_mu_Ds_dptopt_MU10p5_IP3p5', 'C_mu_Ds_dptopt_MU12_IP6', 'C_mu_Ds_dptopt_MU7_IP4', 
             'C_mu_Ds_dptopt_MU8_IP3', 'C_mu_Ds_dptopt_MU8_IP5', 'C_mu_Ds_dptopt_MU8_IP6', 'C_mu_Ds_dptopt_MU8p5_IP3p5', 'C_mu_Ds_dptopt_MU9_IP4', 
             'C_mu_Ds_dptopt_MU9_IP5', 'C_mu_Ds_dptopt_MU9_IP6', 'C_mu_Ds_dr_MU10p5_IP3p5', 'C_mu_Ds_dr_MU12_IP6', 'C_mu_Ds_dr_MU7_IP4', 
             'C_mu_Ds_dr_MU8_IP3', 'C_mu_Ds_dr_MU8_IP5', 'C_mu_Ds_dr_MU8_IP6', 'C_mu_Ds_dr_MU8p5_IP3p5', 'C_mu_Ds_dr_MU9_IP4', 
             'C_mu_Ds_dr_MU9_IP5', 'C_mu_Ds_dr_MU9_IP6', 'C_mu_Ds_eta', 'C_mu_Ds_fitted_E', 'C_mu_Ds_fitted_px', 'C_mu_Ds_fitted_py', 
             'C_mu_Ds_fitted_pz', 'C_mu_Ds_iBestPV', 'C_mu_Ds_idx', 'C_mu_Ds_isGlobal', 'C_mu_Ds_isHnlBrother',
             'C_mu_Ds_isLoose', 'C_mu_Ds_isMedium', 'C_mu_Ds_isSoft', 'C_mu_Ds_isStandAlone', 'C_mu_Ds_isTracker', 
             'C_mu_Ds_matched_MU10p5_IP3p5', 'C_mu_Ds_matched_MU12_IP6', 'C_mu_Ds_matched_MU7_IP4', 'C_mu_Ds_matched_MU8_IP3', 
             'C_mu_Ds_matched_MU8_IP5', 'C_mu_Ds_matched_MU8_IP6', 'C_mu_Ds_matched_MU8p5_IP3p5', 'C_mu_Ds_matched_MU9_IP4', 
             'C_mu_Ds_matched_MU9_IP5', 'C_mu_Ds_matched_MU9_IP6', 'C_mu_Ds_phi', 'C_mu_Ds_pt', 'C_mu_Hnl_BS_ip_xy',
             'C_mu_Hnl_BS_ips_xy', 'C_mu_Hnl_PV_ip_xy', 'C_mu_Hnl_PV_ip_z', 'C_mu_Hnl_PV_ips_xy', 'C_mu_Hnl_PV_ips_z', 'C_mu_Hnl_charge', 
             'C_mu_Hnl_dptopt_MU10p5_IP3p5', 'C_mu_Hnl_dptopt_MU12_IP6', 'C_mu_Hnl_dptopt_MU7_IP4', 'C_mu_Hnl_dptopt_MU8_IP3', 
             'C_mu_Hnl_dptopt_MU8_IP5', 'C_mu_Hnl_dptopt_MU8_IP6', 'C_mu_Hnl_dptopt_MU8p5_IP3p5', 'C_mu_Hnl_dptopt_MU9_IP4', 
             'C_mu_Hnl_dptopt_MU9_IP5', 'C_mu_Hnl_dptopt_MU9_IP6', 'C_mu_Hnl_dr_MU10p5_IP3p5', 'C_mu_Hnl_dr_MU12_IP6', 'C_mu_Hnl_dr_MU7_IP4', 
             'C_mu_Hnl_dr_MU8_IP3', 'C_mu_Hnl_dr_MU8_IP5', 'C_mu_Hnl_dr_MU8_IP6', 'C_mu_Hnl_dr_MU8p5_IP3p5', 'C_mu_Hnl_dr_MU9_IP4', 
             'C_mu_Hnl_dr_MU9_IP5', 'C_mu_Hnl_dr_MU9_IP6', 'C_mu_Hnl_eta', 'C_mu_Hnl_fitted_E', 'C_mu_Hnl_fitted_px', 'C_mu_Hnl_fitted_py', 
             'C_mu_Hnl_fitted_pz' , 'C_mu_Hnl_idx', 'C_mu_Hnl_isGlobal', 'C_mu_Hnl_isHnlDaughter', 'C_mu_Hnl_isLoose',
             'C_mu_Hnl_isMedium', 'C_mu_Hnl_isSoft', 'C_mu_Hnl_isStandAlone', 'C_mu_Hnl_isTracker', 
             'C_mu_Hnl_matched_MU10p5_IP3p5', 'C_mu_Hnl_matched_MU12_IP6', 'C_mu_Hnl_matched_MU7_IP4', 'C_mu_Hnl_matched_MU8_IP3', 
             'C_mu_Hnl_matched_MU8_IP5', 'C_mu_Hnl_matched_MU8_IP6', 'C_mu_Hnl_matched_MU8p5_IP3p5', 'C_mu_Hnl_matched_MU9_IP4', 
             'C_mu_Hnl_matched_MU9_IP5', 'C_mu_Hnl_matched_MU9_IP6', 'C_mu_Hnl_phi', 'C_mu_Hnl_pt',
             'C_pi_BS_ip', 'C_pi_BS_ip_xy', 'C_pi_BS_ip_z', 'C_pi_BS_ips', 'C_pi_BS_ips_xy', 'C_pi_BS_ips_z', 'C_pi_PV_ip_xy', 
             'C_pi_PV_ip_z', 'C_pi_PV_ips_xy', 'C_pi_PV_ips_z', 'C_pi_charge', 'C_pi_eta', 'C_pi_fitted_E', 'C_pi_fitted_px', 'C_pi_fitted_py', 
             'C_pi_fitted_pz', 'C_pi_idx', 'C_pi_isHnlDaughter', 'C_pi_phi', 'C_pi_pt'}

In [None]:
if distributed:
    numWorkers= len(client.scheduler_info()['workers'])

### DataFrame creation and execution on Dask workers

In [None]:
input_file_name = inputFileName_list[0].split("/")[-1].split(".")[0]
dataset_name_label = input_file_name[input_file_name.find("_")+1:input_file_name.rfind("_")]

######################
###### ANALYSIS ######
######################
pre_stop = time.time()
if distributed:
    client.run(clear_nodes, with_aux_files=False)
    
#initialize data frame
if distributed:
    df = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(tree_name, inputFileName_list, npartitions=3*numWorkers, daskclient=client, monitor_label = "HNL") #len(inputFileName_list)
else:
    df = ROOT.RDataFrame(tree_name, inputFileName_list)

nEvents = df.Count()
#define new variables
for var in selection["new_variables"]:
    df = df.Define(var["name"],var["definition"])

# operations on signal samples
if dataset_category == "signal":
    #define GEN matching variables and reject events w/o GEN matched candidates
    gen_matching_cuts =list()
    mask_var = "C_pass_gen_matching"
    ListOfColumns.add(mask_var)
    good_cols.add(mask_var)
    for sel in selection["gen_matching_cuts"]:
        gen_matching_cuts.append(sel["cut"])
    df = df.Define(mask_var," && ".join(gen_matching_cuts))
    df = df.Filter("ROOT::VecOps::Any("+mask_var+")","pass GEN matching cuts")

    # define ctau weights
    if ctauReweighting and dataset_category == "signal":
        old_ctau_label = dataset_name_label[dataset_name_label.find("ctau")+4:dataset_name_label.find("mm")]
        hnl_mass_label = dataset_name_label[dataset_name_label.find("mN")+2:dataset_name_label.find("_ctau")]
        for new_ctau in selection["mN"+hnl_mass_label+"_ctau"+old_ctau_label+"mm_rw_points"]:
          old_ctau = float(old_ctau_label.replace("p","."))
          w_expr   = "("+str(old_ctau)+"/"+str(new_ctau)+")*"+"exp(C_Hnl_gen_l_prop*("+str(1./old_ctau)+"-"+str(1./new_ctau)+"))"
          df = df.Define("C_ctau_weight_"+old_ctau_label+"TO"+str(new_ctau).replace(".","p"),w_expr)


#################
#### WEIGHTS ####
#################

#define cross section normalization mc_weight
df = df.Define("mc_weight",str(mc_weight))    

# define pu weight for MC only
if dataset_category != "data" and not skipPUrw:
    pu_weight = "h_pu_weights->GetBinContent(h_pu_weights->FindBin(nPU_trueInt))"
df = df.Define("pu_weight",str(pu_weight)) 
df = df.Define("tot_weight","mc_weight*pu_weight")

# define trigger scale factors for MC only
if dataset_category != "data" and not skipTrigSF:
    trigger_eff_data_ds  = "get_2D_binContent(h_trigger_eff_data,C_{mu1l}_pt,C_{mu1l}_BS_ips_xy,99.0,499.0)".format(mu1l=config["mu1_label"])
    trigger_eff_mc_ds    = "get_2D_binContent(h_trigger_eff_mc,C_{mu1l}_pt,C_{mu1l}_BS_ips_xy,99.0,499.0)".format(mu1l=config["mu1_label"])
    trigger_eff_data_hnl = "get_2D_binContent(h_trigger_eff_data,C_{mu2l}_pt,C_{mu2l}_BS_ips_xy,99.0,499.0)".format(mu2l=config["mu2_label"])
    trigger_eff_mc_hnl   = "get_2D_binContent(h_trigger_eff_mc,C_{mu2l}_pt,C_{mu2l}_BS_ips_xy,99.0,499.0)".format(mu2l=config["mu2_label"])
    trigger_sf_columns = {"C_mu_Ds_matched_HLT", "C_mu_Hnl_matched_HLT","C_trigger_eff_mc_ds", "C_trigger_sf", "C_trigger_eff_mc_hnl","C_trigger_eff_data_hnl", "C_trigger_eff_data_ds"}
    ListOfColumns.update(trigger_sf_columns)
    good_cols.update(trigger_sf_columns)
    df = df.Define("C_trigger_eff_data_ds",str(trigger_eff_data_ds)) 
    df = df.Define("C_trigger_eff_data_hnl",str(trigger_eff_data_hnl)) 
    df = df.Define("C_trigger_eff_mc_ds",str(trigger_eff_mc_ds))
    df = df.Define("C_trigger_eff_mc_hnl",str(trigger_eff_mc_hnl)) 
    df = df.Define("C_{mu2l}_matched_HLT".format(mu2l=config["mu2_label"]),"(C_{mu2l}_matched_MU7_IP4>0 && C_{mu2l}_dr_MU7_IP4<0.05 && C_{mu2l}_dptopt_MU7_IP4<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU8_IP3>0 && C_{mu2l}_dr_MU8_IP3<0.05 && C_{mu2l}_dptopt_MU8_IP3<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU8_IP5>0 && C_{mu2l}_dr_MU8_IP5<0.05 && C_{mu2l}_dptopt_MU8_IP5<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU8_IP6>0 && C_{mu2l}_dr_MU8_IP6<0.05 && C_{mu2l}_dptopt_MU8_IP6<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU9_IP4>0 && C_{mu2l}_dr_MU9_IP4<0.05 && C_{mu2l}_dptopt_MU9_IP4<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU9_IP5>0 && C_{mu2l}_dr_MU9_IP5<0.05 && C_{mu2l}_dptopt_MU9_IP5<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU9_IP6>0 && C_{mu2l}_dr_MU9_IP6<0.05 && C_{mu2l}_dptopt_MU9_IP6<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU10p5_IP3p5>0 && C_{mu2l}_dr_MU10p5_IP3p5<0.05 && C_{mu2l}_dptopt_MU10p5_IP3p5<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45) || (C_{mu2l}_matched_MU12_IP6>0 && C_{mu2l}_dr_MU12_IP6<0.05 && C_{mu2l}_dptopt_MU12_IP6<0.1 && C_{mu2l}_pt >7.5 && C_{mu2l}_eta>-1.45 && C_{mu2l}_eta<1.45)".format(mu2l=config["mu2_label"])) 
    df = df.Define("C_{mu1l}_matched_HLT".format(mu1l=config["mu1_label"]),"(C_{mu1l}_matched_MU7_IP4>0 && C_{mu1l}_dr_MU7_IP4<0.05 && C_{mu1l}_dptopt_MU7_IP4<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU8_IP3>0 && C_{mu1l}_dr_MU8_IP3<0.05 && C_{mu1l}_dptopt_MU8_IP3<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU8_IP5>0 && C_{mu1l}_dr_MU8_IP5<0.05 && C_{mu1l}_dptopt_MU8_IP5<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU8_IP6>0 && C_{mu1l}_dr_MU8_IP6<0.05 && C_{mu1l}_dptopt_MU8_IP6<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU9_IP4>0 && C_{mu1l}_dr_MU9_IP4<0.05 && C_{mu1l}_dptopt_MU9_IP4<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU9_IP5>0 && C_{mu1l}_dr_MU9_IP5<0.05 && C_{mu1l}_dptopt_MU9_IP5<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU9_IP6>0 && C_{mu1l}_dr_MU9_IP6<0.05 && C_{mu1l}_dptopt_MU9_IP6<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU10p5_IP3p5>0 && C_{mu1l}_dr_MU10p5_IP3p5<0.05 && C_{mu1l}_dptopt_MU10p5_IP3p5<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45) || (C_{mu1l}_matched_MU12_IP6>0 && C_{mu1l}_dr_MU12_IP6<0.05 && C_{mu1l}_dptopt_MU12_IP6<0.1 && C_{mu1l}_pt >7.5 && C_{mu1l}_eta>-1.45 && C_{mu1l}_eta<1.45)".format(mu1l=config["mu1_label"])) 
    df = df.Define("C_trigger_sf","vcompute_total_sf(C_trigger_eff_data_ds,C_trigger_eff_mc_ds,C_{mu1l}_matched_HLT,C_trigger_eff_data_hnl,C_trigger_eff_mc_hnl,C_{mu2l}_matched_HLT)".format(mu1l=config["mu1_label"],mu2l=config["mu2_label"]))

# define mu id factors for MC only
if dataset_category != "data" and not skipMuIDsf:
    mu_id_sf_columns = {"C_mu_Ds_id_sf","C_mu_Hnl_id_sf"}
    ListOfColumns.update(mu_id_sf_columns)
    good_cols.update(mu_id_sf_columns)
    variation = varyMuIDSf
    mu1_id_sf = "vget_mu_id_sf(mu_id_sf_cfg,C_{mu1l}_pt,C_{mu1l}_eta,{variation})".format(mu1l=config["mu1_label"],variation=variation)
    mu2_id_sf = "vget_mu_id_sf(mu_id_sf_cfg,C_{mu2l}_pt,C_{mu2l}_eta,{variation})".format(mu2l=config["mu2_label"],variation=variation)
    df = df.Define("C_{mu1l}_id_sf".format(mu1l=config["mu1_label"]),mu1_id_sf)
    df = df.Define("C_{mu2l}_id_sf".format(mu2l=config["mu2_label"]),mu2_id_sf)

# define mu reco factors for MC only
if dataset_category != "data" and not skipMuRecosf:
    mu_reco_sf_columns = {"C_mu_Ds_reco_sf","C_mu_Hnl_reco_sf"}
    ListOfColumns.update(mu_reco_sf_columns)
    good_cols.update(mu_reco_sf_columns)
    variation = varyMuRecoSf
    mu1_reco_sf = "vget_mu_reco_sf(mu_reco_sf_cfg,C_{mu1l}_pt,C_{mu1l}_eta,{variation})".format(mu1l=config["mu1_label"],variation=variation)
    mu2_reco_sf = "vget_mu_reco_sf(mu_reco_sf_cfg,C_{mu2l}_pt,C_{mu2l}_eta,{variation})".format(mu2l=config["mu2_label"],variation=variation)
    df = df.Define("C_{mu1l}_reco_sf".format(mu1l=config["mu1_label"]),mu1_reco_sf) 
    df = df.Define("C_{mu2l}_reco_sf".format(mu2l=config["mu2_label"]),mu2_reco_sf) 

# define mu_Ds pt shape scale factors for MC only
if dataset_category != "data" and applyMuDsPtCorr:
    ds_pt_shape_sf  = "get_1D_binContent(h_ds_pt_shape_sf,C_{}_pt)".format(config["mu1_label"])
    df = df.Define("C_ds_pt_shape_sf",str(ds_pt_shape_sf)) 

# define mu_Hnl pt shape scale factors for MC only
if dataset_category != "data" and applyMuHnlPtCorr:
    hnl_pt_shape_sf  = "get_1D_binContent(h_hnl_pt_shape_sf,C_{}_pt)".format(config["mu2_label"])
    df = df.Define("C_hnl_pt_shape_sf",str(hnl_pt_shape_sf)) 

# define mu_Ds IPS shape scale factors for MC only
if dataset_category != "data" and applyMuDsIPSCorr:
    ds_ips_shape_sf  = "get_1D_binContent(h_ds_ips_shape_sf,C_{}_BS_ips_xy)".format(config["mu1_label"])
    df = df.Define("C_ds_ips_shape_sf",str(ds_ips_shape_sf)) 

# define mu_Hnl IPS shape scale factors for MC only
if dataset_category != "data" and applyMuHnlIPSCorr:
    hnl_ips_shape_sf  = "get_1D_binContent(h_hnl_ips_shape_sf,C_{}_BS_ips_xy)".format(config["mu2_label"])
    df = df.Define("C_hnl_ips_shape_sf",str(hnl_ips_shape_sf)) 

if bestCandChecks:
    hmodel = ("h_mult_input",";Candidate multiplicity;Normalized to unit",20,1.,21.)
    df_check = df.Define("mult_input","get_cand_multiplicity(C_Hnl_mass)")
    df_check.Histo1D(hmodel,"mult_input").SaveAs("h_cand_mult_input_{}.root".format(dataset_to_process))
    df_check = df_check.Filter("mult_input>1","fraction of input events with more than 1 candidate")
    # define a index selecting best candidate in the event
    df_check = df_check.Define(selection["best_cand_var"]["name"],selection["best_cand_var"]["definition"])
    
    # redefine variables so that only best candidate is saved
    #for c in df_check.GetColumnNames():
    #    col_name = str(c)
    #    col_type = df_check.GetColumnType(col_name)
        # choose candidate branches (beginning with 'C_')
    #    if (not col_name.find("C_")<0) and (not col_type.find("ROOT::VecOps")<0):
    #        idx = str(selection["best_cand_var"]["name"])
    #        df_check = df_check.Redefine(col_name,col_name+"["+idx+"]")
    #        continue
    for col_name in good_cols:
        idx = str(selection["best_cand_var"]["name"])
        df_check = df_check.Redefine(col_name,col_name+"["+idx+"]")

    if dataset_category == "signal":
        df_check = df_check.Filter("C_pass_gen_matching","best-cand-input events have a GEN-matched candidate")
#    df_check.Report().Print()

######################
#### PRESELECTION ####
######################

#apply common pre-selection
presel_i = 0
presel_cuts = list()
for sel in selection["preselection_cuts"]:
    mask_var = "pass_presel_"+str(presel_i)
    presel_cuts.append(mask_var)
    df = df.Define(mask_var,sel["cut"])
    df = df.Filter("ROOT::VecOps::Any("+mask_var+")",sel["printout"])
    presel_i+=1

# skim vectors to retain only candidates passing preselection
#for c in df.GetColumnNames():
#    col_name = str(c)
#    col_type = df.GetColumnType(col_name)
    # choose candidate branches (beginning with 'C_')
#    if(hnl_tools.is_good_cand_var(col_name) and (not col_type.find("ROOT::VecOps")<0)): 
#        presel_cuts_AND = "&&".join(presel_cuts)  
#        df = df.Redefine(col_name,col_name+"["+presel_cuts_AND+"]")
#        continue
for col_name in good_cols:
    # choose candidate branches (beginning with 'C_')
    if hnl_tools.is_good_cand_var(col_name): 
        presel_cuts_AND = "&&".join(presel_cuts)  
        df = df.Redefine(col_name,col_name+"["+presel_cuts_AND+"]")

# count how many preselected events have at least a GEN-matched candidate
if bestCandChecks:
    hmodel = ("h_mult_presel",";Candidate multiplicity;Normalized to unit",20,1.,21.)
    df_check = df.Define("mult_presel","get_cand_multiplicity(C_Hnl_mass)")
    df_check.Histo1D(hmodel,"mult_presel").SaveAs("h_cand_mult_presel_{}.root".format(dataset_to_process))
    df_check = df_check.Filter("mult_presel>1","fraction of preselected events with more than 1 candidate")
    # define a index selecting best candidate in the event
    df_check = df_check.Define(selection["best_cand_var"]["name"],selection["best_cand_var"]["definition"])
    
    # redefine variables so that only best candidate is saved
    #for c in df_check.GetColumnNames():
    #    col_name = str(c)
    #    col_type = df_check.GetColumnType(col_name)
        # choose candidate branches (beginning with 'C_')
    #    if (not col_name.find("C_")<0) and (not col_type.find("ROOT::VecOps")<0):
    #        idx = str(selection["best_cand_var"]["name"])
    #        df_check = df_check.Redefine(col_name,col_name+"["+idx+"]")
    #        continue
    for col_name in good_cols:   
        idx = str(selection["best_cand_var"]["name"])
        df_check = df_check.Redefine(col_name,col_name+"["+idx+"]")

    if dataset_category == "signal":
        df_check = df_check.Filter("C_pass_gen_matching","best-cand-preselected events have at least a GEN-matched candidate")
    #df_check.Report().Print()

#save preselected tree
if savePreselectedTree:
    finalTree_outputFileName = "tree_"+dataset_name_label+".root"
    if not saveRemotely:
        finalTree_outputDirName = os.path.join(config["preselected_tree_output_dir_name"],dataset_name_label)
    else:
        finalTree_outputDirName = os.path.join(config["remote_preselected_tree_output_dir_name"],dataset_name_label)
    if tag!="":
        if not saveRemotely:
            finalTree_outputFileName = "tree_"+tag+"_"+dataset_name_label+".root"
        else:
            finalTree_outputFileName = "tree_"+tag+"_"+dataset_name_label[:-5]+".root"
    if not saveRemotely and not distributed:
        subprocess.call(['mkdir','-p',finalTree_outputDirName])
    finalTree_outFullPath = os.path.join(finalTree_outputDirName,finalTree_outputFileName)
    var_list = ListOfColumns #df.GetColumnNames()
    var_keep_list = keep
    if len(var_keep_list)>0:
        var_list = var_keep_list
    if not saveRemotely:
        if not distributed:
            df.Snapshot(config["tree_output_name"],finalTree_outFullPath,var_list)
            nevt = nEvents.GetValue()
        else:
            print("Impossible to save the preselected tree locally when running distributed. Turn on saveRemotely option and re-run.")
    else:
        finalTree_outFullPath2 = "https://" + config["redirector"] + finalTree_outputDirName
        finalTree_outFullPath = "root://t2-xrdcms.lnl.infn.it:7070/" + finalTree_outputDirName
        df.Snapshot(config["tree_output_name"],finalTree_outputFileName, var_list)
        nevt = nEvents.GetValue()
    post_start_step2 = time.time() 
    if saveRemotely:
        if distributed:
            client.run(transfer_to_tier, remote_folder_name=finalTree_outFullPath2)
        else:
            transfer_to_tier(remote_folder_name=finalTree_outFullPath2)
    if not(not saveRemotely and distributed):
        print("Preselected tree saved in {}".format(finalTree_outFullPath))
        
post_stop_step2 = time.time()
###################
#### SELECTION ####
###################

#apply optimized selection for each category
sel_cuts = dict()
for cat in selection["categories"]:
    l = list()
    sel_i = 0
    for cut in cat["selection_cuts"]:
        mask_var = "pass_sel_"+cat["label"]+"_"+str(sel_i)
        l.append(mask_var)
        df = df.Define(mask_var,cut["cut"]+' && '+cat["cut"])
        sel_i+=1
    sel_cuts[cat["label"]] = l

# build AND of each category's selection cuts
sel_cuts_AND = list()
for cat in sel_cuts:
    s = "&&".join(sel_cuts[cat])
    sel_cuts_AND.append(s)

# filter events which do not pass any of the categories' selection cuts
df = df.Filter("ROOT::VecOps::Any("+"||".join(sel_cuts_AND)+")","selection cuts") 

# skim vectors to retain only candidates passing selection cuts
#for c in df.GetColumnNames():
#    col_name = str(c)
#    col_type = df.GetColumnType(col_name)
    # choose candidate branches (beginning with 'C_')
#    if hnl_tools.is_good_cand_var(col_name) and col_type.find("ROOT::VecOps")==0: 
#        df = df.Redefine(col_name,col_name+"["+"||".join(sel_cuts_AND)+"]")
for col_name in good_cols:
    if hnl_tools.is_good_cand_var(col_name): 
        df = df.Redefine(col_name,col_name+"["+"||".join(sel_cuts_AND)+"]")

# count how many selected events have at least a GEN-matched candidate
if bestCandChecks:
    hmodel = ("h_mult_sel",";Candidate multiplicity;Normalized to unit",20,1.,21.)
    df_check = df.Define("mult_sel","get_cand_multiplicity(C_Hnl_mass)")
    df_check.Histo1D(hmodel,"mult_sel").SaveAs("h_cand_mult_sel_{}.root".format(dataset_to_process))
    df_check = df_check.Filter("mult_sel>1","fraction of selected events with more than 1 candidate")
    #vl = df.GetColumnNames()
    #df_check.Snapshot("test","test.root",vl)
    # define a index selecting best candidate in the event
    df_check = df_check.Define(selection["best_cand_var"]["name"],selection["best_cand_var"]["definition"])
    
    # redefine variables so that only best candidate is saved
    #for c in df_check.GetColumnNames():
    #    col_name = str(c)
    #    col_type = df_check.GetColumnType(col_name)
        # choose candidate branches (beginning with 'C_')
    #    if (not col_name.find("C_")<0) and (not col_type.find("ROOT::VecOps")<0):
    #        idx = str(selection["best_cand_var"]["name"])
    #        df_check = df_check.Redefine(col_name,col_name+"["+idx+"]")
    #        continue
    for col_name in good_cols:
        idx = str(selection["best_cand_var"]["name"])
        df_check = df_check.Redefine(col_name,col_name+"["+idx+"]")
    if dataset_category == "signal":
        df_check = df_check.Filter("C_pass_gen_matching","best-cand-selected events have at least a GEN-matched candidate")
    #df_check.Report().Print()

###################
#### BEST CAND ####
###################

# define a index selecting best candidate in the event
df = df.Define(selection["best_cand_var"]["name"],selection["best_cand_var"]["definition"])

# redefine variables so that only best candidate is saved
#for c in df.GetColumnNames():
#    col_name = str(c)
#    col_type = df.GetColumnType(col_name)
    # choose candidate branches (beginning with 'C_')
#    if col_name.find("C_")==0 and col_type.find("ROOT::VecOps")==0:
#        idx = str(selection["best_cand_var"]["name"])
#        df = df.Redefine(col_name,col_name+"["+idx+"]")
#        continue
for col_name in good_cols:
    idx = str(selection["best_cand_var"]["name"])
    df = df.Redefine(col_name,col_name+"["+idx+"]")

if dataset_category == "signal":
    df = df.Filter("C_pass_gen_matching","best-candidate-selected events have at least a GEN-matched candidate")

#define categories
df = df.Define("C_cat","get_lxy_categories(C_Hnl_vertex_2DDist_BS,C_mu_Hnl_charge,C_mu_Ds_charge)")

# redefine here the total weight as the product of all weights previously defined

# Include trigger SF
if dataset_category != "data" and not skipTrigSF:
    df = df.Redefine("tot_weight","tot_weight*C_trigger_sf")

# Include ID scale factors
if dataset_category != "data" and not skipMuIDsf:
    df = df.Redefine("tot_weight","tot_weight*C_{mu1l}_id_sf*C_{mu2l}_id_sf".format(mu1l=config["mu1_label"],mu2l=config["mu2_label"]))

# Include RECO scale factors
if dataset_category != "data" and not skipMuRecosf:
    df = df.Redefine("tot_weight","tot_weight*C_{mu1l}_reco_sf*C_{mu2l}_reco_sf".format(mu1l=config["mu1_label"],mu2l=config["mu2_label"]))

# Include mu_Ds pt shape scale factors
if dataset_category != "data" and applyMuDsPtCorr:
    df = df.Redefine("tot_weight","tot_weight*C_ds_pt_shape_sf")

# Include mu_Hnl pt shape scale factors
if dataset_category != "data" and applyMuHnlPtCorr:
    df = df.Redefine("tot_weight","tot_weight*C_hnl_pt_shape_sf")

# Include mu_Ds IPS shape scale factors
if dataset_category != "data" and applyMuDsIPSCorr:
    df = df.Redefine("tot_weight","tot_weight*C_ds_ips_shape_sf")
 
# Include mu_Hnl IPS shape scale factors
if dataset_category != "data" and applyMuHnlIPSCorr:
    df = df.Redefine("tot_weight","tot_weight*C_hnl_ips_shape_sf")

#################
##### SAVE ######
#################

if saveOutputTree:
    finalTree_outputFileName = "tree_"+dataset_name_label+".root"
    finalCSV_outputFileName  = "tree_"+dataset_name_label+".csv"
    if not saveRemotely:
        finalTree_outputDirName = os.path.join(config["tree_output_dir_name"],dataset_name_label)
    else:
        finalTree_outputDirName = os.path.join(config["remote_tree_output_dir_name"],dataset_name_label)
    if tag!="":
        if not saveRemotely:
            finalTree_outputFileName = "tree_"+tag+"_"+dataset_name_label+".root"
            finalCSV_outputFileName  = "tree_"+tag+"_"+dataset_name_label+".csv"
        else:
            finalTree_outputFileName = "tree_"+tag+"_"+dataset_name_label[:-5]+".root"
            finalCSV_outputFileName  = "tree_"+tag+"_"+dataset_name_label[:-5]+".csv"
    if not saveRemotely and not distributed:
        subprocess.call(['mkdir','-p',finalTree_outputDirName])
    finalTree_outFullPath = os.path.join(finalTree_outputDirName,finalTree_outputFileName)
    finalCSV_outFullPath = os.path.join(finalTree_outputDirName,finalCSV_outputFileName)

    #save output tree
    var_list = ListOfColumns #df.GetColumnNames()
    csv_var_list = [x for x in ListOfColumns if hnl_tools.is_good_cand_var(x) or x.find("tot_weight")==0 or x.find("mc_weight")==0] #[x for x in df.GetColumnNames() if hnl_tools.is_good_cand_var(x) or x.find("tot_weight")==0 or x.find("mc_weight")==0]
    var_keep_list = keep
    if len(var_keep_list)>0:
        var_list = var_keep_list
        csv_var_list = var_keep_list  
    if not saveRemotely:
        if not distributed:
            df.Snapshot(config["tree_output_name"],finalTree_outFullPath,var_list)
            nevt = nEvents.GetValue()
        else:
            print("Impossible to save the output tree locally when running distributed. Turn on saveRemotely option and re-run.")      
    else:
        finalTree_outFullPath2 = "https://" + config["redirector"] + finalTree_outputDirName
        finalTree_outFullPath = "root://t2-xrdcms.lnl.infn.it:7070/" + finalTree_outputDirName
        df.Snapshot(config["tree_output_name"],finalTree_outputFileName,var_list)
        nevt = nEvents.GetValue()
    post_start_step3 = time.time()
    
    if saveRemotely:
        if distributed:
            client.run(transfer_to_tier, remote_folder_name=finalTree_outFullPath2)
        else:
            transfer_to_tier(remote_folder_name=finalTree_outFullPath2)
    
    #save output csv
    if not(not saveRemotely and distributed):
        a = df.AsNumpy(csv_var_list)
        pdf = pd.DataFrame(a)
        
    if not saveRemotely:
        if not distributed:
            pdf.to_csv(finalCSV_outFullPath, index=False)
    else:
        finalCSV_outFullPath2 = "https://" + config["redirector"] + finalCSV_outFullPath
        finalCSV_outFullPath = "root://t2-xrdcms.lnl.infn.it:7070/" + finalCSV_outFullPath
        pdf.to_csv(finalCSV_outputFileName, index=False)
        os.system("davix-put " + finalCSV_outputFileName + " " + finalCSV_outFullPath2 + " -E $X509_USER_PROXY --capath $X509_CERT_DIR")
        
    if not(not saveRemotely and distributed):
        print("Output tree saved in {}".format(finalTree_outFullPath))
        print("Output csv saved in {}".format(finalCSV_outFullPath))

        #update json
        key = "final_file_name_list"
        if tag != "":
            key = key+"_"+str(tag)
        if saveRemotely:
            ls_output = os.popen("davix-ls " + finalTree_outFullPath2 + " -E $X509_USER_PROXY --capath $X509_CERT_DIR")
            fileNames = ls_output.read().splitlines()
            ntuples[dataset_to_process][key] = [str(finalTree_outFullPath) + "/" + fileName for fileName in fileNames if fileName.endswith(".root")]
        else:
            ntuples[dataset_to_process][key] = [str(finalTree_outFullPath)]
        with open(config["ntuples_cfg_file_full_path"], "w") as f:
            json.dump(ntuples,f, indent=4, sort_keys=True)
        print("{} updated".format(config["ntuples_cfg_file_full_path"]))
    post_stop_step3 = time.time()
##################
##### SPLOT ######
##################

if dataset_category=="data" and addSPlotWeight:
    print("Input splot weight file: {}".format(ntuples[dataset_to_process]["splot_weight_input_file"]))
    sdf = ROOT.RDataFrame(ntuples[dataset_to_process]["splot_weight_tree_name"],ntuples[dataset_to_process]["splot_weight_input_file"]) # get tree containing splot weights
    asw = sdf.AsNumpy([ntuples[dataset_to_process]["splot_weight_variable"]])[ntuples[dataset_to_process]["splot_weight_variable"]] # get column of splot weights
    print("entries in splot tree: {}".format(len(asw)))
    print("entries in analyzed tree: {}".format(df.Count().GetValue()))
    if len(asw) != df.Count().GetValue():
        print("!!! splot tree and analyzed tree do not have the same number of entries !!!")
        print("!!! please check that you are using the correct input tree !!!")
        exit()
    df = df.Define("splot_weight",'auto to_eval = std::string("asw[") + std::to_string(rdfentry_) + "]"; return float(TPython::Eval(to_eval.c_str()));') 
    df = df.Redefine("tot_weight","tot_weight*splot_weight")

#######################
##### HISTOGRAMS ######
#######################
histo_start = time.time()
if doHistograms:
    if not(not saveRemotely and distributed):
        hnl_tools.do_histos(df, config, dataset_name_label, saveRemotely=saveRemotely, tag="")
    else:
        print("Impossible to save the histograms locally when running distributed. Turn on saveRemotely option and re-run.")
        
histo_stop = time.time()
# TODO:define per category reports
# print("+++ FINAL REPORT +++")
# report= df.Report()
# report.Print()

###################################
####### SAVE MONITORING DATA ######
# (Use correct Dask sched image!) #
###################################

if distributed and saveRemotely:
    client.run(transfer_to_tier_monitoring, remote_folder_name=finalTree_outFullPath2)
    
############################################
# CLEAR LOCAL COPIES (when saved remotely) #
############################################

if saveRemotely:
    os.system("rm ./tree_*.csv")
    if doHistograms:
        os.system("rm ./histograms_*.root")
    if not distributed:
        os.system("rm ./tree_*.root")

#############
#### END ####
#############
stop_time = time.time()

## END

In [None]:
print("--- Analysis completed in {} seconds ---".format(stop_time - pre_start))

## Writing time information on JSON

In [None]:
#PRESELECTION
pre_time = pre_stop - pre_start
loop_time = post_start_step2 - pre_stop
post_time = post_stop_step2 - post_start_step2

print(pre_time)
print(loop_time)
print(post_time)

print("Total time of execution: " + str(stop_time-pre_start))
print("Sum of components: " + str(pre_time + loop_time + post_time))

with open('metrics.json','w') as out_file:
    json.dump({'pre_time': pre_time, 
                'analysis_time': loop_time, 
                'post_time': post_time,
                'n_events': nevt
               }, out_file, sort_keys=True, indent=4)     

In [None]:
#SELECTION
pre_time = pre_stop - pre_start
loop_time = (post_start_step3 - post_stop_step2) + (histo_stop - post_stop_step3)
post_time = (post_stop_step3 - post_start_step3) + (stop_time-histo_stop)

print(pre_time)
print(loop_time)
print(post_time)

print("Total time of execution: " + str(stop_time-pre_start))
print("Sum of components: " + str(pre_time + loop_time + post_time))

with open('metrics.json','w') as out_file:
    json.dump({'pre_time': pre_time, 
                'loop_time': loop_time, 
                'post_time': post_time,
                'n_events': nevt
                }, out_file, sort_keys=True, indent=4)        

In [None]:
#PRESELECTION + SELECTION
pre_time = pre_stop - pre_start
loop_time = (post_start_step2 - pre_stop) + (post_start_step3 - post_stop_step2) + (histo_stop - post_stop_step3)
post_time = (post_stop_step2 - post_start_step2) + (post_stop_step3 - post_start_step3) + (stop_time-histo_stop)

print(pre_time)
print(loop_time)
print(post_time)

print("Total time of execution: " + str(stop_time-pre_start))
print("Sum of components: " + str(pre_time + loop_time + post_time))

with open('metrics.json','w') as out_file:
    json.dump({'pre_time': pre_time, 
                'loop_time': loop_time, 
                'post_time': post_time,
                'n_events': nevt
                }, out_file, sort_keys=True, indent=4)        

# Additional Commands and tests

In [None]:
!pip show dask

In [None]:
slim_outFullPath2

In [None]:
!pip install ipywidgets

In [None]:
!rootls -lt root://cms-xrd-global.cern.ch//store/user/llunerti/DsToNMu_NToMuPi_SoftQCD_mN1p5_ctau100p0mm_TuneCP5_13TeV-pythia8-evtgen/DsToHnlMu_HnlToMuPi_prompt_UL/221107_101012/0000/DsToHnlMu_HnlToMuPi_prompt_DsToNMu_NToMuPi_SoftQCD_mN1p5_ctau100p0mm_TuneCP5_13TeV-pythia8-evtgen_tree_1.root

In [None]:
!davix-ls

In [None]:
numWorkers= len(client.scheduler_info()['workers'])

In [None]:
#initialize chain
#chain = ROOT.TChain(tree_name)
#tot_file = len(inputFileName_list)
#files_in_the_chain = int(0)

#add file to the chain
#for inputFileName in inputFileName_list:
#    print("Adding {} to the chain...".format(inputFileName))
#    chain.Add(inputFileName)
#    files_in_the_chain += 1
#    print("[{}/{}] files added to the chain".format(files_in_the_chain,tot_file))
#    print("{} entries now in the chain".format(chain.GetEntries()))

#print("\n")
#print("{} total entries ...".format(chain.GetEntries()))