# Exploratory Data Analysis & Preprocessing

## Data Preprocessing

### Importing the Dataset

In [None]:
import pandas as pd
pd.options.display.max_rows = 100
pd.options.display.max_columns = 100


df = pd.concat(
    [
        pd.read_csv(
            f"data/OraclesElixir/{year}_LoL_esports_match_data_from_OraclesElixir.csv",
            dtype={"url": "str"}
        )
        for year in range(2020, 2025)
    ],
    ignore_index=True
)

rows, cols = df.shape
print(f"The CSV file has {rows} rows and {cols} columns.")

print(df.columns.tolist())


### Filter for Complete Matches Only

In [None]:
num_complete_rows = df[df["datacompleteness"] == 'complete'].shape[0]
total_rows = df.shape[0]
ratio = num_complete_rows / total_rows
print(f"Number of rows where datacompleteness is 'complete': {num_complete_rows}")
print(f"Ratio of 'complete' rows to total rows: {ratio:.4f}")

df = df[df["datacompleteness"] == 'complete']


### Aggregate Individual Stats to Team-Level Rows

In [None]:
team_rows = df[df['position'] == 'team'].copy()
player_rows = df[df['position'] != 'team']

positions = ['top', 'jng', 'mid', 'bot', 'sup']

for pos in positions:
    champ_col = (
        player_rows[player_rows['position'] == pos]
        .loc[:, ['gameid', 'side', 'champion']]
        .rename(columns={'champion': f'{pos}_champ'})
    )
    
    team_rows = team_rows.merge(champ_col, on=['gameid', 'side'], how='left')
df = team_rows


### Visualize Feature Correlations

In [None]:
correlation_void = df['void_grubs'].corr(df['result'])  
print("Correlation with void:", correlation_void)

correlation_monsterkillsownjungle = df['monsterkillsownjungle'].corr(df['result'])  
print("Correlation with monsterkillsownjungle:", correlation_monsterkillsownjungle)

correlation_turretplates = df['turretplates'].corr(df['result'])  
print("Correlation with turretplates:", correlation_turretplates)

correlation_heralds = df['heralds'].corr(df['result'])  
print("Correlation with heralds:", correlation_heralds)

correlation_visionscore = df['visionscore'].corr(df['result'])  
print("Correlation with visionscore:", correlation_visionscore)

correlation_vspm = df['vspm'].corr(df['result'])  
print("Correlation with vspm:", correlation_vspm)

correlation_minionkills = df['minionkills'].corr(df['result'])  
print("Correlation with minionkills:", correlation_minionkills)

correlation_cspm = df['cspm'].corr(df['result'])  
print("Correlation with cspm:", correlation_cspm)

### Remove Unnecessary or Redundant Features

In [None]:
df.drop(columns=["atakhans", "opp_atakhans"], inplace=True)

columns_to_drop = (
    df.columns[1:11]  # Metadata columns
    .union(df.columns[12:18])  # Additional metadata columns
    .union(df.columns[18:28])  # BP data
    .union(df.columns[30:43])  # End game data columns
    .union(df.columns[48:57])  # Drake-related columns
    .union(df.columns[40:43])  # Individual data columns
    .union(pd.Index([df.columns[78]]))  # Specific column (damageshare)
    .union(pd.Index([df.columns[91]]))  # Specific column (earnedgoldshare)
    .union(pd.Index([df.columns[95]]))  # Specific column (total cs)
    .union(pd.Index([df.columns[28]]))  # Specific column (gamelength)
    .union(df.columns[131:161])  # Data after 20 minutes
)

df.drop(columns=columns_to_drop, axis=1, inplace=True)



### Identify Null Values

In [None]:
print("Null values in each column:")
null_counts = df.isnull().sum()
null_columns = null_counts[null_counts > 0]
print(null_columns)
print("----------------------------------------------------")

null_ratio = (null_counts / total_rows)

null_columns_with_ratio = null_ratio[null_ratio > 0]
print("Columns with null values and their ratios:")
print(null_columns_with_ratio)


### Drop or fill null values

In [None]:
df['void_grubs'] = df['void_grubs'].fillna(0)
df['opp_void_grubs'] = df['opp_void_grubs'].fillna(0)
df['turretplates'] = df['turretplates'].fillna(0)
df['opp_turretplates'] = df['opp_turretplates'].fillna(0)
df['heralds'] = df['heralds'].fillna(0)
df['opp_heralds'] = df['opp_heralds'].fillna(0)


