In [None]:
# Uncomment and run this if you are running on Colab (remove only the "#", keep the "!").
# You can run it anyway, but it will do nothing if you have already installed all dependencies
# (and it will take some time to tell you it is not gonna do anything)


#from google.colab import drive
#drive.mount('/content/drive')
#%cd "/content/drive/MyDrive/"
#! git clone https://github.com/vischia/data_science_school_igfae2024.git
#%cd machine_learning_tutorial
#!pwd
#!ls
#!pip install livelossplot shap

In [None]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

matplotlib.rcParams['figure.figsize'] = (8, 6)
matplotlib.rcParams['axes.labelsize'] = 14

import glob
import os
import re
import math
import socket
import json
import pickle
import gzip
import copy
import array
import numpy as np
import numpy.lib.recfunctions as recfunc
from tqdm import tqdm

#from scipy.optimize import newton
#from scipy.stats import norm

import uproot

import datetime
from timeit import default_timer as timer

import sklearn
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from sklearn.metrics import roc_curve, auc, accuracy_score
from sklearn.tree import export_graphviz
from sklearn.inspection import permutation_importance
try:
    # See #1137: this allows compatibility for scikit-learn >= 0.24
    from sklearn.utils import safe_indexing
except ImportError:
    from sklearn.utils import _safe_indexing

import pandas as pd
import torchinfo
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

import torch.nn.functional as F

# Import data

We will use simulated events corresponding to three physics processes.

- ttH production
- ttW production
- Drell-Yan production

We will select the multilepton final state, which is a challenging final state with a rich structure and nontrivial background separation.

<img src="figs/2lss.png" alt="ttH multilepton 2lss" style="width:40%;"/>


In [None]:
INPUT_FOLDER = '/eos/user/c/cmsdas/2024/short-ex-mlg'
sig = uproot.open(os.path.join(INPUT_FOLDER,'signal.root'))['Friends'].arrays(library="pd")
bk1 = uproot.open(os.path.join(INPUT_FOLDER,'background_1.root'))['Friends'].arrays(library="pd")
bk2 = uproot.open(os.path.join(INPUT_FOLDER,'background_2.root'))['Friends'].arrays(library="pd")

signal = sig.drop(["Hreco_Lep2_pt", "Hreco_Lep2_eta", "Hreco_Lep2_phi", "Hreco_Lep2_mass", "Hreco_evt_tag", "Hreco_HTXS_Higgs_pt", "Hreco_HTXS_Higgs_y"], axis=1 )
bkg1 = bk1.drop(["Hreco_Lep2_pt", "Hreco_Lep2_eta", "Hreco_Lep2_phi", "Hreco_Lep2_mass", "Hreco_evt_tag","Hreco_HTXS_Higgs_pt", "Hreco_HTXS_Higgs_y"], axis=1 )
bkg2 = bk2.drop(["Hreco_Lep2_pt", "Hreco_Lep2_eta", "Hreco_Lep2_phi", "Hreco_Lep2_mass", "Hreco_evt_tag","Hreco_HTXS_Higgs_pt", "Hreco_HTXS_Higgs_y"], axis=1 )

In [None]:
# This remains the same, but you will need to call it with one-vs-another or one-vs-all inputs
def plot_rocs(scores_labels_names):
    plt.figure()
    for score, label, name  in scores_labels_names:
        fpr, tpr, thresholds = roc_curve(label, score)
        plt.plot(
            fpr, tpr, 
            linewidth=2, 
            label=f"{name} (AUC = {100.*auc(fpr, tpr): .2f} %)"
        )
    plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
    plt.grid()
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("Receiver Operating Characteristic curve")
    plt.legend(loc="lower right")
    plt.show()
    plt.close()

In [None]:
device = torch.device("cpu")

if torch.backends.mps.is_available() and torch.backends.mps.is_built():
    device = torch.device("mps")
if torch.cuda.is_available() and torch.cuda.device_count()>0:
    device = torch.device("cuda")
    
print ("Available device: ",device)

## Multiclass

Go back to the original dataset, but now assign different labels to the two backgrounds

In [None]:
signal['label'] = 2
bkg1['label'] = 1
bkg2['label'] = 0
bkg = pd.concat([bkg1, bkg2])

Now you need to apply the technique of **one-hot encoding** to convert a categorical label (=0,1,2) into a vector (one dimension per category/class):


| Sample | Categorical | One-hot encoding |
| --- | --- | --- |
| Background 2 | $0$ | $(1,0,0)$ |
| Background 1 | $1$ | $(0,1,0)$ |
| Signal | $2$ | $(0,0,1)$ |


You can use the function `one_hot = torch.nn.functional.one_hot(target)` to one-hot encode the target labels (both for signal and background)

In [None]:
#check encoding
for label in [0,1,2]:
    one_hot = torch.nn.functional.one_hot(torch.tensor(label), num_classes=3)
    print (f"Encoding label '{label}' as '{one_hot.numpy(force=True)}'")

In [None]:
data = pd.concat([signal,bkg]).sample(frac=1).reset_index(drop=True)
X = data.drop(["label"], axis=1)
y = data["label"]
    
print(f"Original label shape {y.shape}")
y = torch.nn.functional.one_hot(torch.tensor(y), num_classes=3)
y = y.to(dtype=torch.float32)
print(f"Encoded label shape {y.shape}")

X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X, y, test_size=0.33, random_state=42)
print("We have", len(X_train), "training samples and ", len(X_test), "testing samples")


