In [3]:
import yaml
import pickle

with open("default_complete.yaml", 'r') as run_config: 
    data_config = yaml.load(run_config) 
with open("data/database_ml_parameters.yml", 'r') as param_config: 
    data_param = yaml.load(param_config) 
with open("data/config_model_parameters.yml", 'r') as mod_config: 
    data_model = yaml.load(mod_config) 
mcordata = "data"
indexp = 0
case = data_config["case"]
param_case = data_param[case]

In [5]:
from machine_learning_hep.listfiles import list_files_dir_lev2, list_files_lev2 
import multiprocessing as mp
import uproot

class Processer:
    # Class Attribute
    species = 'processer'

    # Initializer / Instance Attributes
    def __init__(self, datap, mcordata, indexp):
        
        self.n_reco = datap["files_names"]["namefile_reco"]
        self.n_recosk = datap["files_names"]["namefile_reco_skim"]
        self.n_root = datap["files_names"]["namefile_unmerged_tree"]
        self.n_treereco = datap["files_names"]["treeoriginreco"]
        
        self.v_all = datap["variables"]["var_all"]
        
        self.s_reco_unp = datap["skimming_sel"]
        self.s_reco_skim = datap["skimming2_sel"]
        
        self.d_root = datap["inputs"][mcordata]["unmerged_tree_dir"][indexp]
        self.d_pkl = datap["output_folders"]["pkl_out"][mcordata][indexp]
        self.d_pklsk = datap["output_folders"]["pkl_skimmed"][mcordata][indexp]
        
        self.l_pkl = None
        self.l_pklsk = None
        self.l_root = None
        self.maxperchunk = 30
        
        self.indexsample = None
        
    def set_maxperchunk(self, maxperchunk):
        self.maxperchunk = maxperchunk
    
    def buildlist_root(self):
        self.l_root, self.l_pkl = list_files_dir_lev2(self.d_root, self.d_pkl, self.n_root, self.n_reco)

    def buildlist_pkl(self):
        self.l_pkl, self.l_pklsk = list_files_dir_lev2(self.d_pkl, self.d_pklsk, self.n_reco, self.n_recosk)

    def unpack(self, filein, fileout):
        tree = uproot.open(filein)[self.n_treereco]
        df = tree.pandas.df(branches=self.v_all)
        df = df.query(self.s_reco_unp)
        df.to_pickle(fileout)

    def skim(self, filein, fileout):
        df = pickle.load(open(filein, "rb"))
        df = df.query(self.s_reco_skim)
        df.to_pickle(fileout)
    
    def parallelizer(self, function, argument_list):
        chunks = [argument_list[x:x+self.maxperchunk] for x in range(0, len(argument_list), self.maxperchunk)]
        for chunk in chunks:
            pool = mp.Pool(self.maxperchunk)
            _ = [pool.apply(function,args=chunk[i]) for i in range(len(chunk))] 
            pool.close()

    def unpacker(self):
        self.buildlist_root()
        arguments = [(self.l_root[i], self.l_pkl[i]) for i in range(len(self.l_pkl))]
        self.parallelizer(self.unpack, arguments)

    def skimmer(self):
        self.buildlist_pkl()
        arguments = [(self.l_pkl[i], self.l_pklsk[i]) for i in range(len(self.l_pklsk))]
        self.parallelizer(self.skim, arguments)
        
myprocess = Processer(data_param[case], mcordata, indexp)
myprocess.unpacker()
myprocess.skimmer()