df['cspm'] = df['cspm'].fillna(df['cspm'].median())
df['vspm'] = df['vspm'].fillna(df['vspm'].median())
df['visionscore'] = df['visionscore'].fillna(df['visionscore'].median())
df.drop(columns=['monsterkillsownjungle', 'monsterkillsenemyjungle'], inplace=True)


df.dropna(inplace=True)
df.drop(columns=['gameid', 'side'], inplace=True)

# # Print the count of null values in each column
# print("Null values in each column:")
# null_counts = df.isnull().sum()
# null_columns = null_counts[null_counts > 0]
# print(null_columns)
# print("----------------------------------------------------")

# # Calculate the ratio of null values for each column
# null_ratio = (null_counts / total_rows)

# # Filter and print only the columns where the ratio of null values is greater than 0
# null_columns_with_ratio = null_ratio[null_ratio > 0]
# print("Columns with null values and their ratios:")
# print(null_columns_with_ratio)


In [None]:
df.head()

### Verify That All Missing Values Are Handled

In [None]:
# Print the count of null values in each column
print("Null values in each column:")
null_counts = df.isnull().sum()
null_columns = null_counts[null_counts > 0]
print(null_columns)
print("")

# Calculate the ratio of null values for each column
null_ratio = (null_counts / total_rows)

# Filter and print only the columns where the ratio of null values is greater than 0
null_columns_with_ratio = null_ratio[null_ratio > 0]
print("Columns with null values and their ratios:")
print(null_columns_with_ratio)


### Convert Categorical Variables into Numerical Format

In [None]:
from sklearn.preprocessing import LabelEncoder

champ_cols = ['top_champ', 'jng_champ', 'mid_champ', 'bot_champ', 'sup_champ']

all_champs = pd.concat([df[col] for col in champ_cols], axis=0).unique()

le = LabelEncoder()
le.fit(all_champs)

for col in champ_cols:
    df[col] = le.transform(df[col])


### Save champion-label mapping

In [None]:
import pandas as pd

champ_mapping = pd.DataFrame({
    "champion": le.classes_,
    "label": le.transform(le.classes_)
})

champ_mapping_path = "data/champion_label_mapping.csv"


champ_mapping.to_csv(champ_mapping_path, index=False)
print(f"📝 Champion-label mapping saved to: {champ_mapping_path}")

### Preview the Dataset

In [None]:
# Concatenate the head and tail of the dataframe
head_and_tail = pd.concat([df.head(), df.tail()])

display(pd.concat([df.head(), df.tail()]))


## EDA

### Class Imbalance

In [None]:
import matplotlib.pyplot as plt

class_counts = df['result'].value_counts()
display(class_counts)

df['result'].value_counts(normalize=True).plot(kind='bar', title='Class Distribution')
plt.show()

### Feature Distributions 

In [None]:
display(df.describe())
print("Feature skewness: ")
display(df.skew().sort_values(ascending=False))

skewed = df.skew()[abs(df.skew()) > 1].index
axes = df.hist(bins=30, figsize=(25, 20))

# Annotate skewed features
for ax in axes.flatten():
    if ax.get_title() in skewed:
        ax.set_title(ax.get_title(), color='red')

plt.suptitle("Feature Distributions (Red = Skewed)", fontsize=20)
plt.tight_layout(rect=(0, 0, 1, 0.96))
plt.show()

#### Apply normalization to skew data

In [None]:
import numpy as np
import joblib
import json

# Compute skewed features
skewed_features = df.skew()[abs(df.skew()) >= 1].index.tolist()

# Apply log transform to skewed + non-negative features
for col in skewed_features:
    if (df[col] >= 0).all():
        df[col] = np.log1p(df[col])
        df.rename(columns={col: f"{col}_normalized"}, inplace=True)
    else:
        print(f"Feature {col} contains negative values, skip log transform")

# Define which columns to exclude from scaling
label_col = 'result'
binary_cols = [
    'firstdragon', 'firstherald', 'firstbaron', 'firsttower',
    'firstmidtower', 'firsttothreetowers'
]
categorical_cols = [
    'top_champ', 'jng_champ', 'mid_champ', 'bot_champ', 'sup_champ'
]
exclude_cols = [label_col] + binary_cols + categorical_cols
normalize_cols = df.columns.difference(exclude_cols)

# Fit and apply MinMaxScaler
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
df[normalize_cols] = scaler.fit_transform(df[normalize_cols])

# Save transformer and info
joblib.dump(scaler, "data/minmax_scaler.pkl")
with open("data/skewed_features.json", "w") as f:
    json.dump(skewed_features, f)
with open("data/normalize_cols.json", "w") as f:
    json.dump(list(normalize_cols), f)


