In [None]:
import os
import pickle
import pandas as pd
import numpy as np
import time

from torch.utils.data import Dataset, TensorDataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F

from sklearn.model_selection import train_test_split

start_time = time.time()

In [None]:
class ExtendedMNISTDataset(Dataset):
    def __init__(self, is_local: bool = False, is_train: bool = True):
        root = "/kaggle/input/fii-nn-2025-homework-4/"
        if is_local:
            root = "." + root

        file_name = "extended_mnist_train.pkl"
        if is_train == False:
            file_name = "extended_mnist_test.pkl"
        file_path = os.path.join(root, file_name)
        
        with open(file_path, "rb") as fp:
            self.data = pickle.load(fp)

    def __len__(self) -> int:
        return len(self.data)

    def __getitem__(self, i: int):
        return self.data[i]

In [None]:
train_data = []
train_labels = []
for image, label in ExtendedMNISTDataset(is_train=True):
    train_data.append(image.flatten())
    train_labels.append(label)

test_data = []
for image, label in ExtendedMNISTDataset(is_train=False):
    test_data.append(image.flatten())

train_data = np.array(train_data, dtype=np.float32)
train_labels = np.array(train_labels, dtype=np.int64)
test_data = np.array(test_data, dtype=np.float32)

print("Data loaded.")
print(f"Train: {train_data.shape}, Test: {test_data.shape}")

In [None]:
# Normalization
train_data /= 255.0
test_data /= 255.0

# Standardization
mean = train_data.mean(axis=0, keepdims=True).astype(np.float32)
std = train_data.std(axis=0, keepdims=True).astype(np.float32) + 1e-8
train_data = (train_data - mean) / std
test_data = (test_data - mean) / std

# Data Augmentation
print("Augmenting data...")
augmented_data = [train_data]
augmented_labels = [train_labels]

# One augmentation with noise
noisy = train_data + np.random.normal(0, 0.07, train_data.shape).astype(np.float32)
augmented_data.append(noisy)
augmented_labels.append(train_labels)

train_data_aug = np.vstack(augmented_data).astype(np.float32)
train_labels_aug = np.concatenate(augmented_labels).astype(np.int64)

print(f"Augmented: {train_data_aug.shape}")

In [None]:
# Train/val split
X_train, X_val, y_train, y_val = train_test_split(
    train_data_aug,
    train_labels_aug,
    test_size=0.1,
    random_state=42,
    stratify=train_labels_aug
)

# Convert to PyTorch tensors
X_train_t = torch.from_numpy(X_train).float()
y_train_t = torch.from_numpy(y_train).long()
X_val_t = torch.from_numpy(X_val).float()
y_val_t = torch.from_numpy(y_val).long()
X_test_t = torch.from_numpy(test_data).float()

train_ds = TensorDataset(X_train_t, y_train_t)
val_ds = TensorDataset(X_val_t, y_val_t)

# DataLoaders
batch_size = 512
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)

print(f"Train batches: {len(train_loader)}, Val batches: {len(val_loader)}")

In [None]:
class FastMLP(nn.Module):
    def __init__(self, input_dim=784, num_classes=10):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 512)
        self.bn1 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 256)
        self.bn2 = nn.BatchNorm1d(256)
        self.fc3 = nn.Linear(256, 128)
        self.bn3 = nn.BatchNorm1d(128)
        self.fc4 = nn.Linear(128, num_classes)
        self.dropout = nn.Dropout(0.25)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc3(x)
        x = self.bn3(x)
        x = F.relu(x)
        x = self.dropout(x)
        
        x = self.fc4(x)
        return x

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

model = FastMLP(input_dim=train_data.shape[1]).to(device)

# Count parameters
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001, weight_decay=0.0001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, 
    mode="max", 
    factor=0.5, 
    patience=3
)

In [None]:
def evaluate(loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            yb = yb.to(device)
            logits = model(xb)
            preds = logits.argmax(dim=1)
            correct += (preds == yb).sum().item()
            total += xb.size(0)
    return correct / total

In [None]:
print("Training...")
start_training = time.time()

epochs = 25
best_val_acc = 0.0
best_state = None
patience = 7
no_improve = 0
current_lr = optimizer.param_groups[0]["lr"]

for epoch in range(1, epochs + 1):
    model.train()
    running_loss = 0.0
    total_train = 0
    correct_train = 0

    for xb, yb in train_loader:
        xb = xb.to(device)
        yb = yb.to(device)

        optimizer.zero_grad()
        logits = model(xb)
        loss = criterion(logits, yb)
        loss.backward()
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()

        running_loss += loss.item() * xb.size(0)
        preds = logits.argmax(dim=1)
        correct_train += (preds == yb).sum().item()
        total_train += xb.size(0)

    train_acc = correct_train / total_train
    val_acc = evaluate(val_loader)
    
    # Check if LR changed
    old_lr = current_lr
    scheduler.step(val_acc)
    current_lr = optimizer.param_groups[0]["lr"]
    
    print(f"# {epoch:02d}/{epochs} | "
          f"training : {train_acc*100:.2f}% | "
          f"vall: {val_acc*100:.2f}%", end="")
    
    if current_lr != old_lr:
        print(f" | LR: {old_lr:.6f} , {current_lr:.6f}", end="")
    
    print()  # New line

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_state = model.state_dict().copy()
        no_improve = 0
    else:
        no_improve += 1
        
    if no_improve >= patience:
        print(f"\nEarly stopping at epoch {epoch}")
        break

training_time = time.time() - start_training
print(f"\nTraining time: {training_time:.2f}s ({training_time/60:.2f} min)")
print(f"Best validation accuracy: {best_val_acc*100:.2f}%")

# Load best model
if best_state is not None:
    model.load_state_dict(best_state)

In [None]:
print("Applying TTA...")
model.eval()
tta_predictions = []

with torch.no_grad():
    test_loader = DataLoader(TensorDataset(X_test_t), batch_size=batch_size, shuffle=False)
    
    # Original predictions
    all_preds = []
    for (xb,) in test_loader:
        xb = xb.to(device)
        logits = model(xb)
        all_preds.append(F.softmax(logits, dim=1).cpu().numpy())
    tta_predictions.append(np.vstack(all_preds))
    
    # 2 noisy augmentations
    for i in range(2):
        X_test_noisy = X_test_t + torch.randn_like(X_test_t) * 0.04
        test_loader_noisy = DataLoader(TensorDataset(X_test_noisy), batch_size=batch_size, shuffle=False)
        all_preds = []
        for (xb,) in test_loader_noisy:
            xb = xb.to(device)
            logits = model(xb)
            all_preds.append(F.softmax(logits, dim=1).cpu().numpy())
        tta_predictions.append(np.vstack(all_preds))

# Average predictions
avg_proba = np.mean(tta_predictions, axis=0)
final_predictions = np.argmax(avg_proba, axis=1)

print(f"Predictions shape: {final_predictions.shape}")
print(f"Unique predictions: {np.unique(final_predictions)}")

In [None]:
total_time = time.time() - start_time
print(f"total time: {total_time:.2f}s ({total_time/60:.2f} minutes)")

submission = pd.DataFrame({
    "ID": list(range(len(final_predictions))),
    "target": final_predictions.tolist(),
})

submission.to_csv('submission.csv', index=False)
print('submission done')