# VGGNet 구현 - 20192253 Hongchan Yoon

In [1]:
import torch
random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('current device: ',device)

current device:  cpu


## 1. Dataset Preparation

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
from PIL import Image

### 1-1. Load CIFAR100 Dateset & perform PCA Analysis(밑의 RGB ColourShift에 사용)

In [None]:
train_dataset = torchvision.datasets.CIFAR100(root='./', train=True, download=True)
test_dataset = torchvision.datasets.CIFAR100(root='./', train=False, download=True)

# Extract RGB pixel values from the training dataset
pixels = np.vstack([np.asarray(img).reshape(-1, 3) for img, _ in train_dataset])

'''VGGNet 논문- The only pre- processing we do is subtracting the mean RGB value, computed on the training set,
from each pixel.'''
# Compute the mean RGB value
mean_rgb = np.mean(pixels, axis=0)

# Compute the covariance matrix of the RGB pixel values
cov_matrix = np.cov(pixels, rowvar=False)

# Perform eigen decomposition to obtain eigenvectors and eigenvalues
eig_vals, eig_vecs = np.linalg.eigh(cov_matrix)

# Sort in descending order
sorted_indices = np.argsort(eig_vals)[::-1]
eig_vals = eig_vals[sorted_indices]
eig_vecs = eig_vecs[:, sorted_indices]

print("Mean RGB values:", mean_rgb)
print("Eigenvalues:", eig_vals)
print("Eigenvectors:\n", eig_vecs)

### 1-2. Define Data Augmentation

In [4]:
def convert2numpy(image):
    if torch.is_tensor(image):
        image = image.data.cpu().numpy()
    else:
        image = np.array(image)
    return image

'''
VGGNet Paper - To further augment the training set, the crops underwent random horizontal flipping and random RGB colour shift

다만 Dataset을 논문과 같이 ILSVRC-2014를 사용하지 않고, 이미지 크기가 32x32인 CIFAR100 을 사용할 것이기에 논문과 같이 Training Scale S에 맞춰 Resize후
Crop 하는 방식은 사용하지 않고, 32x32 image 부분에 5x5 crop(구멍)을 내준다.
'''
class RandomCrop(object):
    def __init__(self, crop_pixel:int = 5):
        self.crop_pixel = crop_pixel

    def __call__(self, image):
        image = convert2numpy(image)
        # Image: Height x Width x Channel
        x_y = np.random.choice(image.shape[0] - self.crop_pixel, 2)
        start_x, start_y = x_y[0], x_y[1]
        image[start_x: start_x + self.crop_pixel, start_y: start_y + self.crop_pixel, :] = 0.0

        return image
    

# Random Horizontal Flipping
# 'probability'의 확률로 좌우 flipping 실행
class RandomHorizontalFlip(object):
    def __init__(self, probability = 0.3):
        assert probability >= 0.0 and probability <= 1.0
        self.probability = probability

    def __call__(self, image):
        self.execute = np.random.rand() < self.probability
        if self.execute:
            new_image = image.transpose(Image.FLIP_LEFT_RIGHT)
            return new_image
        else:
            return image
        
# Random RGB Colour Shift
# VGGNet 논문에서는 RGB Colour Shift에 대한 자세한 내용은 없고, AlexNet 논문만 인용
# 따라서 AlexNet의 RGB Colour Shif(PCA 연산 후 더하기)로 구현
'''AlexNet - To each training image, we add multiples of the found principal components
with magnitudes proportional to the corresponding eigenvalues times a random variable drawn from
a Gaussian with mean zero and standard deviation 0.1'''
class RandomRGBColorShift(object):
    def __init__(self, eig_vecs, eig_vals, alpha_std=0.1):
        """
        Initialize with precomputed eigenvectors and eigenvalues.
        
        Parameters:
        - eig_vecs: eigenvectors of the covariance matrix of RGB pixel values
        - eig_vals: eigenvalues of the covariance matrix of RGB pixel values
        - alpha_std: standard deviation of the Gaussian from which alphas are drawn
        - probability: probability of applying the color shift
        """
        self.eig_vecs = eig_vecs
        self.eig_vals = eig_vals
        self.alpha_std = alpha_std

    def __call__(self, image):
        alpha = np.random.normal(0, self.alpha_std, 3)
        quantity = np.dot(self.eig_vecs, alpha * self.eig_vals)
        new_image = np.asarray(image).astype(np.float32)
        for i in range(3):  # For R, G, B channels
            new_image[:, :, i] += quantity[i]
        new_image = np.clip(new_image, 0, 255).astype(np.uint8)
        return new_image