#### Feature Distribution after normalization

In [None]:
display(df.describe())

axes = df.hist(bins=30, figsize=(25, 20))

plt.suptitle("Feature Distributions after normalization", fontsize=20)
plt.tight_layout(rect=(0, 0, 1, 0.96))
plt.show()

### Correlation with Target

In [None]:
grouped_means = df.groupby('result').mean()
display(grouped_means)

df.groupby('result').mean().T.plot(kind='bar', figsize=(25, 20), title='Feature Mean by Result')


### Feature Correlation Matrix / Heatmap

In [None]:
import seaborn as sns

# correlation_matrix = df.corr()
# display(correlation_matrix)

plt.figure(figsize=(25, 20))
sns.heatmap(df.corr(), cmap='coolwarm', center=0, annot=False)
plt.title('Correlation Heatmap', fontsize=20)
plt.show()

## Save the processed data to Parquet

In [None]:
df.to_parquet("data/processed_lol_data.parquet", index=False)

# Model Training


## Define Numerical Columns

In [None]:
is_numerical = [
    True,  # team kpm
    True,  # ckpm
    False, # firstdragon
    True,  # dragons
    True,  # opp_dragons
    True,  # elders_normalized
    True,  # opp_elders_normalized
    False, # firstherald
    True,  # heralds
    True,  # opp_heralds
    True,  # void_grubs_normalized
    True,  # opp_void_grubs_normalized
    False, # firstbaron
    True,  # barons
    True,  # opp_barons
    False, # firsttower
    True,  # towers
    True,  # opp_towers
    False, # firstmidtower
    False, # firsttothreetowers
    True,  # turretplates
    True,  # opp_turretplates
    True,  # inhibitors_normalized
    True,  # opp_inhibitors_normalized
    True,  # damagetochampions
    True,  # dpm
    True,  # damagetakenperminute
    True,  # damagemitigatedperminute_normalized
    True,  # wardsplaced
    True,  # wpm
    True,  # wardskilled
    True,  # wcpm
    True,  # controlwardsbought
    True,  # visionscore
    True,  # vspm
    True,  # totalgold
    True,  # earnedgold
    True,  # earned gpm
    True,  # goldspent
    True,  # gspd
    True,  # gpr
    True,  # minionkills
    True,  # monsterkills
    True,  # cspm
    True,  # goldat10
    True,  # xpat10
    True,  # csat10
    True,  # opp_goldat10
    True,  # opp_xpat10
    True,  # opp_csat10
    True,  # golddiffat10
    True,  # xpdiffat10
    True,  # csdiffat10
    True,  # killsat10_normalized
    True,  # assistsat10_normalized
    True,  # deathsat10_normalized
    True,  # opp_killsat10_normalized
    True,  # opp_assistsat10_normalized
    True,  # opp_deathsat10_normalized
    True,  # goldat15
    True,  # xpat15
    True,  # csat15
    True,  # opp_goldat15
    True,  # opp_xpat15
    True,  # opp_csat15
    True,  # golddiffat15
    True,  # xpdiffat15
    True,  # csdiffat15
    True,  # killsat15_normalized
    True,  # assistsat15_normalized
    True,  # deathsat15_normalized
    True,  # opp_killsat15_normalized
    True,  # opp_assistsat15_normalized
    True,  # opp_deathsat15_normalized
    False, # top_champ
    False, # jng_champ
    False, # mid_champ
    False, # bot_champ
    False  # sup_champ
]

## Load Processed data

In [None]:
import pandas as pd
import os

df = pd.read_parquet("data/processed_lol_data.parquet")

In [None]:
import torch
if torch.cuda.is_available():
    print(torch.cuda.get_device_name(0))
    
    
else:
    print("CUDA is not available.")

## Define data loader

In [None]:
def create_dataloaders(df, batch_size=64, test_size=0.2, seed=42):
    from sklearn.model_selection import train_test_split
    from torch.utils.data import Dataset, DataLoader
    import torch

    class LoLDataset(torch.utils.data.Dataset):
        def __init__(self, df):
            self.X = torch.tensor(df.drop(columns=['result']).values, dtype=torch.float32)
            self.y = torch.tensor(df['result'].values, dtype=torch.long)

        def __len__(self):
            return len(self.X)

        def __getitem__(self, idx):
            return self.X[idx], self.y[idx]

    train_df, test_df = train_test_split(df, test_size=test_size, random_state=seed, stratify=df['result'])

    train_dataset = LoLDataset(train_df)
    test_dataset = LoLDataset(test_df)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader


## Define DNN Model

In [None]:
import torch.nn as nn

