In [None]:
import torch
import optuna
import numpy as np
import pandas as pd
import torch.nn as nn
import torch_directml
import torch.optim as optim
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc
from optuna.integration import PyTorchLightningPruningCallback

# Device selection: DirectML or fallback to CUDA/CPU
device = torch_directml.device() if torch_directml.is_available() else torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

In [None]:
# URL to the dataset
data_url = 'https://raw.githubusercontent.com/KeithGalli/pandas/master/pokemon_data.csv'

# Load dataset
df = pd.read_csv(data_url, index_col='Name')
df = df.drop(['#'], axis=1)

# Map Legendary to binary values
df['Legendary'] = df['Legendary'].map({False: 0, True: 1})

# One-hot encode categorical variables and handle missing values
df = pd.get_dummies(df, columns=['Type 1', 'Type 2'], dummy_na=True)

# Separate features and target
x_columns = df.drop('Legendary', axis=1).columns
x = df[x_columns].astype(float).values
y = df['Legendary'].values

# Convert to PyTorch tensors
x = torch.tensor(x, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32).view(-1, 1)

In [1]:
# Constants
BATCH_SIZE = 32
TEST_SIZE = 0.25
RANDOM_STATE = 42

# Split into train/test sets
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=TEST_SIZE, random_state=RANDOM_STATE)

# Move data to the appropriate device (DirectML, CUDA, or CPU)
x_train, y_train, x_test, y_test = [t.to(device) for t in [x_train, y_train, x_test, y_test]]

# DataLoader creation
train_dataset = TensorDataset(x_train, y_train)
test_dataset = TensorDataset(x_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

Using device: privateuseone:0


## Advance Model

In [2]:
# Model Definition
class ClassificationModel(nn.Module):
    def __init__(self, input_dim, hidden_layers, dropout_rate):
        super(ClassificationModel, self).__init__()
        layers = []
        last_dim = input_dim
        
        for hidden_dim in hidden_layers:
            layers.append(nn.Linear(last_dim, hidden_dim))
            layers.append(nn.BatchNorm1d(hidden_dim))
            layers.append(nn.LeakyReLU())
            layers.append(nn.Dropout(dropout_rate))
            last_dim = hidden_dim
            
        layers.append(nn.Linear(last_dim, 1))
        layers.append(nn.Sigmoid())
        
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

# Hyperparameter Optimization Function
def objective(trial):
    # Suggest hyperparameters
    hidden_layers = trial.suggest_categorical('hidden_layers', [[256, 128], [512, 256, 128], [512, 256]])
    dropout_rate = trial.suggest_float('dropout_rate', 0.2, 0.5)
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
    weight_decay = trial.suggest_loguniform('weight_decay', 1e-6, 1e-3)

    # Model and optimizer
    model = ClassificationModel(input_dim=x_train.shape[1], hidden_layers=hidden_layers, dropout_rate=dropout_rate).to(device)
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)
    criterion = nn.BCELoss()
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=MAX_EPOCHS)
    
    # Training loop
    best_loss = float('inf')
    no_improvement = 0
    patience = 10
    
    for epoch in range(MAX_EPOCHS):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        
        model.eval()
        with torch.no_grad():
            val_loss = sum(criterion(model(inputs.to(device)), labels.to(device)).item() for inputs, labels in test_loader) / len(test_loader)
        
        scheduler.step()
        
        # Early stopping
        if val_loss < best_loss - 1e-3:
            best_loss = val_loss
            no_improvement = 0
        else:
            no_improvement += 1

        if no_improvement >= patience:
            break

    return best_loss

# Create study and optimize
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10) #n_trials recommended = 50

# Best model parameters
best_params = study.best_params
print(f"Best hyperparameters: {best_params}")




[I 2024-08-24 12:16:52,455] A new study created in memory with name: no-name-0d18d360-28d6-4d03-ad59-46d9de17b810
  lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
  weight_decay = trial.suggest_loguniform('weight_decay', 1e-6, 1e-3)
  return torch._C._nn.binary_cross_entropy(input, target, weight, reduction_enum)
[I 2024-08-24 12:18:15,483] Trial 0 finished with value: 0.09839651081711054 and parameters: {'hidden_layers': [512, 256], 'dropout_rate': 0.33210047754426975, 'lr': 0.00051556418093953, 'weight_decay': 0.0002151813490372811}. Best is trial 0 with value: 0.09839651081711054.
  lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
  weight_decay = trial.suggest_loguniform('weight_decay', 1e-6, 1e-3)
