In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np
from NumpyImageDataset import NumpyImageDataset
from sklearn.model_selection import train_test_split

device = torch.device("mps")

train_data = np.load('./data/cifar_train_data.npy').transpose((0,2,3,1))
train_label = np.load('./data/cifar_train_label.npy')

X_train,X_test,y_train,y_test = train_test_split(train_data,train_label,test_size=0.2,random_state=42)

# Data Scaling & Augmentation
trainset = NumpyImageDataset(X_train, y_train, transform=transforms.Compose([
    transforms.RandomResizedCrop(32, scale=(1.0, 1.0), ratio=(1.0, 1.0)),
    torchvision.transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandAugment(num_ops=2, magnitude=12),
    transforms.ColorJitter(0.2,0.2,0.2),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.4914,0.4822,0.4465],std=[0.2471,0.2435,0.2616]),
    transforms.RandomErasing(p=0.2)
]))

testset = NumpyImageDataset(X_test, y_test, transform=transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.4914,0.4822,0.4465],std=[0.2471,0.2435,0.2616])
]))

# Residual Block
class Residual(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn

    def forward(self, x):
        return self.fn(x) + x

# Neural Network Structure
def ConvMixer(dim, kernel_size=5, patch_size=2, n_classes=20):
    return nn.Sequential(
        nn.Conv2d(3, dim, kernel_size=patch_size, stride=patch_size),
        nn.GELU(),
        nn.BatchNorm2d(dim),
        nn.Sequential(
                Residual(nn.Sequential(
                    nn.Conv2d(dim, dim, kernel_size, groups=dim, padding="same"),
                    nn.GELU(),
                    nn.BatchNorm2d(dim)
                )),
                nn.Conv2d(dim, dim, kernel_size=1),
                nn.GELU(),
                nn.BatchNorm2d(dim)
        ),
        nn.Sequential(
                Residual(nn.Sequential(
                    nn.Conv2d(dim, dim, kernel_size, groups=dim, padding="same"),
                    nn.GELU(),
                    nn.BatchNorm2d(dim)
                )),
                nn.Conv2d(dim, dim, kernel_size=1),
                nn.GELU(),
                nn.BatchNorm2d(dim)
        ),
        nn.Sequential(
                Residual(nn.Sequential(
                    nn.Conv2d(dim, dim, kernel_size, groups=dim, padding="same"),
                    nn.GELU(),
                    nn.BatchNorm2d(dim)
                )),
                nn.Conv2d(dim, dim, kernel_size=1),
                nn.GELU(),
                nn.BatchNorm2d(dim)
        ),
        nn.Sequential(
                Residual(nn.Sequential(
                    nn.Conv2d(dim, dim, kernel_size, groups=dim, padding="same"),
                    nn.GELU(),
                    nn.BatchNorm2d(dim)
                )),
                nn.Conv2d(dim, dim, kernel_size=1),
                nn.GELU(),
                nn.BatchNorm2d(dim)
        ),
        nn.AdaptiveAvgPool2d((1,1)),
        nn.Flatten(),
        nn.Linear(dim, n_classes),
    )

trainloader = torch.utils.data.DataLoader(trainset, batch_size=512,
                                          shuffle=True, num_workers=0)

testloader = torch.utils.data.DataLoader(testset, batch_size=512,
                                         shuffle=False, num_workers=0)


In [None]:
# Training
model = ConvMixer(dim=256, patch_size=2, kernel_size=5, n_classes=20)
model = model.to(device)

# Dynamic Learning Rate
lr_schedule = lambda t: np.interp([t], [0, 150*2//5, 150*4//5, 150], 
                                  [0, 0.05, 0.05/20.0, 0])[0]

opt = optim.AdamW(model.parameters(), lr=0.05, weight_decay=0.005)
criterion = nn.CrossEntropyLoss()
scaler = torch.cuda.amp.GradScaler()

# Total of 150 Epochs
if __name__ == "__main__":
    for epoch in range(150):
        train_loss, train_acc, n = 0, 0, 0
        for i, (X, y) in enumerate(trainloader):
            model.train()
            X, y = X.to(device), y.to(device)
            lr = lr_schedule(epoch + (i + 1)/len(trainloader))
            opt.param_groups[0].update(lr=lr)
            opt.zero_grad()
            with torch.cuda.amp.autocast():
                output = model(X)
                loss = criterion(output, y)

            scaler.scale(loss).backward()
            scaler.unscale_(opt)
            nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            scaler.step(opt)
            scaler.update()
            
            train_loss += loss.item() * y.size(0)
            train_acc += (output.max(1)[1] == y).sum().item()
            n += y.size(0)
            
        model.eval()
        test_acc, m = 0, 0
        with torch.no_grad():
            for i, (X, y) in enumerate(testloader):
                X, y = X.to(device), y.to(device)
                with torch.cuda.amp.autocast():
                    output = model(X)
                test_acc += (output.max(1)[1] == y).sum().item()
                m += y.size(0)

        print(f'Epoch: {epoch} : Train Acc: {train_acc/n:.4f}, Test Acc: {test_acc/m:.4f}')

In [None]:
# Testing
from generate_submission import writeSubmissionFile
model.eval()
pred_data = np.load('./data/cifar_test_data.npy').transpose((0,2,3,1))
tmp_data = np.ones(pred_data.shape[0])

# Scale Test Data
pred_set = NumpyImageDataset(pred_data, tmp_data, transform=transforms.Compose([
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.4914,0.4822,0.4465],std=[0.2471,0.2435,0.2616])
]))

all_pred = []

for pred,tmp in pred_set:

    pred = pred.unsqueeze(0)  

    pred = pred.to(device)
    output = model(pred)

    output = output.argmax(axis = 1)
    output = output.cpu().numpy()
    all_pred.append(output)

all_pred = np.array(all_pred)

In [None]:
# Write Submmision
writeSubmissionFile(all_pred, 'submission.csv')