In [2]:
import math
from math import pi
import random
import glob

import tqdm
from tqdm.notebook import tqdm_notebook
import time

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn

import dask
import dask.dataframe as dd
from dask.distributed import Client
from dask.distributed import get_client
from dask.distributed import wait

import uproot
import awkward as ak
import vector
vector.register_awkward()

from ROOT import TCanvas, TH1F, TLegend, TFile

## Checking, if the voms proxy is set

Set the valid proxy, as we are going to access the data using xrootd

In [3]:
!voms-proxy-info


subject   : /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=piperov/CN=422973/CN=Stefcho Piperov/CN=3014286933
issuer    : /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=piperov/CN=422973/CN=Stefcho Piperov
identity  : /DC=ch/DC=cern/OU=Organic Units/OU=Users/CN=piperov/CN=422973/CN=Stefcho Piperov
type      : RFC compliant proxy
strength  : 1024 bits
path      : /home/spiperov/x509up_u638764
timeleft  : 167:07:38


## Connecting to the dask cluster

Please follow the instructions carefully for setting up the dask cluster and pass the ip address and port number in the below cell

Please note the ip-address and the port for the dashboard of the scheduler and connect a browser there to monitor your dask cluster


In [5]:
node_ip = "128.211.149.206"  ## this is the ip-address of the cluster
slurm_port = "40696"        ## port to communicate to the scheduler
client = Client(f"{node_ip}:{slurm_port}")
print(f"Connected to cluster!")
print(client)



Connected to cluster!
<Client: 'tcp://128.211.149.206:40696' processes=10 threads=10, memory=36.30 GiB>


## Reading input files using xrootd

It might be useful to add this line to your bash initialization files

`export XRD_REQUESTTIMEOUT=300`

In [6]:
xrootdserver = 'root://xrootd.rcac.purdue.edu/'


files_dy =    !xrdfs root://xrootd.rcac.purdue.edu/ ls -R /store/mc/RunIISummer20UL18NanoAODv2/DYJetsToLL_M-50_TuneCP5_13TeV-madgraphMLM-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/|grep '.root$'

files_tt =    !xrdfs root://xrootd.rcac.purdue.edu/ ls -R /store/mc/RunIISummer20UL18NanoAODv2/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/|grep '.root$'

files_data =  !xrdfs root://xrootd.rcac.purdue.edu/ ls -R /store/data/Run2018D/DoubleMuon/NANOAOD/UL2018_MiniAODv2_NanoAODv9_GT36-v1/|grep '.root$'  

all_files = {
    "dy": [xrootdserver+ str(f) for f in files_dy],
    "tt": [xrootdserver+ str(f) for f in files_tt],
    "data": [xrootdserver+ str(f) for f in files_data]
}


max_events = 5000

## Set max_events to -1 to run over entire dataset


In [None]:
##if the files are present locally, you can read directly from the filesystem

# all_files = []
# path = "/mnt/hadoop/store/mc/RunIISummer20UL18NanoAODv2/DYJetsToLL_M-50_TuneCP5_13TeV-madgraphMLM-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/50000/"
# for f in glob.glob(path+"*.root"):
#     all_files.append(f)
    
# all_files1 = []
# path = "/mnt/hadoop//store/mc/RunIISummer20UL18NanoAODv2/TTTo2L2Nu_TuneCP5_13TeV-powheg-pythia8/NANOAODSIM/106X_upgrade2018_realistic_v15_L1v1-v1/270000/"
# for f in glob.glob(path+"*.root"):
#     all_files1.append(f)
    
# all_files2 = []
# path = "/mnt/hadoop//store/data/Run2018D/DoubleMuon/NANOAOD/UL2018_MiniAODv2_NanoAODv9_GT36-v1/2820000/"
# for f in glob.glob(path+"*.root"):
#     all_files2.append(f)
    


##  Lets apply some selections

Here we are selecting two oppositely charged muons and saving the information in dask dataframes.

