<a href="https://colab.research.google.com/github/mamagoudou/QNN-with-dithering/blob/main/notebooks/VGG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Model definition

In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

import math

import csv
import time
from tqdm.notebook import tqdm


device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

PATH_Models = '/content/drive/MyDrive/Memory/Models/VGG/'
PATH_Measures = '/content/drive/MyDrive/Memory/Measures/VGG/'

In [2]:
# adapted from 
# https://github.com/pytorch/vision/blob/master/torchvision/models/vgg.py
# paper: https://arxiv.org/pdf/1409.1556.pdf

class VGG(nn.Module):

  def __init__(self, features, classes = 10):

    super(VGG, self).__init__()
    self.features = features
    """ #GENUINE CLASSIFIER
    self.classifier = nn.Sequential(
      nn.Dropout(),
      nn.Linear(512, 512),
      nn.ReLU(True),
      nn.Dropout(),
      nn.Linear(512, 512),
      nn.ReLU(True),
      nn.Linear(512, classes),
    )
    """
    # SIMPLIFIED CLASSIFIER
    self.classifier = nn.Linear(512, 10) 

    # Initialize weights
    for m in self.modules():
      if isinstance(m, nn.Conv2d):
        n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
        m.weight.data.normal_(0, math.sqrt(2. / n))
        m.bias.data.zero_()

  def forward(self, x):

    x = self.features(x)
    x = x.view(x.size(0), -1)
    x = self.classifier(x)
    return x

In [3]:
def make_layers(cfg, batch_norm=False):

  layers = []
  in_channels = 3
  for v in cfg:
    if v == 'M': # pooling
      layers += [nn.MaxPool2d(kernel_size=2, stride=2)]

    else: # convolution
      conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)

      if batch_norm:
          layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
      else:
          layers += [conv2d, nn.ReLU(inplace=True)]

      in_channels = v
  return nn.Sequential(*layers)

In [4]:
cfg = {
    '11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    '13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 
           512, 'M'],
    '16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 
           512, 512, 512, 'M'],
    '19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 
           512, 'M', 512, 512, 512, 512, 'M'],
}

In [5]:
def VGG11():
  return VGG(make_layers(cfg['11']))

def VGG11_bn():
  return VGG(make_layers(cfg['11'], batch_norm=True))

def VGG13():
  return VGG(make_layers(cfg['13']))

def VGG13_bn():
  return VGG(make_layers(cfg['13'], batch_norm=True))

def VGG16():
  return VGG(make_layers(cfg['16']))

def VGG16_bn():
  return VGG(make_layers(cfg['16'], batch_norm=True))

def VGG19():
  return VGG(make_layers(cfg['19']))

def VGG19_bn():
  return VGG(make_layers(cfg['19'], batch_norm=True))

In [6]:
# NAME_DD_MM_TEST
PATH_Name = 'VGG16_16_02_TEST'

network = VGG16()

epoch = 0
network.to(device)

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

# Dataset loading, processing and loaders

In [7]:
BATCH_SIZE = 128
NUM_WORKERS = 4

train_transform = transforms.Compose([transforms.RandomCrop(32, padding=4),
                                      transforms.RandomHorizontalFlip(p=0.5),
                                      transforms.ToTensor(),
                                      transforms.Normalize((0.4914, 0.4822, 0.4465), 
                                                           (0.2023, 0.1994, 0.2010))])

test_transform = transforms.Compose([transforms.ToTensor(),
                                     transforms.Normalize((0.4914, 0.4822, 0.4465), 
                                                          (0.2023, 0.1994, 0.2010))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=train_transform)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=test_transform)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

trainloader = torch.utils.data.DataLoader(trainset, batch_size=BATCH_SIZE,
                                          shuffle=True, num_workers=NUM_WORKERS)
testloader = torch.utils.data.DataLoader(testset, batch_size=BATCH_SIZE,
                                         shuffle=False, num_workers=NUM_WORKERS)

Files already downloaded and verified
Files already downloaded and verified


# Training & validation

In [8]:
LEARNING_RATE = 0.1
MOMENTUM = 0.9
NEPOCHS  = 50
TOTAL_STEPS = math.ceil(len(trainloader.dataset)/BATCH_SIZE)*NEPOCHS
CRITERION = "CrossEntropyLoss"
OPTIMIZER = "SGD"
SCHEDULER = "OneCycleLR"