from lolnet import LoLNet


## Define Training Loop

In [None]:
import torch
from sklearn.metrics import accuracy_score
import numpy as np
from trades import trades_loss
from jacobian import JacobianReg


def add_input_noise(x, sigma=0.01, is_numerical=None):
    noise = torch.randn_like(x) * sigma
    if is_numerical is not None:
        mask = torch.tensor(is_numerical, dtype=torch.float32, device=x.device)
        noise *= mask
    return torch.clamp(x + noise, 0.0, 1.0)


def generate_pgd_adversarial(model, x, y, epsilon=0.02, alpha=0.002, steps=10, is_numerical=None):
    x_adv = x.clone().detach().requires_grad_(True)
    mask = torch.tensor(is_numerical, dtype=torch.float32, device=x.device)

    for _ in range(steps):
        output = model(x_adv)
        loss = torch.nn.CrossEntropyLoss()(output, y)
        grad = torch.autograd.grad(loss, x_adv, retain_graph=False, create_graph=False)[0]
        x_adv = x_adv + alpha * grad.sign() * mask
        x_adv = torch.min(torch.max(x_adv, x - epsilon), x + epsilon)
        x_adv = torch.clamp(x_adv, 0.0, 1.0).detach().requires_grad_(True)

    return x_adv.detach()


def train_model_with_perturbation(
    model, train_loader, test_loader, criterion, optimizer,
    num_epochs=10, device='cpu', patience=3,
    pgd=False, noise=False, trades=False, jacobian_reg=False,
    is_numerical=None, pgd_config=None, noise_sigma=0.01, lambda_JR=0.01
):
    if pgd_config is None:
        pgd_config = {'epsilon': 0.02, 'alpha': 0.002, 'steps': 10}

    print("🔧 Training Configuration:")
    print(f"📌 Device       : {device}")
    print(f"📌 Epochs       : {num_epochs}")
    print(f"📌 Patience     : {patience}")
    print(f"📌 PGD          : {pgd}")
    if pgd:
        print(f"   ↳ epsilon    : {pgd_config['epsilon']}")
        print(f"   ↳ alpha      : {pgd_config['alpha']}")
        print(f"   ↳ steps      : {pgd_config['steps']}")
    print(f"📌 Noise        : {noise}")
    if noise:
        print(f"   ↳ sigma      : {noise_sigma}")
    print(f"📌 TRADES       : {trades}")
    print(f"📌 Jacobian Reg : {jacobian_reg}")
    print(f"📌 Numerical Mask Present: {is_numerical is not None}")
    print("-" * 40)

    device = torch.device(device)
    model.to(device)
    best_acc, best_epoch = 0, -1
    best_model_state = None
    epoch_log = []
    reg = JacobianReg(n=1) if jacobian_reg else None

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0.0
        all_preds, all_labels = [], []

        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            X_clean = X_batch
            X_aug, y_aug = [], []

            if pgd:
                X_pgd = generate_pgd_adversarial(model, X_clean, y_batch, **pgd_config, is_numerical=is_numerical)
                X_aug.append(X_pgd)
                y_aug.append(y_batch)

            if noise:
                X_noisy = add_input_noise(X_clean, sigma=noise_sigma, is_numerical=is_numerical)
                X_aug.append(X_noisy)
                y_aug.append(y_batch)

            if X_aug:
                X_batch = torch.cat([X_clean] + X_aug, dim=0)
                y_batch = torch.cat([y_batch] + y_aug, dim=0)

            X_batch.requires_grad = True
            optimizer.zero_grad()

            if trades:
                loss = trades_loss(
                    model=model,
                    x_natural=X_clean,
                    y=y_batch[:len(X_clean)],
                    optimizer=optimizer,
                    step_size=pgd_config['alpha'],
                    epsilon=pgd_config['epsilon'],
                    perturb_steps=pgd_config['steps'],
                    distance='l_inf',
                    is_numerical=is_numerical
                )
            else:
                outputs = model(X_batch)

                loss = criterion(outputs, y_batch)

                if jacobian_reg and reg is not None:
                    R = reg(X_batch, outputs)
                    loss += lambda_JR * R

            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            with torch.no_grad():
                preds = model(X_batch).argmax(dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(y_batch.cpu().numpy())

        train_acc = accuracy_score(all_labels, all_preds)
        avg_loss = total_loss / len(train_loader)

        model.eval()
        test_preds, test_labels = [], []
        with torch.no_grad():
            for X_test, y_test in test_loader:
                X_test, y_test = X_test.to(device), y_test.to(device)
                preds = model(X_test).argmax(dim=1)
                test_preds.extend(preds.cpu().numpy())
                test_labels.extend(y_test.cpu().numpy())

        test_acc = accuracy_score(test_labels, test_preds)

        print(f"🧪 Epoch {epoch+1:>2}/{num_epochs:<2} | "
              f"Loss: {avg_loss:10.4f} | "
              f"Train Acc: {train_acc:7.4f} | "
              f"Test Acc: {test_acc:7.4f}")

        epoch_log.append({
            'epoch': epoch + 1,
            'train_acc': train_acc,
            'test_acc': test_acc,
            'loss': avg_loss
        })

        if test_acc > best_acc:
            best_acc = test_acc
            best_epoch = epoch
            best_model_state = model.state_dict()
        elif epoch - best_epoch >= patience:
            print(f"⏹️ Early stopping at epoch {epoch+1} (no improvement for {patience} epochs)")
            break

    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    return model, best_acc, epoch_log


## Define Training Function

In [None]:
import os
import torch
import torch.nn as nn
from sklearn.metrics import accuracy_score

def train_lolnet_model(df, 
                       model_name, 
                       is_numerical, 
                       pgd=False,
                       noise=False,
                       trades=False,
                       jacobian_reg=False
                       ):



    batch_size = 64
    lr = 0.001
    num_epochs = 30
    patience = 10
    # device = "cuda" if torch.cuda.is_available() else "cpu"
    device = "cpu"

    train_loader, test_loader = create_dataloaders(df, batch_size=batch_size)
    input_dim = df.drop(columns=['result']).shape[1]

    model = LoLNet(input_dim=input_dim).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)


    model, best_acc, epoch_log = train_model_with_perturbation(
        model, train_loader, test_loader,
        criterion, optimizer,
        num_epochs=num_epochs, 
        device=device,
        patience=patience,
        pgd=pgd,
        noise=noise,
        trades=trades,
        jacobian_reg=jacobian_reg,
        is_numerical=is_numerical,
        pgd_config={ 'epsilon': 0.02,'alpha': 0.002,'steps': 10 },
        noise_sigma=0.01
    )

    os.makedirs("models", exist_ok=True)
    model_path = f"models/{model_name}.pth"
    torch.save(model.state_dict(), model_path)
    print(f"✅ Training complete. Best test accuracy: {best_acc:.4f}")
    print(f"💾 Model saved to: {model_path}")

    dummy_input = torch.randn(1, input_dim).to(device)
    onnx_path = f"models/{model_name}.onnx"
    torch.onnx.export(
        model,
        (dummy_input,),
        onnx_path,
        input_names=["input"],
        output_names=["output"],
        dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}},
        opset_version=11
    )
    print(f"🧠 ONNX model exported to: {onnx_path}")