'''VGGNet - The only pre- processing we do is subtracting the mean RGB value, computed on the training set, from each pixel
'''
class SubtractMeanRGB(object):
    def __init__(self, mean):
        self.mean = mean

    def __call__(self, image):
        image = np.asarray(image).astype(np.float32)
        image -= self.mean
        image = np.clip(image, 0, 255)
        image = Image.fromarray(np.uint8(image))
        return image
    
def imshow(image):
    plt.imshow(np.transpose(image, (1,2,0)))

### 1-3. Apply augmentation to the dataset

In [None]:
train_transform = torchvision.transforms.Compose([
    RandomHorizontalFlip(),
    #RandomRGBColorShift(eig_vecs, eig_vals),   잘 안되는 것 같아 밑의 ColorJitter로 대체
    SubtractMeanRGB(mean_rgb),
    torchvision.transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    RandomCrop(),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Resize(32),
])

test_transform = torchvision.transforms.Compose([
    SubtractMeanRGB(mean_rgb),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Resize(32),
])

train_loader = torch.utils.data.DataLoader(
    torchvision.datasets.CIFAR100(root='./', train=True, download=True, transform=train_transform),
    batch_size=128, shuffle=True, num_workers=0
)

test_loader = torch.utils.data.DataLoader(
    torchvision.datasets.CIFAR100(root='./', train=False, download=True, transform=test_transform),
    batch_size=128, shuffle=False, num_workers=0
)

### 1-4. Plot the augmented images.

In [None]:
# Get one batch of training images
dataiter = iter(train_loader)
images, labels = next(dataiter)
# Convert images to numpy for display
images = images.numpy()

classes = [
    'apple', 'aquarium_fish', 'baby', 'bear', 'beaver', 'bed', 'bee', 'beetle', 
    'bicycle', 'bottle', 'bowl', 'boy', 'bridge', 'bus', 'butterfly', 'camel', 
    'can', 'castle', 'caterpillar', 'cattle', 'chair', 'chimpanzee', 'clock', 
    'cloud', 'cockroach', 'couch', 'crab', 'crocodile', 'cup', 'dinosaur', 
    'dolphin', 'elephant', 'flatfish', 'forest', 'fox', 'girl', 'hamster', 
    'house', 'kangaroo', 'computer_keyboard', 'lamp', 'lawn_mower', 'leopard', 
    'lion', 'lizard', 'lobster', 'man', 'maple_tree', 'motorcycle', 'mountain', 
    'mouse', 'mushroom', 'oak_tree', 'orange', 'orchid', 'otter', 'palm_tree', 
    'pear', 'pickup_truck', 'pine_tree', 'plain', 'plate', 'poppy', 'porcupine', 
    'possum', 'rabbit', 'raccoon', 'ray', 'road', 'rocket', 'rose', 'sea', 
    'seal', 'shark', 'shrew', 'skunk', 'skyscraper', 'snail', 'snake', 'spider', 
    'squirrel', 'streetcar', 'sunflower', 'sweet_pepper', 'table', 'tank', 'telephone', 
    'television', 'tiger', 'tractor', 'train', 'trout', 'tulip', 'turtle', 'wardrobe', 
    'whale', 'willow_tree', 'wolf', 'woman', 'worm'
]

# Plot the images in the batch
fig = plt.figure(figsize=(25, 4))

# Display 20 images
# Viaulize Images
for idx in np.arange(20):
    ax = fig.add_subplot(2, int(20/2), idx+1, xticks=[], yticks=[])
    imshow(images[idx])
    ax.set_title(classes[labels[idx]])

## 2. VGGNet Implementation

### 2-1. VGGNet model

