In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import pickle
import numpy as np
import pandas as pd
import os

#############################################################################
# 1. Data Preparation: CIFAR-10
#    - Includes Cutout in the training transform
#############################################################################
class Cutout:
    """
    Randomly masks out one or more square regions of an image.
    This version assumes the input is a tensor, not a PIL Image.
    """
    def __init__(self, n_holes=1, length=16):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, img):
        """
        img: tensor image of size (C, H, W)
        """
        h = img.size(1)
        w = img.size(2)

        # Create a mask full of 1s
        mask = np.ones((h, w), np.float32)

        for _ in range(self.n_holes):
            y = np.random.randint(h)
            x = np.random.randint(w)

            y1 = np.clip(y - self.length // 2, 0, h)
            y2 = np.clip(y + self.length // 2, 0, h)
            x1 = np.clip(x - self.length // 2, 0, w)
            x2 = np.clip(x + self.length // 2, 0, w)

            mask[y1: y2, x1: x2] = 0.

        mask = torch.from_numpy(mask).to(img.device)
        mask = mask.expand_as(img)
        img = img * mask

        return img

    def __repr__(self):
        return self.__class__.__name__ + f'(n_holes={self.n_holes}, length={self.length})'


def get_cifar10_dataloaders(batch_size=128, num_workers=2):
    """
    Returns train and test DataLoaders for CIFAR-10 with standard
    data augmentation + Cutout on the training set.
    """
    # Use the standard CIFAR-10 mean and std values
    mean = (0.4914, 0.4822, 0.4465)
    std = (0.2470, 0.2435, 0.2616)

    # Train transforms: random crop, flip, cutout, then normalize
    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10),
        transforms.ToTensor(),
        Cutout(n_holes=1, length=16),
        transforms.Normalize(mean, std),
    ])

    # Test transforms: only ToTensor + normalize
    transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])

    train_dataset = torchvision.datasets.CIFAR10(
        root='./data', train=True, download=True, transform=transform_train
    )
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=batch_size, shuffle=True,
        num_workers=num_workers
    )

    test_dataset = torchvision.datasets.CIFAR10(
        root='./data', train=False, download=True, transform=transform_test
    )
    test_loader = torch.utils.data.DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False,
        num_workers=num_workers
    )

    return train_loader, test_loader

#############################################################################
# 2. MixUp Data Augmentation
#############################################################################
def mixup_data(x, y, alpha=1.0):
    """Returns mixed inputs and targets"""
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    index = torch.randperm(batch_size).to(x.device)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    """Returns mixup loss"""
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

#############################################################################
# 3. Squeeze-and-Excitation Block
#############################################################################
class SEBlock(nn.Module):
    """Squeeze-and-Excitation block for channel attention"""
    def __init__(self, channel, reduction=16):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

