In [1]:
import torch
import torch.nn as nn
from torch import optim
from torch.optim.lr_scheduler import StepLR

from torchvision import datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import os

from torchvision import utils
import matplotlib.pyplot as plt
%matplotlib inline

import numpy as np
from datetime import datetime
import copy

In [13]:
path2data = 'data'
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224, 224)),
    transforms.Normalize(RGB_mean, RGB_std)
])
train_dataset = datasets.STL10(path2data, split ='train', 
                               download = True, transform = transform)
valid_dataset = datasets.STL10(path2data, split = 'test', 
                               download = True, transform = transform)

In [5]:
img, _ = train_dataset[1]
img.shape

In [9]:
meanRGB = [np.mean(x.numpy(), axis = (1, 2)) for x, _ in train_dataset]
stdRGB = [np.std(x.numpy(), axis = (1, 2)) for x, _ in train_dataset]

meanR = np.mean([m[0] for m in meanRGB])
meanG = np.mean([m[1] for m in meanRGB])
meanB = np.mean([m[2] for m in meanRGB])

stdR = np.mean([s[0] for s in stdRGB])
stdG = np.mean([s[1] for s in stdRGB])
stdB = np.mean([s[2] for s in stdRGB])

RGB_mean = [meanR, meanG, meanB]
RGB_std = [stdR, stdG, stdB]

In [14]:
train_dataloader = DataLoader(train_dataset, batch_size = 4, shuffle = True)
valid_dataloader = DataLoader(valid_dataset, batch_size = 4, shuffle = False)

In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
# from torchsummary import summary

device = "cuda:0" if torch.cuda.is_available() else "cpu"

VGG_16 = [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M']

class VGG16(nn.Module):
    def __init__(self, in_channels, n_classes):
        super(VGG16, self).__init__()
        self.in_channels = in_channels
        
        self.feature_extractor = self.create_conv_layers(VGG_16)
        
        self.classification = nn.Sequential(
            nn.Linear(512*7*7, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(4096, n_classes)
        )
    
    def forward(self, x):
        x = self.feature_extractor(x)
        x = x.view(-1, 512*7*7)
        x = self.classification(x)
        
        return x
    
    def create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels
        
        for x in architecture:
            if type(x) == int:
                out_channels = x
                
                layers += [nn.Conv2d(in_channels = in_channels, out_channels = out_channels,
                                    kernel_size = (3, 3), stride = (1, 1), padding = (1, 1)),
                          nn.BatchNorm2d(x),
                          nn.ReLU()]
                in_channels = x
            elif x == 'M':
                layers += [nn.MaxPool2d(kernel_size = (2, 2), stride = (2, 2))]
        
        return nn.Sequential(*layers)
    


model = VGG16(in_channels = 3, n_classes= 10).to(device)
# summary(model, input_size = (3, 224, 224))

In [16]:
def get_accuracy(model, data_loader, device):
    correct_pred = 0
    n = 0
    
    with torch.no_grad():
        model.eval()
        for X, y_true in data_loader:
            X, y_true = X.to(device), y_true.to(device)
            y_pred = model(X)
            
            _, predicted_labels = torch.max(y_pred, 1)
            
            n += y_true.size(0)
            correct_pred += (predicted_labels == y_true).sum()
            
    return correct_pred.float() / n

In [17]:
def train(train_loader, model, criterion, optimizer, device):
    model.train()
    running_loss = 0
    for X, y_true in train_loader:
        X, y_true = X.to(device), y_true.to(device)
        y_hat = model(X)
        loss = criterion(y_hat, y_true)
        running_loss += loss.item() * X.size(0)
        loss.backward()
        optimizer.step()
    
    epoch_loss = running_loss / len(train_loader.dataset)
    return model, optimizer, epoch_loss

In [18]:
def validation(valid_loader, model, criterion, device):
    model.eval()
    running_loss = 0
    for X, y_true in valid_loader:
        X, y_true = X.to(device), y_true.to(device)

        y_hat= model(X)
        loss = criterion(y_hat, y_true)
        running_loss += loss.item() * X.size(0)
  
    epoch_loss = running_loss / len(valid_loader.dataset)

    return model, epoch_loss

In [19]:
def training_loop(model, criterion, optimizer, train_loader, valid_loader, epochs, device, print_every = 1):
    best_loss = 1e10
    train_losses = []
    valid_losses = []

    for epoch in range(epochs):
        model, optimizer, train_loss = train(train_loader, model, criterion, optimizer, device)
        train_losses.append(train_loss)

        with torch.no_grad():
            model, valid_loss = validation(valid_loader, model, criterion, device)
            valid_losses.append(valid_loss)

        if epoch % print_every == (print_every - 1):
            train_acc = get_accuracy(model, train_loader, device)
            valid_acc = get_accuracy(model, valid_loader, device)
            
            print(f'{datetime.now().time().replace(microsecond=0)} --- '
                  f'Epoch: {epoch}\t'
                  f'Train loss: {train_loss:.4f}\t'
                  f'Valid loss: {valid_loss:.4f}\t'
                  f'Train accuracy: {100 * train_acc:.2f}\t'
                  f'Valid accuracy: {100 * valid_acc:.2f}')
          
    return model, optimizer, (train_losses, valid_losses)

In [None]:
device = "cuda:0" if torch.cuda.is_available() else "cpu"

model = VGG16(3, 10).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr = 1e-1)

model, optimizer, _ = training_loop(model, criterion, optimizer, 
                                    train_dataloader, valid_dataloader, 15, device)