[I 2024-08-24 12:24:57,763] Trial 1 finished with value: 0.16772862949541636 and parameters: {'hidden_layers': [512, 256, 128], 'dropout_rate': 0.4535601296897447, 'lr': 4.298653906155615e-05, 'weight_decay': 4.121577731662978e-05}. Best is trial 0 with value: 0.09839651081711054.
[I 202

Best hyperparameters: {'hidden_layers': [256, 128], 'dropout_rate': 0.29561935187464167, 'lr': 0.005535215564704708, 'weight_decay': 0.0001776278936979131}


In [19]:

# Training function
def train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, max_epochs, patience):
    best_loss = float('inf')
    no_improvement = 0

    for epoch in range(max_epochs):
        model.train()
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        model.eval()
        with torch.no_grad():
            val_loss = sum(criterion(model(inputs.to(device)), labels.to(device)).item() for inputs, labels in test_loader) / len(test_loader)

        if val_loss < best_loss - 1e-3:
            best_loss = val_loss
            no_improvement = 0
        else:
            no_improvement += 1

        if no_improvement >= patience:
            print(f"Early stopping at epoch {epoch}")
            break

        scheduler.step()

    return model


# Evaluate model
def evaluate_model(model, X_test, y_test):
    model.eval()
    with torch.no_grad():
        pred = model(X_test).cpu().numpy().flatten()
        pred = np.clip(pred, a_min=1e-6, a_max=1-1e-6)
        logloss = metrics.log_loss(y_test.cpu().numpy(), pred)
        pred_binary = (pred > 0.5).astype(int)
        accuracy = metrics.accuracy_score(y_test.cpu().numpy(), pred_binary)
        auc_roc = roc_auc_score(y_test.cpu().numpy(), pred)
        precision, recall, _ = precision_recall_curve(y_test.cpu().numpy(), pred)
        pr_auc = auc(recall, precision)
        return logloss, accuracy, auc_roc, pr_auc

# Feature importance
def perturbation_rank(model, x_test, y_test, feature_names, verbose=False):
    model.eval()
    baseline_loss = criterion(model(x_test), y_test).item()
    importance_scores = []

    for i in range(x_test.shape[1]):
        x_test_perturbed = x_test.clone()
        x_test_perturbed[:, i] = x_test_perturbed[torch.randperm(x_test_perturbed.size(0)), i]
        perturbed_loss = criterion(model(x_test_perturbed), y_test).item()
        importance = perturbed_loss - baseline_loss
        importance_scores.append(importance)

        if verbose:
            print(f"Feature {feature_names[i]} - Perturbed Loss: {perturbed_loss:.4f} - Importance: {importance:.4f}")

    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Importance': importance_scores
    }).sort_values(by='Importance', ascending=False).reset_index(drop=True)

    return importance_df


In [16]:
# Train final model with best parameters

criterion = nn.BCELoss()
best_model = ClassificationModel(input_dim=x_train.shape[1], hidden_layers=best_params['hidden_layers'], 
                                 dropout_rate=best_params['dropout_rate']).to(device)
best_optimizer = optim.AdamW(best_model.parameters(), lr=best_params['lr'], weight_decay=best_params['weight_decay'])
best_scheduler = optim.lr_scheduler.CosineAnnealingLR(best_optimizer, T_max=MAX_EPOCHS)

final_model = train_model(best_model, train_loader, test_loader, criterion, best_optimizer, best_scheduler, MAX_EPOCHS, PATIENCE)

# Evaluate model
logloss, accuracy, auc_roc, pr_auc = evaluate_model(final_model, x_test, y_test)
print(f"Validation logloss: {logloss}")
print(f"Validation accuracy score: {accuracy}")
print(f"Validation AUC-ROC: {auc_roc}")
print(f"Validation Precision-Recall AUC: {pr_auc}")

Early stopping at epoch 14
Validation logloss: 0.12320013152055614
Validation accuracy score: 0.95
Validation AUC-ROC: 0.9806663924310983
Validation Precision-Recall AUC: 0.7879967334044045


In [20]:
# Feature Importance
importance_df = perturbation_rank(final_model, x_test, y_test, feature_names)
print(importance_df.head(10))

          Feature  Importance
0          Attack    0.101388
1         Sp. Atk    0.076471
2           Speed    0.066158
3         Defense    0.055552
4         Sp. Def    0.027226
5              HP    0.009540
6      Generation    0.002305
7  Type 1_Psychic    0.000918
8   Type 1_Ground    0.000780
9      Type 2_nan    0.000636