import torch.optim as optim

criterion = nn.CrossEntropyLoss()

optimizer = optim.SGD(network.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM)
lr_scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.1,
                                             total_steps=TOTAL_STEPS,
                                             anneal_strategy='linear')

In [None]:
TrainLoss = []
TrainAcc = []
Traintime = []
TestLoss = []
TestAcc = []

for epoch in tqdm(range(epoch, NEPOCHS), position=0, desc="Epoch"):

  print("Epoch: %d" %(epoch))
  # TRAINING
  network.train()
  start_time = time.time()
  train_loss = 0
  correct = 0
  total = 0
  for i, data in tqdm(enumerate(trainloader, 0), position=1, desc="Training", 
                      total=len(trainloader.dataset)/BATCH_SIZE, leave=False):
    
    inputs, labels = data[0].to(device), data[1].to(device)
    optimizer.zero_grad()
    outputs = network(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()

    train_loss += loss.item()
    _, predicted = outputs.max(1)
    total += labels.size(0)
    correct += predicted.eq(labels).sum().item()
    end_time = time.time()

  lr_scheduler.step()
  TrainLoss.append(train_loss/(i+1))
  TrainAcc.append(100.*correct/total)
  Traintime.append(end_time-start_time)
  print('TrainLoss: %.3f | TrainAcc: %.3f%% (%d/%d) | Time Elapsed %.3f sec' 
        % (TrainLoss[-1], TrainAcc[-1], correct, total, Traintime[-1]))
  
  # TESTING
  network.eval()
  test_loss = 0
  correct = 0
  total = 0
  with torch.no_grad():
    for i, data in tqdm(enumerate(testloader, 0), position=2, desc="Testing", 
                        total=len(testloader.dataset)/BATCH_SIZE, leave=False):
      inputs, labels = data[0].to(device), data[1].to(device)
      outputs = network(inputs)
      loss = criterion(outputs, labels)

      test_loss += loss.item()
      _, predicted = outputs.max(1)
      total += labels.size(0)
      correct += predicted.eq(labels).sum().item()

    TestLoss.append(test_loss/(i+1))
    TestAcc.append(100.*correct/total)
    print('TestLoss: %.3f | TestAcc: %.3f%% (%d/%d)' 
          % (TestLoss[-1], TestAcc[-1], correct, total))
    print('-' * 75)
  # SAVE MODEL IF BEST
  if TestAcc[-1] == max(TestAcc):
    torch.save({
      'optimizer': optimizer.state_dict(),
      'scheduler': lr_scheduler.state_dict(),
      'network': network.state_dict(),
      'epoch': epoch
    }, PATH_Models + PATH_Name + '.pth')


# WRITE INFOS & STATS IN CSV
stats = {"TrainLoss": TrainLoss, "TrainAcc": TrainAcc, "Traintime": Traintime,
         "TestLoss": TestLoss, "TestAcc": TestAcc}

with open(PATH_Measures + PATH_Name + ".csv", "w") as f:
  writer = csv.writer(f)
  writer.writerow(stats.keys())
  writer.writerows(zip(*stats.values()))

infos = {
  "PATH_Name":PATH_Name,
  "BATCH_SIZE":BATCH_SIZE,
  "NUM_WORKERS":NUM_WORKERS,
  "LEARNING_RATE":LEARNING_RATE,
  "MOMENTUM":MOMENTUM,
  "NEPOCHS":NEPOCHS,
  "TOTAL_STEPS":TOTAL_STEPS,
  "CRITERION":CRITERION,
  "OPTIMIZER":OPTIMIZER,
  "SCHEDULER":SCHEDULER,
  "optimizer":optimizer.state_dict(),
  "lr_scheduler":lr_scheduler.state_dict(),
  "epoch":epoch,
}

with open(PATH_Measures + PATH_Name + "_infos.csv", "w") as f:
  writer = csv.DictWriter(f, fieldnames=infos.keys())
  writer.writeheader()
  writer.writerow(infos)

HBox(children=(FloatProgress(value=0.0, description='Epoch', max=50.0, style=ProgressStyle(description_width='…

Epoch: 0


HBox(children=(FloatProgress(value=0.0, description='Training', max=390.625, style=ProgressStyle(description_w…

TrainLoss: 1.922 | TrainAcc: 27.222% (13611/50000) | Time Elapsed 28.601 sec


HBox(children=(FloatProgress(value=0.0, description='Testing', max=78.125, style=ProgressStyle(description_wid…

TestLoss: 1.619 | TestAcc: 39.300% (3930/10000)
---------------------------------------------------------------------------
Epoch: 1


HBox(children=(FloatProgress(value=0.0, description='Training', max=390.625, style=ProgressStyle(description_w…

TrainLoss: 1.459 | TrainAcc: 45.846% (22923/50000) | Time Elapsed 28.962 sec


HBox(children=(FloatProgress(value=0.0, description='Testing', max=78.125, style=ProgressStyle(description_wid…

TestLoss: 1.260 | TestAcc: 53.830% (5383/10000)
---------------------------------------------------------------------------
Epoch: 2


HBox(children=(FloatProgress(value=0.0, description='Training', max=390.625, style=ProgressStyle(description_w…

In [None]:
# WRITE INFOS & STATS IN CSV
stats = {"TrainLoss": TrainLoss, "TrainAcc": TrainAcc, "Traintime": Traintime,
         "TestLoss": TestLoss, "TestAcc": TestAcc}

with open(PATH_Measures + PATH_Name + ".csv", "w") as f:
  writer = csv.writer(f)
  writer.writerow(stats.keys())
  writer.writerows(zip(*stats.values()))

infos = {
  "PATH_Name":PATH_Name,
  "BATCH_SIZE":BATCH_SIZE,
  "NUM_WORKERS":NUM_WORKERS,
  "LEARNING_RATE":LEARNING_RATE,
  "MOMENTUM":MOMENTUM,
  "NEPOCHS":NEPOCHS,
  "TOTAL_STEPS":TOTAL_STEPS,
  "CRITERION":CRITERION,
  "OPTIMIZER":OPTIMIZER,
  "SCHEDULER":SCHEDULER,
  "optimizer":optimizer.state_dict(),
  "lr_scheduler":lr_scheduler.state_dict(),
  "epoch":epoch,
}

with open(PATH_Measures + PATH_Name + "_infos.csv", "w") as f:
  writer = csv.DictWriter(f, fieldnames=infos.keys())
  writer.writeheader()
  writer.writerow(infos)

# Drafts

In [None]:
from tqdm.notebook import tqdm


network.train()
for epoch in tqdm(range(epoch, MAX_EPOCHS), position=0, desc="Epoch"):
  for i, data in tqdm(enumerate(trainloader, 0), position=1, desc="Batch", 
                      total=len(trainloader.dataset)/BATCH_SIZE, leave=True):
  
    # get the inputs; data is a list of [inputs, labels]
    inputs, labels = data[0].to(device), data[1].to(device)

    # zero the parameter gradients
    optimizer.zero_grad()

    # forward + backward + optimize
    outputs = network(inputs)
    loss = criterion(outputs, labels)
    loss.backward()
    optimizer.step()

epoch += 1


PATH = '/content/drive/MyDrive/Colab Notebooks/Memory/VGG11_bn.pth'
# Save the state of the training
torch.save({
    'optimizer': optimizer.state_dict(),
    'network': network.state_dict(),
    'epoch': epoch,
}, PATH)

In [None]:
network.eval()

correct = 0
total = 0
with torch.no_grad():
  for data in testloader:
    inputs, labels = data[0].to(device), data[1].to(device)
    outputs = network(inputs)
    _, predicted = torch.max(outputs.data, 1)
    total += labels.size(0)
    correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 10000 test images: %d %%' % (
      100 * correct / total))

In [None]:
class_correct = list(0. for i in range(10))
class_total = list(0. for i in range(10))
with torch.no_grad():
  for data in testloader:
    inputs, labels = data[0].to(device), data[1].to(device)
    outputs = network(inputs)
    _, predicted = torch.max(outputs, 1)
    c = (predicted == labels).squeeze()
    for i in range(4):
      label = labels[i]
      class_correct[label] += c[i].item()
      class_total[label] += 1


for i in range(10):
  print('Accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))