## Start Training

In [None]:
train_lolnet_model(df, "lolnet", is_numerical, )
train_lolnet_model(df, "lolnet_pgd", is_numerical, pgd=True)
train_lolnet_model(df, "lolnet_noise", is_numerical, noise=True)
train_lolnet_model(df, "lolnet_trades", is_numerical, trades=True)
train_lolnet_model(df, "lolnet_jacobian", is_numerical, jacobian_reg=True)

# Model Evaluation

## Define numerical columns

In [None]:
is_numerical = [
    True,  # team kpm
    True,  # ckpm
    False, # firstdragon
    True,  # dragons
    True,  # opp_dragons
    True,  # elders_normalized
    True,  # opp_elders_normalized
    False, # firstherald
    True,  # heralds
    True,  # opp_heralds
    True,  # void_grubs_normalized
    True,  # opp_void_grubs_normalized
    False, # firstbaron
    True,  # barons
    True,  # opp_barons
    False, # firsttower
    True,  # towers
    True,  # opp_towers
    False, # firstmidtower
    False, # firsttothreetowers
    True,  # turretplates
    True,  # opp_turretplates
    True,  # inhibitors_normalized
    True,  # opp_inhibitors_normalized
    True,  # damagetochampions
    True,  # dpm
    True,  # damagetakenperminute
    True,  # damagemitigatedperminute_normalized
    True,  # wardsplaced
    True,  # wpm
    True,  # wardskilled
    True,  # wcpm
    True,  # controlwardsbought
    True,  # visionscore
    True,  # vspm
    True,  # totalgold
    True,  # earnedgold
    True,  # earned gpm
    True,  # goldspent
    True,  # gspd
    True,  # gpr
    True,  # minionkills
    True,  # monsterkills
    True,  # cspm
    True,  # goldat10
    True,  # xpat10
    True,  # csat10
    True,  # opp_goldat10
    True,  # opp_xpat10
    True,  # opp_csat10
    True,  # golddiffat10
    True,  # xpdiffat10
    True,  # csdiffat10
    True,  # killsat10_normalized
    True,  # assistsat10_normalized
    True,  # deathsat10_normalized
    True,  # opp_killsat10_normalized
    True,  # opp_assistsat10_normalized
    True,  # opp_deathsat10_normalized
    True,  # goldat15
    True,  # xpat15
    True,  # csat15
    True,  # opp_goldat15
    True,  # opp_xpat15
    True,  # opp_csat15
    True,  # golddiffat15
    True,  # xpdiffat15
    True,  # csdiffat15
    True,  # killsat15_normalized
    True,  # assistsat15_normalized
    True,  # deathsat15_normalized
    True,  # opp_killsat15_normalized
    True,  # opp_assistsat15_normalized
    True,  # opp_deathsat15_normalized
    False, # top_champ
    False, # jng_champ
    False, # mid_champ
    False, # bot_champ
    False  # sup_champ
]