#############################################################################
# 4. Narrow ResNet-18 Implementation with SE Blocks
#############################################################################
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding, no bias (used with BatchNorm)."""
    return nn.Conv2d(
        in_planes, out_planes, kernel_size=3, stride=stride,
        padding=1, bias=False
    )

class BasicBlockWithSE(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1, downsample=None, bn_momentum=0.9, se_reduction=16):
        super(BasicBlockWithSE, self).__init__()
        self.conv1 = conv3x3(in_planes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes, momentum=bn_momentum)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes, momentum=bn_momentum)
        self.downsample = downsample
        self.se = SEBlock(planes, reduction=se_reduction)

    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        # Apply SE block
        out = self.se(out)

        out += identity
        out = self.relu(out)

        return out

class NarrowResNet18(nn.Module):
    def __init__(self, num_classes=10, width_multiplier=0.6,
                 bn_momentum=0.9, dropout=0.3, se_reduction=16):
        """
        ResNet-18 with fewer channels (width_multiplier), SE blocks, and dropout.
        """
        super(NarrowResNet18, self).__init__()
        self.block_layers = [2, 2, 2, 2]
        base_channels = [64, 128, 256, 512]
        self.channels = [int(c * width_multiplier) for c in base_channels]
        self.in_planes = self.channels[0]

        # Initial convolution
        self.conv1 = nn.Conv2d(3, self.in_planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_planes, momentum=bn_momentum)
        self.relu = nn.ReLU(inplace=True)

        # Stages
        self.layer1 = self._make_layer(self.channels[0], self.block_layers[0], stride=1,
                                       bn_momentum=bn_momentum, se_reduction=se_reduction)
        self.layer2 = self._make_layer(self.channels[1], self.block_layers[1], stride=2,
                                       bn_momentum=bn_momentum, se_reduction=se_reduction)
        self.layer3 = self._make_layer(self.channels[2], self.block_layers[2], stride=2,
                                       bn_momentum=bn_momentum, se_reduction=se_reduction)
        self.layer4 = self._make_layer(self.channels[3], self.block_layers[3], stride=2,
                                       bn_momentum=bn_momentum, se_reduction=se_reduction)

        # Global average pooling + dropout + linear
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.dropout = nn.Dropout(p=dropout)
        self.fc = nn.Linear(self.channels[3] * BasicBlockWithSE.expansion, num_classes)

        self._init_weights()

    def _make_layer(self, planes, blocks, stride=1, bn_momentum=0.9, se_reduction=16):
        downsample = None
        if stride != 1 or self.in_planes != planes:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_planes, planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes, momentum=bn_momentum),
            )

        layers = []
        layers.append(BasicBlockWithSE(self.in_planes, planes, stride, downsample,
                                     bn_momentum=bn_momentum, se_reduction=se_reduction))
        self.in_planes = planes
        for _ in range(1, blocks):
            layers.append(BasicBlockWithSE(self.in_planes, planes, bn_momentum=bn_momentum,
                                         se_reduction=se_reduction))

        return nn.Sequential(*layers)

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)

        x = self.dropout(x)  # Dropout before final FC
        x = self.fc(x)
        return x

#############################################################################
# 5. Training and Evaluation Routines
#############################################################################
def train_one_epoch(model, device, train_loader, criterion, optimizer, use_mixup=True, mixup_alpha=1.0):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, targets in train_loader:
        inputs, targets = inputs.to(device), targets.to(device)

        optimizer.zero_grad()

        if use_mixup:
            # Apply mixup
            inputs, targets_a, targets_b, lam = mixup_data(inputs, targets, alpha=mixup_alpha)
            outputs = model(inputs)
            loss = mixup_criterion(criterion, outputs, targets_a, targets_b, lam)
        else:
            # Standard forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)

        if not use_mixup:
            _, predicted = outputs.max(1)
            correct += predicted.eq(targets).sum().item()
            total += targets.size(0)
        else:
            # For mixup, we don't track accuracy during training
            total += inputs.size(0)

    epoch_loss = running_loss / total
    epoch_acc = 100.0 * correct / total if not use_mixup else 0.0
    return epoch_loss, epoch_acc


def evaluate(model, device, test_loader, criterion):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)

            running_loss += loss.item() * inputs.size(0)
            _, predicted = outputs.max(1)
            correct += predicted.eq(targets).sum().item()
            total += targets.size(0)

    epoch_loss = running_loss / total
    epoch_acc = 100.0 * correct / total
    return epoch_loss, epoch_acc

#############################################################################
# 6. Test Time Augmentation
#############################################################################
def test_time_augmentation(model, inputs, num_augmentations=10, device='cuda'):
    """Apply multiple augmentations to each input and average predictions"""
    model.eval()
    batch_size = inputs.size(0)
    predictions = []

    # Base prediction (no augmentation)
    with torch.no_grad():
        outputs = model(inputs)
        predictions.append(F.softmax(outputs, dim=1))

    # Horizontal flip
    with torch.no_grad():
        flipped = torch.flip(inputs, dims=[3])  # Flip horizontally
        outputs = model(flipped)
        predictions.append(F.softmax(outputs, dim=1))

    # Random shifts (small translations)
    for _ in range(num_augmentations - 2):  # -2 because we already did base and flip
        shifted = inputs.clone()

        # Apply small random shifts (we're limited by what we can do with tensor operations)
        pad_size = 4
        padded = F.pad(shifted, (pad_size, pad_size, pad_size, pad_size), mode='reflect')

        # For each image in the batch
        for i in range(batch_size):
            h_shift, w_shift = np.random.randint(0, 2*pad_size+1, size=2)
            shifted[i] = padded[i, :, h_shift:h_shift+32, w_shift:w_shift+32]

        with torch.no_grad():
            outputs = model(shifted)
            predictions.append(F.softmax(outputs, dim=1))

    # Average predictions
    avg_preds = torch.stack(predictions).mean(dim=0)
    return avg_preds

#############################################################################
# 7. Improved Test Data Loading
#############################################################################
def locate_test_file():
    """Try to locate the test file in various locations"""
    potential_paths = [
        "cifar_test_nolabel.pkl",
        "/content/cifar_test_nolabel.pkl",
        "/content/drive/MyDrive/cifar_test_nolabel.pkl",
        "sample_data/cifar_test_nolabel.pkl"
    ]

    for path in potential_paths:
        if os.path.exists(path):
            print(f"Found test file at: {path}")
            return path

    # If we reach here, we couldn't find the file
    print("Could not locate test file. Please check the file exists and provide the correct path.")
    return None

def load_custom_test_set(filepath):
    """
    Load and preprocess the test data from pickle file.
    Diagnoses the structure of the file and handles different formats.
    """
    print(f"Loading data from: {filepath}")

    # Open the pickle file and load the data
    with open(filepath, 'rb') as f:
        data = pickle.load(f, encoding='bytes')

    # Print keys to understand the structure
    print("Keys in data:", [k.decode() if isinstance(k, bytes) else k for k in data.keys()])

    # Extract image data - adjust according to the actual structure
    images = data[b'data']
    print("Raw data shape:", images.shape)

    # Handle different data formats
    if len(images.shape) == 2:  # Flattened format (N, 3072)
        num_samples = images.shape[0]
        # Check if this is CIFAR format (3072 = 3*32*32)
        if images.shape[1] == 3072:
            # Reshape from (N, 3072) to (N, 3, 32, 32)
            images = images.reshape(num_samples, 3, 32, 32)
            print("Reshaped flattened data to NCHW format:", images.shape)
        else:
            raise ValueError(f"Unexpected data shape: {images.shape}")
    elif len(images.shape) == 4:  # Image format (N, H, W, C)
        # Convert from NHWC to NCHW format
        images = np.transpose(images, (0, 3, 1, 2))
        print("Converted from NHWC to NCHW format:", images.shape)

    # Convert data to float32 and normalize to [0,1]
    images = images.astype(np.float32) / 255.0

    # Convert to PyTorch tensor
    images = torch.tensor(images, dtype=torch.float32)

    return images

def preprocess_test_images(images):
    """
    Normalize test images using the same statistics as training.
    """
    # IMPORTANT: Use the same normalization values as in training
    mean = (0.4914, 0.4822, 0.4465)
    std = (0.2470, 0.2435, 0.2616)

    normalize = transforms.Normalize(mean=mean, std=std)

    # Apply normalization to each image
    normalized_images = torch.zeros_like(images)
    for i in range(images.shape[0]):
        normalized_images[i] = normalize(images[i])

    return normalized_images

#############################################################################
# 8. Main Training Function
#############################################################################
def train_model(num_epochs=300, batch_size=128, width_multiplier=0.6, dropout=0.3, se_reduction=16,
                use_mixup=True, mixup_alpha=1.0):
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    print("Using device:", device)

    # Hyperparameters
    initial_lr = 0.1
    weight_decay = 5e-4
    bn_momentum = 0.9

    # 1) Load Data
    train_loader, test_loader = get_cifar10_dataloaders(batch_size=batch_size)

    # 2) Create Model with SE blocks
    model = NarrowResNet18(
        num_classes=10,
        width_multiplier=width_multiplier,
        bn_momentum=bn_momentum,
        dropout=dropout,
        se_reduction=se_reduction
    ).to(device)

    total_params = sum(p.numel() for p in model.parameters())
    print(f"Total parameters: {total_params:,}")
    assert total_params < 5_000_000, "Model exceeds 5 million parameters!"

    # 3) Define Loss (with label smoothing) and Optimizer
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1)  # Increased label smoothing from 0.05 to 0.1
    optimizer = optim.SGD(
        model.parameters(), lr=initial_lr, momentum=0.9, weight_decay=weight_decay, nesterov=True
    )

    # 4) Learning Rate Schedule with Warmup + Cosine
    warmup_epochs = 25
    scheduler_cosine = optim.lr_scheduler.CosineAnnealingLR(
        optimizer,
        T_max=(num_epochs - warmup_epochs),
        eta_min=1e-5  # Reduced eta_min to allow learning rate to go lower
    )

    # 5) Training Loop
    best_acc = 0.0
    best_model_state = None

    for epoch in range(num_epochs):
        # -------------- Warmup stage --------------
        if epoch < warmup_epochs:
            # Linearly interpolate from 0.01 to initial_lr over warmup_epochs
            warmup_start_lr = 0.01
            progress = (epoch + 1) / warmup_epochs  # goes from 1/warmup_epochs → 1.0
            new_lr = warmup_start_lr + (initial_lr - warmup_start_lr) * progress

            for param_group in optimizer.param_groups:
                param_group['lr'] = new_lr

        # -------------- Cosine stage --------------
        else:
            # Step the cosine scheduler
            scheduler_cosine.step()

        # ----- Training as usual -----
        train_loss, train_acc = train_one_epoch(
            model, device, train_loader, criterion, optimizer,
            use_mixup=use_mixup, mixup_alpha=mixup_alpha
        )

        test_loss, test_acc = evaluate(model, device, test_loader, criterion)

        # Track best accuracy
        if test_acc > best_acc:
            best_acc = test_acc
            best_model_state = model.state_dict().copy()  # Save a copy of the model state

        if (epoch + 1) % 10 == 0 or epoch == num_epochs - 1:
            print(f"Epoch [{epoch+1}/{num_epochs}] "
                  f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | "
                  f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")

    print(f"\nBest Test Accuracy: {best_acc:.2f}%")

    # Load best model weights
    model.load_state_dict(best_model_state)

    return model, best_acc

#############################################################################
# 9. Inference and Submission Generation
#############################################################################
def generate_submission(model, device='cuda', use_tta=True, num_augmentations=10):
    """
    Run inference on the test set and generate a submission file.
    """
    # 1) Find and load test data
    test_file_path = locate_test_file()
    if test_file_path is None:
        print("Failed to locate test file. Submission generation aborted.")
        return

    # 2) Load and preprocess test data
    test_images = load_custom_test_set(test_file_path)
    test_images = preprocess_test_images(test_images)

    # 3) Create DataLoader
    test_dataset = torch.utils.data.TensorDataset(test_images)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False)

    # 4) Run inference with or without TTA
    model.eval()
    all_predictions = []

    with torch.no_grad():
        for batch in test_loader:
            inputs = batch[0].to(device)

            if use_tta:
                # Apply test-time augmentation
                outputs = test_time_augmentation(model, inputs, num_augmentations=num_augmentations)
                _, predicted = outputs.max(1)
            else:
                # Standard inference
                outputs = model(inputs)
                _, predicted = outputs.max(1)

            all_predictions.extend(predicted.cpu().numpy())

    # 5) Create submission file
    submission = pd.DataFrame({
        'ID': np.arange(len(all_predictions)),
        'Labels': all_predictions
    })

    # Save submission
    submission_file = 'improved_submission.csv'
    submission.to_csv(submission_file, index=False)
    print(f"Submission file saved as {submission_file}")

    return submission

#############################################################################
# 10. Full Training and Submission Pipeline
#############################################################################
def run_full_pipeline():
    # 1) Train the model
    model, best_acc = train_model(
        num_epochs=300,         # Increased from 250 to 300
        batch_size=128,
        width_multiplier=0.6,   # Increased from 0.5 to 0.6
        dropout=0.3,            # Decreased from 0.5 to 0.3
        se_reduction=16,        # SE block reduction ratio
        use_mixup=True,         # Enable mixup augmentation
        mixup_alpha=1.0         # Mixup alpha parameter
    )

    # 2) Save the trained model
    torch.save(model.state_dict(), "improved_resnet18_state.pth")
    print("Model saved to improved_resnet18_state.pth")

    # 3) Generate submission with test-time augmentation
    submission = generate_submission(model, use_tta=True, num_augmentations=10)

    return model, submission

# Execute if running as script
if __name__ == "__main__":
    run_full_pipeline()

Using device: cpu
Files already downloaded and verified


KeyboardInterrupt: 