In [None]:
import os
import torch
import torchvision
import numpy as np
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image

In [None]:
# Set device and directories
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
DATA_DIR = '../2021VRDL_HW1_datasets'
CKPT_DIR = '../checkpoints/resnext101_32x8d'
RLS_CKPT_DIR = '../checkpoints/RELEASE'
OUT_DIR = '.'

In [None]:
class VLDR_HW1_Dataset(Dataset):
    def __init__(self, labels_file_dir, img_dir, transform=None, target_transform=None):
        with open(labels_file_dir) as f:
            labels_file = list(f)

        imgs = []
        for row in labels_file:
            row.strip()
            words = row.split()
            imgs.append((words[0], int(words[1].split('.')[0]) - 1))

        self.imgs = imgs
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        image_name, label = self.imgs[idx]
        img_path = os.path.join(self.img_dir, image_name)
        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)

        return image, label

In [None]:
class AddNoise(object):
    def __init__(self, intensity=0.05):
        self.intensity = intensity
        
    def __call__(self, tensor):
        return tensor + torch.randn_like(tensor) * self.intensity
    
    def __repr__(self):
        return self.__class__.__name__ + f'(intensity={self.intensity})'

In [None]:
# Read the classes
with open(f'{DATA_DIR}/classes.txt') as f:
    classes = [x.strip() for x in f.readlines()]

def get_class_display(pred):
    # Returns the class name according to the given label
    return classes[pred]

def validate(net, val_dataloader):
    # Returns the accuracy of validation data
    with torch.no_grad():
        net.eval()

        correct_cnt = 0
        all_cnt = 0

        for i, data in enumerate(val_dataloader, 0):
            # Put inputs and labels to device
            inputs, labels = data[0].to(device), data[1].to(device)

            # Predict
            outputs = net(inputs)
            _, preds = torch.max(outputs, 1)

            correct_cnt += int(torch.sum(torch.eq(preds, labels)))
            all_cnt += len(labels)

        return correct_cnt / all_cnt

In [None]:
# Input the training data
dataset = VLDR_HW1_Dataset(
    f'{DATA_DIR}/training_labels.txt',
    f'{DATA_DIR}/training_images/',
    transform=transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.RandomAffine(degrees=(-30, 30)),
        transforms.RandomPerspective(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        AddNoise()
    ])
)

# Split the data to train set and validation set
train_size_ratio = 0.9
train_size = int(len(dataset) * train_size_ratio)
validate_size = len(dataset) - train_size

train_dataset, val_dataset = torch.utils.data.random_split(
    dataset, [train_size, validate_size]
)

# Dataloader
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=False)

In [None]:
# Set training params
from_epoch = 400
to_epoch = 600
save_per = 1
learning_rate = 0.001  # 0.001
momentum = 0.9  # 0.9

# Initial net and set the size of final layer to 200
net = torchvision.models.resnext101_32x8d(pretrained=True)
num_features = net.fc.in_features
net.fc = torch.nn.Linear(num_features, 200)

In [None]:
# Load checkpoint if necessary
if from_epoch > 0:
    PATH = f'{CKPT_DIR}/{net.__class__.__name__}_{from_epoch}.pth'
    net.load_state_dict(torch.load(PATH))
    print('Load checkpoint from', PATH)

# Put net to device
net = net.to(device)

# Set criterion and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(net.parameters(), lr=learning_rate, momentum=momentum)

In [None]:
# Training
print(f'Start training from epoch {from_epoch} to {to_epoch} on device {device}')
print(f'lr = {learning_rate}')

for epoch in range(from_epoch, to_epoch):
    running_loss = torch.tensor(0.0).to(device)

    for i, data in enumerate(train_dataloader, 0):
        net.train()

        # Put inputs and labels to device
        inputs, labels = data[0].to(device), data[1].to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward, backward, and optimize
        outputs = net(inputs)
        _, preds = torch.max(outputs, 1)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss

    # Print statistics
    print(f'Epoch {epoch + 1} loss: {running_loss / len(train_dataloader)}')
    running_loss = torch.tensor(0.0).to(device)

    if (epoch + 1) % save_per == 0:
        # Save the checkpoint and test
        PATH = f'{CKPT_DIR}/{net.__class__.__name__}_{epoch + 1}.pth'
        torch.save(net.state_dict(), PATH)
        val_acc = validate(net, val_dataloader)
        print(f'Epoch {epoch + 1} saved. Acc on validation data: {val_acc}')

print('Finished Training')

In [None]:
# Nets to predict testing data
nets = [
    (torchvision.models.resnet152(pretrained=False),
        'resnet152_130e', 0.622156),
    (torchvision.models.resnext101_32x8d(pretrained=False),
        'resnext101_32x8d_advtrsf_160e', 0.679196),
    (torchvision.models.resnext101_32x8d(pretrained=False),
        'resnext101_32x8d_advtrsf_nonoised_250e', 0.661721),
    (torchvision.models.resnext101_32x8d(pretrained=False),
        'resnext101_32x8d_70e', 0.631058),
    (torchvision.models.resnext101_32x8d(pretrained=False),
        'resnext101_32x8d_400e', 0.666667),
]

In [None]:
# Initial the nets
for net, ckpt, _ in nets:
    # Set the size of final layer to 200
    num_features = net.fc.in_features
    net.fc = torch.nn.Linear(num_features, 200)
    
    # Load the pretrained weughts
    PATH = f'{RLS_CKPT_DIR}/{ckpt}.pth'
    net.load_state_dict(torch.load(PATH))
    print('Load checkpoint from', PATH)
    
    # Put net to device
    net = net.to(device)

In [None]:
# Testing
submission = []

with torch.no_grad():
    # Data transform
    transform = transforms.Compose([
        transforms.Resize([224, 224]),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    with open(f'{DATA_DIR}/testing_img_order.txt') as f:
        # Read all the testing images
        test_image_names = [x.strip() for x in f.readlines()]

    for img_name in test_image_names:
        # image order is important to your result
        img_path = os.path.join(f'{DATA_DIR}/testing_images/', img_name)
        img = transform(Image.open(img_path).convert('RGB'))
        img = img[None, :]
        img = img.to(device)

        outputs_list = torch.zeros(200).to(device)
        for net, _, weight in nets:
            net.eval()
            outputs = net(img)  # the predicted category
            outputs_list = outputs_list + outputs * weight

        _, predicted_class = torch.max(outputs_list, 1)
        predicted_class_display = get_class_display(int(predicted_class))
        submission.append([img_name, predicted_class_display])

In [None]:
# Save the submission
np.savetxt(f'{OUT_DIR}/answer.txt', submission, fmt='%s')