<a href="https://colab.research.google.com/github/vainaijr/aiStartUp/blob/master/snippets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PyTorch imports

In [0]:
from __future__ import print_function, unicode_literals, division
from IPython.core.debugger import set_trace
from pprint import pprint
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import torchvision.transforms as transforms
from torchvision.transforms import Compose, RandomHorizontalFlip, RandomResizedCrop, ToTensor, Normalize
from torchvision.transforms import CenterCrop, Resize, ColorJitter
import torchvision.models as models
from torch.utils.data import DataLoader
from torchvision.utils import make_grid
import copy
import glob
import time
import math
import os
from io import open
import unicodedata
import string
import random
import re

print("CUDA available: ", torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# LeNet model


In [0]:
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
    self.conv2_drop = nn.Dropout2d()
    self.fc1 = nn.Linear(320, 50)
    self.fc2 = nn.Linear(50, 10)
  
  def forward(self, x):
    x = F.relu(F.max_pool2d(self.conv1(x), 2))
    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(-1, 320)
    x = F.relu(self.fc1(x))
    x = F.dropout(x, training=self.training)
    x = self.fc2(x)
    return F.log_softmax(x, dim=1)

# MNIST dataset, dataloader

In [0]:
# train dataset
train_loader = DataLoader(datasets.MNIST('./', train=True, download=True,
                                                         transform=Compose([
                                                             ToTensor(),
                                                             Normalize((0.1307,), (0.3081,))
                                                         ])), batch_size=64, shuffle=True, num_workers=4)

# test dataset
test_loader = DataLoader(datasets.MNIST('./', train=False, download=False,
                                                         transform=Compose([
                                                             ToTensor(),
                                                             Normalize((0.1307,), (0.3081,))
                                                         ])), batch_size=1, shuffle=True, num_workers=4)


# CUDA

In [0]:
print("CUDA available: ", torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# transforms

In [0]:
transforms = transforms.Compose([
    transforms.Resize(imsize),
    transforms.ToTensor()
])

train_transform = Compose([
    RandomCrop(200),
    RandomHorizontalFlip(),
    ColorJitter(),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])

data_transforms = {
    
  'train' : Compose([
    RandomResizedCrop(input_size),
    RandomHorizontalFlip(),
    ColorJitter(),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
  ]),
  'val' : Compose([
      Resize(input_size),
      CenterCrop(input_size),
      ToTensor(),
      Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
  ])    
}

# unicodeToAscii

In [0]:
# turn a Unicode string to plain ASCII
def unicodeToAscii(s):
  return ''.join(
      c for c in unicodedata.normalize('NFD', s)
      if unicodedata.category(c) != 'Mn'
      and c in all_letters
  )

# RNN

In [0]:
class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super(RNN, self).__init__()
    
    self.hidden_size = hidden_size
    self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
    self.i2o = nn.Linear(input_size + hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=1)
  
  def forward(self, input, hidden):
    combined = torch.cat((input, hidden), 1)
    hidden = self.i2h(combined)
    output = self.i2o(combined)
    output = self.softmax(output)
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1, self.hidden_size)

# Hyperparameters

In [0]:
input_size = 784
hidden_size = 500
num_classes = 10
num_epochs = 5
batch_size = 100
learning_rate = 0.001

# Lang

In [0]:
SOS_token = 0
EOS_token = 1

class Lang:
  def __init__(self, name):
    self.name = name
    self.word2index = {}
    self.word2count = {}
    self.index2word = {0: "SOS", 1: "EOS"}
    self.n_words = 2 # count SOS and EOS
    
  def addSentence(self, sentence):
    for word in sentence.split(' '):
      self.addWord(word)
  
  def addWord(self, word):
    if word not in self.word2index:
      self.word2index[word] = self.n_words
      self.word2count[word] = 1
      self.index2word[self.n_words] = word
      self.n_words += 1
    else:
      self.word2count[word] += 1

# normalizeString

In [0]:
# lowercase, trim, and remove non-letter characters
def normalizeString(s):
  s = unicodeToAscii(s.lower().strip())
  s = re.sub(r"([.!?])", r" \1", s)
  s = re.sub(r"[^a-zA-z.!?]", r" ", s)
  return s

# EncoderRNN

In [0]:
# the encoder of a seq2seq network is a RNN that outputs some value for every word from the input sentence.
# for every input word the encoder outputs a vector and a hidden state, and uses the hidden state for the next
# input word.
class EncoderRNN(nn.Module):
  def __init__(self, input_size, hidden_size):
    super(EncoderRNN, self).__init__()
    self.hidden_size = hidden_size
    
    self.embedding = nn.Embedding(input_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)
  
  def forward(self, input, hidden):
    embedded = self.embedding(input).view(1, 1, -1)
    output = embedded
    output, hidden = self.gru(output, hidden)
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)

