In [None]:
import pickle
import torch
import torchvision
import torchvision.transforms as transforms
import tqdm
import numpy as np

with open("./train.pkl", "rb") as f:
    train = pickle.load(f) # a dictionary

with open("./validation.pkl", "rb") as f:
    val = pickle.load(f) # a dictionary
    
with open("./test.pkl", "rb") as f:
    test = pickle.load(f) # a dictionary

In [None]:
# show the size of the train, val, test
print("Train dataset size", len(train['images']))
print("Validation dataset size", len(val['images']))
print("Test dataset size", len(test['qry_images']))

In [None]:
# numpy comcatenate from train and val
new_train = {}
new_train['images'] = np.concatenate((train['images'], val['images']), axis=0)
new_train['labels'] = np.concatenate((train['labels'], val['labels']), axis=0)
print("New train dataset size", len(new_train['images']))
print("New train dataset shape", new_train['images'].shape)

In [None]:
print("New train label size", len(new_train['labels']))
print("New train label shape", new_train['labels'].shape[0])

In [None]:
print(new_train['labels'][47999])

In [None]:
# traversal all new_train['labels'] and calculate the number of each class
class_num = {}
for i in range(new_train['labels'].shape[0]):
    if new_train['labels'][i] in class_num:
        class_num[new_train['labels'][i]] += 1
    else:
        class_num[new_train['labels'][i]] = 1
print("Class number", len(class_num))
print("Class number", class_num)

In [None]:
# transform = torchvision.transforms.ToPILImage()
# img2 = transform(torch.tensor(new_train["images"][38400])*255)
# img2

In [None]:
# data augmentation for train

# set device as gpu
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

print(device)

train_transform = transforms.Compose(
        [transforms.Grayscale(num_output_channels=3),
        transforms.ToTensor(),
        transforms.RandomHorizontalFlip(p=0.15),
        transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.9, 1.1)),
        transforms.Normalize((0.5,), (0.5,))])

val_transform = transforms.Compose(
        [transforms.Grayscale(num_output_channels=3),
        transforms.ToTensor(),
        transforms.Normalize((0.5,), (0.5,))])

# doing data augmentation for train["images"]
train_dataset = torch.utils.data.TensorDataset(torch.tensor(new_train["images"]).float().to(device), torch.tensor(new_train["labels"]).long().to(device))
val_dataset = torch.utils.data.TensorDataset(torch.tensor(val["images"]).float().to(device), torch.tensor(val["labels"]).long().to(device))
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=64, shuffle=True)

In [None]:
# show the size of the train_loader
print("Train loader size", len(train_loader))
print("Validation loader size", len(val_loader))

In [None]:
class ResNetBlock(torch.nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResNetBlock, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels, out_channels, 3, stride=stride, padding=1)
        self.bn1 = torch.nn.BatchNorm2d(out_channels)
        self.conv2 = torch.nn.Conv2d(out_channels, out_channels, 3, padding=1)
        self.bn2 = torch.nn.BatchNorm2d(out_channels)
        self.relu = torch.nn.ReLU()
        self.downsample = None
        if stride != 1 or in_channels != out_channels:
            self.downsample = torch.nn.Sequential(
                torch.nn.Conv2d(in_channels, out_channels, 1, stride=stride),
                torch.nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.downsample is not None: # identity mapping used
            identity = self.downsample(identity)
        x += identity
        x = self.relu(x)
        return x
    
class ResNet18(torch.nn.Module):
    def __init__(self, num_classes=80):
        super(ResNet18, self).__init__()
        self.conv1 = torch.nn.Conv2d(3, 64, 3, padding=1)
        self.bn1 = torch.nn.BatchNorm2d(64)
        self.relu = torch.nn.ReLU()
        self.layer1 = self._make_layer(64, 64, 2)
        self.layer2 = self._make_layer(64, 128, 2, stride=2) 
        self.layer3 = self._make_layer(128, 256, 2, stride=2)
        self.layer4 = self._make_layer(256, 512, 2, stride=2)
        self.avgpool = torch.nn.AdaptiveAvgPool2d((1, 1))
        self.fc = torch.nn.Linear(512, num_classes)

    def _make_layer(self, in_channels, out_channels, block_num, stride=1):
        layers = []
        layers.append(ResNetBlock(in_channels, out_channels, stride=stride))
        for i in range(1, block_num):
            layers.append(ResNetBlock(out_channels, out_channels))
        return torch.nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x
    

In [None]:
# train the model with train_loader and test the model with val_loader

model = ResNet18().to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.1)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1280, gamma=0.1)

In [None]:
# train the model
train_epoch = 100

for epoch in range(train_epoch):
    print('Epoch', epoch)
    train_loss_sum = 0
    train_acc_sum = 0
    
    model.train()
    #train_loop = tqdm.tqdm((train_loader), total=len(train_loader), leave=False)
    
    for batch_idx, (data, target) in enumerate(train_loader):
        # move data and target to device
        optimizer.zero_grad()
        output = model(data.to(device))
        loss = criterion(output, target.to(device))
        loss.backward()
        optimizer.step()
        scheduler.step()
        _, predicted = torch.max(output.data, 1)
        train_loss_sum += loss.item()
        train_acc_sum += (predicted == target.to(device)).sum().item()
    print('Train Accuracy', train_acc_sum / len(train_loader.dataset))
    print('Train Loss', train_loss_sum / len(train_loader.dataset))

    model.eval()
    val_loss_sum = 0
    val_acc_sum = 0

    for batch_idx, (data, target) in enumerate(val_loader):
        output = model(data.to(device))
        loss = criterion(output, target.to(device))
        val_loss_sum += loss.item()
        _, predicted = torch.max(output.data, 1)
        val_acc_sum += (predicted == target.to(device)).sum().item()
    print('Validation Accuracy', val_acc_sum / len(val_loader.dataset))
    print('Validation Loss', val_loss_sum / len(val_loader.dataset))
        