In [7]:
"""
--- SECTION 1: LIBRARY IMPORTS ---
Importing necessary libraries for deep learning (PyTorch), machine learning (Scikit-learn, XGBoost, CatBoost),
data manipulation (Pandas, NumPy), and system operations.
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, KFold
from sklearn.preprocessing import StandardScaler, QuantileTransformer
from sklearn.metrics import mean_squared_error, mean_absolute_error, accuracy_score
import random
import math
import copy
import os

from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from xgboost import XGBClassifier, XGBRegressor
from catboost import CatBoostClassifier, CatBoostRegressor

from sklearn.neural_network import MLPRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from sklearn.svm import SVR
from sklearn.linear_model import Ridge, Lasso
from sklearn.kernel_ridge import KernelRidge

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

DEVICE = torch.device("mps") if torch.backends.mps.is_available() else (torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu"))
print(f"Using device: {DEVICE}")

IMPORTANT_FEATURES = [
    'MagpieData maximum MendeleevNumber', 'MagpieData mean AtomicWeight',
    'MagpieData minimum MeltingT', 'MagpieData maximum MeltingT',
    'MagpieData mean MeltingT', 'MagpieData minimum Column',
    'MagpieData range Column', 'MagpieData avg_dev Column',
    'MagpieData mode Column', 'MagpieData range Row', 'MagpieData mean Row',
    'MagpieData range Electronegativity', 'MagpieData avg_dev Electronegativity',
    'MagpieData mode Electronegativity', 'MagpieData mean NpValence',
    'MagpieData maximum NdValence', 'MagpieData range NdValence',
    'MagpieData mean NdValence', 'MagpieData maximum NfValence',
    'MagpieData mean NfValence', 'MagpieData mean NValence',
    'MagpieData mode NValence', 'MagpieData maximum NpUnfilled',
    'MagpieData range NpUnfilled', 'MagpieData mean NpUnfilled',
    'MagpieData range NUnfilled', 'MagpieData mean NUnfilled',
    'MagpieData mode NUnfilled', 'MagpieData minimum GSvolume_pa',
    'MagpieData mode GSvolume_pa', 'MagpieData maximum GSbandgap',
    'MagpieData range GSbandgap', 'MagpieData mode GSbandgap',
    'MagpieData mean GSmagmom', 'MagpieData mode SpaceGroupNumber'
]

Using device: mps


In [8]:
"""
--- SECTION 2: PREDICTION MODEL DEFINITIONS (REGRESSION) ---
Definitions for Deep Learning regression models (CNN, ViT, Diffusion) and a factory function
to instantiate traditional machine learning regressors.
"""
class Simple2DCNN(nn.Module):
    def __init__(self, input_side_len):
        super(Simple2DCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        if input_side_len == 12: flatten_dim = 128 * 3 * 3
        elif input_side_len == 6: flatten_dim = 128 * 1 * 1
        else: raise ValueError("Unsupported image size")
        self.regressor = nn.Sequential(
            nn.Flatten(),
            nn.Linear(flatten_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )
    def forward(self, x):
        return self.regressor(self.features(x))

class ImageViT(nn.Module):
    def __init__(self, image_size, dim=64, depth=4, heads=4, mlp_dim=128):
        super().__init__()
        num_patches = image_size * image_size 
        self.pixel_embedding = nn.Linear(1, dim)
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim))
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches + 1, dim))
        encoder_layer = nn.TransformerEncoderLayer(d_model=dim, nhead=heads, dim_feedforward=mlp_dim, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=depth)
        self.mlp_head = nn.Sequential(nn.LayerNorm(dim), nn.Linear(dim, 1))
    def forward(self, x):
        b, c, h, w = x.shape
        x = x.view(b, h*w, 1)
        x = self.pixel_embedding(x)
        cls_tokens = self.cls_token.repeat(b, 1, 1)
        x = torch.cat((cls_tokens, x), dim=1)
        x += self.pos_embedding[:, :(h*w + 1)]
        x = self.transformer(x)
        return self.mlp_head(x[:, 0])

class DiffusionNet(nn.Module):
    def __init__(self, input_dim, hidden_dim=128):
        super().__init__()
        self.time_mlp = nn.Sequential(nn.Linear(1, hidden_dim), nn.SiLU(), nn.Linear(hidden_dim, hidden_dim))
        self.cond_mlp = nn.Sequential(nn.Linear(input_dim, hidden_dim), nn.SiLU(), nn.Linear(hidden_dim, hidden_dim))
        self.input_mlp = nn.Linear(1, hidden_dim)
        self.mid = nn.Sequential(nn.Linear(hidden_dim, hidden_dim), nn.SiLU(), nn.Linear(hidden_dim, hidden_dim), nn.SiLU(), nn.Linear(hidden_dim, 1))
    def forward(self, x, t, condition):
        t_emb = self.time_mlp(t.view(-1, 1) / 100.0)
        c_emb = self.cond_mlp(condition)
        x_emb = self.input_mlp(x)
        return self.mid(x_emb + t_emb + c_emb)

class DiffusionWrapper(nn.Module):
    def __init__(self, input_dim, n_steps=100):
        super().__init__()
        self.n_steps = n_steps
        self.net = DiffusionNet(input_dim)
        self.register_buffer('betas', torch.linspace(1e-4, 0.02, n_steps))
        self.register_buffer('alphas', 1 - self.betas)
        self.register_buffer('alphas_cumprod', torch.cumprod(self.alphas, dim=0))
    def compute_loss(self, x0, condition):
        batch_size = x0.shape[0]
        t = torch.randint(0, self.n_steps, (batch_size,), device=x0.device)
        noise = torch.randn_like(x0)
        a_bar = self.alphas_cumprod[t].view(-1, 1)
        x_t = torch.sqrt(a_bar) * x0 + torch.sqrt(1 - a_bar) * noise
        pred_noise = self.net(x_t, t.float(), condition)
        return F.mse_loss(pred_noise, noise)
    @torch.no_grad()
    def sample(self, condition, n_ensembles=20):
        batch_size = condition.shape[0]
        cond_expanded = condition.repeat_interleave(n_ensembles, dim=0)
        x = torch.randn(cond_expanded.shape[0], 1, device=condition.device)
        for t in reversed(range(self.n_steps)):
            t_batch = torch.full((x.shape[0],), t, device=condition.device, dtype=torch.float)
            pred_noise = self.net(x, t_batch, cond_expanded)
            beta = self.betas[t]
            alpha = self.alphas[t]
            a_bar = self.alphas_cumprod[t]
            if t > 0: noise = torch.randn_like(x)
            else: noise = 0
            mean = (1 / torch.sqrt(alpha)) * (x - (beta / torch.sqrt(1 - a_bar)) * pred_noise)
            x = mean + torch.sqrt(beta) * noise
        x = x.view(batch_size, n_ensembles, 1)
        return x.mean(dim=1)

def get_regressor(reg_type):
    if reg_type == 'XGBoost': return XGBRegressor(n_estimators=500, learning_rate=0.05, max_depth=6, n_jobs=-1, random_state=42)
    if reg_type == 'CatBoost': return CatBoostRegressor(iterations=1000, learning_rate=0.05, verbose=0, random_seed=42)
    if reg_type == 'RandomForest': return RandomForestRegressor(n_estimators=200, n_jobs=-1, random_state=42)
    if reg_type == 'KRR': return KernelRidge(kernel='rbf', alpha=0.1, gamma=0.01)
    if reg_type == 'SVR': return SVR(kernel='rbf', C=10, gamma=0.01)
    if reg_type == 'MLP': return MLPRegressor(hidden_layer_sizes=(256, 128, 64), max_iter=500, random_state=42)
    if reg_type == 'Ridge': return Ridge(alpha=1.0)
    if reg_type == 'Lasso': return Lasso(alpha=0.01)
    if reg_type == 'DecisionTree': return DecisionTreeRegressor(max_depth=10, random_state=42)
    raise ValueError(f"Unknown regressor: {reg_type}")

In [9]:
"""
--- SECTION 3: CLASSIFICATION MODEL DEFINITIONS ---
Factory function to instantiate various traditional machine learning classifiers
used for the two-step metal/non-metal classification task.
"""
def get_classifier(clf_type):
    if clf_type == 'XGBoost': return XGBClassifier(n_estimators=300, learning_rate=0.05, max_depth=6, random_state=42, n_jobs=-1)
    if clf_type == 'CatBoost': return CatBoostClassifier(iterations=500, verbose=0, random_seed=42)
    if clf_type == 'RandomForest': return RandomForestClassifier(n_estimators=200, n_jobs=-1, random_state=42)
    if clf_type == 'DecisionTree': return DecisionTreeClassifier(max_depth=10, random_state=42)
    if clf_type == 'SVM': return SVC(kernel='rbf', probability=True, random_state=42)
    if clf_type == 'LogisticRegression': return LogisticRegression(max_iter=1000, random_state=42)
    if clf_type == 'LDA': return LinearDiscriminantAnalysis()
    raise ValueError(f"Unknown classifier: {clf_type}")

In [10]:
"""
--- SECTION 4: DATASET PROCESSING DEFINITIONS ---
Functions to reshape and pad feature matrices based on the model architecture requirements
(e.g., transforming 1D vectors to 2D image formats for CNNs/ViTs).
"""
def process_features(X_in, model_type, use_important_features):
    is_image_model = model_type in ['CNN', 'ViT']
    
    if is_image_model:
        if use_important_features:
            pad_width = ((0,0), (1,0)) 
            X_out = np.pad(X_in, pad_width, mode='constant').reshape(-1, 1, 6, 6)
        else:
            target = 144
            if X_in.shape[1] > target: X_in = X_in[:, :target]
            pad = target - X_in.shape[1]
            X_out = np.pad(X_in, ((0,0), (0, pad)), mode='constant').reshape(-1, 1, 12, 12)
    else:
        X_out = X_in
        
    return X_out

In [11]:
"""
--- SECTION 5: TRAIN-TEST FUNCTION DEFINITION ---
The main execution loop implementing a 5-Fold Cross-Validation strategy.
Handles data splitting, optional two-step classification, feature scaling,
model training (both DL and ML), and final evaluation.
"""
def train_test_cv(model_type='XGBoost', classifier_type=None, use_important_features=True, 
                  csv_path='team-a.csv', n_splits=5, epochs=50, batch_size=64):
    print(f"\n{'='*70}")
    print(f"5-FOLD CV: Regressor={model_type} | Classifier={classifier_type} | Features={'Important' if use_important_features else 'All'}")
    print(f"{'='*70}")

    if not os.path.exists(csv_path):
        print("CSV not found, generating dummy data.")
        df = pd.DataFrame(np.random.rand(500, 100), columns=[f'feat_{i}' for i in range(100)])
        for c in IMPORTANT_FEATURES: df[c] = np.random.rand(500)
        df['gap expt'] = np.abs(np.random.randn(500))
    else:
        df = pd.read_csv(csv_path)

    y_raw = df.iloc[:, 1].values
    
    if use_important_features:
        cols = [c for c in IMPORTANT_FEATURES if c in df.columns]
        X_raw = df[cols].values
    else:
        X_raw = df.iloc[:, 2:].values

    X_train_val, X_holdout_test, y_train_val, y_holdout_test = train_test_split(X_raw, y_raw, test_size=0.2, random_state=42)

    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    
    fold_mae_scores = []
    
    for fold, (train_idx, val_idx) in enumerate(kf.split(X_train_val)):
        print(f"\n--- Fold {fold+1}/{n_splits} ---")
        
        X_fold_train, X_fold_val = X_train_val[train_idx], X_train_val[val_idx]
        y_fold_train, y_fold_val = y_train_val[train_idx], y_train_val[val_idx]

        scaler_x = StandardScaler()
        X_fold_train = scaler_x.fit_transform(X_fold_train)
        X_fold_val = scaler_x.transform(X_fold_val)
        
        if classifier_type is not None:
            y_cls_train = (y_fold_train > 0.001).astype(int)
            
            clf = get_classifier(classifier_type)
            clf.fit(X_fold_train.reshape(X_fold_train.shape[0], -1), y_cls_train)
            
            y_cls_pred_val = clf.predict(X_fold_val.reshape(X_fold_val.shape[0], -1))
            
            non_metal_mask = y_cls_train == 1
            X_reg_train = X_fold_train[non_metal_mask]
            y_reg_train = y_fold_train[non_metal_mask]
            
        else:
            y_cls_pred_val = np.ones(len(y_fold_val))
            X_reg_train = X_fold_train
            y_reg_train = y_fold_train

        scaler_y = QuantileTransformer(output_distribution='normal', n_quantiles=min(500, len(y_reg_train)//2))
        y_reg_train_scaled = scaler_y.fit_transform(y_reg_train.reshape(-1, 1)).ravel()

        X_reg_train_processed = process_features(X_reg_train, model_type, use_important_features)
        X_fold_val_processed = process_features(X_fold_val, model_type, use_important_features)
        
        input_shape = X_reg_train_processed.shape
        
        if model_type in ['CNN', 'ViT', 'Diffusion']:
            train_ds = TensorDataset(torch.tensor(X_reg_train_processed, dtype=torch.float32), 
                                     torch.tensor(y_reg_train_scaled, dtype=torch.float32))
            y_fold_val_scaled = scaler_y.transform(y_fold_val.reshape(-1, 1)).ravel()
            val_ds = TensorDataset(torch.tensor(X_fold_val_processed, dtype=torch.float32), 
                                   torch.tensor(y_fold_val_scaled, dtype=torch.float32))
            
            train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
            val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
            
            if model_type == 'CNN': model = Simple2DCNN(input_shape[2]).to(DEVICE)
            elif model_type == 'ViT': model = ImageViT(input_shape[2]).to(DEVICE)
            elif model_type == 'Diffusion': model = DiffusionWrapper(input_shape[1]).to(DEVICE)
            
            opt = optim.AdamW(model.parameters() if model_type!='Diffusion' else model.net.parameters(), lr=1e-3)
            crit = nn.MSELoss()
            
            best_val_mse = float('inf')
            best_w = None
            patience = 0
            
            for ep in range(epochs):
                model.train()
                for bx, by in train_loader:
                    bx, by = bx.to(DEVICE), by.to(DEVICE)
                    opt.zero_grad()
                    loss = model.compute_loss(by.unsqueeze(1), bx) if model_type=='Diffusion' else crit(model(bx), by.unsqueeze(1))
                    loss.backward()
                    opt.step()
                
                model.eval()
                val_mse = 0
                with torch.no_grad():
                    for bx, by in val_loader:
                        bx, by = bx.to(DEVICE), by.to(DEVICE)
                        loss = model.compute_loss(by.unsqueeze(1), bx) if model_type=='Diffusion' else crit(model(bx), by.unsqueeze(1))
                        val_mse += loss.item()
                
                if val_mse < best_val_mse:
                    best_val_mse = val_mse
                    best_w = copy.deepcopy(model.net.state_dict() if model_type=='Diffusion' else model.state_dict())
                    patience = 0
                else: patience += 1
                if patience >= 10: break
            
            if model_type=='Diffusion': model.net.load_state_dict(best_w)
            else: model.load_state_dict(best_w)
            model.eval()
            
            X_val_tensor = torch.tensor(X_fold_val_processed, dtype=torch.float32).to(DEVICE)
            with torch.no_grad():
                if model_type=='Diffusion': pred_scaled = model.sample(X_val_tensor, n_ensembles=10).cpu().numpy()
                else: pred_scaled = model(X_val_tensor).cpu().numpy()
        
        else:
            reg = get_regressor(model_type)
            if X_reg_train_processed.ndim > 2:
                reg.fit(X_reg_train_processed.reshape(len(X_reg_train_processed), -1), y_reg_train_scaled)
                pred_scaled = reg.predict(X_fold_val_processed.reshape(len(X_fold_val_processed), -1)).reshape(-1, 1)
            else:
                reg.fit(X_reg_train_processed, y_reg_train_scaled)
                pred_scaled = reg.predict(X_fold_val_processed).reshape(-1, 1)

        pred_raw = scaler_y.inverse_transform(pred_scaled).flatten()
        
        final_pred = pred_raw * y_cls_pred_val
        final_pred = np.maximum(final_pred, 0.0)
        
        mae = mean_absolute_error(y_fold_val, final_pred)
        fold_mae_scores.append(mae)
        print(f"  [Fold {fold+1}] MAE: {mae:.4f}")

    avg_mae = np.mean(fold_mae_scores)
    std_mae = np.std(fold_mae_scores)
    print(f"\n{'='*30}")
    print(f"CV Results (Aligns with Baseline):")
    print(f"Mean MAE: {avg_mae:.4f} (+/- {std_mae * 2:.4f})")
    print(f"{'='*30}")
    
    return avg_mae

In [12]:
train_test(model_type='CNN', use_important_features=False)


Running: Regressor=CNN | Classifier=None
Features: All
Using all 132 features.
Step 1: Skipped (Pure Regression Mode)
Step 2: Training CNN Regressor...
Ep 10 | Val Loss: 3.7435
Ep 20 | Val Loss: 3.6084
Ep 30 | Val Loss: 3.4448

Final Test Results:
Regressor: CNN + Classifier: None
MAE:  0.5776
RMSE: 1.2735


0.5776020898835795

In [6]:
train_test(model_type='CatBoost', use_important_features=False, classifier_type='SVM')


Running: Regressor=CatBoost | Classifier=SVM
Features: All
Using all 132 features.
Step 1: Training SVM Classifier for Metal/Non-Metal...
Classifier Accuracy on Test Set: 0.8838
Data Filtered: Regressor will train on 1530/3222 samples (Non-Metals).
Step 2: Training CatBoost Regressor...

Final Test Results:
Regressor: CatBoost + Classifier: SVM
MAE:  0.4484
RMSE: 1.1104


0.4483820563341684

In [31]:
train_test(model_type='MLP', use_important_features=False)


Running Experiment: Model=MLP, Important_Features=False
Using all 132 features.
Kept features as 1D vectors with dimension 132.
Epoch 10/50 | Train: 2.9797 | Val: 3.6832
Epoch 20/50 | Train: 2.4301 | Val: 3.3990
Epoch 30/50 | Train: 2.1635 | Val: 3.4347
Epoch 40/50 | Train: 2.0290 | Val: 3.3087
Epoch 50/50 | Train: 1.8008 | Val: 3.1892
Early stopping at epoch 50
Testing with Best Model...
Results for MLP (ImportantFeats=False):
MSE: 1.4561
RMSE: 1.2067
MAE: 0.5676


0.5676350153069265

In [32]:
train_test(model_type='MLP', use_important_features=True)


Running Experiment: Model=MLP, Important_Features=True
Using 35 important features.
Kept features as 1D vectors with dimension 35.
Epoch 10/50 | Train: 3.4125 | Val: 3.7716
Epoch 20/50 | Train: 3.1117 | Val: 3.6147
Epoch 30/50 | Train: 2.8713 | Val: 3.6025
Epoch 40/50 | Train: 2.8316 | Val: 3.4818
Epoch 50/50 | Train: 2.6356 | Val: 3.4947
Testing with Best Model...
Results for MLP (ImportantFeats=True):
MSE: 1.4731
RMSE: 1.2137
MAE: 0.6180


0.6180294933973991