# DecoderRNN

In [0]:
class DecoderRNN(nn.Module):
  def __init__(self, hidden_size, output_size):
    super(DecoderRNN, self).__init__()
    self.hidden_size = hidden_size
    
    self.embedding = nn.Embedding(output_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)
    self.out = nn.Linear(hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=1)
    
  def forward(self, input, hidden):
    output = self.embedding(input).view(1, 1, -1)
    output = F.relu(output)
    output, hidden = self.gru(output, hidden)
    output = self.softmax(self.out(output[0]))
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)

# AttnDecoderRNN

In [0]:
class AttnDecoderRNN(nn.Module):
  def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
    super(AttnDecoderRNN, self).__init__()
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.dropout_p = dropout_p
    self.max_length = max_length
    
    self.embedding = nn.Embedding(self.output_size, self.hidden_size)
    self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
    self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
    self.dropout = nn.Dropout(self.dropout_p)
    self.gru = nn.GRU(self.hidden_size, self.hidden_size)
    self.out = nn.Linear(self.hidden_size, self.output_size)
  
  def forward(self, input, hidden, encoder_outputs):
    embedded = self.embedding(input).view(1, 1, -1)
    embedded = self.dropout(embedded)
    
    attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
    attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))
    
    output = torch.cat((embedded[0], attn_applied[0]), 1)
    output = self.attn_combine(output).unsqueeze(0)
    
    output = F.relu(output)
    output, hidden = self.gru(output, hidden)
    
    output = F.log_softmax(self.out(output[0]), dim=1)
    return output, hidden, attn_weights
  
  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)

# timeSince

In [0]:
def asMinutes(s):
  m = math.floor(s / 60)
  s -= m * 60
  return '%dm %ds' % (m, s)

def timeSince(since, percent):
  now = time.time()
  s = now - since
  es = s / (percent)
  rs = es - s
  return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

# plotting

In [0]:
plt.switch_backend('agg')

def showPlot(points):
  plt.figure()
  fig, ax = plt.subplots()
  loc = ticker.MultipleLocator(base=0.2) # this locator puts ticks at regulate intervals
  ax.yaxis.set_major_locator(loc)
  plt.plot(points)
  
plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.title("Test dataset 'Horses'")
plt.imshow(test_A[0])
plt.subplot(122)
plt.title("Test dataset 'Zebras'")
plt.imshow(test_B[0])

# BiRNN