In [7]:
def processor(file):
    import awkward as ak
    import vector
    vector.register_awkward()
    

    tree = uproot.open(file)["Events"].arrays(entry_start=0, entry_stop=max_events)

    muons = ak.Array(
        ak.zip(
            {
                "nmu": tree["nMuon"],
                "pt": tree["Muon_pt"],
                "eta": tree["Muon_eta"],
                "charge": tree["Muon_charge"],
                "id": tree["Muon_isGlobal"],
                "phi": tree["Muon_phi"],
                "mass": tree["Muon_mass"],
                "met": tree["MET_pt"]
            }
        )
    )

    
    muon_mask = (muons.id==1) & (muons.pt > 20) & (abs(muons.eta)<2.4)
    good_muons = muons[muon_mask]
    two_muons = good_muons[(ak.sum(muon_mask, axis=-1)==2)]
    opp_muons = two_muons.charge[:,0] != two_muons.charge[:,1]

    two_opp_good_muons = two_muons[opp_muons]

    mu_p4 = ak.Array(
        ak.zip(
            {
                "pt":two_opp_good_muons.pt,
                "eta":two_opp_good_muons.eta,
                "phi":two_opp_good_muons.phi,
                "mass":two_opp_good_muons.mass
            }
        ), with_name = "Momentum4D"
    )

    dimuon_p4 = mu_p4[:,0] + mu_p4[:,1]

    mu_cols = ["pt", "eta", "phi"]
    
    ## Converting awkward arrays to panda dataframes
    df_mu1 = ak.to_pandas(two_opp_good_muons[:,0][mu_cols])
    df_mu1 = df_mu1.add_prefix('mu1_')
    
    df_mu2 = ak.to_pandas(two_opp_good_muons[:,1][mu_cols])
    df_mu2 = df_mu2.add_prefix('mu2_')
    
    df_met = pd.DataFrame(two_opp_good_muons.met[:,0], columns=['MET'])

    df = pd.concat([df_mu1, df_mu2, df_met], axis=1)
    df["dimuon_mass"] = dimuon_p4.M
    
    df = df.astype(float)
    
    ## Converting the panda dataframes to dask dataframes to distribute to workers
    
    return dd.from_pandas(df, npartitions=1)





## Sending the task to the worker nodes

Here we are sending the task using dask distributor to the worker nodes to apply selections on the dataset

In [None]:
futures = {}
for dataset, paths in all_files.items():
    futures[dataset] = client.map(processor, paths)

for dataset, paths in all_files.items():
    if(wait(futures[dataset])):
        print(f"{dataset}: DONE")



----------
We are gathering the output of the previous step from each worker node.
Remember: The output is stored as dask dataframes


In [None]:
results = {}

for key, future in futures.items():
    results[key] = dd.concat(client.gather(future))

   

## Plotting the invariant mass of the two muons

While plotting, the dask dataframes will be computed to get the real values


In [None]:
'''Plot dimuon invariant mass'''
plt.figure(figsize=(5,4))


for key, res in results.items():
    mass = res.dimuon_mass
    
    if(key=='dy'):
        plt.hist(mass, bins=150, range=[0, 500], histtype="step",linewidth=2, color='blue', label='DY+Jets')
    elif(key == 'tt'):
        plt.hist(mass, bins=150, range=[0,500], histtype='step',linewidth=2, color='orange', label='ttbar')
    else:
        n, bins, patches = plt.hist(mass, bins=150, range=[0,500], histtype='step',linewidth=0)

        
errory = np.sqrt(n)
plt.errorbar(np.linspace(0,500,150), n,yerr= errory, fmt='o', markersize=3, color='k', label='Data')
    
plt.title('Dimuon invariant mass')
plt.xlabel('Dimuon invariant mass [GeV]')
plt.ylabel('Events')
plt.yscale('log')
plt.legend()
plt.savefig(f"dimuon_mass_parallel.pdf")
plt.show()
plt.clf()



# Training a DNN using PyTorch to select the Drell-Yan events 

In [None]:
load_features = ['mu1_pt', 'mu1_eta', 'mu2_pt', 'mu2_eta', 'dimuon_mass','MET']

df_sig = results['dy'][load_features]
df_bkg = results['tt'][load_features]

df_sig["label"] = 1.0
df_bkg["label"] = 0.0

dataset = dd.concat([df_sig, df_bkg], ignore_index=True)

##Sampling the training and validation data

train_data, val_data = dataset.random_split([0.7, 0.3], shuffle=True)


train_labels = train_data["label"].compute().values
val_labels = val_data["label"].compute().values

train = train_data.drop(columns=["label"]).compute()
train = train.dropna().values

val = val_data.drop(columns=["label"]).compute()
val = val.dropna().values

train_size = len(train_data)


In [None]:
'''Lets train a network and apply some selections'''

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print("****Model will run on", device)

# Seed
seed = 123
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

class NeuralNet(nn.Module):
    def __init__(self, input_size, hidden_sizes, num_classes):
        super(NeuralNet, self).__init__()
        layers = []
        layers.append(nn.Linear(input_size, hidden_sizes[0]))
        layers.append(nn.ReLU())
        for i in range(len(hidden_sizes)-1):

            layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
            layers.append(nn.ELU())
            #layers.append(nn.Dropout(p=0.1))

        layers.append(nn.Linear(hidden_sizes[-1], num_classes))
        layers.append(nn.Sigmoid())
        self.layers = nn.ModuleList(layers)

    def get_weights(self):
        return self.weight    
    def forward(self, x):
        out = self.layers[0](x)
        for i in range(1, len(self.layers)):
            out = self.layers[i](out)

        return out


In [None]:
input_size = len(load_features)
hidden_sizes = [16, 8]
learning_rate = 0.001
num_classes = 1
num_epochs =  10
batch_size =  512