## Define Verified Robustness Rate Testing

In [None]:
from maraboupy import Marabou
from maraboupy.MarabouNetworkONNX import MarabouNetworkONNX
import pandas as pd
from typing import List
import sys
import os
from contextlib import contextmanager
from tqdm import tqdm
tqdm.pandas()

@contextmanager
def suppress_stdout():
    original_stdout = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    try:
        yield
    finally:
        sys.stdout.close()
        sys.stdout = original_stdout
 

def test_lolnet_verified_robustness_rate(
    df: pd.DataFrame,
    onnx_path: str,
    epsilon: float,
    is_numerical:list[bool],
    num_samples: int = 100,
    verbosity: int = 1
):
    from maraboupy import MarabouNetworkONNX 
    options = Marabou.createOptions(verbosity=0)
    network:MarabouNetworkONNX  = Marabou.read_onnx(onnx_path)

    inputVars = network.inputVars[0][0]
    outputVars = network.outputVars[0][0]

    
    if num_samples == -1:
        samples = df.iterrows()
    else:
        assert num_samples <= len(df), "Number of samples exceeds the size of the dataframe."
        samples = df.sample(n=num_samples, random_state=42).iterrows()
    
    input_dim = len(inputVars)
    results = []
    
    samples = list(samples)

    for idx, row in tqdm(samples, desc="Verifying", unit="sample", leave=True):
        x0 = row[1:].values.tolist()
        true_label = int(row.iloc[0])

        assert len(x0) == input_dim, "Input dimension mismatch."

        for i, x_i in enumerate(x0):
            eps_i = epsilon if is_numerical[i] else 0.0
            network.setLowerBound(inputVars[i], x_i - eps_i)
            network.setUpperBound(inputVars[i], x_i + eps_i)


        y0, y1 = outputVars[0], outputVars[1]
        if true_label == 0:
            network.addInequality([y1, y0], [1, -1], 0) 
        else:
            network.addInequality([y0, y1], [1, -1], 0) 

        # Solve
        # with suppress_stdout():
        #     status, assignments, stats = network.solve(options=options)
        # status, assignments, stats = network.solve(options=options)
        
        orig_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')
        try:
            status, assignments, stats = network.solve(options=options)
        finally:
            sys.stdout.close()
            sys.stdout = orig_stdout


        results.append((idx, true_label, status))
        
        if verbosity == 1:
            if status == "sat":
                print(f"⚠️ SAT – idx {idx} | label: {true_label}")
            else:
                print(f"✅ UNSAT – idx {idx} | label: {true_label}")
        elif verbosity == 2:
            if status == "sat":
                print(f"⚠️ SAT – idx {idx} | label: {true_label}")
                for i, var in enumerate(inputVars):
                    val = assignments.get(var, None)
                    try:
                        print(f"  x{i}: {float(val):.5f}")
                    except:
                        print(f"  x{i}: {val}")

                for j, out_var in enumerate(outputVars):
                    val = assignments.get(out_var, None)
                    try:
                        print(f"  y{j}: {float(val):.5f}")
                    except:
                        print(f"  y{j}: {val}")
            else:
                print(f"✅ UNSAT – idx {idx} | label: {true_label}")


    num_total = len(results)
    num_sat = sum(1 for r in results if r[2] == "sat")
    num_unsat = num_total - num_sat
    print(f"\n📊 Summary:")
    print(f"  Total samples tested: {num_total}")
    print(f"  SAT (adversarial found): {num_sat}")
    print(f"  UNSAT (robust): {num_unsat}")
    
    return {
        "total_samples": num_total,
        "num_verified_robust": num_unsat,
        "num_adversarial_found": num_sat,
        "verified_robustness_rate": num_unsat / num_total,
        "results": results 
    }