In [0]:
# Recurrent Neural Networks (many to one)
class BiRNN(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers, num_classes):
    super(BiRNN, self).__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
    # torch.nn.LSTM(*args, **kwargs)
    # applies a multi layer long shot term memory RNN to an input sequence
    
    # parameters
    # input_size - the number of expected features in the input x
    # hidden_size - the number of features in the hidden state h
    # num_layers - number of recurrent layers, example, setting num_layers = 2 would mean stacking two LSTMs together
    # to form a stacked LSTM, with the second LSTM taking in outputs of the first LSTM and computing the final results
    # default: 1
    # bias - if False, then the layer does not use bias weights, default: True
    # batch_first - if True, then the input and output tensors are provided as (batch, seq, feature), default: False
    # dropout - if non-zero, introduces a Dropout layer on the outputs of each LSTM layer except the last layer,
    # with dropout probability equal to dropout, default: 0
    # bidirectional - if True, becomes a bidirectional LSTM, default: False
    
    # inputs - input, (h_0, c_0)
    
    # input of shape (seq_len, batch, input_size) - tensor containing the features of the input sequence, the input 
    # can also be a packed variable length sequence
    
    # h_0 of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the initial hidden state for 
    # each element in the batch, if the LSTM is bidirectional, num_directions should be 2, else it should be 1
    
    # c_0 of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the initial cell state for each
    # element in the batch
    
    # if (h_0, c_0) is not provided, both h_0 and c_0 default to zero
    
    # outputs - output, (h_n, c_n)
    
    # output of shape (seq_len, batch, num_directions*hidden_size) - tensor containing the ouput features (h_t) from
    # the last layer of the LSTM, for each t
    
    # h_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the hidden state for 
    # t = seq_len
    
    # c_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the cell state for t = seq_len
    
    self.fc = nn.Linear(hidden_size*2, num_classes)
    # torch.nn.Linear(in_features, out_features, bias=True)
    # applies a linear transformation to the incoming data

    # parameters
    # in_feature - size of each input sample
    # out_feature - size of each output sample
    # bias - if set to False, the layer will not learn an additive bias, default: True
    
  def forward(self, x):
    # set initial hidden and cell states
    h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size)
    # torch.zeros(*sizes, out=None, layout=torch.strided, device=None, requires_grad=False)
    # returns a tensor filled with the scalar value 0, with the shape defined by the variable argument sizes

    # parameters
    # sizes - a sequence of integers defining the shape of the output tensor, can be a variable number of argumentss
    # or a collection like a list or tuple
    # out - the output tensor
    # dtype - the desired data type of returned tensor, default: if None, uses a global default
    # layout - the desired layout of returned tensor, default: torch.strided
    # device - the desired device of returned tensor
    # requires_grad - if autograd should record operations on the returned tensor, default: False

    c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size)

    # forward propagate LSTM
    out, _ = self.lstm(x, (h0, c0))

    # outputs - output, (h_n, c_n)

    # output of shape (seq_len, batch, num_directions*hidden_size) - tensor containing the ouput features (h_t) from
    # the last layer of the LSTM, for each t

    # h_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the hidden state for 
    # t = seq_len

    # c_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the cell state for 
    # t = seq_len

    # decode the hidden state of the last time step
    out = self.fc(out[:, -1, :])

    return out

# loss and optimizer

In [0]:
# loss and optimizer
criterion = nn.CrossEntropyLoss() # loss function, example, MSELoss, L1Loss, CTCLoss, NLLLoss, PoissonNLLLoss, KLDivLoss,BCELoss,
# BCEWithLogitsLoss, MarginRankingLoss, HingeEmbeddingLoss, MultiLabelMarginLoss, SmoothL1Loss, SoftMarginLoss, 
# MultiLabelSoftMarginLoss, CosineEmbeddingLoss, MultiMarginLoss, TripletMarginLoss
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) # optimizer, example, Adadelta, Adagrad, SparseAdam, Adamax,
# ASGD, LBFGS, RMSprop, Rprop, SGD

# train

In [0]:
# train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        # move tensors to the configured device
        images = images.reshape(-1, 28*28).to(device)
        # reshape(*shape) -> Tensor
        # returns a tensor with the same data and number of elements as self but with the specified shape.
        labels = labels.to(device)
        
        # forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # backward and optimize
        optimizer.zero_grad() # set gradients of all model parameters to zero
        loss.backward()
        optimizer.step()
        
        if (i+1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, i+1, total_step, loss.item()))

# test

In [0]:

with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        
        images = images.reshape(-1, 28*28).to(device)
        labels = labels.to(device)
        
        outputs = model(images)
        
        _, predicted = torch.max(outputs.data, 1) 
        
        # torch.max(input, dim, keepdim=False, out=None) returns a namedtuple (values, indices) where values is the maximum
        # value of each row of the input tensor in the given dimension dim, and indices is the index location of each maximum
        # value found (argmax).
        # If keepdim is True, the output tensors are of the same size as input except in the dimension dim where they are of
        # size 1
        
        total += labels.size(0)
        
        correct += (predicted == labels).sum().item() 
        
    print('Accuracy of the network on the 10000 test images: {} %'.format(100*correct / total))
    

# make_grid

In [0]:
plt.figure(figsize=(16, 8))
plt.axis("off")
plt.title("Training Images from A")
plt.imshow(vutils.make_grid(real_batch['A'][:64], padding=2, normalize=True).cpu().numpy().transpose(1, 2, 0))

# Ignite import