In [None]:
'''
VGGNet - The width of conv. layers (the number of channels) is rather small, starting from 64 
in the first layer and then increasing by a factor of 2 after each max-pooling layer, until it reaches 512.
'''
class VGGNet(nn.Module):
    def __init__(self, layer_infos, num_classes=100):
        super(VGGNet, self).__init__()
        self.conv_layers = self._make_layers(layer_infos)
        self.fc_layers = nn.Sequential(
            nn.Linear(512 * 1 * 1, 4096),   # image size에 따라 달라짐; 32(CIFAR10 image size)/32 = 1
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),                # "dropout regularisation for the first two fully-connected layers (dropout ratio set to 0.5)"
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, num_classes),
        )
    
    def _make_layers(self, layer_infos):
        layers = []
        input_channels = 3                  # 처음은 RGB이므로 3
        for layer_info in layer_infos:
            if layer_info == 'M':
                layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            else:
                layers.append(nn.Conv2d(input_channels, layer_info, kernel_size=3, padding=1))
                input_channels = layer_info
                layers.append(nn.ReLu(inplace=True))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.conv_layers(x)
        out = out.view(out.size(0), -1)
        out = self.fc_layers(out)
        
VGG19 = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M']

def VGGNet19():
    return VGGNet(VGG19)

### 2-3. Save & Load model parameters

In [None]:
def save_model(model, save_name):
    path = './' + str(save_name) + '.pt'
    ckpt = {'model': model}
    torch.save(ckpt, path)

def load_model(init_model, load_name):
    path = './' + str(load_name) + '.pt'
    load_file = torch.load(path, map_location='cpu')
    model = load_file['model']
    init_model.load_state_dict(model.state_dict())
    return init_model

## 3. Train & Test

### 3-1. Train & Test Definition

In [None]:
def train(total_epoch, network, criterion, optimizer, lr_schedule, train_loader, device = 'cpu', save_name = 'save_name'):
    network = network.to(device)
    for epoch in range(total_epoch):
        train_single_epoch(epoch, network, criterion, optimizer, train_loader, device)
        lr_schedule.step()
        if ((epoch + 1) % 10 == 0) or epoch == total_epoch - 1:
            save_model(network, save_name)
            print('Model saved at epoch {} with name {} '.format(epoch + 1, save_name + '.pt'))

def train_single_epoch(current_epoch, network, criterion, optimizer, train_loader, device='cpu'):
    network.train()
    running_loss = 0.0
    correct, total_sample = 0.0, 0.0
    for idx, (input, label) in enumerate(train_loader):
        input, label = input.to(device), label.to(device)
        optimizer.zero_grad()
        output = network(input)

        _, pred = torch.max(output.data, 1)
        correct += (pred == label).sum().item()
        total_sample += label.size(0)

        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    print('Epoch: {} | Training Accuracy: {:.2f} % | Loss: {:.2f}'.format(current_epoch, 100*correct/total_sample, running_loss/(idx+1)))


def test(network, criterion, test_loader, device = 'cpu', load_name = None):
    if load_name is not None:
        network = load_model(network, load_name)

    print('Test start')
    network = network.to(device)
    network.eval()
    test_loss = 0.0
    correct, total_sample = 0.0, 0.0
    with torch.no_grad():
        for idx, (image, label) in enumerate(test_loader):
            image, label = image.to(device), label.to(device)
            output = network(image)
            loss = criterion(output, label)
            _, pred = torch.max(output.data, 1)
            print('Prediction values: {}' .format(pred))
            total_sample += label.size(0)
            correct += (pred == label).sum().item()
            test_loss += loss.item()
    print('Test loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(test_loss/(idx + 1) ,correct, total_sample, 100 * correct / total_sample))

### 3-2. Train & Test VGGNet

In [None]:
network = VGGNet19()
total_epoch = 100
criterion = nn.CrossEntropyLoss()
'''
VGGNet - The learning rate was initially set to 10^-2, and then decreased by a factor of 10 when the validation set accuracy stopped improving
The batch size was set to 256, momentum to 0.9.
논문에서는 10^-2로 설정되었지만, 학습 속도 개선을 위해 0.1로 설정. momentum은 동일하게 0.9, batch = 128로 조정
'''
optimizer = optim.SGD(network.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
step_lr_scheduler = lr_scheduler.MultiStepLR(optimizer, milestones=[50, 75], gamma =0.1)

saved_model_name = '20192253_VGGNet19'

train(total_epoch, network, criterion, optimizer, step_lr_scheduler, train_loader, device = device, save_name = saved_model_name)
test(network, criterion, test_loader, device = device, load_name = saved_model_name)