In [1]:
import uproot
import yaml
import json
import numpy as np
import awkward as ak
from tqdm.notebook import tqdm
import multiprocessing as mp
import time
import os
import shutil

from pprint import pprint

# How do we construct a generic dataset processor for vectors of variaous lengths and scalars?

## Create a dictionary of feature names and length of the feature and save as a yaml

In [2]:
features = {
    "Jet_set": {
        "subfeatures": ["Jet_pt", "Jet_eta", "Jet_phi", "Jet_mass", "Jet_btagCSVV2"],
        "set": True
    },
    "FatJet_set": {
        "subfeatures": ["FatJet_pt", "FatJet_eta", "FatJet_phi", "FatJet_mass", "FatJet_btagCSVV2"],
        "set": True
    },
    "Electron_set": {
        "subfeatures": ["Electron_pt", "Electron_eta", "Electron_phi", "Electron_mass"],
        "set": True
    },
    "Muon_set": {
        "subfeatures": ["Muon_pt", "Muon_eta", "Muon_phi", "Muon_mass"],
        "set": True
    },
    "Photon_set": {
        "subfeatures": ["Photon_pt", "Photon_eta", "Photon_phi", "Photon_mass"],
        "set": True
    },
    "MET_sumEt": {
        "subfeatures": ["MET_sumEt"],
        "set": False
    }
}

weight_features = ["genWeight", "btagWeight_CSVV2"]

yaml.dump(features, open("carl_features.yaml", 'w'))

In [3]:
uproot.open("https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19980_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1_00000_0000.root")["Events"].keys()