In [0]:
# Ignite is a high level library to help with training neural networks in PyTorch, it comes with an Engine to setup
# a training loop, various metrics, handlers
!pip install ignite
from ignite.engine import Engine, Events 
# Engine - runs a given process_function over each batch of a dataset, emitting events as it goes
# Events - allows users to attach functions to an Engine to fire functions at a specific event, example: 
# EPOCH_COMPLETED, ITERATION_STARTED etc.

from ignite.metrics import Accuracy, Loss, RunningAverage
# Accuracy - metric to calculate accuracy over a dataset, for binary, multiclass, multilabel cases
# Loss - general metric that takes a loss function as a parameter, calculate loss over a dataset
# RunningAverage - general metric to attach to Engine during training

from ignite.handlers import ModelCheckpoint, EarlyStopping
# ModelCheckpoint - handler to checkpoint models
# EarlyStopping - handler to stop training based on a score function

from ignite.contrib.handlers import ProgressBar
# ProgressBar - handler to create a tqdm progress bar, tqdm means progress in Arabic

# TextCNN

In [0]:
# TextCNN model
# the model works for variable text lengths, we embed the words of a sentence, use convolutions, maxpooling
# and concatenation to embed the sentence as a single vector
# the single vector is passed through a fully connected layer with sigmoid to output a single value
# this value can be interpreted as the probability a sentence is positive (closer to 1) or negative (closer to 0)

# the minimum length of text expected by the model is the size of the smallest kernel size of the model

class TextCNN(nn.Module):
  def __init__(self, vocab_size, embedding_dim, kernel_sizes, num_filters, num_classes, d_prob, mode):
    super(TextCNN, self).__init__()
    self.vocab_size = vocab_size
    self.embedding_dim = embedding_dim
    self.kernel_sizes = kernel_sizes
    self.num_filters = num_filters
    self.num_classes = num_classes
    self.d_prob = d_prob
    self.mode = mode
    self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=1)
    self.load_embeddings()
    self.conv = nn.ModuleList([nn.Conv1d(in_channels=embedding_dim, out_channels=num_filters, kernel_size=k,
                                        stride=1) for k in kernel_sizes])
    self.dropout = nn.Dropout(d_prob)
    self.fc = nn.Linear(len(kernel_sizes)*num_filters, num_classes)
  
  def forward(self, x):
    batch_size, sequence_length = x.shape
    x = self.embedding(x).transpose(1, 2)
    x = [F.relu(conv(x)) for conv in self.conv]
    x = [F.max_pool1d(c, c.size(-1)).squeeze(dim=-1) for c in x]
    x = torch.cat(x, dim=1)
    x = self.fc(self.dropout(x))
    return torch.sigmoid(x).squeeze()
  
  def load_embeddings(self):
    if 'static' in self.mode:
      self.embedding.weight.data.copy_(TEXT.vocab.vectors)
      if 'non' not in self.mode:
        self.embedding.weight.data.requires_grad = False
        print('Loaded pretrained embeddings, weights are not trainable')
      else:
        self.embedding.weight.data.requires_grad = True
        print('Loaded pretrained embeddings, weights are trainable')
      
    elif self.mode == 'rand':
      print('Randoml initialized embeddings are used')
    else:
      raise ValueError('Unexpected value of mode')

# Ignite process_function

In [0]:
# training and evaluating using Ignite
# Ignite's Engine allows user to define a process_function a given batch, this is applied to all the batches of 
# the dataset, this is a general class that can be applied to train and validate models, a process_function
# has two parameters, engine and batch

# the function of the trainer
# sets model in train mode
# sets the gradient of the optimizer to zero
# generate x and y from batch
# performs a forward pass to calculate y_pred using model and x
# calculates loss using y_pred and y
# performs a backward pass using loss to calculate gradients for the model parameters
# model parameters are optimized using gradients and optimizer
# returns scalar loss

def process_function(engine, batch):
  model.train()
  optimizer.zero_grad()
  x, y = batch.text, batch.label
  y_pred = model(x)
  loss = criterion(y_pred, y)
  loss.backward()
  optimizer.step()
  return loss.item()

# Ignite eval_function

In [0]:
# evaluator engine - process_function

# similar to the training process function, we setup a function to evaluate a single batch