## Define Robustness Accuracy Testing

In [None]:
import torch
from torch import nn

def test_lolnet_robustness_accuracy(
    model,
    df,
    is_numerical,
    epsilon_val=0.01,
    batch_size=64,
    alpha=0.005,
    steps=20,
    device='cpu'
):
    model = model.to(device).eval()

    X = torch.tensor(df.iloc[:, 1:].values, dtype=torch.float32)
    y = torch.tensor(df.iloc[:, 0].values, dtype=torch.long)

    input_dim = X.shape[1]
    epsilon = torch.tensor(
        [epsilon_val if is_numerical[i] else 0.0 for i in range(input_dim)],
        dtype=torch.float32, device=device
    )
    mask = torch.tensor(is_numerical, dtype=torch.float32, device=device)

    def masked_pgd(x, y):
        x_orig = x.detach()
        x_adv = x.clone().detach().requires_grad_(True)

        for _ in range(steps):
            outputs = model(x_adv)
            loss = nn.CrossEntropyLoss()(outputs, y)

            grad = torch.autograd.grad(loss, x_adv, retain_graph=False, create_graph=False)[0]
            grad = grad.sign() * mask 
            update = alpha * grad

            x_adv = x_adv + update
            x_adv = torch.min(torch.max(x_adv, x_orig - epsilon), x_orig + epsilon)
            x_adv = torch.clamp(x_adv, 0.0, 1.0).detach().requires_grad_(True)

        return x_adv.detach()

    correct = 0
    total = 0

    for i in range(0, len(X), batch_size):
        x_batch = X[i:i+batch_size].to(device)
        y_batch = y[i:i+batch_size].to(device)

        adv_x = masked_pgd(x_batch, y_batch)

        with torch.no_grad():
            preds = model(adv_x).argmax(dim=1)
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)

    robust_accuracy = correct / total
    print(f"🔐 Robustness Accuracy: {robust_accuracy:.2%}")
    return robust_accuracy


## Define Gradient Normalization Function

In [None]:
import torch
import pandas as pd

def get_gradient_norms(model, df: pd.DataFrame, is_numerical: list[bool], norm_type=2, batch_size=64):
    device = next(model.parameters()).device 

    X_test = df.drop(columns=["result"]).values
    mask = torch.tensor(is_numerical, dtype=torch.float32, device=device)
    X_tensor = torch.tensor(X_test, dtype=torch.float32, device=device)

    def masked_gradient_norm(x):
        x = x.clone().detach().requires_grad_(True)
        outputs = model(x)
        scores = outputs.max(1)[0]
        grads = torch.autograd.grad(scores, x,
                                    grad_outputs=torch.ones_like(scores),
                                    retain_graph=False, create_graph=False)[0]
        masked_grads = grads * mask
        if norm_type == 'inf':
            return masked_grads.abs().max(dim=1)[0]
        return masked_grads.norm(p=norm_type, dim=1)

    model.eval()
    all_norms = []
    for i in range(0, len(X_tensor), batch_size):
        x_batch = X_tensor[i:i + batch_size]
        norms = masked_gradient_norm(x_batch)
        all_norms.append(norms)

    all_norms_tensor = torch.cat(all_norms)
    print(f"✅ Gradient Norms — Mean: {all_norms_tensor.mean():.4f} | Min: {all_norms_tensor.min():.4f} | Max: {all_norms_tensor.max():.4f}")
    return all_norms_tensor



## Define Clean Accuracy Testing

In [None]:
import torch
from torch import nn

def test_lolnet_clean_accuracy(
    model,
    df,
    batch_size=64,
    device='cpu'
):
    model = model.to(device).eval()

    X = torch.tensor(df.iloc[:, 1:].values, dtype=torch.float32)
    y = torch.tensor(df.iloc[:, 0].values, dtype=torch.long)

    correct = 0
    total = 0

    for i in range(0, len(X), batch_size):
        x_batch = X[i:i+batch_size].to(device)
        y_batch = y[i:i+batch_size].to(device)

        with torch.no_grad():
            preds = model(x_batch).argmax(dim=1)
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)

    accuracy = correct / total
    print(f"✅ Clean Accuracy: {accuracy:.2%}")
    return accuracy


## Test verified robustness rate

In [None]:
import os
import glob
import pandas as pd
from sklearn.model_selection import train_test_split