model = NeuralNet(input_size, hidden_sizes, num_classes).to(device)
model = model.float()
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
decayRate = 0.6
my_lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=decayRate)

total_step = train_size

print("RUNNING... ")
pbar = None
pbar = tqdm.tqdm(range(num_epochs), position=0, leave=True, colour="green")
for epoch in pbar:
    mean_loss = 0
    tot_wgt = 0
    val_mean_loss = 0
    val_tot_wgt = 0
    for i in range(int(train_size/batch_size)):
        # Move tensors to the configured device
        data = torch.from_numpy(train[i*batch_size: (i+1)*batch_size]).to(device)
        label = torch.from_numpy(train_labels[i*batch_size: (i+1)*batch_size].reshape((batch_size,1))).to(device)

        outputs = model(data.float())
        loss = criterion(outputs, label.float())
        weight_loss = loss

        # Backward and optimize
        optimizer.zero_grad()
        weight_loss.mean().backward()
        optimizer.step()
        mean_loss += weight_loss.mean().item()*batch_size
        if i%4 == 0:
            j = int(i/4)
            val_data = torch.from_numpy(val[j*batch_size: (j+1)*batch_size]).to(device)
            val_label = torch.from_numpy(val_labels[j*batch_size: (j+1)*batch_size].reshape(val_labels[j*batch_size: (j+1)*batch_size].shape[0],1)).to(device)
            val_outputs = model(val_data.float())


            val_loss = criterion(val_outputs, val_label.float())

        if (i+1) % 150 == 0:
            #print ('Epoch [{}/{}], Step [{}/{}], Train Loss: {:.4f}, Val Loss: {:.4f}'
            #       .format(epoch+1, num_epochs, i+1, int(total_step/batch_size), mean_loss, val_loss))
            mean_loss=0
            tot_wgt=0
            val_mean_loss=0
            val_tot_wgt=0
    my_lr_scheduler.step()
    pbar.set_postfix({"val_loss": val_loss.item()})
    #pbar.update()


torch.save(model.state_dict(), 'model.ckpt')

'''The model has been saved. Lets now evaluate the results'''

## Evaluating the model output

Training of the model is done locally as it needs the full chunk of training data, but the evaluation can be done on the worker nodes by sending model on each worker node and then getting the output

In [None]:
'''Loading the model'''

df_sig = results['dy'][load_features]
df_bkg = results['tt'][load_features]
df_data = results['data'][load_features]


def inference(df):
    device = torch.device('cpu')
    model = NeuralNet(6, [16, 8], 1).to(device)
    model.load_state_dict(torch.load("model.ckpt", map_location=torch.device('cpu')))
    model.eval()
    df = torch.from_numpy(df.compute().values).to(device).float()
    scores = model(df) 
    scores = scores.cpu().detach().numpy()
    return scores.ravel()  

##As the evaluation data is big, first the data is scattered on the nodes and then evaluation
## is performed using dask

scattered_data = client.scatter([df_sig, df_bkg, df_data])
futures = client.map(inference, scattered_data)
dnn_sig, dnn_bkg, dnn_data = client.gather(futures)


In [None]:
'''Checking the DNN Score'''
bins = np.linspace(0, 1, 100)
plt.figure(figsize=(5,4))


plt.hist(dnn_sig, bins, alpha=0.3, label='sig')
plt.hist(dnn_bkg, bins, alpha=0.3, label='bkg')
plt.xlabel('DNN Score')
plt.ylabel('Events')
plt.legend(loc='upper left')


## Lets check the dimuon mass after selection with DNN based discriminator

In [None]:
DNN_CUT = 0.5

dy_mass_dnn = df_sig["dimuon_mass"].compute()[dnn_sig>DNN_CUT]
tt_mass_dnn = df_bkg["dimuon_mass"].compute()[dnn_bkg>DNN_CUT]
data_mass_dnn = df_data["dimuon_mass"].compute()[dnn_data>DNN_CUT]


'''Plot dimuon invariant mass'''
plt.figure(figsize=(5,4))

plt.hist(dy_mass_dnn, bins=150, range=[0,500], histtype='step',linewidth=2, color='blue', label='DY+Jets')
plt.hist(tt_mass_dnn, bins=150, range=[0,500], histtype='step',linewidth=2, color='orange', label='ttbar')
n, bins, patches = plt.hist(data_mass_dnn, bins=150, range=[0,500], histtype='step',linewidth=0)

errory = np.sqrt(n)
plt.errorbar(np.linspace(0,500,150), n,yerr= errory, fmt='o', markersize=3, color='k', label='Data')

plt.title('Dimuon invariant mass')
plt.xlabel('Dimuon invariant mass [GeV]')
plt.ylabel('Events')
#plt.xscale('log')
plt.yscale('log')
plt.legend()
plt.savefig(f"dimuon_mass_dnncut_parallel.pdf")
plt.show()
plt.clf()

## We see that the selected data events are populated with Drell-Yan events