# eval_function
# sets model in eval mode
# generates x and y from batch
# with torch.no_grad(), no gradients are calculated for any succeeding steps
# performs a forward pass on the model to calculate y_pred based on model and x
# returns y_pred and y

# Ignite suggests attaching metrics to evaluators and not trainers

# all metrics in Ignite require y_pred and y as outputs of the function attached to the Engine

def eval_function(engine, batch):
  model.eval()
  with torch.no_grad():
    x, y = batch.text, batch.label
    y_pred = model(x)
    return y_pred, y

# CustomDataset

In [0]:
class CustomDataset(Dataset):
  def __init__(self):
    super().__init__()
  
  def __len__(self):
    return 0
  
  def __getitem__(self, idx):
    return None

# Transfer Learning

In [0]:
# training helpers

# list(get_trainable(model.parameters()))
def get_trainable(model_params):
  return (p for p in model_params if p.requires_grad)

# list(get_frozen(model.parameters()))
def get_frozen(model_params):
  return (p for p in model_params if not p.requires_grad)

# all_trainable(model.parameters())
def all_trainable(model_params):
  return all(p.requires_grad for p in model.params)

# all_frozen(model.parameters())
def all_frozen(model_params):
  return all(not p.requires_grad for p in model.params)

def freeze_all(model_params):
  for param in model_params:
    param.requires_grad = False

# pretrainedmodels

In [0]:
!pip install pretrainedmodels
import pretrainedmodels
print(pretrainedmodels.model_names)

model_name = 'nasnetalarge' # could be fbresnet152 or inceptionresnetv2
model = pretrainedmodels.__dict__[model_name](num_classes=1000, pretrained='imagenet')
model.eval()

# train_model_helper

In [0]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=False):
  since = time.time()
  
  val_acc_history = []
  
  best_model_wts = copy.deepcopy(model.state_dict())
  best_acc = 0.0
  
  for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs - 1))
    print('-' * 10)

    # each epoch has a training and validation phase
    for phase in ['train', 'val']:
      if phase == 'train':
        model.train() # set model to training mode
      else:
        model.eval() # set model to eval mode

      running_loss = 0.0
      running_corrects = 0

      # iterate over data
      for inputs, labels in dataloader[phase]:
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward
        # track history if only in train
        with torch.set_grad_enabled(phase == 'train'):
          # get model outputs and calculate loss
          # special case for inception because in training it has an auxiliary output.
          # in train mode we calculate the loss by summing the final output and the auxiliary output but in testing
          # we only consider the final output.
          if is_inception and phase == 'train':
            outputs, aux_outputs = model(inputs)
            loss1 = criterion(outputs, labels)
            loss2 = criterion(aux_outputs, labels)
            loss = loss1 + 0.4*loss2
          else:
            outputs = model(inputs)
            loss = criterion(outputs, labels)

          _, preds = torch.max(outputs, 1)

          # backward + optimize only if in training phase
          if phase == 'train':
            loss.backward()
            optimizer.step()

        # statistics
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

    epoch_loss = running_loss / len(dataloaders[phase].dataset)
    epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

    print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

    # deep copy the model
    if phase == 'val' and epoch_acc > best_acc:
      best_acc = epoch_acc
      best_model_wts = copy.deepcopy(model.state_dict())
    if phase == 'val':
      val_acc_history.append(epoch_acc)

  print()
  
  time_elapsed = time.time() - since
  print('Training complete in {:.0f}m {:.0f}s'.format(time_elapse // 60, time_elapsed % 60))
  print('Best val Acc: {:4f}'.format(best_acc))
  
  # load best model weights
  model.load_state_dict(best_model_wts)
  return model, val_acc_history

# datasets and dataloaders

In [0]:
# create training and validation datasets
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train',
                                                                                                   'val']}

# create training and vaildation dataloaders
dataloaders_dict = {x: DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4) for x in
                  ['train', 'val']}

# plot_compare

In [0]:
plt.title("Validation Accuracy vs. Number of Training Epochs")
plt.xlabel("Training epochs")
plt.ylabel("Validation Accuracy")
plt.plot(range(1, num_epochs+1), ohist, label="Pretrained")
plt.plot(range(1, num_epochs+1), shist, label="Scratch")
plt.ylim((0,1.))
plt.xticks(np.arange(1, num_epochs+1, 1.0))
plt.legend()
plt.show()