df = pd.read_parquet("data/processed_lol_data.parquet")
_, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['result'])

onnx_files = sorted(glob.glob("models/*.onnx"))

for onnx_path in onnx_files:
    model_name = os.path.splitext(os.path.basename(onnx_path))[0]
    print(f"\n🧪 Verifying: {model_name}")
    for epsilon in [0.01]:
        print(f"{epsilon = }")
        test_lolnet_verified_robustness_rate(
            test_df,
            onnx_path=onnx_path,
            epsilon=0.05,
            num_samples=1000,
            is_numerical=is_numerical,
            verbosity=0
        )

## Test Robustness Accuracy

In [None]:
import os
import glob
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
from lolnet import LoLNet

df = pd.read_parquet("data/processed_lol_data.parquet")
_, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['result'])

input_dim = df.drop(columns=["result"]).shape[1]
device = "cpu"
print(f"{device = }")

pth_files = sorted(glob.glob("models/*.pth"))

for model_path in pth_files:
    model_name = os.path.splitext(os.path.basename(model_path))[0]
    print(f"\n🧪 Verifying: {model_name}")

    model = LoLNet(input_dim=input_dim, hidden_dim=64).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    for epsilon in [0.01, 0.03, 0.05, 0.07, 0.09]:
        print(f"{epsilon = }")
        robust_accuracy = test_lolnet_robustness_accuracy(
            model=model,
            df=test_df,
            is_numerical=is_numerical,
            epsilon_val=epsilon,
            device=device
        )


## Test Gradient Norms

In [None]:
import os
import glob
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from lolnet import LoLNet

df = pd.read_parquet("data/processed_lol_data.parquet")
_, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['result'])

input_dim = df.drop(columns=["result"]).shape[1]
device = "cpu"
print(f"{device = }")

model_files = sorted(glob.glob("models/*.pth"))

for model_path in model_files:
    model_name = os.path.splitext(os.path.basename(model_path))[0]
    print(f"\n🧪 Verifying: {model_name}")

    model = LoLNet(input_dim=input_dim, hidden_dim=64) 
    model.load_state_dict(torch.load(model_path, map_location=device))
    model = model.to(device)
    model.eval()
    gradient_norm = get_gradient_norms(
        model=model,
        df=test_df,
        is_numerical=is_numerical
    )


## Test Accuracy on Clean Data

In [None]:
import os
import glob
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from lolnet import LoLNet

df = pd.read_parquet("data/processed_lol_data.parquet")
_, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['result'])

input_dim = df.drop(columns=["result"]).shape[1]
device = "cpu"
print(f"{device = }")

model_files = sorted(glob.glob("models/*.pth"))

for model_path in model_files:
    model_name = os.path.splitext(os.path.basename(model_path))[0]
    print(f"\n🧪 Verifying: {model_name}")

    model = LoLNet(input_dim=input_dim, hidden_dim=64).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    clean_accuracy = test_lolnet_clean_accuracy(
        model=model,
        df=test_df,
        device=device
    )


## Lips

In [None]:
import torch.nn.functional as F

def spectral_norm(layer, n_iter=10):
    if isinstance(layer, nn.Linear):
        W = layer.weight.data
        device = W.device
        u = torch.randn(W.size(0), 1, device=device)  
        for _ in range(n_iter):
            v = F.normalize(torch.matmul(W.t(), u), dim=0)
            u = F.normalize(torch.matmul(W, v), dim=0)
        sigma = torch.dot(u.squeeze(), torch.matmul(W, v).squeeze())
        return sigma.item()
    else:
        return 1.0  

def compute_lipschitz_constant(model):
    lipschitz = 1.0
    for layer in model.model:
        lipschitz *= spectral_norm(layer)
    print(f"\n✅ Estimated Global Lipschitz Constant: {lipschitz:.4f}")
    return lipschitz


In [None]:
import os
import glob
import pandas as pd
from sklearn.model_selection import train_test_split
import torch
from lolnet import LoLNet

df = pd.read_parquet("data/processed_lol_data.parquet")
_, test_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df['result'])

input_dim = df.drop(columns=["result"]).shape[1]
device = "cpu"
print(f"{device = }")

model_files = sorted(glob.glob("models/*.pth"))

for model_path in model_files:
    model_name = os.path.splitext(os.path.basename(model_path))[0]
    print(f"\n🧪 Verifying: {model_name}")

    model = LoLNet(input_dim=input_dim, hidden_dim=64).to(device)
    model.load_state_dict(torch.load(model_path, map_location="cpu"))
    model.eval()

    lips_score = compute_lipschitz_constant(model)