Ntrain=10000
Ntest=2000
X_train = X_train[:Ntrain]
y_train = y_train[:Ntrain]
X_test = X_test[:Ntest]
y_test = y_test[:Ntest]

# Try with and without!
#from sklearn.preprocessing import StandardScaler

#fit transformation to train dataset
#scaler = StandardScaler().fit(X_train)

#apply transformation for train and test
#X_train[X_train.columns] = scaler.transform(X_train[X_train.columns])
#X_test[X_test.columns] = scaler.transform(X_test[X_test.columns])


Next you have to modify the neural network model: the output must be a dimension-three vector rather than a scalar.

You can use `self.output = nn.Linear(8, 3)` as last layer, and here we use the `softmax` activation function, to ensure the outputs are interpretable as  probabilities per label, ie. the output will be a 3-dim vector with $(P_\textrm{bkg1},P_\textrm{bkg2},P_\textrm{sig})$ and the probabilities are normalized as $P_\textrm{bkg1}+P_\textrm{bkg2}+P_\textrm{sig}=1$.



In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self, ninputs, device=torch.device("cpu")):
        super().__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(ninputs, 512),
            nn.ReLU(),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128,64),
            nn.ReLU(),
            nn.Linear(64,8),
            nn.ReLU(),
            nn.Linear(8, 3),
            nn.Softmax(dim=1)
        )
        self.linear_relu_stack.to(device)

    def forward(self, x):
        # Pass data through conv1
        x = self.linear_relu_stack(x)
        return x

class MyDataset(Dataset):
    def __init__(self, X, y, device=torch.device("cpu")):
        self.X = torch.Tensor(X.values if isinstance(X, pd.core.frame.DataFrame) else X).to(device)
        self.y = y.to(device)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        label = self.y[idx]
        datum = self.X[idx]
        
        return datum, label

train_dataset = MyDataset(X_train, y_train, device=device)
test_dataset = MyDataset(X_test, y_test, device=device)
batch_size = 2048

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

random_batch_X, random_batch_y = next(iter(train_dataloader))

print("The new dataloader puts the batches in in", random_batch_X.get_device())

# Reinstantiate the model, on the chosen device
model = NeuralNetwork(X_train.shape[1], device)


def train_loop(dataloader, model, loss_fn, optimizer, scheduler, device):
    size = len(dataloader.dataset)
    losses=[] # Track the loss function
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    #for batch, (X, y) in enumerate(dataloader):
    #for batch, (X, y) in tqdm(enumerate(dataloader, 0), unit="batch", total=len(dataloader)):
    for (X,y) in tqdm(dataloader):
        # Reset gradients (to avoid their accumulation)
        optimizer.zero_grad()
        # Compute prediction and loss
        pred = model(X)
        #if (all_equal3(pred.detach().numpy())):
        #    print("All equal!")
        loss = loss_fn(pred, y)
        losses.append(loss.detach().cpu())
        # Backpropagation
        loss.backward()
        optimizer.step()

    scheduler.step()
    return np.mean(losses)
    
def test_loop(dataloader, model, loss_fn, device):
    losses=[] # Track the loss function
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating the model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True
    with torch.no_grad():
        #for X, y in dataloader:
        #for batch, (X, y) in tqdm(enumerate(dataloader, 0), unit="batch", total=len(dataloader)):
        for (X,y) in tqdm(dataloader):
            pred = model(X)
            loss = loss_fn(pred, y).item()
            losses.append(loss)
            test_loss += loss
            #correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            
    return np.mean(losses)

epochs=100
learningRate = 0.01
loss_fn = torch.nn.CrossEntropyLoss()

optimizer = torch.optim.SGD(model.parameters(), lr=learningRate)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)

train_losses=[]
test_losses=[]
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loss=train_loop(train_dataloader, model, loss_fn, optimizer, scheduler, device)
    test_loss=test_loop(test_dataloader, model, loss_fn, device)
    train_losses.append(train_loss)
    test_losses.append(test_loss)
    print("Avg train loss", train_loss, ", Avg test loss", test_loss, "Current learning rate", scheduler.get_last_lr())
print("Done!")

In [None]:
plt.figure()
plt.plot(train_losses, label="Average training loss")
plt.plot(test_losses, label="Average test loss")
plt.legend(loc="best")
plt.show()
plt.close()

After training, you will need to figure out how to get categorical predictions to be able to test performance (for instance to do confusion matrices for each pair of classes, or for one class against all the others.

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
pred_y = model(torch.tensor(X_test.values, device=device)).numpy(force=True)
pred_class = np.argmax(pred_y,axis=1)
true_class = np.argmax(y_test.numpy(force=True),axis=1)

print (f"{'true class':10s} | {'predicted class':15s} ")
print ('='*30)
for i in range(10):
    print (f"{true_class[i]:<10d} | {pred_class[i]:<15d}")

confusion_mat = confusion_matrix(true_class, pred_class, normalize='true')
                                 
plt.figure()
disp = ConfusionMatrixDisplay(confusion_matrix=confusion_mat, display_labels=['bkg2', 'bkg1', 'sig'])
disp.plot()
plt.show()
plt.close()

## Exercise

Calculate the predictions for each class, then plot the ROC curve for:

- signal vs bkg1
- signal vs bkg2
- bkg vs bkg2

Then, in another plot, plot:

- signal vs all backgrounds


### The end