# STN

In [0]:
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
    self.conv2_drop = nn.Dropout2d()
    self.fc1 = nn.Linear(320, 50)
    self.fc2 = nn.Linear(50, 10)
    
    # Spatial transformer localization-network
    self.localization = nn.Sequential(
        nn.Conv2d(1, 8, kernel_size=7),
        nn.MaxPool2d(2, stride=2),
        nn.ReLU(True),
        nn.Conv2d(8, 10, kernel_size=5),
        nn.MaxPool2d(2, stride=2),
        nn.ReLU(True)
    )
    
    # regressor for the 3 * 2 affine matrix
    self.fc_loc = nn.Sequential(
        nn.Linear(10 * 3 * 3, 32),
        nn.ReLU(True),
        nn.Linear(32, 3 * 2)
    )
    
    # initialize the weights/bias with identity transformation
    self.fc_loc[2].weight.data.zero_()
    self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))
    
  # spatial transformer network forward function
  def stn(self, x):
    xs = self.localization(x)
    xs = xs.view(-1, 10 * 3 * 3)
    theta = self.fc_loc(xs)
    theta = theta.view(-1, 2, 3)
    
    grid = F.affine_grid(theta, x.size())
    x = F.grid_sample(x, grid)
    
    return x
  
  def forward(self, x):
    # transform the input
    x = self.stn(x)
    
    # perform forward pass
    x = F.relu(F.max_pool2d(self.conv1(x), 2))
    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(-1, 320)
    x = F.relu(self.fc1(x))
    x = F.dropout(x, training=self.training)
    x = self.fc2(x)
    return F.log_softmax(x, dim=1)
  
model = Net().to(device)

# train function

In [0]:
# train the model
def train(epoch):
  model.train()
  # total_step = len(train_loader)
  for batch_idx, (images, labels) in enumerate(train_loader):
    # move tensors to the configured device
    images = images.to(device)
    # reshape(*shape) -> Tensor
    # returns a tensor with the same data and number of elements as self but with the specified shape.
    labels = labels.to(device)

    # forward pass
    outputs = model(images)
    loss = F.nll_loss(outputs, labels)

    # backward and optimize
    optimizer.zero_grad() # set gradients of all model parameters to zero
    loss.backward()
    optimizer.step()

    if (batch_idx) % 500 == 0:
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(images), 
                                                                       len(train_loader.dataset), 
                                                                       100. * batch_idx / len(train_loader), 
                                                                       loss.item()))

# test function

In [0]:
def test():
  with torch.no_grad():
      model.eval()
      test_loss = 0
      correct = 0
      total = 0
      for images, labels in test_loader:

          images = images.to(device)
          labels = labels.to(device)

          outputs = model(images)

          _, predicted = torch.max(outputs.data, 1) 

          test_loss += F.nll_loss(outputs, labels, size_averag=False).item()

          pred = output.max(1, keepdim=True)[1]

          correct += pred.eq(target.view_as(pred)).sum().item()
      
      test_loss /= len(test_loader.dataset)
      
      print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct,
                                                                                  len(test_loader.dataset),
                                                                                  100. * correct / 
                                                                                  len(test_loader.dataset)))


# tensor to numpy

In [0]:
def convert_image_np(inp):
  inp = inp.numpy().transpose((1, 2, 0))
  mean = np.array([0.485, 0.456, 0.406])
  std = np.array([0.229, 0.224, 0.225])
  inp = std * inp + mean
  inp = np.clip(inp, 0, 1)
  return inp

# visualize stn

In [0]:
def visualize_stn():
  with torch.no_grad():
    # get a batch of training data
    data = next(iter(test_loader))[0].to(device)
    
    input_tensor = data.cpu()
    transformed_input_tensor = model.stn(data).cpu()
    
    in_grid = convert_image_np(make_grid(input_tensor))
    
    out_grid = convert_image_np(make_grid(transformed_input_tensor))
    
    # plot the results side by side
    f, axarr = plt.subplots(1, 2)
    axarr[0].imshow(in_grid)
    axarr[0].set_title('Dataset Images')
    
    axarr[1].imshow(out_grid)
    axarr[1].set_title('Transformed Images')