['run',
 'luminosityBlock',
 'event',
 'HTXS_Higgs_pt',
 'HTXS_Higgs_y',
 'HTXS_stage1_1_cat_pTjet25GeV',
 'HTXS_stage1_1_cat_pTjet30GeV',
 'HTXS_stage1_1_fine_cat_pTjet25GeV',
 'HTXS_stage1_1_fine_cat_pTjet30GeV',
 'HTXS_stage1_2_cat_pTjet25GeV',
 'HTXS_stage1_2_cat_pTjet30GeV',
 'HTXS_stage1_2_fine_cat_pTjet25GeV',
 'HTXS_stage1_2_fine_cat_pTjet30GeV',
 'HTXS_stage_0',
 'HTXS_stage_1_pTjet25',
 'HTXS_stage_1_pTjet30',
 'HTXS_njets25',
 'HTXS_njets30',
 'btagWeight_CSVV2',
 'btagWeight_DeepCSVB',
 'CaloMET_phi',
 'CaloMET_pt',
 'CaloMET_sumEt',
 'ChsMET_phi',
 'ChsMET_pt',
 'ChsMET_sumEt',
 'nCorrT1METJet',
 'CorrT1METJet_area',
 'CorrT1METJet_eta',
 'CorrT1METJet_muonSubtrFactor',
 'CorrT1METJet_phi',
 'CorrT1METJet_rawPt',
 'DeepMETResolutionTune_phi',
 'DeepMETResolutionTune_pt',
 'DeepMETResponseTune_phi',
 'DeepMETResponseTune_pt',
 'nElectron',
 'Electron_deltaEtaSC',
 'Electron_dr03EcalRecHitSumEt',
 'Electron_dr03HcalDepth1TowerSumEt',
 'Electron_dr03TkSumPt',
 'Electron_dr03T

## Get the relevant metadata

In [4]:
data_metadata_dict = {}
with open("carl_data_metadata.json", 'r') as f:
    data_metadata_dict = json.load(f)
    max_data_index = data_metadata_dict["max_data_index"]
    max_jet_size = data_metadata_dict["max_jet_size"]
    max_electron_size = data_metadata_dict["max_electron_size"]
    max_muon_size = data_metadata_dict["max_muon_size"]

## Load the filenames

In [5]:
with open("nanoaod_inputs.json", 'r') as f:
    file_dict = json.load(f)

In [6]:
all_nominal_files = [x["path"] for x in file_dict["ttbar"]["nominal"]["files"]]
print(len(all_nominal_files), all_nominal_files[0])
np.random.shuffle(all_nominal_files)
all_nominal_files[0]

243 https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19980_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1_00000_0000.root


'https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19981_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext4-v1_00000_0033.root'

In [7]:
all_variation_files = [x["path"] for x in file_dict["ttbar"]["PS_var"]["files"]]
print(len(all_variation_files), all_variation_files[0])
np.random.shuffle(all_variation_files)
all_variation_files[0]

15 https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneEE5C_13TeV-powheg-herwigpp/cmsopendata2015_ttbar_19999_PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1_10000_0000.root


'https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneEE5C_13TeV-powheg-herwigpp/cmsopendata2015_ttbar_19999_PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1_10000_0002.root'

## Define the training, validation, and testing splits

In [8]:
train_frac = 0.6
val_frac = 0.2
test_frac = 0.2

max_train_index = int(train_frac * max_data_index)
max_val_index = max_train_index + int(val_frac * max_data_index)
max_test_index = max_val_index + int(test_frac * max_data_index)

np.random.shuffle(all_nominal_files)
np.random.shuffle(all_variation_files)
print(max_train_index, max_val_index, max_test_index)

11602238 15469650 19337062


## Load the features and define useful functions

In [9]:
features = yaml.load(open("carl_features.yaml", 'r'), Loader=yaml.CLoader)
pprint(features)

{'Electron_set': {'set': True,
                  'subfeatures': ['Electron_pt',
                                  'Electron_eta',
                                  'Electron_phi',
                                  'Electron_mass']},
 'FatJet_set': {'set': True,
                'subfeatures': ['FatJet_pt',
                                'FatJet_eta',
                                'FatJet_phi',
                                'FatJet_mass',
                                'FatJet_btagCSVV2']},
 'Jet_set': {'set': True,
             'subfeatures': ['Jet_pt',
                             'Jet_eta',
                             'Jet_phi',
                             'Jet_mass',
                             'Jet_btagCSVV2']},
 'MET_sumEt': {'set': False, 'subfeatures': ['MET_sumEt']},
 'Muon_set': {'set': True,
              'subfeatures': ['Muon_pt', 'Muon_eta', 'Muon_phi', 'Muon_mass']},
 'Photon_set': {'set': True,
                'subfeatures': ['Photon_pt',
                         

In [10]:
def fill_or_extend_tree(datafile, tree_dict, treename="Events"):
    if datafile.get(treename) is None:
        datafile[treename] = tree_dict
    else:
        datafile[treename].extend(tree_dict)
    return None

In [11]:
def build_data_dict(arrays, split_index=None, split_low=False, split_high=False):
    features = arrays.fields
    if split_low is False and split_high is False:
        data_dict = dict(zip(features, ak.unzip(arrays)))
    elif split_high is True:
        data_dict = dict(zip(features, ak.unzip(arrays[:split_index])))
    elif split_low is True:
        data_dict = dict(zip(features, ak.unzip(arrays[split_index:])))
    return data_dict

In [12]:
def update_dataset(lock, current_index, filename, train_pair, val_pair, test_pair, features, weight_features, chunk_size=100000):
    df_train_name, max_train_index = train_pair
    df_val_name, max_val_index = val_pair
    df_test_name, max_test_index = test_pair

    df_train = uproot.recreate(df_train_name)
    df_val = uproot.recreate(df_val_name)
    df_test = uproot.recreate(df_test_name)
    
    lock.acquire()
    try:
        print("Loading file: {}".format(filename))
        time.sleep(1e-8)
        if current_index.value >= max_test_index:
            print("Finished file: {}".format(filename))
            time.sleep(1e-8)
            return None
    finally:
        lock.release()
        
    # Load the data
    nominal_dataset = uproot.open(filename)["Events"]
    filesize = int(nominal_dataset.arrays(weight_features[0]).type.length)
    for i in range(int(np.ceil(filesize / chunk_size))):
        # Get the run number
        #nominal_file_run_number = str(NOMINAL_FILE_TO_RUN_NUMBER[filename])
        # Get the DSID number
        #nominal_file_dsid = str(NOMINAL_FILE_TO_DSID[filename])
        # Get the luminsotiy, DSID cross section, and per DSID total weighted events
        #_scale_factor = luminosities_by_run[nominal_file_run_number] * NOMINAL_XSECTIONS[nominal_file_dsid] / NOMINAL_NORMALIZATIONS[nominal_file_dsid]
        _scale_factor = 3378 * 1 / 1
        # Extract the combined weight array
        weight_arr = nominal_dataset.arrays(weight_features, entry_start=int(i * chunk_size), entry_stop=int((i+1) * chunk_size))
        _weights = ak.concatenate(ak.unzip(weight_arr[weight_features][:, np.newaxis]), axis=1).to_numpy().prod(axis=1)

        current_data_size = len(_weights)

        nominal_standard_features = [feat for feat in features.keys() if features[feat]["set"] is False]
        nominal_set_features = [feat for feat in features.keys() if features[feat]["set"] is True]
        array_dict = {}
        for feat in nominal_standard_features:
            arr = nominal_dataset.arrays(features[feat]["subfeatures"], entry_start=int(i * chunk_size), entry_stop=int((i+1) * chunk_size))
            array_dict[feat] = ak.concatenate(ak.unzip(weight_arr[weight_features][:, np.newaxis]), axis=1).to_numpy()
        if len(nominal_set_features) > 0:
            total_subfeatures = []
            for feat in nominal_set_features:
                total_subfeatures.extend(features[feat]["subfeatures"])
                array_dict[feat] = []
            arr = nominal_dataset.arrays(total_subfeatures, entry_start=int(i * chunk_size), entry_stop=int((i+1) * chunk_size))
            for j in range(current_data_size):
                for feat in nominal_set_features:
                    array_dict[feat].append(
                        np.concatenate([x.to_numpy()[:, np.newaxis] for x in ak.unzip(arr[features[feat]["subfeatures"]][j])], axis=1).flatten()
                    )
        nominal_arr = ak.Array(array_dict)

        # Now we lock the state so that only one process writes data and updates the index at a time
        # This is okay since the time consuming process is preprocessing the data
        lock.acquire()
        try:
            # put everything in train
            if current_index.value + current_data_size < max_train_index:
                data_dict = build_data_dict(nominal_arr)
                data_dict["weight_mc_combined"] = _weights * _scale_factor
                fill_or_extend_tree(df_train, data_dict)
            # put part in train and the rest in val
            elif current_index.value < max_train_index and current_index.value + current_data_size < max_val_index:
                split_index = max_train_index - current_index.value
                data_dict = build_data_dict(nominal_arr, split_index=split_index, split_high=True)
                data_dict["weight_mc_combined"] = _weights[:split_index] * _scale_factor
                fill_or_extend_tree(df_train, data_dict)
    
                data_dict = build_data_dict(nominal_arr, split_index=split_index, split_low=True)
                data_dict["weight_mc_combined"] = _weights[split_index:] * _scale_factor
                fill_or_extend_tree(df_val, data_dict)
            # put everything into val
            elif current_index.value >= max_train_index and current_index.value + current_data_size < max_val_index:
                data_dict = build_data_dict(nominal_arr)
                data_dict["weight_mc_combined"] = _weights * _scale_factor
                fill_or_extend_tree(df_val, data_dict)
            # put part in val and the rest in test
            elif current_index.value < max_val_index and current_index.value + current_data_size < max_test_index:
                split_index = max_val_index - current_index.value
                data_dict = build_data_dict(nominal_arr, split_index=split_index, split_high=True)
                data_dict["weight_mc_combined"] = _weights[:split_index] * _scale_factor
                fill_or_extend_tree(df_val, data_dict)
    
                data_dict = build_data_dict(nominal_arr, split_index=split_index, split_low=True)
                data_dict["weight_mc_combined"] = _weights[split_index:] * _scale_factor
                fill_or_extend_tree(df_test, data_dict)
            # put everything into test
            elif current_index.value >= max_val_index and current_index.value + current_data_size < max_test_index:
                data_dict = build_data_dict(nominal_arr)
                data_dict["weight_mc_combined"] = _weights * _scale_factor
                fill_or_extend_tree(df_test, data_dict)
            # put what's needed into test and ignore the rest
            elif current_index.value < max_test_index and current_index.value + current_data_size >= max_test_index:
                split_index = max_test_index - current_index.value
                data_dict = build_data_dict(nominal_arr, split_index=split_index, split_high=True)
                data_dict["weight_mc_combined"] = _weights[:split_index] * _scale_factor
                fill_or_extend_tree(df_test, data_dict)
                current_data_size = split_index
            # If we reached the limit before finishing preprocessing or waiting for the lock
            elif  current_index.value >= max_test_index:
                break
            else:
                print("Uncaught case:",
                      "current_index: {}".format(current_index.value),
                      "current_data_size: {}".format(current_data_size),
                      "max_train_index: {}".format(max_train_index),
                      "max_val_index: {}".format(max_val_index),
                      "max_test_index: {}".format(max_test_index), sep='\n')
    
            current_index.value += current_data_size
            print("Completed: {:.2f}%".format(current_index.value * 100 / max_test_index))
            time.sleep(1e-8)
        finally:
            lock.release()
        
        if current_index.value >= max_test_index:
            break
    lock.acquire()
    try:
        print("Finished file: {}".format(filename))
        time.sleep(1e-8)
    finally:
        lock.release()

## Use multiprocessing to efficiently generate a combined dataset

In [27]:
chunk_size=10000

if not os.path.exists("/data/mdrnevich/AGC/temp"): os.mkdir("/data/mdrnevich/AGC/temp")

# Create all of the subprocesses and have them write to their subfiles
nom_lock = mp.Lock()
nom_current_index = mp.Value('i', 0)
num_processes = 25
for i in range(int(np.ceil(len(all_nominal_files) / num_processes)) + 1):
    nom_processes = []
    for j, name in enumerate(all_nominal_files[int(num_processes*i):int(num_processes*(i+1))]):
        nom_processes.append(mp.Process(target=update_dataset, args=(nom_lock, nom_current_index, name, ("/data/mdrnevich/AGC/temp/CMS_ttbar_nominal_DeepSets_training_data_{}.root".format(i*num_processes+j), max_train_index),
                                                                                                        ("/data/mdrnevich/AGC/temp/CMS_ttbar_nominal_DeepSets_validation_data_{}.root".format(i*num_processes+j), max_val_index),
                                                                                                        ("/data/mdrnevich/AGC/temp/CMS_ttbar_nominal_DeepSets_testing_data_{}.root".format(i*num_processes+j), max_test_index),
                                                                     features, weight_features), kwargs=dict(chunk_size=chunk_size)))
        nom_processes[-1].start()
    # Wait for them to finish before starting new ones
    for p in nom_processes:
        p.join()

print(nom_current_index.value, max_test_index)

Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19980_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1_70000_0006.root
Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19981_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext4-v1_00000_0007.root
Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19980_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1_60000_0027.root
Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19980_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext3-v1_70000_0019.root

Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneCUETP8M1_13TeV-powheg-pythia8/cmsopendata2015_ttbar_19980_PU25nsData2015v1_76X_mcRun2_asymptotic_v12_ext

In [28]:
nom_dfs = [uproot.recreate("/data/mdrnevich/AGC/CMS_ttbar_nominal_DeepSets_training_data_new.root"),
           uproot.recreate("/data/mdrnevich/AGC/CMS_ttbar_nominal_DeepSets_validation_data_new.root"),
           uproot.recreate("/data/mdrnevich/AGC/CMS_ttbar_nominal_DeepSets_testing_data_new.root")]

update_chunk_size = 100000
# Pool the results into a combined dataset
for i in tqdm(range(len(all_nominal_files))):
    try:
        temp_dfs = [uproot.open("/data/mdrnevich/AGC/temp/CMS_ttbar_nominal_DeepSets_training_data_{}.root".format(i)),
                    uproot.open("/data/mdrnevich/AGC/temp/CMS_ttbar_nominal_DeepSets_validation_data_{}.root".format(i)),
                    uproot.open("/data/mdrnevich/AGC/temp/CMS_ttbar_nominal_DeepSets_testing_data_{}.root".format(i))]
    except FileNotFoundError:
        continue

    for j in range(len(nom_dfs)):
        try:
            temp_ds = temp_dfs[j]["Events"]
        except uproot.KeyInFileError:
            continue
        filesize = int(temp_ds.arrays("weight_mc_combined").type.length)
        for k in range(int(np.ceil(filesize / update_chunk_size))):
            arr = temp_ds.arrays(list(features.keys()) + ["weight_mc_combined"], entry_start=int(k * update_chunk_size), entry_stop=int((k+1) * update_chunk_size))
            fill_or_extend_tree(nom_dfs[j], build_data_dict(arr))
shutil.rmtree("/data/mdrnevich/AGC/temp")

  0%|          | 0/243 [00:00<?, ?it/s]

## Now do it for the alternative sample

In [15]:
chunk_size=10000

if not os.path.exists("/data/mdrnevich/AGC/temp"): os.mkdir("/data/mdrnevich/AGC/temp")

# Create all of the subprocesses and have them write to their subfiles
alt_lock = mp.Lock()
alt_current_index = mp.Value('i', 0)
alt_processes = []
for i, name in enumerate(all_variation_files):
    alt_processes.append(mp.Process(target=update_dataset, args=(alt_lock, alt_current_index, name, ("/data/mdrnevich/AGC/temp/CMS_ttbar_PS_var_DeepSets_training_data_{}.root".format(i), max_train_index),
                                                                                                    ("/data/mdrnevich/AGC/temp/CMS_ttbar_PS_var_DeepSets_validation_data_{}.root".format(i), max_val_index),
                                                                                                    ("/data/mdrnevich/AGC/temp/CMS_ttbar_PS_var_DeepSets_testing_data_{}.root".format(i), max_test_index),
                                                                 features, weight_features), kwargs=dict(chunk_size=chunk_size)))
    alt_processes[-1].start()

# Wait for them all to finish
for p in alt_processes:
    p.join()

print(alt_current_index.value, max_test_index)

Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneEE5C_13TeV-powheg-herwigpp/cmsopendata2015_ttbar_19999_PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1_10000_0010.root

Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneEE5C_13TeV-powheg-herwigpp/cmsopendata2015_ttbar_19999_PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1_10000_0011.rootLoading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneEE5C_13TeV-powheg-herwigpp/cmsopendata2015_ttbar_19999_PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1_10000_0012.root
Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneEE5C_13TeV-powheg-herwigpp/cmsopendata2015_ttbar_19999_PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1_10000_0001.root
Loading file: https://xrootd-local.unl.edu:1094//store/user/AGC/nanoAOD/TT_TuneEE5C_13TeV-powheg-herwigpp/cmsopendata2015_ttbar_19999_PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1_10000_0008.root
Loading file: https:

In [16]:
alt_dfs = [uproot.recreate("/data/mdrnevich/AGC/CMS_ttbar_PS_var_DeepSets_training_data_new.root"),
           uproot.recreate("/data/mdrnevich/AGC/CMS_ttbar_PS_var_DeepSets_validation_data_new.root"),
           uproot.recreate("/data/mdrnevich/AGC/CMS_ttbar_PS_var_DeepSets_testing_data_new.root")]

update_chunk_size = 100000
# Pool the results into a combined dataset
for i in tqdm(range(len(all_variation_files))):
    try:
        temp_dfs = [uproot.open("/data/mdrnevich/AGC/temp/CMS_ttbar_PS_var_DeepSets_training_data_{}.root".format(i)),
                    uproot.open("/data/mdrnevich/AGC/temp/CMS_ttbar_PS_var_DeepSets_validation_data_{}.root".format(i)),
                    uproot.open("/data/mdrnevich/AGC/temp/CMS_ttbar_PS_var_DeepSets_testing_data_{}.root".format(i))]
    except FileNotFoundError:
        continue

    for j in range(len(alt_dfs)):
        try:
            temp_ds = temp_dfs[j]["Events"]
        except uproot.KeyInFileError:
            continue
        filesize = int(temp_ds.arrays("weight_mc_combined").type.length)
        for k in range(int(np.ceil(filesize / update_chunk_size))):
            arr = temp_ds.arrays(list(features.keys()) + ["weight_mc_combined"], entry_start=int(k * update_chunk_size), entry_stop=int((k+1) * update_chunk_size))
            fill_or_extend_tree(alt_dfs[j], build_data_dict(arr))

shutil.rmtree("/data/mdrnevich/AGC/temp")

  0%|          | 0/15 [00:00<?, ?it/s]

## Verify the files look good

In [31]:
nom_dfs = [uproot.open("/data/mdrnevich/AGC/CMS_ttbar_nominal_DeepSets_training_data_new.root"),
           uproot.open("/data/mdrnevich/AGC/CMS_ttbar_nominal_DeepSets_validation_data_new.root"),
           uproot.open("/data/mdrnevich/AGC/CMS_ttbar_nominal_DeepSets_testing_data_new.root")]

alt_dfs = [uproot.open("/data/mdrnevich/AGC/CMS_ttbar_PS_var_DeepSets_training_data_new.root"),
           uproot.open("/data/mdrnevich/AGC/CMS_ttbar_PS_var_DeepSets_validation_data_new.root"),
           uproot.open("/data/mdrnevich/AGC/CMS_ttbar_PS_var_DeepSets_testing_data_new.root")]

In [39]:
pprint(np.cumsum([df["Events"].num_entries for df in nom_dfs])), pprint(np.cumsum([df["Events"].num_entries for df in alt_dfs]));

array([11602238, 15469650, 19337062])
array([11602238, 15469650, 19337062])


In [34]:
max_train_index, max_val_index, max_test_index

(11602238, 15469650, 19337062)