<a href="https://colab.research.google.com/github/vainaijr/aiStartUp/blob/master/snippets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PyTorch imports

In [0]:
!pip install \
  http://storage.googleapis.com/pytorch-tpu-releases/tf-1.13/torch-1.0.0a0+1d94a2b-cp36-cp36m-linux_x86_64.whl  \
  http://storage.googleapis.com/pytorch-tpu-releases/tf-1.13/torch_xla-0.1+5622d42-cp36-cp36m-linux_x86_64.whl

In [0]:
%%shell
pip install tensorflow==1.14.0rc0
pip install future
pip install --upgrade tb-nightly
# pip install --upgrade torch
# pip install --upgrade torchvision

# Install latest Tensorflow build
# !pip install -q tf-nightly-2.0-preview
# !pip install tensorflow==2.0.0-alpha0 
# !pip install tensorboardX
# import tensorflow as tf
# from tensorflow import summary
# %load_ext tensorboard.notebook

# current_time = str(datetime.datetime.now().timestamp())
# train_log_dir = 'logs/tensorboard/train/' + current_time
# test_log_dir = 'logs/tensorboard/test/' + current_time
# train_summary_writer = summary.create_file_writer(train_log_dir)
# test_summary_writer = summary.create_file_writer(test_log_dir)

In [0]:
from __future__ import print_function, unicode_literals, division
from IPython.core.debugger import set_trace
from IPython.display import HTML
from pprint import pprint

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets
from torch.nn.utils.weight_norm import WeightNorm
from torch.optim.lr_scheduler import StepLR
from torch.optim import Adam
from torchvision.transforms import Compose, RandomHorizontalFlip, RandomResizedCrop, ToTensor, Normalize
from torchvision.transforms import CenterCrop, Resize, ColorJitter, ToPILImage
import torchvision.models as models
from torch.autograd import Variable
from torch.utils.data import DataLoader, Dataset, sampler
from torchvision.utils import make_grid, save_image
from torch.utils.tensorboard import SummaryWriter
import torch_xla
import torch_xla
import torch_xla_py.utils as xu
import torch_xla_py.xla_model as xm
import copy, json, glob, time, math, os, datetime, string, random, re, warnings
import numpy as np
from PIL import Image, ImageEnhance
import pandas as pd
from skimage import io, transform
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.animation as animation
import h5py
from io import open
from tqdm import tqdm
from abc import abstractmethod

warnings.filterwarnings("ignore")
plt.ion() # interactive mode


In [0]:
print("CUDA available: ", torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [0]:
writer = SummaryWriter()

# LeNet model


In [0]:
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
    self.conv2_drop = nn.Dropout2d()
    self.fc1 = nn.Linear(320, 50)
    self.fc2 = nn.Linear(50, 10)
  
  def forward(self, x):
    x = F.relu(F.max_pool2d(self.conv1(x), 2))
    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(-1, 320)
    x = F.relu(self.fc1(x))
    x = F.dropout(x, training=self.training)
    x = self.fc2(x)
    return F.log_softmax(x, dim=1)

# MNIST dataset, dataloader

In [0]:
# train dataset
train_loader = DataLoader(datasets.MNIST('./', train=True, download=True,
                                                         transform=Compose([
                                                             ToTensor(),
                                                             Normalize((0.1307,), (0.3081,))
                                                         ])), batch_size=64, shuffle=True, num_workers=4)

# test dataset
test_loader = DataLoader(datasets.MNIST('./', train=False, download=False,
                                                         transform=Compose([
                                                             ToTensor(),
                                                             Normalize((0.1307,), (0.3081,))
                                                         ])), batch_size=1, shuffle=True, num_workers=4)

# GAN
mnist_loader = DataLoader(datasets.MNIST('./', train=True, download=True,
                                                         transform=Compose([
                                                             Resize(img_size),
                                                             ToTensor(),
                                                             Normalize([0.5], [0.5])
                                                         ])), batch_size=batch_size, shuffle=True,)


# CUDA

In [0]:
print("CUDA available: ", torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# transforms

In [0]:
transforms = transforms.Compose([
    transforms.Resize(imsize),
    transforms.ToTensor()
])

train_transform = Compose([
    RandomCrop(200),
    RandomHorizontalFlip(),
    ColorJitter(),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])

data_transforms = {
    
  'train' : Compose([
    RandomResizedCrop(input_size),
    RandomHorizontalFlip(),
    ColorJitter(),
    ToTensor(),
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
  ]),
  'val' : Compose([
      Resize(input_size),
      CenterCrop(input_size),
      ToTensor(),
      Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
  ])    
}

# unicodeToAscii

In [0]:
# turn a Unicode string to plain ASCII
def unicodeToAscii(s):
  return ''.join(
      c for c in unicodedata.normalize('NFD', s)
      if unicodedata.category(c) != 'Mn'
      and c in all_letters
  )

# RNN

In [0]:
class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super(RNN, self).__init__()
    
    self.hidden_size = hidden_size
    self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
    self.i2o = nn.Linear(input_size + hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=1)
  
  def forward(self, input, hidden):
    combined = torch.cat((input, hidden), 1)
    hidden = self.i2h(combined)
    output = self.i2o(combined)
    output = self.softmax(output)
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1, self.hidden_size)

# Hyperparameters

In [0]:
input_size = 784
hidden_size = 500
num_classes = 10
num_epochs = 5
batch_size = 100
learning_rate = 0.001

# Lang

In [0]:
SOS_token = 0
EOS_token = 1

class Lang:
  def __init__(self, name):
    self.name = name
    self.word2index = {}
    self.word2count = {}
    self.index2word = {0: "SOS", 1: "EOS"}
    self.n_words = 2 # count SOS and EOS
    
  def addSentence(self, sentence):
    for word in sentence.split(' '):
      self.addWord(word)
  
  def addWord(self, word):
    if word not in self.word2index:
      self.word2index[word] = self.n_words
      self.word2count[word] = 1
      self.index2word[self.n_words] = word
      self.n_words += 1
    else:
      self.word2count[word] += 1

# normalizeString

In [0]:
# lowercase, trim, and remove non-letter characters
def normalizeString(s):
  s = unicodeToAscii(s.lower().strip())
  s = re.sub(r"([.!?])", r" \1", s)
  s = re.sub(r"[^a-zA-z.!?]", r" ", s)
  return s

# EncoderRNN

In [0]:
# the encoder of a seq2seq network is a RNN that outputs some value for every word from the input sentence.
# for every input word the encoder outputs a vector and a hidden state, and uses the hidden state for the next
# input word.
class EncoderRNN(nn.Module):
  def __init__(self, input_size, hidden_size):
    super(EncoderRNN, self).__init__()
    self.hidden_size = hidden_size
    
    self.embedding = nn.Embedding(input_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)
  
  def forward(self, input, hidden):
    embedded = self.embedding(input).view(1, 1, -1)
    output = embedded
    output, hidden = self.gru(output, hidden)
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)

# DecoderRNN

In [0]:
class DecoderRNN(nn.Module):
  def __init__(self, hidden_size, output_size):
    super(DecoderRNN, self).__init__()
    self.hidden_size = hidden_size
    
    self.embedding = nn.Embedding(output_size, hidden_size)
    self.gru = nn.GRU(hidden_size, hidden_size)
    self.out = nn.Linear(hidden_size, output_size)
    self.softmax = nn.LogSoftmax(dim=1)
    
  def forward(self, input, hidden):
    output = self.embedding(input).view(1, 1, -1)
    output = F.relu(output)
    output, hidden = self.gru(output, hidden)
    output = self.softmax(self.out(output[0]))
    return output, hidden
  
  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)

# AttnDecoderRNN

In [0]:
class AttnDecoderRNN(nn.Module):
  def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH):
    super(AttnDecoderRNN, self).__init__()
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.dropout_p = dropout_p
    self.max_length = max_length
    
    self.embedding = nn.Embedding(self.output_size, self.hidden_size)
    self.attn = nn.Linear(self.hidden_size * 2, self.max_length)
    self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
    self.dropout = nn.Dropout(self.dropout_p)
    self.gru = nn.GRU(self.hidden_size, self.hidden_size)
    self.out = nn.Linear(self.hidden_size, self.output_size)
  
  def forward(self, input, hidden, encoder_outputs):
    embedded = self.embedding(input).view(1, 1, -1)
    embedded = self.dropout(embedded)
    
    attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
    attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))
    
    output = torch.cat((embedded[0], attn_applied[0]), 1)
    output = self.attn_combine(output).unsqueeze(0)
    
    output = F.relu(output)
    output, hidden = self.gru(output, hidden)
    
    output = F.log_softmax(self.out(output[0]), dim=1)
    return output, hidden, attn_weights
  
  def initHidden(self):
    return torch.zeros(1, 1, self.hidden_size, device=device)

# timeSince

In [0]:
def asMinutes(s):
  m = math.floor(s / 60)
  s -= m * 60
  return '%dm %ds' % (m, s)

def timeSince(since, percent):
  now = time.time()
  s = now - since
  es = s / (percent)
  rs = es - s
  return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

# plotting

In [0]:
plt.switch_backend('agg')

def showPlot(points):
  plt.figure()
  fig, ax = plt.subplots()
  loc = ticker.MultipleLocator(base=0.2) # this locator puts ticks at regulate intervals
  ax.yaxis.set_major_locator(loc)
  plt.plot(points)
  
plt.figure(figsize=(10, 5))
plt.subplot(121)
plt.title("Test dataset 'Horses'")
plt.imshow(test_A[0])
plt.subplot(122)
plt.title("Test dataset 'Zebras'")
plt.imshow(test_B[0])

# BiRNN

In [0]:
# Recurrent Neural Networks (many to one)
class BiRNN(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers, num_classes):
    super(BiRNN, self).__init__()
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
    # torch.nn.LSTM(*args, **kwargs)
    # applies a multi layer long shot term memory RNN to an input sequence
    
    # parameters
    # input_size - the number of expected features in the input x
    # hidden_size - the number of features in the hidden state h
    # num_layers - number of recurrent layers, example, setting num_layers = 2 would mean stacking two LSTMs together
    # to form a stacked LSTM, with the second LSTM taking in outputs of the first LSTM and computing the final results
    # default: 1
    # bias - if False, then the layer does not use bias weights, default: True
    # batch_first - if True, then the input and output tensors are provided as (batch, seq, feature), default: False
    # dropout - if non-zero, introduces a Dropout layer on the outputs of each LSTM layer except the last layer,
    # with dropout probability equal to dropout, default: 0
    # bidirectional - if True, becomes a bidirectional LSTM, default: False
    
    # inputs - input, (h_0, c_0)
    
    # input of shape (seq_len, batch, input_size) - tensor containing the features of the input sequence, the input 
    # can also be a packed variable length sequence
    
    # h_0 of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the initial hidden state for 
    # each element in the batch, if the LSTM is bidirectional, num_directions should be 2, else it should be 1
    
    # c_0 of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the initial cell state for each
    # element in the batch
    
    # if (h_0, c_0) is not provided, both h_0 and c_0 default to zero
    
    # outputs - output, (h_n, c_n)
    
    # output of shape (seq_len, batch, num_directions*hidden_size) - tensor containing the ouput features (h_t) from
    # the last layer of the LSTM, for each t
    
    # h_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the hidden state for 
    # t = seq_len
    
    # c_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the cell state for t = seq_len
    
    self.fc = nn.Linear(hidden_size*2, num_classes)
    # torch.nn.Linear(in_features, out_features, bias=True)
    # applies a linear transformation to the incoming data

    # parameters
    # in_feature - size of each input sample
    # out_feature - size of each output sample
    # bias - if set to False, the layer will not learn an additive bias, default: True
    
  def forward(self, x):
    # set initial hidden and cell states
    h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size)
    # torch.zeros(*sizes, out=None, layout=torch.strided, device=None, requires_grad=False)
    # returns a tensor filled with the scalar value 0, with the shape defined by the variable argument sizes

    # parameters
    # sizes - a sequence of integers defining the shape of the output tensor, can be a variable number of argumentss
    # or a collection like a list or tuple
    # out - the output tensor
    # dtype - the desired data type of returned tensor, default: if None, uses a global default
    # layout - the desired layout of returned tensor, default: torch.strided
    # device - the desired device of returned tensor
    # requires_grad - if autograd should record operations on the returned tensor, default: False

    c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size)

    # forward propagate LSTM
    out, _ = self.lstm(x, (h0, c0))

    # outputs - output, (h_n, c_n)

    # output of shape (seq_len, batch, num_directions*hidden_size) - tensor containing the ouput features (h_t) from
    # the last layer of the LSTM, for each t

    # h_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the hidden state for 
    # t = seq_len

    # c_n of shape (num_layers*num_directions, batch, hidden_size) - tensor containing the cell state for 
    # t = seq_len

    # decode the hidden state of the last time step
    out = self.fc(out[:, -1, :])

    return out

# loss, optimizer and learning rate scheduler

In [0]:
# loss and optimizer

criterion = nn.CrossEntropyLoss() # loss function, example, MSELoss, L1Loss, CTCLoss, NLLLoss, PoissonNLLLoss, KLDivLoss,BCELoss,
# BCEWithLogitsLoss, MarginRankingLoss, HingeEmbeddingLoss, MultiLabelMarginLoss, SmoothL1Loss, SoftMarginLoss, 
# MultiLabelSoftMarginLoss, CosineEmbeddingLoss, MultiMarginLoss, TripletMarginLoss

params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.Adam(params, lr=learning_rate) # optimizer, example, Adadelta, Adagrad, SparseAdam, Adamax,
# ASGD, LBFGS, RMSprop, Rprop, SGD

# and a learning rate scheduler which decrease the learning rate by 10x every 3 epochs
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# train

In [0]:
# train the model
total_step = len(train_loader)
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        # move tensors to the configured device
        images = images.reshape(-1, 28*28).to(device)
        # reshape(*shape) -> Tensor
        # returns a tensor with the same data and number of elements as self but with the specified shape.
        labels = labels.to(device)
        
        # forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)
        
        # backward and optimize
        optimizer.zero_grad() # set gradients of all model parameters to zero
        loss.backward()
        optimizer.step()
        
        if (i+1) % 100 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, i+1, total_step, loss.item()))

# test

In [0]:

with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        
        images = images.reshape(-1, 28*28).to(device)
        labels = labels.to(device)
        
        outputs = model(images)
        
        _, predicted = torch.max(outputs.data, 1) 
        
        # torch.max(input, dim, keepdim=False, out=None) returns a namedtuple (values, indices) where values is the maximum
        # value of each row of the input tensor in the given dimension dim, and indices is the index location of each maximum
        # value found (argmax).
        # If keepdim is True, the output tensors are of the same size as input except in the dimension dim where they are of
        # size 1
        
        total += labels.size(0)
        
        correct += (predicted == labels).sum().item() 
        
    print('Accuracy of the network on the 10000 test images: {} %'.format(100*correct / total))
    

# make_grid

In [0]:
def imshow(image):
  if isinstance(image, torch.Tensor):
    image = image.numpy().transpose((1, 2, 0))
  else:
    image = np.array(image).transpose((1, 2, 0))
  mean = np.array([0.485, 0.456, 0.406])
  std = np.array([0.229, 0.224, 0.225])
  image = std * image + mean
  image = np.clip(image, 0, 1)
  fig, ax = plt.subplots(1, 1, figsize=(15, 15))
  plt.imshow(image)
  ax.axis('off')

images, _ = next(iter(train_loader))
out = make_grid(images, nrow=8)
imshow(out)

# Ignite import

In [0]:
# Ignite is a high level library to help with training neural networks in PyTorch, it comes with an Engine to setup
# a training loop, various metrics, handlers
!pip install ignite
from ignite.engine import Engine, Events 
# Engine - runs a given process_function over each batch of a dataset, emitting events as it goes
# Events - allows users to attach functions to an Engine to fire functions at a specific event, example: 
# EPOCH_COMPLETED, ITERATION_STARTED etc.

from ignite.metrics import Accuracy, Loss, RunningAverage
# Accuracy - metric to calculate accuracy over a dataset, for binary, multiclass, multilabel cases
# Loss - general metric that takes a loss function as a parameter, calculate loss over a dataset
# RunningAverage - general metric to attach to Engine during training

from ignite.handlers import ModelCheckpoint, EarlyStopping
# ModelCheckpoint - handler to checkpoint models
# EarlyStopping - handler to stop training based on a score function

from ignite.contrib.handlers import ProgressBar
# ProgressBar - handler to create a tqdm progress bar, tqdm means progress in Arabic

# TextCNN

In [0]:
# TextCNN model
# the model works for variable text lengths, we embed the words of a sentence, use convolutions, maxpooling
# and concatenation to embed the sentence as a single vector
# the single vector is passed through a fully connected layer with sigmoid to output a single value
# this value can be interpreted as the probability a sentence is positive (closer to 1) or negative (closer to 0)

# the minimum length of text expected by the model is the size of the smallest kernel size of the model

class TextCNN(nn.Module):
  def __init__(self, vocab_size, embedding_dim, kernel_sizes, num_filters, num_classes, d_prob, mode):
    super(TextCNN, self).__init__()
    self.vocab_size = vocab_size
    self.embedding_dim = embedding_dim
    self.kernel_sizes = kernel_sizes
    self.num_filters = num_filters
    self.num_classes = num_classes
    self.d_prob = d_prob
    self.mode = mode
    self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=1)
    self.load_embeddings()
    self.conv = nn.ModuleList([nn.Conv1d(in_channels=embedding_dim, out_channels=num_filters, kernel_size=k,
                                        stride=1) for k in kernel_sizes])
    self.dropout = nn.Dropout(d_prob)
    self.fc = nn.Linear(len(kernel_sizes)*num_filters, num_classes)
  
  def forward(self, x):
    batch_size, sequence_length = x.shape
    x = self.embedding(x).transpose(1, 2)
    x = [F.relu(conv(x)) for conv in self.conv]
    x = [F.max_pool1d(c, c.size(-1)).squeeze(dim=-1) for c in x]
    x = torch.cat(x, dim=1)
    x = self.fc(self.dropout(x))
    return torch.sigmoid(x).squeeze()
  
  def load_embeddings(self):
    if 'static' in self.mode:
      self.embedding.weight.data.copy_(TEXT.vocab.vectors)
      if 'non' not in self.mode:
        self.embedding.weight.data.requires_grad = False
        print('Loaded pretrained embeddings, weights are not trainable')
      else:
        self.embedding.weight.data.requires_grad = True
        print('Loaded pretrained embeddings, weights are trainable')
      
    elif self.mode == 'rand':
      print('Randoml initialized embeddings are used')
    else:
      raise ValueError('Unexpected value of mode')

# Ignite process_function

In [0]:
# training and evaluating using Ignite
# Ignite's Engine allows user to define a process_function a given batch, this is applied to all the batches of 
# the dataset, this is a general class that can be applied to train and validate models, a process_function
# has two parameters, engine and batch

# the function of the trainer
# sets model in train mode
# sets the gradient of the optimizer to zero
# generate x and y from batch
# performs a forward pass to calculate y_pred using model and x
# calculates loss using y_pred and y
# performs a backward pass using loss to calculate gradients for the model parameters
# model parameters are optimized using gradients and optimizer
# returns scalar loss

def process_function(engine, batch):
  model.train()
  optimizer.zero_grad()
  x, y = batch.text, batch.label
  y_pred = model(x)
  loss = criterion(y_pred, y)
  loss.backward()
  optimizer.step()
  return loss.item()

# Ignite eval_function

In [0]:
# evaluator engine - process_function

# similar to the training process function, we setup a function to evaluate a single batch

# eval_function
# sets model in eval mode
# generates x and y from batch
# with torch.no_grad(), no gradients are calculated for any succeeding steps
# performs a forward pass on the model to calculate y_pred based on model and x
# returns y_pred and y

# Ignite suggests attaching metrics to evaluators and not trainers

# all metrics in Ignite require y_pred and y as outputs of the function attached to the Engine

def eval_function(engine, batch):
  model.eval()
  with torch.no_grad():
    x, y = batch.text, batch.label
    y_pred = model(x)
    return y_pred, y

# CustomDataset

In [0]:
class CustomDataset(Dataset):
  def __init__(self):
    super().__init__()
  
  def __len__(self):
    return 0
  
  def __getitem__(self, idx):
    return None
  
class FaceLandmarksDataset(Dataset):
  def __init__(self, csv_file, root_dir, transform=None):
    # csv_file (string): path to the csv file with annotations.
    # root_dir (string): directory with all the images.
    # transform (callable, optional): optional transform to be applied on a sample.
    
    self.landmarks_frame = pd.read_csv(csv_file)
    self.root_dir = root_dir
    self.transform = transform
  
  def __len__(self):
    return len(self.landmarks_frame)
  
  def __getitem__(self, idx):
    img_name = os.path.join(self.root_dir, self.landmarks_frame.iloc[idx, 0])
    image = io.imread(img_name)
    landmarks = self.landmarks_frame.iloc[idx, 1:].as_matrix()
    landmarks = landmarks.astype('float').reshape(-1, 2)
    sample = {'image': image, 'landmarks': landmarks}
    
    if self.transform:
      sample = self.transform(sample)
    
    return sample

# Transfer Learning

In [0]:
# training helpers

# list(get_trainable(model.parameters()))
def get_trainable(model_params):
  return (p for p in model_params if p.requires_grad)

# list(get_frozen(model.parameters()))
def get_frozen(model_params):
  return (p for p in model_params if not p.requires_grad)

# all_trainable(model.parameters())
def all_trainable(model_params):
  return all(p.requires_grad for p in model.params)

# all_frozen(model.parameters())
def all_frozen(model_params):
  return all(not p.requires_grad for p in model.params)

def freeze_all(model_params):
  for param in model_params:
    param.requires_grad = False

# pretrainedmodels

In [0]:
!pip install pretrainedmodels
import pretrainedmodels
print(pretrainedmodels.model_names)

model_name = 'nasnetalarge' # could be fbresnet152 or inceptionresnetv2
model = pretrainedmodels.__dict__[model_name](num_classes=1000, pretrained='imagenet')
model.eval()

# train_model_helper

In [0]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=False):
  since = time.time()
  
  val_acc_history = []
  
  best_model_wts = copy.deepcopy(model.state_dict())
  best_acc = 0.0
  
  for epoch in range(num_epochs):
    print('Epoch {}/{}'.format(epoch, num_epochs - 1))
    print('-' * 10)

    # each epoch has a training and validation phase
    for phase in ['train', 'val']:
      if phase == 'train':
        model.train() # set model to training mode
      else:
        model.eval() # set model to eval mode

      running_loss = 0.0
      running_corrects = 0

      # iterate over data
      for inputs, labels in dataloader[phase]:
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward
        # track history if only in train
        with torch.set_grad_enabled(phase == 'train'):
          # get model outputs and calculate loss
          # special case for inception because in training it has an auxiliary output.
          # in train mode we calculate the loss by summing the final output and the auxiliary output but in testing
          # we only consider the final output.
          if is_inception and phase == 'train':
            outputs, aux_outputs = model(inputs)
            loss1 = criterion(outputs, labels)
            loss2 = criterion(aux_outputs, labels)
            loss = loss1 + 0.4*loss2
          else:
            outputs = model(inputs)
            loss = criterion(outputs, labels)

          _, preds = torch.max(outputs, 1)

          # backward + optimize only if in training phase
          if phase == 'train':
            loss.backward()
            optimizer.step()

        # statistics
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)

    epoch_loss = running_loss / len(dataloaders[phase].dataset)
    epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

    print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

    # deep copy the model
    if phase == 'val' and epoch_acc > best_acc:
      best_acc = epoch_acc
      best_model_wts = copy.deepcopy(model.state_dict())
    if phase == 'val':
      val_acc_history.append(epoch_acc)

  print()
  
  time_elapsed = time.time() - since
  print('Training complete in {:.0f}m {:.0f}s'.format(time_elapse // 60, time_elapsed % 60))
  print('Best val Acc: {:4f}'.format(best_acc))
  
  # load best model weights
  model.load_state_dict(best_model_wts)
  return model, val_acc_history

# datasets and dataloaders

In [0]:
# create training and validation datasets
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train',
                                                                                                   'val']}

# create training and vaildation dataloaders
dataloaders_dict = {x: DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4) for x in
                  ['train', 'val']}

# plot_compare

In [0]:
plt.title("Validation Accuracy vs. Number of Training Epochs")
plt.xlabel("Training epochs")
plt.ylabel("Validation Accuracy")
plt.plot(range(1, num_epochs+1), ohist, label="Pretrained")
plt.plot(range(1, num_epochs+1), shist, label="Scratch")
plt.ylim((0,1.))
plt.xticks(np.arange(1, num_epochs+1, 1.0))
plt.legend()
plt.show()

# STN

In [0]:
class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
    self.conv2_drop = nn.Dropout2d()
    self.fc1 = nn.Linear(320, 50)
    self.fc2 = nn.Linear(50, 10)
    
    # Spatial transformer localization-network
    self.localization = nn.Sequential(
        nn.Conv2d(1, 8, kernel_size=7),
        nn.MaxPool2d(2, stride=2),
        nn.ReLU(True),
        nn.Conv2d(8, 10, kernel_size=5),
        nn.MaxPool2d(2, stride=2),
        nn.ReLU(True)
    )
    
    # regressor for the 3 * 2 affine matrix
    self.fc_loc = nn.Sequential(
        nn.Linear(10 * 3 * 3, 32),
        nn.ReLU(True),
        nn.Linear(32, 3 * 2)
    )
    
    # initialize the weights/bias with identity transformation
    self.fc_loc[2].weight.data.zero_()
    self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0], dtype=torch.float))
    
  # spatial transformer network forward function
  def stn(self, x):
    xs = self.localization(x)
    xs = xs.view(-1, 10 * 3 * 3)
    theta = self.fc_loc(xs)
    theta = theta.view(-1, 2, 3)
    
    grid = F.affine_grid(theta, x.size())
    x = F.grid_sample(x, grid)
    
    return x
  
  def forward(self, x):
    # transform the input
    x = self.stn(x)
    
    # perform forward pass
    x = F.relu(F.max_pool2d(self.conv1(x), 2))
    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
    x = x.view(-1, 320)
    x = F.relu(self.fc1(x))
    x = F.dropout(x, training=self.training)
    x = self.fc2(x)
    return F.log_softmax(x, dim=1)
  
model = Net().to(device)

# train function

In [0]:
# train the model
def train(epoch):
  model.train()
  # total_step = len(train_loader)
  for batch_idx, (images, labels) in enumerate(train_loader):
    # move tensors to the configured device
    images = images.to(device)
    # reshape(*shape) -> Tensor
    # returns a tensor with the same data and number of elements as self but with the specified shape.
    labels = labels.to(device)

    # forward pass
    outputs = model(images)
    loss = F.nll_loss(outputs, labels)

    # backward and optimize
    optimizer.zero_grad() # set gradients of all model parameters to zero
    loss.backward()
    optimizer.step()

    if (batch_idx) % 500 == 0:
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(epoch, batch_idx * len(images), 
                                                                       len(train_loader.dataset), 
                                                                       100. * batch_idx / len(train_loader), 
                                                                       loss.item()))

# test function

In [0]:
def test():
  with torch.no_grad():
      model.eval()
      test_loss = 0
      correct = 0
      total = 0
      for images, labels in test_loader:

          images = images.to(device)
          labels = labels.to(device)

          outputs = model(images)

          _, predicted = torch.max(outputs.data, 1) 

          test_loss += F.nll_loss(outputs, labels, size_averag=False).item()

          pred = output.max(1, keepdim=True)[1]

          correct += pred.eq(target.view_as(pred)).sum().item()
      
      test_loss /= len(test_loader.dataset)
      
      print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct,
                                                                                  len(test_loader.dataset),
                                                                                  100. * correct / 
                                                                                  len(test_loader.dataset)))


# tensor to numpy

In [0]:
def convert_image_np(inp):
  inp = inp.numpy().transpose((1, 2, 0))
  mean = np.array([0.485, 0.456, 0.406])
  std = np.array([0.229, 0.224, 0.225])
  inp = std * inp + mean
  inp = np.clip(inp, 0, 1)
  return inp

# visualize stn

In [0]:
def visualize_stn():
  with torch.no_grad():
    # get a batch of training data
    data = next(iter(test_loader))[0].to(device)
    
    input_tensor = data.cpu()
    transformed_input_tensor = model.stn(data).cpu()
    
    in_grid = convert_image_np(make_grid(input_tensor))
    
    out_grid = convert_image_np(make_grid(transformed_input_tensor))
    
    # plot the results side by side
    f, axarr = plt.subplots(1, 2)
    axarr[0].imshow(in_grid)
    axarr[0].set_title('Dataset Images')
    
    axarr[1].imshow(out_grid)
    axarr[1].set_title('Transformed Images')

# image loader

In [0]:
def image_loader(image_name):
  image = Image.open(image_name)
  # fake batch dimension required to fit network's input dimensions
  image = transforms(image).unsqueeze(0)
  return image.to(device, torch.float)

# tensor to PIL show

In [0]:
unloader = ToPILImage() # reconvert into PIL image

plt.ion()

def imshow(tensor, title=None):
  image = tensor.cpu().clone() # we clone the tensor to not do changes on it
  image = image.squeeze(0)
  image = unloader(image)
  plt.imshow(image)
  if title is not None:
    plt.title(title)
  plt.pause(0.001) # pause a bit so that plots are updated

plt.figure()
imshow(style_img, title='Style Image')

plt.figure()
imshow(content_img, title='Content Image')


# gram matrix

In [0]:
def gram_matrix(input):
  a, b, c, d = input.size() 
  # a = batch size(=1)
  # b = number of feature maps
  # (c, d) = dimensions of a f. map (N=c*d)
  
  features = input.view(a * b, c * d)
  
  G = torch.mm(features, features.t()) # compute the gram product 
  
  # we 'normalize' the values of the gram matrix by dividing by the number of element in each feature maps.
  return G.div(a * b * c * d)

# Normalization

In [0]:
# create a module to normalize input image so we can easily put it in a nn.Sequential
class Normalization(nn.Module):
  def __init__(self, mean, std):
    super(Normalization, self).__init__()
    # view the mean and std to make them [c x 1 x] so that they can directly work with image Tensor of shape
    # [B x C x H x W].
    # B is batch size.
    # C is number of channels.
    # H is height and W is width
    self.mean = torch.tensor(mean).view(-1, 1, 1)
    self.std = torch.tensor(std).view(-1, 1, 1)
  
  def forward(self, img):
    # normalize img
    return (img - self.mean) / self.std

# FGSM attack

In [0]:
# FGSM attack code
def fgsm_attack(image, epsilon, data_grad):
  # collect the element-wise sign of the data gradient
  sign_data_grad = data_grad.sign()
  
  # create the perturbed image by adjusting each pixel of the input image
  perturbed_image = image + epsilon * sign_data_grad
  
  # adding clipping to maintain [0, 1] range
  perturbed_image = torch.clamp(perturbed_image, 0, 1)
  
  return perturbed_image

# fgsm attack test
def test(model, device, test_loader, epsilon):
  
  test_loss = 0
  correct = 0
  total = 0
  adv_examples = []
  for data, target in test_loader:

      data = data.to(device)
      target = target.to(device)

      data.requires_grad = True
      
      output = model(data)

      init_pred = output.max(1, keepdim=True)[1]
      
      if init_pred.item() != target.item(): # if the initial prediction is wrong, do not bother attacking
        continue
        
      loss = F.nll_loss(output, target)
      
      model.zero_grad()
      
      loss.backward()
      
      # collect datagrad
      data_grad = data.grad.data
      
      # call FGSM attack
      perturbed_data = fgsm_attack(data, epsilon, data_grad)
      
      # reclassify the perturbed image
      output = model(perturbed_data)
      
      # check for success
      final_pred = output.max(1, keepdim=True)[1] # get the index of the max log probability
      if final_pred.item() == target.item():
        correct += 1
        if (epsilon == 0) and (len(adv_examples) < 5):
          # special case for saving 0 epsilon examples
          adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
          adv_examples.append((init_pred.item(), final_pred.item(), adv_ex))
        else:
          # save some adv examples for visualization later
          if len(adv_examples) < 5:
            adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
            adv_examples.append((init_pred.item(), final_pred.item(), adv_ex))
  
  
  # calculate final accuracy for this epsilon
  final_acc = correct/float(len(test_loader))

  print('Epsilon: {}\t, Test Accuracy = {} / {} = {}'.format(epsilon, correct,
                                                                              len(test_loader),
                                                                              final_acc))
  # return the accuracy and an adversarial example
  return final_acc, adv_examples



# show image with landmarks

In [0]:
# show image with landmarks
def show_landmarks(image, landmarks):
  plt.imshow(image)
  plt.scatter(landmarks[:, 0], landmarks[:, 1], s=10, marker='.', c='r')
  plt.pause(0.001) # pause a bit so that plots are updated

plt.figure()
show_landmarks(io.imread(os.path.join('faces/', img_name)), landmarks)

plt.show()

# CustomTransforms

In [0]:
class Rescale(object):
  """
  
  Rescale the image in a sample to a given size.
  
  Args:
    output_size (tuple or int): desired output size.
    if tuple, output is matched to output_size.
    if int, smaller of image edges is matched to output_size keeping aspect ratio the same.
  
  """
  
  def __init__(self, output_size):
    assert isinstance(output_size, (int, tuple))
    self.output_size = output_size
  
  def __call__(self, sample):
    image, landmarks = sample['image'], sample['landmarks']
    
    h, w = image.shape[:2]
    
    if isinstance(self.output_size, int):
      if h > w:
        new_h, new_w = self.output_size * h / w, self.output_size
      else:
        new_h, new_w = self.output_size, self.output_size * w / h
    else:
      new_h, new_w = int(new_h), int(new_w)
      
    img = transform.resize(image, (new_h, new_w))
    
    # h and w are swapped for landmarks because for images, x and y axes are axis 1 and 0 respectively
    
    landmarks = landmarks * [new_w / w, new_h / h]
    
    return {'image': img, 'landmarks': landmarks}

class RandomCrop(object):
  """
  
  Crop randomly the image in a sample.
  
  Args:
    output_size (tuple or int): desired output size.
    if int, square crop is made.
  
  
  """
  
  def __init__(self, output_size):
    assert isinstance(output_size, (int, tuple))
    if isinstance(output_size, int):
      self.output_size = (output_size, output_size)
    else:
      assert len(output_size) == 2
      self.output_size = output_size
  
  def __call__(self, sample):
    image, landmarks = sample['image'], sample['landmarks']
    h, w = iamge.shape[:2]
    new_h, new_w = self.output_size
    
    top = np.random.randint(0, h - new_h)
    left = np.random.randint(0, w - new_w)
    
    image = image[top: top + new_h, left: left + new_w]
    
    landmarks = landmarks - [left, top]
    
    return {'image': image, 'landmarks': landmarks}

class ToTensor(object):
  """
  
  convert ndarrays in sample to tensors.
  
  """
  def __call__(self, sample):
    image, landmarks = sample['image'], sample['landmarks']
    
    # swap color axis because
    # numpy image : H x W x C
    # torch image : C x H x w
    
    image = image.transpose((2, 0, 1))
    
    return {'image': torch.from_numpy(image), 'landmarks': torch.from_numpy(landmarks)}
    

# sparsity

In [0]:
def sparsity(cl_data_file):
  class_list = cl_data_file.keys()
  cl_sparsity = []
  for cl in class_list:
    cl_sparsity.append(np.mean([np.sum(x!=0) for x in cl_data_file[cl]]))
    
  return np.mean(cl_sparsity)

# one_hot

In [0]:
def one_hot(y, num_class):
  return torch.zeros((len(y), num_class)).scatter_(1, y.unsqueeze(1), 1)

# DBindex

In [0]:
def DBindex(cl_data_file):
  class_list = cl_data_file.keys()
  cl_num = len(class_list)
  cl_means = []
  stds = []
  DBs = []
  for cl in class_list:
    cl_means.append(np.mean(cl_data_file[cl], axis=0))
    stds.append(np.sqrt(np.mean(np.sum(np.square(cl_data_file[cl] - cl_means[-1]), axis = 1))))
    
  mu_i = np.tile(np.expand_dims(np.array(cl_means), axis = 0), len(class_list), 1, 1)
  mu_j = np.transpose(mu_i, (1, 0, 2))
  mdists = np.sqrt(np.sum(np.square(mu_i - mu_j), axis = 2))
  
  for i in range(cl_num):
    DBs.append(np.max([(stds[i] + stds[j])/mdists[i, j] for j in range(cl_num) if j != i]))
  return np.mean(DBs)


# CustomTransforms

In [0]:
class TransformLoader:
  def __init__(self, image_size, normalize_param = dict(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
               jitter_param=dict(Brightness=0.4, Contrast=0.4, Color=0.4)):
    self.image_size = image_size
    self.normalize_param = normalize_param
    self.jitter_param = jitter_param
    
  def parse_transform(self, transform_type):
    if transform_type == 'ImageJitter':
      method = add_transforms.ImageJitter(self.jitter_param)
      return method
    method = getattr(transforms, transform_type)
    if transform_type == 'RandomSizedCrop':
      return method(self.image_size)
    elif transform_type == 'CenterCrop':
      return method(self.image_size)
    elif transform_type == 'Scale':
      return method([int(self.image_size*1.15), int(self.image_size*1.15)])
    elif transform_type == 'Normalize':
      return method(**self.normalize_param)
    else:
      return method()
    
  def get_composed_transform(self, aug = False):
    if aug:
      transform_list = ['RandomSizedCrop', 'ImageJitter', 'RandomHorizontalFlip', 'ToTensor', 'Normalize']
    else:
      transform_list = ['Scale', 'CenterCrop', 'ToTensor', 'Normalize']
    
    transform_funcs = [self.parse_transform(x) for x in transform_list]
    transform = transforms.Compose(transform_funcs)
    return transform

# SimpleDataset

In [0]:
class SimpleDataset:
  def __init__(self, data_file, transform, target_transform=identity):
    with open(data_file, 'r') as f:
      self.meta = json.load(f)
    self.transform = transform
    self.target_transform = target_transform
  
  def __getitem__(self, i):
    image_path = os.path.join(self.meta['image_names'][i])
    img = Image.open(image_path).convert('RGB')
    img = self.transform(img)
    target = self.target_transform(self.meta['image_labels'])
    return img, target
  
  def __len__(self):
    return len(self.meta['image_names'])

# SetDataset, SubDataset

In [0]:
class SetDataset:
  def __init__(self, data_file, batch_size, transform):
    with open(data_file, 'r') as f:
      self.meta = json.load(f)
      
    self.cl_list = np.unique(self.meta['image_labels']).tolist()
    
    self.sub_meta = {}
    
    for cl in self.cl_list:
      self.sub_meta[cl] = []
      
    for x, y in zip(self.meta['image_names'], self.meta['image_labels']):
      self.sub_meta[y].append(x)
    
    self.sub_dataloader = []
    
    # use main thread only or may receive multiple batches
    sub_data_loader_params = dict(batch_size = batch_size, shuffle = True, num_workers = 0, pin_memory = False)
    
    for cl in self.cl_list:
      sub_dataset = SubDataset(self.sub_meta[cl], cl, transform = transform)
      self.sub_dataloader.append(DataLoader(sub_dataset, **sub_data_loader_params))
  
  def __getitem__(self, i):
    return next(iter(self.sub_dataloader[i]))
  
  def __len__(self):
    return len(self.cl_list)
  
class SubDataset:
  def __init__(self, sub_meta, cl, transform=transforms.ToTensor(), target_transform=identity):
    self.sub_meta = sub_meta
    self.cl = cl
    self.transform = transform
    self.target_transform = target_transform
  
  def __getitem__(self, i):
    image_path = os.path.join(self.sub_meta[i])
    img = Image.open(image_path).convert('RGB')
    img = self.transform(img)
    target = self.target_transform(self.cl)
    return img, target
  
  def __len__(self):
    return len(self.sub_meta)

# EpisodicBatchSampler

In [0]:
class EpisodicBatchSampler(object):
  def __init__(self, n_classes, n_way, n_episodes):
    self.n_classes = n_classes
    self.n_way = n_way
    self.n_episodes = n_episodes
  
  def __len__(self):
    return self.n_episodes
  
  def __iter__(self):
    for i in range(self.n_episodes):
      yield torch.randperm(self.n_classes)[:self.n_way]

# SimpleHDF5Dataset

In [0]:
# feature loader

class SimpleHDF5Dataset:
  def __init__(self, file_handle = None):
    if file_handle == None:
      self.f = ''
      self.all_feats_dset = []
      self.all_labels = []
      self.total = 0
    else:
      self.f = file_handle
      self.all_feats_dset = self.f['all_feats'][...]
      self.all_labels = self.f['all_labels'][...]
      self.total = self.f['count'][0]
  
  def __getitem__(self, i):
    return torch.Tensor(self.all_feats_dset[i, :], int(self.all_labels[i]))
  
  def __len__(self):
    return self.total
  
def init_loader(filename):
  with h5py.File(filename, 'r') as f:
    fileset = SimpleHDF5Dataset(f)
  
  feats = fileset.all_feats_dset
  labels = fileset.all_labels
  while np.sum(feats[-1]) == 0:
    feats = np.delete(feats, -1, axis = 0)
    labels = np.delete(labels, -1, axis = 0)
  
  class_list = np.unique(np.array(labels)).tolist()
  inds = range(len(labels))
  
  cl_data_file = {}
  for cl in class_list:
    cl_data_file[cl] = []
  for ind in inds:
    cl_data_file[labels[ind]].append(feats[ind])
  
  return cl_data_file
  

# multiple models save, load


In [0]:
# save
torch.save({
    'modelA_state_dict': modelA.state_dict(),
    'modelB_state_dict': modelB.state_dict(),
    'optimizerA_state_dict': optimizerA.state_dict(),
    'optimizerB_state_dict': optimizerB.state_dict(),
    ...
}, PATH)

# load
modelA = TheModelAClass(*args, **kwargs)
modelB = TheModelBClass(*args, **kwargs)
optimizerA = TheOptimizerAClass(*args, **kwargs)
optimizerB = TheOptimizerBClass(*args, **kwargs)

checkpoint = torch.load(PATH)
modelA.load_state_dict(checkpoint['modelA_state_dict'])
modelB.load_state_dict(checkpoint['modelB_state_dict'])
optimizerA.load_state_dict(checkpoint['optimizerA_state_dict'])
optimizerB.load_state_dict(checkpoint['optimizerB_state_dict'])

modelA.eval()
modelB.eval()
# or
modelA.train()
modelB.train()


# saving and loading a general checkpoint for inference and/or resuming training


In [0]:
# save:  
torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss,
    ...

}, PATH)

# load:
model = TheModelClass(*args, **kwargs)
optimizer = TheOptimizerClass(*args, **kwargs)

checkpoint = torch.load(PATH)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']

model.eval()
# or
model.train()

# save/load entire model

In [0]:
# save
torch.save(model, PATH)
  
# load
# model class must be defined somewhere
model = torch.load(PATH)
model.eval()

# saving and loading model state_dict for inference

In [0]:
# save
torch.save(model.state_dict(), PATH)

# load:
model = TheModelClass(*args, **kwargs)
model.load_state_dict(torch.load(PATH))
model.eval()

# warmstarting model using parameters from a different model



In [0]:
# save
torch.save(modelA.state_dict(), PATH)
  
# load
modelB = TheModelBClass(*args, **kwargs)
modelB.load_state_dict(torch.load(PATH), strict=False)

# CNNEncoder

In [0]:
class CNNEncoder(nn.Module):
  def __init__(self):
    super(CNNEncoder, self).__init__()
    self.layer1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=3, padding=0),
                               nn.BatchNorm2d(64, momentum=1, affine=True),
                               nn.ReLU(),
                               nn.MaxPool2d(2))
    self.layer2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, padding=0),
                               nn.BatchNorm2d(64, momentum=1, affine=True),
                               nn.ReLU(),
                               nn.MaxPool2d(2))
    self.layer3 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, padding=1),
                               nn.BatchNorm2d(64, momentum=1, affine=True),
                               nn.ReLU()
                               )
    self.layer4 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, padding=1),
                               nn.BatchNorm2d(64, momentum=1, affine=True),
                               nn.ReLU()
                               )
  
  def forward(self, x):
    out = self.layer1(x)
    out = self.layer2(out)
    out = self.layer3(out)
    out = self.layer4(out)
    return out

# RelationNetwork

In [0]:
class RelationNetwork(nn.Module):
  def __init__(self, input_size, hidden_size):
    super(RelationNetwork, self).__init__()
    self.layer1 = nn.Sequential(nn.Conv2d(128, 64, kernel_size=3, padding=1),
                               nn.BatchNorm2d(64, momentum=1, affine=True),
                               nn.ReLU(),
                               nn.MaxPool2d(2))
    self.layer2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=3, padding=1),
                               nn.BatchNorm2d(64, momentum=1, affine=True),
                               nn.ReLU(),
                               nn.MaxPool2d(2))
    self.fc1 = nn.Linear(input_size, hidden_size)
    self.fc2 = nn.Linear(hidden_size, 1)
    
  def forward(self, x):
    out = self.layer1(x)
    out = self.layer2(x)
    out = out.view(out.size(0), -1)
    out = F.relu(self.fc1(out))
    out = F.sigmoid(self.fc2(out))
    return out

def weights_init(m):
  classname = m.__class__.__name__
  if classname.find('Conv') != -1:
    n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
    m.weight.data.normal_(0, math.sqrt(2. / n))
    if m.bias is not None:
      m.bias.data.zero_()
  elif classname.find('BatchNorm') != -1:
    m.weight.data.fill_(1)
    m.bias.data.zero_()
  elif classname.find('Linear') != -1:
    n = m.weight.size(1)
    m.weight.data.normal_(0, 0.01)
    m.bias.data = torch.ones(m.bias.data.size())


# FullyContextualEmbedding

In [0]:
class FullyContextualEmbedding(nn.Module):
  def __init__(self, feat_dim):
    super(FullyContextualEmbedding, self).__init__()
    self.lstmcell = nn.LSTMCell(feat_dim+2, feat_dim)
    self.softmax = nn.Softmax()
    self.c_0 = Variable(torch.zeros(1, feat_dim))
    self.feat_dim = feat_dim
    
  def forward(self, f, G):
    h = f
    c = self.c_0.expand_as(f)
    G_T = G.transpose(0, 1)
    K = G.size(0)
    for k in range(K):
      logit_a = h.mm(G_T)
      a = self.softmax(logit_a)
      r = a.mm(G)
      x = torch.cat((f, r), 1)
      
      h, c = self.lstmcell(x, (h, c))
      h = h + f
    return h
  
  def cuda(self):
    super(FullyContextualEmbedding, self).cuda()
    self.c_0 = self.c_0.cuda()
    return self

# init seed

In [0]:
def init_seed(opt):
  """
  Disable cudnn to maximize reproducibility
  """
  torch.cuda.cudnn_enabled = False
  np.random.seed(opt.manual_seed)
  torch.manual_seed(opt.manual_seed)
  torch.cuda.manual_seed(opt.manual_seed)

# show sixteen

In [0]:
def show_sixteen(images, titles=0):
    f, axarr = plt.subplots(4, 4, figsize=(15, 15), gridspec_kw={"wspace": 0, "hspace": 0})
    for idx, ax in enumerate(f.axes):
        ax.imshow(images[idx])
        ax.axis("off")
        if titles: ax.set_title(titles[idx])
    plt.show()

# show enhanced and original

In [0]:
def show_enhanced_and_original(enhanced_img, titles=0):
    f, axarr = plt.subplots(1, 2, figsize=(10, 6))
    axarr[0].imshow(img)
    axarr[0].axis("off")
    axarr[0].set_title(titles[0])
    axarr[1].imshow(enhanced_img)
    axarr[1].axis("off")
    axarr[1].set_title(titles[1])
    plt.show()

# show three magnitudes and original

In [0]:
def show_three_magnitutes_and_original(images, titles=0):
    f, axarr = plt.subplots(1, 4, figsize=(20, 10))
    for idx, ax in enumerate(axarr):
        if idx==0: ax.imshow(img)
        else: ax.imshow(images[idx-1])
        ax.axis("off")
        if titles: ax.set_title(titles[idx])
    plt.show()

# rotate with fill

In [0]:
def rotate_with_fill(img, magnitude):
    im2 = img.convert("RGBA")
    rot = im2.rotate(magnitude)
    fff = Image.new("RGBA", rot.size, (128,) * 4)
    out = Image.composite(rot, fff, rot)
    return out.convert(img.mode)

# auto augment 

In [0]:
from PIL import Image, ImageEnhance, ImageOps
import numpy as np
import random


class ImageNetPolicy(object):
    """ Randomly choose one of the best 24 Sub-policies on ImageNet.
        Example:
        >>> policy = ImageNetPolicy()
        >>> transformed = policy(image)
        Example as a PyTorch Transform:
        >>> transform=transforms.Compose([
        >>>     transforms.Resize(256),
        >>>     ImageNetPolicy(),
        >>>     transforms.ToTensor()])
    """
    def __init__(self, fillcolor=(128, 128, 128)):
        self.policies = [
            SubPolicy(0.4, "posterize", 8, 0.6, "rotate", 9, fillcolor),
            SubPolicy(0.6, "solarize", 5, 0.6, "autocontrast", 5, fillcolor),
            SubPolicy(0.8, "equalize", 8, 0.6, "equalize", 3, fillcolor),
            SubPolicy(0.6, "posterize", 7, 0.6, "posterize", 6, fillcolor),
            SubPolicy(0.4, "equalize", 7, 0.2, "solarize", 4, fillcolor),

            SubPolicy(0.4, "equalize", 4, 0.8, "rotate", 8, fillcolor),
            SubPolicy(0.6, "solarize", 3, 0.6, "equalize", 7, fillcolor),
            SubPolicy(0.8, "posterize", 5, 1.0, "equalize", 2, fillcolor),
            SubPolicy(0.2, "rotate", 3, 0.6, "solarize", 8, fillcolor),
            SubPolicy(0.6, "equalize", 8, 0.4, "posterize", 6, fillcolor),

            SubPolicy(0.8, "rotate", 8, 0.4, "color", 0, fillcolor),
            SubPolicy(0.4, "rotate", 9, 0.6, "equalize", 2, fillcolor),
            SubPolicy(0.0, "equalize", 7, 0.8, "equalize", 8, fillcolor),
            SubPolicy(0.6, "invert", 4, 1.0, "equalize", 8, fillcolor),
            SubPolicy(0.6, "color", 4, 1.0, "contrast", 8, fillcolor),

            SubPolicy(0.8, "rotate", 8, 1.0, "color", 2, fillcolor),
            SubPolicy(0.8, "color", 8, 0.8, "solarize", 7, fillcolor),
            SubPolicy(0.4, "sharpness", 7, 0.6, "invert", 8, fillcolor),
            SubPolicy(0.6, "shearX", 5, 1.0, "equalize", 9, fillcolor),
            SubPolicy(0.4, "color", 0, 0.6, "equalize", 3, fillcolor),

            SubPolicy(0.4, "equalize", 7, 0.2, "solarize", 4, fillcolor),
            SubPolicy(0.6, "solarize", 5, 0.6, "autocontrast", 5, fillcolor),
            SubPolicy(0.6, "invert", 4, 1.0, "equalize", 8, fillcolor),
            SubPolicy(0.6, "color", 4, 1.0, "contrast", 8, fillcolor),
            SubPolicy(0.8, "equalize", 8, 0.6, "equalize", 3, fillcolor)
        ]


    def __call__(self, img):
        policy_idx = random.randint(0, len(self.policies) - 1)
        return self.policies[policy_idx](img)

    def __repr__(self):
        return "AutoAugment ImageNet Policy"


class CIFAR10Policy(object):
    """ Randomly choose one of the best 25 Sub-policies on CIFAR10.
        Example:
        >>> policy = CIFAR10Policy()
        >>> transformed = policy(image)
        Example as a PyTorch Transform:
        >>> transform=transforms.Compose([
        >>>     transforms.Resize(256),
        >>>     CIFAR10Policy(),
        >>>     transforms.ToTensor()])
    """
    def __init__(self, fillcolor=(128, 128, 128)):
        self.policies = [
            SubPolicy(0.1, "invert", 7, 0.2, "contrast", 6, fillcolor),
            SubPolicy(0.7, "rotate", 2, 0.3, "translateX", 9, fillcolor),
            SubPolicy(0.8, "sharpness", 1, 0.9, "sharpness", 3, fillcolor),
            SubPolicy(0.5, "shearY", 8, 0.7, "translateY", 9, fillcolor),
            SubPolicy(0.5, "autocontrast", 8, 0.9, "equalize", 2, fillcolor),

            SubPolicy(0.2, "shearY", 7, 0.3, "posterize", 7, fillcolor),
            SubPolicy(0.4, "color", 3, 0.6, "brightness", 7, fillcolor),
            SubPolicy(0.3, "sharpness", 9, 0.7, "brightness", 9, fillcolor),
            SubPolicy(0.6, "equalize", 5, 0.5, "equalize", 1, fillcolor),
            SubPolicy(0.6, "contrast", 7, 0.6, "sharpness", 5, fillcolor),

            SubPolicy(0.7, "color", 7, 0.5, "translateX", 8, fillcolor),
            SubPolicy(0.3, "equalize", 7, 0.4, "autocontrast", 8, fillcolor),
            SubPolicy(0.4, "translateY", 3, 0.2, "sharpness", 6, fillcolor),
            SubPolicy(0.9, "brightness", 6, 0.2, "color", 8, fillcolor),
            SubPolicy(0.5, "solarize", 2, 0.0, "invert", 3, fillcolor),

            SubPolicy(0.2, "equalize", 0, 0.6, "autocontrast", 0, fillcolor),
            SubPolicy(0.2, "equalize", 8, 0.8, "equalize", 4, fillcolor),
            SubPolicy(0.9, "color", 9, 0.6, "equalize", 6, fillcolor),
            SubPolicy(0.8, "autocontrast", 4, 0.2, "solarize", 8, fillcolor),
            SubPolicy(0.1, "brightness", 3, 0.7, "color", 0, fillcolor),

            SubPolicy(0.4, "solarize", 5, 0.9, "autocontrast", 3, fillcolor),
            SubPolicy(0.9, "translateY", 9, 0.7, "translateY", 9, fillcolor),
            SubPolicy(0.9, "autocontrast", 2, 0.8, "solarize", 3, fillcolor),
            SubPolicy(0.8, "equalize", 8, 0.1, "invert", 3, fillcolor),
            SubPolicy(0.7, "translateY", 9, 0.9, "autocontrast", 1, fillcolor)
        ]


    def __call__(self, img):
        policy_idx = random.randint(0, len(self.policies) - 1)
        return self.policies[policy_idx](img)

    def __repr__(self):
        return "AutoAugment CIFAR10 Policy"


class SVHNPolicy(object):
    """ Randomly choose one of the best 25 Sub-policies on SVHN.
        Example:
        >>> policy = SVHNPolicy()
        >>> transformed = policy(image)
        Example as a PyTorch Transform:
        >>> transform=transforms.Compose([
        >>>     transforms.Resize(256),
        >>>     SVHNPolicy(),
        >>>     transforms.ToTensor()])
    """
    def __init__(self, fillcolor=(128, 128, 128)):
        self.policies = [
            SubPolicy(0.9, "shearX", 4, 0.2, "invert", 3, fillcolor),
            SubPolicy(0.9, "shearY", 8, 0.7, "invert", 5, fillcolor),
            SubPolicy(0.6, "equalize", 5, 0.6, "solarize", 6, fillcolor),
            SubPolicy(0.9, "invert", 3, 0.6, "equalize", 3, fillcolor),
            SubPolicy(0.6, "equalize", 1, 0.9, "rotate", 3, fillcolor),

            SubPolicy(0.9, "shearX", 4, 0.8, "autocontrast", 3, fillcolor),
            SubPolicy(0.9, "shearY", 8, 0.4, "invert", 5, fillcolor),
            SubPolicy(0.9, "shearY", 5, 0.2, "solarize", 6, fillcolor),
            SubPolicy(0.9, "invert", 6, 0.8, "autocontrast", 1, fillcolor),
            SubPolicy(0.6, "equalize", 3, 0.9, "rotate", 3, fillcolor),

            SubPolicy(0.9, "shearX", 4, 0.3, "solarize", 3, fillcolor),
            SubPolicy(0.8, "shearY", 8, 0.7, "invert", 4, fillcolor),
            SubPolicy(0.9, "equalize", 5, 0.6, "translateY", 6, fillcolor),
            SubPolicy(0.9, "invert", 4, 0.6, "equalize", 7, fillcolor),
            SubPolicy(0.3, "contrast", 3, 0.8, "rotate", 4, fillcolor),

            SubPolicy(0.8, "invert", 5, 0.0, "translateY", 2, fillcolor),
            SubPolicy(0.7, "shearY", 6, 0.4, "solarize", 8, fillcolor),
            SubPolicy(0.6, "invert", 4, 0.8, "rotate", 4, fillcolor),
            SubPolicy(0.3, "shearY", 7, 0.9, "translateX", 3, fillcolor),
            SubPolicy(0.1, "shearX", 6, 0.6, "invert", 5, fillcolor),

            SubPolicy(0.7, "solarize", 2, 0.6, "translateY", 7, fillcolor),
            SubPolicy(0.8, "shearY", 4, 0.8, "invert", 8, fillcolor),
            SubPolicy(0.7, "shearX", 9, 0.8, "translateY", 3, fillcolor),
            SubPolicy(0.8, "shearY", 5, 0.7, "autocontrast", 3, fillcolor),
            SubPolicy(0.7, "shearX", 2, 0.1, "invert", 5, fillcolor)
        ]


    def __call__(self, img):
        policy_idx = random.randint(0, len(self.policies) - 1)
        return self.policies[policy_idx](img)

    def __repr__(self):
        return "AutoAugment SVHN Policy"


class SubPolicy(object):
    def __init__(self, p1, operation1, magnitude_idx1, p2, operation2, magnitude_idx2, fillcolor=(128, 128, 128)):
        ranges = {
            "shearX": np.linspace(0, 0.3, 10),
            "shearY": np.linspace(0, 0.3, 10),
            "translateX": np.linspace(0, 150 / 331, 10),
            "translateY": np.linspace(0, 150 / 331, 10),
            "rotate": np.linspace(0, 30, 10),
            "color": np.linspace(0.0, 0.9, 10),
            "posterize": np.round(np.linspace(8, 4, 10), 0).astype(np.int),
            "solarize": np.linspace(256, 0, 10),
            "contrast": np.linspace(0.0, 0.9, 10),
            "sharpness": np.linspace(0.0, 0.9, 10),
            "brightness": np.linspace(0.0, 0.9, 10),
            "autocontrast": [0] * 10,
            "equalize": [0] * 10,
            "invert": [0] * 10
        }

        # from https://stackoverflow.com/questions/5252170/specify-image-filling-color-when-rotating-in-python-with-pil-and-setting-expand
        def rotate_with_fill(img, magnitude):
            rot = img.convert("RGBA").rotate(magnitude)
            return Image.composite(rot, Image.new("RGBA", rot.size, (128,) * 4), rot).convert(img.mode)

        func = {
            "shearX": lambda img, magnitude: img.transform(
                img.size, Image.AFFINE, (1, magnitude * random.choice([-1, 1]), 0, 0, 1, 0),
                Image.BICUBIC, fillcolor=fillcolor),
            "shearY": lambda img, magnitude: img.transform(
                img.size, Image.AFFINE, (1, 0, 0, magnitude * random.choice([-1, 1]), 1, 0),
                Image.BICUBIC, fillcolor=fillcolor),
            "translateX": lambda img, magnitude: img.transform(
                img.size, Image.AFFINE, (1, 0, magnitude * img.size[0] * random.choice([-1, 1]), 0, 1, 0),
                fillcolor=fillcolor),
            "translateY": lambda img, magnitude: img.transform(
                img.size, Image.AFFINE, (1, 0, 0, 0, 1, magnitude * img.size[1] * random.choice([-1, 1])),
                fillcolor=fillcolor),
            "rotate": lambda img, magnitude: rotate_with_fill(img, magnitude),
            # "rotate": lambda img, magnitude: img.rotate(magnitude * random.choice([-1, 1])),
            "color": lambda img, magnitude: ImageEnhance.Color(img).enhance(1 + magnitude * random.choice([-1, 1])),
            "posterize": lambda img, magnitude: ImageOps.posterize(img, magnitude),
            "solarize": lambda img, magnitude: ImageOps.solarize(img, magnitude),
            "contrast": lambda img, magnitude: ImageEnhance.Contrast(img).enhance(
                1 + magnitude * random.choice([-1, 1])),
            "sharpness": lambda img, magnitude: ImageEnhance.Sharpness(img).enhance(
                1 + magnitude * random.choice([-1, 1])),
            "brightness": lambda img, magnitude: ImageEnhance.Brightness(img).enhance(
                1 + magnitude * random.choice([-1, 1])),
            "autocontrast": lambda img, magnitude: ImageOps.autocontrast(img),
            "equalize": lambda img, magnitude: ImageOps.equalize(img),
            "invert": lambda img, magnitude: ImageOps.invert(img)
        }

        # self.name = "{}_{:.2f}_and_{}_{:.2f}".format(
        #     operation1, ranges[operation1][magnitude_idx1],
        #     operation2, ranges[operation2][magnitude_idx2])
        self.p1 = p1
        self.operation1 = func[operation1]
        self.magnitude1 = ranges[operation1][magnitude_idx1]
        self.p2 = p2
        self.operation2 = func[operation2]
        self.magnitude2 = ranges[operation2][magnitude_idx2]


    def __call__(self, img):
        if random.random() < self.p1: img = self.operation1(img, self.magnitude1)
        if random.random() < self.p2: img = self.operation2(img, self.magnitude2)
        return img

# tensorboard

In [0]:
with test_summary_writer.as_default():
          summary.scalar('loss', test_loss, step=self.globaliter)
          summary.scalar('accuracy', accuracy, step=self.globaliter)

with train_summary_writer.as_default():
          tf.summary.scalar('loss', loss.item(), step=globaliter)

@tf.function
def my_func(step, loss):
  with train_summary_writer.as_default():
    tf.summary.scalar("loss", loss.item(), step)
    tf.summary.histogram("weights", w)
    tf.summary.image('input', x_image, 3)
# call this function during training
my_func(globaliter, loss)

    
%tensorboard --logdir logs/tensorboard


# check runtime

In [0]:
from tensorflow.python.client import device_lib
device_lib.list_local_devices()

# weights_init_normal

In [0]:
def weights_init_normal(m):
  classname = m.__class__.__name__
  if classname.find("Conv") != -1:
    torch.nn.init.normal_(m.weight.data, 0.0, 0.02)
  elif classname.find("BatchNorm2d") != -1:
    torch.nn.init.normal_(m.weight.data, 1.0, 0.02)
    torch.nn.init.constant_(m.bias.data, 0.0)

# ResidualBlock

In [0]:
class ResidualBlock(nn.Module):
  # 1
  def __init__(self, in_features):
  # 2
  def __init__(self, in_features, norm="in"):
    super(ResidualBlock, self).__init__()
    
    conv_block = [
        nn.Conv2d(in_features, in_features, 3, stride=1, padding=1, bias=False),
        nn.InstanceNorm2d(in_features, affine=True, track_running_stats=True),
        nn.ReLU(inplace=True),
        nn.Conv2d(in_features, in_features, 3, stride=1, padding=1, bias=False),
        nn.InstanceNorm2d(in_features, affine=True, track_running_stats=True)
    ]
    
    # 2
    norm_layer = AdaptiveInstanceNorm2d if norm == "adain" else nn.InstanceNorm2d
    
    conv_block = [
        nn.ReflectionPad2d(1),
        nn.Conv2d(in_features, in_features, 3),
        norm_layer(in_features),
        nn.ReLU(inplace=True),
        nn.ReflectionPad2d(1),
        nn.Conv2d(in_features, in_features, 3),
        norm_layer(in_features),        
    ]
    
    self.conv_block = nn.Sequential(*conv_block)
    
    
    
  
  def forward(self, x):
    return x + self.conv_block(x)

# GeneratorResNet

In [0]:
class GeneratorResNet(nn.Module):
  def __init__(self, img_shape=(3, 128, 128), res_blocks=9, c_dim=5):
    super(GeneratorResNet, self).__init__()
    channels, img_size, _ = img_shape
    
    # initial convolutional block
    model = [
        nn.Conv2d(channels + c_dim, 64, 7, stride=1, padding=3, bias=False),
        nn.InstanceNorm2d(64, affine=True, track_running_stats=True),
        nn.ReLU(inplace=True)
    ]
    
    # downsampling
    curr_dim = 64
    for _ in range(2):
      model += [
          nn.Conv2d(curr_dim, curr_dim * 2, 4, stride=2, padding=1, bias=False),
          nn.InstanceNorm2d(curr_dim=2, affine=True, track_running_stats=True),
          nn.ReLU(inplace=True)
      ]
      curr_dim *= 2
    
    # residual blocks
    for _ in range(res_blocks):
      model += [ResidualBlock(curr_dim)]
    
    # upsampling
    for _ in range(2):
      mode += [
          nn.ConvTranspose2d(curr_dim, curr_dim // 2, 4, stride=2, padding=1, bias=False),
          nn.InstanceNorm2d(curr_dim // 2, affine=True, track_running_stats=True),
          nn.ReLU(inplace=True)
      ]
      curr_dim = curr_dim // 2
    
    # output layer
    model += [nn.Conv2d(curr_dim, channels, 7, stride=1, padding=3), nn.Tanh()]
    
    self.model = nn.Sequential(*model)
  
  def forward(self, x, c):
    c = c.view(c.size(0), c.size(1), 1, 1)
    c = c.repeat(1, 1, x.size(2), x.size(3))
    x = torch.cat((x, c), 1)
    return self.model(x)

# Discriminator

In [0]:
class Discriminator(nn.Module):
  # 1
  def __init__(self, img_shape=(3, 128, 128), c_dim=5, n_strided=6):
  # 2
  def __init__(self, input_shape):
  # 3
  def __init__(self):
  # 4
  def __init__(self, in_channels=3):
    super(Discriminator, self).__init__()
    # 1
    channels, img_size, _ = img_shape
    
    # 2
    channels, height, width = input_shape
    
    # calculate output of image discriminator (PatchGAN)
    # 1
    self.output_shape = (1, height // 2 ** 3, width // 2 ** 3)
    # 2
    self.output_shape = (1, height // 2 ** 4, width // 2 ** 4)
    
    # 1
    def discriminator_block(in_filters, out_filters):
      """
      Returns downsampling layers of each discriminator block
      """
      layers = [nn.Conv2d(in_filters, out_filters, 4, stride=2, padding=1), nn.LeakyReLU(0.01)]
      return layers
    
    # 2
    def discriminator_block(in_filters, out_filters, normalization=True):
      """
      Returns downsampling layers of each discriminator block
      """
      # 1
      layers = [nn.Conv2d(in_filters, out_filters, 4, stride=2, padding=1)]
      # 2
      layers = [nn.Conv2d(in_filters, out_filters, 3, stride=2, padding=1), nn.LeakyReLU(0.2, inplace=True),
               nn.Dropout2d(0.25)]
      if normalization:
        # 1
        layers.append(nn.InstanceNorm2d(out_filters))
        # 2
        layers.append(nn.BatchNorm2d(out_filters, 0.8))
      layers.append(nn.LeakyReLU(0.2, inplace=True))
      return layers
    # 3
    def discriminator_block(in_filters, out_filters, first_block=False):
      layers = []
      layers.append(nn.Conv2d(in_filters, out_filters, kernel_size=3, stride=1, padding=1))
      if not first_block:
        layers.append(nn.BatchNorm2d(out_filters))
      layers.append(nn.LeakyReLU(0.2, inplace=True))
      layers.append(nn.Conv2d(out_filters, out_filters, kernel_size=3, stride=2, padding=1))
      layers.append(nn.BatchNorm2d(out_filters))
      layers.append(nn.LeakyReLU(0.2, inplace=True))
      return layers
    
    
    
    # 1
    layers = discriminator_block(channels, 64)
    curr_dim = 64
    for _ in range(n_strided - 1):
      layers.extend(discriminator_block(curr_dim, curr_dim * 2))
      curr_dim *= 2
    
    self.model = nn.Sequentail(*layers)
    
    # output 1: PatchGAN
    self.out1 = nn.Conv2d(curr_dim, 1, 3, padding=1, bias=False)
    
    # output 2: class prediction
    kernel_size = img_size // 2 ** n_strided
    self.out2 = nn.Conv2d(curr_dim, c_dim, kernel_size, bias=False)
  
    # 2
    self.model = nn.Sequential(
        # 1
        *discriminator_block(channels, 64, normalization=False),
        # 2
        *discriminator_block(channels * 2, 64, normalization=False),
        *discriminator_block(64, 128),
        # 1
        *discriminator_block(128, 128),
        # 2
        *discriminator_block(128, 256),
        *discriminator_block(256, 512),
        
        nn.ZeroPad2d((1, 0, 1, 0)),
        # 1
        nn.Conv2d(256, 1, 4, padding=1)
        # 2
        nn.Conv2d(256, 1, 4)
        # 3
        nn.Conv2d(512, 1, 4, padding=1, bias=False)
       
    )
    
    # 3
    self.model = nn.Sequential(
        # 1
        nn.Linear(opt.img_size ** 2, 512),
        # 2
        nn.Linear(int(np.prod(img_shape)), 512),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Linear(512, 256),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Linear(256, 1),
        # 1/2
        nn.Sigmoid(),
    )
  
    # 4
    
    layers = []
    in_filters = in_channels
    for i, out_filters in enumerate([64, 128, 256, 512]):
      layers.extend(discriminator_block(in_filters, out_filters, first_block=(i == 0)))
      in_filters = out_filters
    
    layers.append(nn.Conv2d(out_filters, 1, kernel_size=3, stride=1, padding=1))
    
    self.model = nn.Sequential(*layers)
    
    # 5
    self.model = nn.Sequential(
        *discriminator_block(channels, 16, normalization=False),
        *discriminator_block(16, 32),
        *discriminator_block(32, 64),
        *discriminator_block(64, 128),
    )
    
    # the height and width of downsampled image
    ds_size = img_size // 2 ** 4
    self.adv_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, 1), 
                                   # 1/2
                                   nn.Sigmoid())
    # 1/2
    self.aux_layer = nn.Sequential(nn.Linear(128 * ds_size ** 2, n_classes), nn.Softmax())
    
    # 6
    # upsampling
    self.down = nn.Sequential(nn.Conv2d(channels, 64, 3, 2, 1), nn.ReLU())
    # fully connected layers
    self.down_size = img_size // 2
    down_dim = 64 * (img_size // 2) ** 2
    
    self.embedding = nn.Linear(down_dim, 32)
    
    self.fc = nn.Sequential(
        nn.BatchNorm1d(32, 0.8),
        nn.ReLU(inplace=True),
        nn.Linear(32, down_dim),
        nn.BatchNorm1d(down_dim),
        nn.ReLU(inplace=True),
    )
    
    # upsampling
    self.up = nn.Sequential(nn.Upsample(scale_factor=2), nn.Conv2d(64, channels, 3, 1, 1))
  
  # 1
  def forward(self, img):
    feature_repr = self.model(img)
    out_adv = self.out1(feature_repr)
    out_cls = self.out2(feature_repr)
    return out_adv, out_cls.view(out_cls.size(0), -1)
  
  # 2
  def forward(self, img_A, img_B):
    # concatenate image and condition image by channels to produce input
    img_input = torch.cat((img_A, img_B), 1)
    return self.model((img_input))
  
  # 3
  def forward(self, img):
    img_flat = img.view(img.shape[0], -1)
    validity = self.model(img_flat)
    return validity
  
  # 4
  def forward(self, img):
    out = self.model(img)
    out = out.view(out.shape[0], -1)
    validity = self.adv_layer(out)
    return validity
  
  # 5
  def forward(self, img):
    out = self.down(img)
    embedding = self.embedding(out.view(out.size(0), -1))
    out = self.fc(embedding)
    out = self.up(out.view(out.size(0), 64, self.down_size, self.down_size))
    return out, embedding
  
  # 6
  def forward(self, img):
    out = self.model(img)
    out = out.view(out.shape[0], -1)
    validity = self.adv_layer(out)
    label = self.aux_layer(out)
    return validity, label

# FeatureExtractor

In [0]:
class FeatureExtractor(nn.Module):
  def __init__(self):
    super(FeatureExtractor, self).__init__()
    vgg19_model = models.vgg19(pretrained=True)
    self.feature_extractor = nn.Sequential(*list(vgg19_model.features.children())[:18])
  
  def forward(self, img):
    return self.feature_extractor(img)

# UNetDown

In [0]:
# U-NET

class UNetDown(nn.Module):
  def __init__(self, in_size, out_size, normalize=True, dropout=0.0):
    super(UNetDown, self).__init__()
    # 1
    layers = [nn.Conv2d(in_size, out_size, 3, stride=2, padding=1, bias=False)]
    # 2
    layers = [nn.Conv2d(in_size, out_size, 4, stride=2, padding=1, bias=False)]
    if normalize:
      # 1
      layers.append(nn.BatchNorm2d(out_size, 0.8))
      # 2
      layers.append(nn.InstanceNorm2d(out_size))
      # 3
      layers.append(nn.InstanceNorm2d(out_size, affine=True))
    if dropout:
      layers.append(nn.Dropout(dropout))
    layers.append(nn.LeakyReLU(0.2))
    self.model = nn.Sequential(*layers)
  
  def forward(self, x):
    return self.model(x)

# UNetUp

In [0]:
class UNetUp(nn.Module):
  # 1
  def __init__(self, in_size, out_size, dropout=0.0):
  # 2
  def __init__(self, in_size, out_size):
    super(UNetUp, self).__init__()
    # 1
    self.model = nn.Sequential(
        nn.Upsample(scale_factor=2),
        nn.Conv2d(in_size, out_size, 3, stride=1, padding=1, bias=False),
        nn.BatchNorm2d(out_size, 0.8),
        nn.ReLU(inplace=True),
    )
    
    # 2
    layers = [# 1
              nn.ConvTranspose2d(in_size, out_size, 4, 2, 1), 
              # 2
              nn.ConvTranspose2d(in_size, out_size, 4, 2, 1, bias=False), 
              # 1
              nn.InstanceNorm2d(out_size), 
              # 2
              nn.BatchNorm2d(out_size, 0.8),
              nn.ReLU(inplace=True)]
    if dropout:
      layers.append(nn.Dropout(dropout))
    self.model = nn.Sequential(*layers)
    
  def forward(self, x, skip_input):
    x = self.model(x)
    x = torch.cat((x, skip_input), 1)
    return x
  

# Encoder

In [0]:
# 1
class Encoder(nn.Module):
  def __init__(self, latent_dim, input_shape):
    super(Encoder, self).__init__()
  
    resnet18_model = resnet18(pretrained=False)
    self.feature_extractor = nn.Sequential(*list(resnet18_model.children())[:-3])
    self.pooling = nn.AvgPool2d(kernel_size=8, stride=8, padding=0)
    # output is mu and log(var) for reparameterization rick used in VAEs
    self.fc_mu = nn.Linear(256, latent_dim)
    self.fc_logvar = nn.Linear(256, latent_dim)
    
  def forward(self, img):
    out = self.feature_extractor(img)
    out = self.pooling(out)
    out = out.view(out.size(0), -1)
    mu = self.fc_mu(out)
    logvar = self.fc_logvar(out)
    return mu, logvar

# 2
class Encoder(nn.Module):
  def __init__(self, in_channels=3, dim=64, n_residual=3, n_downsample=2, style_dim=8):
    super(Encoder, self).__init__()
  
    self.content_encoder = ContentEncoder(in_channels, dim, n_residual, n_downsample)
    self.style_encoder = StyleEncoder(in_channels, dim, n_downsample, style_dim)
    
  def forward(self, img):
    content_code = self.content_encoder(x)
    style_code = self.style_encoder(x)
    return content_code, style_code

# GeneratorUNet

In [0]:
class GeneratorUNet(nn.Module):
  # 1
  def __init__(self, latent_dim, img_shape):
  # 2
  def __init__(self, channels=3):
  # 3
  def __init__(self, in_channels=3, out_channels=3):
    super(GeneratorUNet, self).__init__()
    channels, self.h, self.w = img_shape
    
    # 1
    self.fc = nn.Linear(latent_dim, self.h * self.w)
    
    # 1
    self.down1 = UNetDown(channels + 1, 64, normalize=False)
    # 2
    self.down1 = UNetDown(channels, 64, normalize=False)
    self.down2 = UNetDown(64, 128)
    # 1
    self.down3 = UNetDown(128, 256)
    # 2
    self.down3 = UNetDown(128 + channels, 256, dropout=0.5)
    # 3
    self.down3 = UNetDown(128, 256, dropout=0.5)
    # 1
    self.down4 = UNetDown(256, 512)
    self.down5 = UNetDown(512, 512)
    self.down6 = UNetDown(512, 512)
    self.down7 = UNetDown(512, 512, normalize=False)
    # 2
    self.down4 = UNetDown(256, 512, dropout=0.5)
    self.down5 = UNetDown(512, 512, dropout=0.5)
    self.down6 = UNetDown(512, 512, dropout=0.5)
    self.down7 = UNetDown(512, 512, normalize=False, dropout=0.5)
    # 1
    self.up1 = UNetUp(512, 512)
    self.up2 = UNetUp(1024, 512)
    self.up3 = UNetUp(1024, 512)
    # 2
    self.up1 = UNetUp(512, 512, dropout=0.5)
    self.up2 = UNetUp(1024, 512, dropout=0.5)
    self.up3 = UNetUp(1024, 512, dropout=0.5)
    # 1/2
    self.up4 = UNetUp(1024, 512, dropout=0.5)
    # 1
    self.up4 = UNetUp(1024, 256)
    self.up5 = UNetUp(512, 128)
    self.up6 = UNetUp(256, 64)
    # 2
    self.up5 = UNetUp(1024, 256)
    self.up6 = UNetUp(512, 128)
    self.up7 = UNetUp(256, 64)
  
    # 1
    self.final = nn.Sequential(
        nn.Upsample(scale_factor),
        nn.Conv2d(128, channels, 3, stride=1, padding=1),
        nn.Tanh()
    )
    
    # 2
    channels, _, _ = input_shape
    
    self.down1 = UNetDown(channels, 64, normalize=False)
    self.down2 = UNetDown(64, 128)
    self.down3 = UNetDown(128, 256, dropout=0.5)
    self.down4 = UNetDown(256, 512, dropout=0.5)
    self.down5 = UNetDown(512, 512, dropout=0.5)
    # 1
    self.down6 = UNetDown(512, 512, dropout=0.5, normalize=False)
    # 2
    self.down6 = UNetDown(512, 512, dropout=0.5)
    
    self.up1 = UNetUp(512, 512, dropout=0.5)
    self.up2 = UNetUp(1024, 512, dropout=0.5)
    self.up3 = UNetUp(1024, 256)
    self.up4 = UNetUp(512, 128)
    self.up5 = UNetUp(256, 64)
    
    # 2
    self.final = nn.Sequential(
        nn.Upsample(scale_factor=2),
        nn.ZeroPad2d((1, 0, 1, 0)),
        # 1
        nn.Conv2d(128, channels, 4, padding=1),
        # 2
        nn.Conv2d(128, out_channels, 4, padding=1),
        nn.Tanh()
    )
  
    # 3
    self.final = nn.Sequential(
        nn.ConvTranspose2d(128, channels, 4, stride=2, padding=1),
        nn.Tanh()
    )
  
  
  def forward(self, x, c):
    # 1
    # propagate noise through fc layer and reshape to img shape
    z = self.fc(z).view(z.size(0), 1, self.h, self.w)
    d1 = self.down1(torch.cat((x, z), 1))
    d2 = self.down2(d1)
    d3 = self.down3(d2)
    d4 = self.down4(d3)
    d5 = self.down5(d4)
    d6 = self.down6(d5)
    d7 = self.down7(d6)
    u1 = self.up1(d7, d6)
    u2 = self.up2(u1, d5)
    u3 = self.up3(u2, u4)
    u4 = self.up4(u3, d3)
    u5 = self.up5(u4, d2)
    u6 = self.up6(u5, d1)
    return self.final(u6)
  
    # 2
    # U-Net generator with skip connections from encoder to decoder
    d1 = self.down1(x)
    d2 = self.down2(d1)
    d2 = torch.cat((d2, x_lr), 1)
    d3 = self.down3(d2)
    d4 = self.down4(d3)
    d5 = self.down5(d4)
    d6 = self.down6(d5)
    u1 = self.up1(d6, d5)
    u2 = self.up2(u1, d4)
    u3 = self.up3(u2, d3)
    u4 = self.up4(u3, d2)
    u5 = self.up5(u4, d1)
    return self.final(u5)
  
    # 3
    # U-Net generator with skip connections from encoder to decoder
    d1 = self.down1(x)
    d2 = self.down2(d1)
    # d2 = torch.cat((d2, x_lr), 1)
    d3 = self.down3(d2)
    d4 = self.down4(d3)
    d5 = self.down5(d4)
    d6 = self.down6(d5)
    d7 = self.down7(d6)
    d8 = self.down8(d7)
    u1 = self.up1(d8, d7)
    u2 = self.up2(u1, d6)
    u3 = self.up3(u2, d5)
    u4 = self.up4(u3, d4)
    u5 = self.up5(u4, d3)
    u6 = self.up6(u5, d2)
    u7 = self.up7(u6, d1)
    return self.final(u7)

# Generator

In [0]:
class Generator(nn.Module):
  def __init__(self):
    super(Generator, self).__init__()
    
    # 1
    def block(in_feat, out_feat, normalize=True):
      layers = [nn.Linear(in_feat, out_feat)]
      if normalize:
        layers.append(nn.BatchNorm1d(out_feat, 0.8))
      layers.append(nn.LeakyReLU(0.2, inplace=True))
      return layers
    
    self.model = nn.Sequential(
        # 1
        *block(latent_dim, 128, normalize=False),
        # 2
        *block(latent_dim + n_classes, 128, normalize=False),
        *block(128, 256),
        *block(256, 512),
        *block(512, 1024),
        nn.Linear(1024, int(np.prod(img_shape))),
        nn.Tanh()
    )
    
    # 2
    
    # 1/2
    self.label_emb = nn.Embedding(n_classes, latent_dim)
    
    self.init_size = img_size // 4
    self.l1 = nn.Sequential(nn.Linear(latent_dim, 128 * self.init_size ** 2))
    
    self.conv_blocks = nn.Sequential(
        nn.BatchNorm2d(128),
        nn.Upsample(scale_factor=2),
        nn.Conv2d(128, 128, 3, stride=1, padding=1),
        nn.BatchNorm2d(128, 0.8),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Upsample(scale_factor=2),
        nn.Conv2d(128, 64, 3, stride=1, padding=1),
        nn.BatchNorm2d(64, 0.8),
        nn.LeakyReLU(0.2, inplace=True),
        nn.Conv2d(64, channels, 3, stride=1, padding=1),
        nn.Tanh(),
    )
  # 1  
  def forward(self, z):
    img = self.model(z)
    img = img.view(img.shape[0], *img_shape)
    return img
  # 2
  def forward(self, noise):
    out = self.l1(noise)
    out = out.view(out.shape[0], 128, self.init_size, self.init_size)
    img = self.conv_blocks(out)
    return img
  # 3
  def forward(self, noise, labels):
    gen_input = torch.mul(self.label_emb(labels), noise)
    out = self.l1(gen_input)
    out = out.view(out.shape[0], 128, self.init_size, self.init_size)
    img = self.conv_blocks(out)
    return img

# GAN optimizer

In [0]:
# optimizer, example, Adadelta, Adagrad, SparseAdam, Adamax, ASGD, LBFGS, RMSprop, Rprop, SGD
optimizer_G = optim.Adam(generator.parameters(), lr=learning_rate, betas=(b1, b2))
optimizer_D = optim.Adam(discriminator.parameters(), lr=learning_rate, betas=(b1, b2))

# gradient penalty

In [0]:
def compute_gradient_penalty(D, X):
  """
  calculates the gradient penalty loss for DRAGAN
  """
  # random weight term for interpolation
  alpha = Tensor(np.random.random(size=X.shape))
  
  interpolates = alpha * X + ((1 - alpha) * (X + 0.5 * X.std() * torch.rand(X.size())))
  interpolates = Variable(interpoates, requires_grad=True)
  
  d_interpolates = D(interpolates)
  fake = Variable(Tensor(X.shape[0], 1).fill_(1.0), requires_grad=False)
  
  # get gradient with respect to interpolates
  gradients = autograd.grad(
      outputs=d_interpolates,
      inputs=interpolates,
      grad_outputs=fake,
      create_graph=True,
      retain_graph=True,
      only_inputs=True,
  )[0]
  
  gradient_penalty = lambda_gp * ((gradients.norm(2, dim=1) - 1) ** 2).mean()
  return gradient_penalty

# initialize generator and discriminator

In [0]:
# initialize generator and discriminator
generator = Generator().to(device)
discriminator = Discriminator().to(device)

# GAN optimizers

In [0]:
optimizer_G = Adam(generator.parameters(), lr=learning_rate, betas=(b1, b2))
optimizer_D = Adam(discriminator.parameters(), lr=learning_rate, betas=(b1, b2))

# GAN hyperparameters

In [0]:
# GAN hyperparameters

num_epochs =  200
batch_size = 64
learning_rate = 0.0002
b1 = 0.5 # adam: decay of first order momentum of gradient
b2 = 0.999 # adam: deccay of first order momentum of gradient
latent_dim = 100 # dimensionality of the latent space
img_size = 28 # size of each image dimension
n_classes = 10 # number of classes for dataset
channels = 1 # number of image channels
sample_interval = 400 # interval between image samples
img_shape = (channels, img_size, img_size)

# boundary seeking loss

In [0]:
def boundary_seeking_loss(y_pred, y_true):
  return 0.5 * torch.mean((torch.log(y_pred) - torch.log(1 - y_pred)) ** 2)

# GAN train generator, discriminator

In [0]:
if os.path.isfile('./model.tar'):
  checkpoint = torch.load('model.tar')
  generator.load_state_dict(checkpoint['modelA_state_dict'])
  discriminator.load_state_dict(checkpoint['modelB_state_dict'])
  optimizer_G.load_state_dict(checkpoint['optimizerA_state_dict'])
  optimizer_D.load_state_dict(checkpoint['optimizerB_state_dict'])

prev_time = time.time()
iters = 0
for epoch in range(num_epochs):
  for i, (imgs, _) in enumerate(mnist_loader):
    
    # adversarial ground truths
    valid = Variable(Tensor(imgs.shape[0], 1).fill_(1.0), requires_grad=False)
    fake = Variable(Tensor(imgs.shape[0], 1).fill_(0.0), requires_grad=False)
    
    # configure input
    real_imgs = Variable(imgs.type(Tensor))
    
    # TRAIN GENERATOR
    
    optimizer_G.zero_grad()
    
    # sample noise as generator input
    z = Variable(Tensor(np.random.normal(0, 1, (imgs.shape[0], latent_dim))))
    
    # generate a batch of images
    gen_imgs = generator(z)
    gen_labels = Variable(LongTensor(np.random.randint(0, n_classes, imgs.shape[0])))
    
    
    # 1
    # loss measures generator's ability to fool the discriminator
    g_loss = boundary_seeking_loss(discriminator(gen_imgs), valid)
    
    # 2
    # loss measures generator's ability to fool the discriminator
    g_loss = adversarial_loss(discriminator(gen_imgs), valid)
    
    # 3
    # loss measures generator's ability to fool the discriminator
    g_loss = pixelwise_loss(recon_images, gen_imgs.detach() + lambda_pt * pullaway_loss(img_embeddings))
    
    # 4
    # 1
    
    real_pred = discriminator(real_imgs).detach()
    fake_pred = discriminator(gen_imgs)
    
    if rel_avg_gan:
      g_loss = adversarial_loss(fake_pred - real_pred.mean(0, keepdim=True), valid)
    else:
      g_loss = adversarial_loss(fake_pred - real_pred, valid)
    
    # 2
    # loss measures generator's ability to fool the discriminator
    g_loss = adversarial_loss(discriminator(gen_imgs), valid)
    
    # 5
    validity, pred_label = discriminator(gen_imgs)
    g_loss = 0.5 * adversarial_loss(validity, valid) + auxiliary_loss(pred_label, gen_labels)
    
    g_loss.backward()
    optimizer_G.step()
    
    writer.add_images('pred_fake', pred_fake, iters)
    writer.add_scalar('loss_GAN', loss_GAN, iters)
    
    # TRAIN DISCRIMINATOR
    
    optimizer_D.zero_grad()
    
    # 1
    # measure discriminator's ability to classify real from generated samples
    real_loss = discriminator_loss(discriminator(real_imgs), valid)
    fake_loss = discriminator_loss(discriminator(gen_imgs.detach()), fake)
    
    # 2
    # measure discriminator's ability to classify real from generated samples
    real_loss = adversarial_loss(discriminator(real_imgs), valid)
    fake_loss = adversarial_loss(discriminator(gen_imgs.detach()), fake)
    
    d_loss = (real_loss + fake_loss) / 2
    
    # 3
    # measure discriminator's ability to classify real from generated samples
    real_recon, _ = discriminator(real_imgs)
    fake_recon, _ = discriminator(gen_imgs.detach())
    
    d_loss_real = pixelwise_loss(real_recon, real_imgs)
    d_loss_fake = pixelwise_loss(fake_recon, gen_imgs.detach())
    
    d_loss = d_loss_real
    if (margin - d_loss_fake.data).item() > 0:
      d_loss += margin - d_loss_fake
    
    # 4
    # measure discriminator's ability to classify real from generated samples
    if rel_avg_gan:
      real_loss = adversarial_loss(real_pred - fake_pred.mean(0, keepdim=True), valid)
      fake_loss = adversarial_loss(real_pred - real_pred.mean(0, keepdim=True), fake)
    else:
      real_loss = adversarial_loss(real_pred - fake_pred, valid)
      fake_loss = adversarial_loss(fake_pred - real_pred, fake)
    
    # 5
    # loss for real images
    real_pred, real_aux = discriminator(real_imgs)
    d_real_loss = (adversarial_loss(real_pred, valid) + auxiliary_loss(real_aux, labels)) / 2
    
    # loss for fake images
    fake_pred, fake_aux = discriminator(gen_imgs.detach())
    d_fake_loss = (adversarial_loss(fake_pred, fake) + auxiliary_loss(fake_aux, gen_labels)) / 2
    
    d_loss = (real_loss + fake_loss) / 2  

    
    # call this function during training
    # my_func(i, d_loss, g_loss)
    
    d_loss.backward()
    optimizer_D.step()
    
    batches_done = epoch * len(dataloader) + i
    batches_left = num_epochs * len(dataloader) - batches_done
    
    time_left = datetime.timedelta(seconds=batches_left * (time.time() - prev_time))
    prev_time = time.time()
    
    print("[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f] ETA: %s"% (epoch, num_epochs, i, len(mnist_loader),
                                                                    d_loss.item(), g_loss.item(),  time_left,
                                                                   ))
    
    if batches_done % sample_interval == 0:
      # 1
      sample_images(n_row=10, batches_done=batches_done)
      # 2
      save_image(gen_imgs.data[:25], "./%d.png" % batches_done, nrow=5, normalize=True)
    
    # Check how the generator is doing by saving G's output on fixed_noise
    # if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(mnist_loader)-1)):
    #     with torch.no_grad():
    #         fake = generator(fixed_noise).detach().cpu()
    #    img_list.append(make_grid(fake, padding=2, normalize=True))

    
    torch.save({
        'modelA_state_dict': generator.state_dict(),
        'modelB_state_dict': discriminator.state_dict(),
        'optimizerA_state_dict': optimizer_G.state_dict(),
        'optimizerB_state_dict': optimizer_D.state_dict()
    }, 'model.tar')
    
    iters += 1
writer.close()

# apply weight init normal

In [0]:
generator.apply(weights_init_normal)
discriminator.apply(weights_init_normal)

# GAN loss

In [0]:
# reconstruction loss of AE
pixelwise_loss = nn.MSELoss()

adversarial_loss = torch.nn.BCELoss()

# minimize MSE instead of BCE
adversarial_loss = torch.nn.MSELoss()

adversarial_loss = torch.nn.BCEWithLogitsLoss().to(device)

auxiliary_loss = nn.CrossEntropyLoss()

# GAN tensorboard

In [0]:

@tf.function
def my_func(step, d_loss, g_loss):
  with train_summary_writer.as_default():
    tf.summary.scalar("d_loss", d_loss.item(), step)
    tf.summary.scalar("g_loss", d_loss.item(), step)
    
    

# LambdaLR

In [0]:
class LambdaLR:
  def __init__(self, num_epochs, offset, decay_start_epoch):
    assert (num_epochs - decay_start_epoch) > 0, "Decay must start befor the training session ends!"
    self.num_epochs = num_epocsh
    self.offset = offset
    self.decay_start_epoch = decay_start_epoch
  
  def step(self, epoch):
    return 1.0 - max(0, epoch + self.offset - self.decay_start_epoch) / (self.num_epochs - 
                                                                         self.decay_start_epoch)

# Decoder

In [0]:
class Decoder(nn.Module):
  def __init__(self, in_channels=3, dim=64, n_residual=3, n_downsample=2, style_dim=8):
    super(Decoder, self).__init__()
    
    layers = []
    dim = dim * 2 ** n_upsample
    
    # residual blocks
    for _ in range(n_residual):
      layers += [ResidualBlock(dim, norm="adain")]
    
    # upsampling
    for _ in range(n_upsample):
      layers += [
          nn.Upsample(scale_factor=2),
          nn.Conv2d(dim, dim // 2, 5, stride=1, padding=2),
          LayerNorm(dim // 2),
          nn.ReLU(inplace=True),
      ]
      dim = dim // 2
    
    # output layer
    layers += [nn.ReflectionPad2d(3), nn.Conv2d(dim, out_channels, 7), nn.Tanh()]
    
    self.model = nn.Sequential(*layers)
    
    # initiate mlp (predicts AdaIN parameters)
    num_adain_params = self.get_num_adain_params()
    self.mlp = MLP(style_dim, num_adain_params)
  
  def get_num_adain_params(self):
    """
    return the number of AdaIN parameters needed by the model
    """
    num_adain_params = 0
    for m in self.modules():
      if m.__class__.__name__ == "AdaptiveInstanceNorm2d":
        num_adain_params += 2 * m.num_features
    return num_adain_params
  
  def assign_adain_params(self, adain_params):
    """
    assign the adain_params to the AdaIN layers in model
    """
    for m in self.modules():
      if m.__class__.__name__ == "AdaptiveInstanceNorm2d":
        # extract mean and std predictions
        mean = adain_params[:, : m.num_features]
        std = adain_params[:, m.num_features : 2 * m.num_features]
        # update bias and weight
        m.bias = mean.contiguous().view(-1)
        m.weight = std.contiguous().view(-1)
        # move pointer
        if adain_params.size(1) > 2 * m.num_features:
          adain_params = adain_params[:, 2 * m.num_features :]
  
  def forward(self, content_code, style_code):
    # update AdaIN parameters by MLP prediction based off style code
    self.assign_adain_params(self.mlp(style_code))
    img = self.model(content_code)
    return img

# ContentEncoder

In [0]:
class ContentEncoder(nn.Module):
  def __init__(self, in_channels=3, dim=64, n_residual=3, n_downsample=2):
    super(ContentEncoder, self).__init__()
    
    # initial convolution block
    layers = [
        nn.ReflectionPad2d(3),
        nn.Conv2d(in_channels, dim, 7),
        nn.InstanceNorm2d(dim),
        nn.ReLU(inplace=True),
    ]
    
    # downsampling
    for _ in range(n_downsample):
      layers += [
          nn.Conv2d(dim, dim * 2, 4, stride=2, padding=1),
          nn.InstanceNorm2d(dim * 2),
          nn.ReLU(inplace=True),
      ]
      dim *= 2
    
    # residual blocks
    for _ in range(n_residual):
      layers += [ResidualBlock(dim, norm="in")]
    
    self.model = nn.Sequential(*layers)
  
  def forward(self, x):
    return self.model(x)

# StyleEncoder

In [0]:
class StyleEncoder(nn.Module):
  def __init__(self, in_channels=3, dim=64, n_downsample=2, style_dim=0):
    super(StyleEncoder, self).__init__()
    
    # initial conv block
    layers = [nn.ReflectionPad2d(3), nn.Conv2d(in_channels, dim, 7), nn.ReLU(inplace=True)]
    
    # downsampling
    for _ in range(2):
      layers += [nn.Conv2d(dim, dim * 2, 4, stride=2, padding=1), nn.ReLU(inplace=True)]
      
    # downsampling with constant depth
    for _ in range(n_downsample - 2):
      layers += [nn.Conv2d(dim, dim, 4, stride=2, padding=1), nn.ReLU(inplace=True)]
    
    layers += [nn.AdaptiveAvgPool2d(1), nn.Conv2d(dim, style_dim, 1, 1, 0)]
    
    self.model = nn.Sequential(*layers)
    
  def forward(self, x):
    return self.model(x)

# MLP

In [0]:
# MLP (predicts AdaIn parameters)

class MLP(nn.Module):
  def __init__(self, input_dim, output_dim, dim=256, n_blk=3, activ="relu"):
    super(MLP, self).__init__()
    layers = [nn.Linear(input_dim, dim), nn.ReLU(inplace=True)]
    for _ in range(n_blk - 2):
      layers += [nn.Linear(dim, dim), nn.ReLU(inplace=True)]
    layers += [nn.Linear(dim, output_dim)]
    self.model = nn.Sequential(*layers)
  
  def forward(self, x):
    return self.model(x.view(x.size(0), -1))
  

# AdaptiveInstanceNorm2d

In [0]:
class AdaptiveInstanceNorm2d(nn.Module):
  def __init__(self, num_features, eps=1e-5, momentum=0.1):
    super(AdaptiveInstanceNorm2d, self).__init__()
    self.num_features = num_features
    self.eps = eps
    self.momentum = momentum
    # weight and bias are dynamically assigned
    self.weight = None
    self.bias = None
  
  def forward(self, x):
    assert (
      self.weight is not None and self.bias is not None
    ), "please assing weight and bias before calling AdaIN"
    b, c, h, w = x.size()
    running_mean = self.running_mean.repeat(b)
    running_var = self.running_var.repeat(b)
    
    # apply instance norm
    x_reshaped = x.contiguous().view(1, b * c, h, w)
    
    out = F.batch_norm(
        x_reshaped, running_mean, running_var, self.weight, self.bias, True, self.momentum, self.eps
    )
    
    return out.view(b, c, h, w)
  
  def __repr__(self):
    return self.__class__.__name__ + "(" + str(self.num_features) + ")"

# LayerNorm

In [0]:
class LayerNorm(nn.Module):
  def __init__(self, num_features, eps=1e-5, affine=True):
    super(LayerNorm, self).__init__()
    self.num_features = num_features
    self.affine = affine
    self.eps = eps
    
    if self.affine:
      self.gamma = nn.Parameter(torch.Tensor(num_features).uniform_())
      self.beta = nn.Parameter(torch.zeros(num_features))
  
  def forward(self, x):
    shape = [-1] + [1] * (x.dim() - 1)
    mean = x.view(x.size(0), -1).mean(1).view(*shape)
    std = x.view(x.size(0), -1).std(1).view(*shape)
    x = (x - mean) / (std + self.eps)
    
    if self.affine:
      shape = [1, -1] + [1] * (x.dim() - 2)
      x = x * self.gamma.view(*shape) + self.beta.view(*shape)
    return x

# plot train

In [0]:
# Plot some training images
real_batch = next(iter(dataloader))
plt.figure(figsize=(8,8))
plt.axis("off")
plt.title("Training Images")
plt.imshow(np.transpose(make_grid(real_batch[0].to(device)[:64], padding=2, normalize=True).cpu(),(1,2,0)))

# visualize GAN

In [0]:
# Create batch of latent vectors that we will use to visualize
#  the progression of the generator
fixed_noise = torch.randn(64, nz, 1, 1, device=device)

# Establish convention for real and fake labels during training
real_label = 1
fake_label = 0

#%%capture
fig = plt.figure(figsize=(8,8))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())

# real images vs fake images

In [0]:
# Grab a batch of real images from the dataloader
real_batch = next(iter(dataloader))

# Plot the real images
plt.figure(figsize=(15,15))
plt.subplot(1,2,1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(np.transpose(vutils.make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))

# Plot the fake images from the last epoch
plt.subplot(1,2,2)
plt.axis("off")
plt.title("Fake Images")
plt.imshow(np.transpose(img_list[-1],(1,2,0)))
plt.show()

# torchvision repo

In [0]:
%%shell

pip uninstall -y torchvision

pip install https://download.pytorch.org/whl/cu100/torchvision-0.3.0-cp36-cp36m-linux_x86_64.whl

# Download TorchVision repo to use some files from
# references/detection
git clone https://github.com/pytorch/vision.git
cd vision
git checkout v0.3.0

cp references/detection/utils.py ../
cp references/detection/transforms.py ../
cp references/detection/coco_eval.py ../
cp references/detection/engine.py ../
cp references/detection/coco_utils.py ../

# pick one image and predict

In [0]:
# pick one image from the test set
img, _ = dataset_test[0]
# put the model in evaluation mode
model.eval()
with torch.no_grad():
  prediction = model([img.to(device)])



# train, evaluate using helper functions

In [0]:
# let us train it for 10 epochs
num_epochs = 10

for epoch in range(num_epochs):
  # train for one epoch, printing every 10 iterations
  train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
  # update the learning rate
  lr_scheduler.step()
  # evaluate on the test dataset
  evaluate(model, data_loader_test, device=device)

# transforms helper function

In [0]:
import transforms as T

def get_transform(train):
  transforms = []
  transforms.append(T.ToTensor())
  if train:
    transforms.append(T.RandomHorizontalFlip(0.5))
  return T.Compose(transforms)

# instance segmentation

In [0]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

def get_instance_segmentation_model(num_classes):
  # load an instance segmentation model pre-trained on COCO
  model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)
  
  # get number of input features for the classifier
  in_features = model.roi_heads.box_predictor.cls_score.in_features
  
  # replace the pre-trained head with a new one
  model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
  
  # now get the number of input features for the mask classifier
  in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
  hidden_layer = 256
  
  # and replace the mask predictor with a new one
  model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)
  
  return model  

# modify model to add a different backbone

In [0]:
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

# load a pre-trained model for classification and return only the features
backbone = torchvision.models.mobilenet_v2(pretrained=True).features

# FasterRCNN needs to know the number of output channels in a backbone.
# for mobilenet_v2, it is 1280 so we need to add it here
backbone.out_channels = 1280

# let us make the RPN generate 5 x 3 anchors per spatial location, with 5 different sizes and 3 different
# aspect ratios.
# we have a Tuple[Tuple[int]] because each faeture map could potentially have different sizes and aspect ratios
anchor_generator = AnchorGenerator(sizes=((32, 64, 128, 256, 512),), aspect_ratios=((0.5, 1.0, 2.0),))

# let us define what are the feature maps that we will use to perform the region of interest cropping, as well
# as the size of the crop after rescaling.
# if our backbone returns a Tensor, featmap_names is expected to be [0].
# more generally, the backbone should return an OrderedDict[Tensor], and in featmap_names we can choose which
# feature maps to use.
roi_pooler = torchvision.ops.MultiScaleRoIAlign(featmap_names=[0], output_size=7, sampling_ratio=2)

# put the pieces together inside a FasterRCNN model
model = FasterRCNN(backbone, num_classes=2, rpn_anchor_generator=anchor_generator, box_roi_pool=roi_pooler)

# finetune from a pretrained model

In [0]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torch.nn as nn

# load a model pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

# replace the classifier with a new one, that has num_classes which is user-defined
num_classes = 2 # 1 class (person) + background

# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# object detection custom dataset

In [0]:
import os
import numpy as np
import torch
from PIL import Image

class PennFudanDataset(object):
  def __init__(self, root, transforms):
    self.root = root
    self.transforms = transforms
    # we load all image files, sorting them to ensure that they are aligned
    self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))
    self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))
  
  def __getitem__(self, idx):
    # load images and masks
    img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])
    mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])
    
    img = Image.open(img_path).convert("RGB")
    # we have not converted the mask to RGB, because each color corresponds to a different instance with
    # 0 being background
    
    mask = Image.open(mask_path)
    
    # convert the PIL Image into a numpy array
    mask = np.array(mask)
    
    # instances are encoded as different colors
    obj_ids = np.unique(mask)
    
    # first id is the background, so remove it
    obj_ids = obj_ids[1:]
    
    # split the color-encoded mask into a set of binary masks
    masks = mask == obj_ids[:, None, None]
    
    # get bounding box coordinates for each mask
    num_objs = len(obj_ids)
    boxes = []
    for i in range(num_objs):
      pos = np.where(masks[i])
      xmin = np.min(pos[1])
      xmax = np.max(pos[1])
      ymin = np.min(pos[0])
      ymax = np.max(pos[0])
      boxes.append([xmin, ymin, xmax, ymax])
      
    # convert everything into a torch.Tensor
    boxes = torch.as_tensor(boxes, dtype=torch.float32)
    
    # there is only one class
    labels = torch.ones((num_objs,), dtype=torch.int64)
    masks = torch.as_tensor(masks, dtype=torch.uint8)
    
    image_id = torch.tensor([idx])
    area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
    
    # suppose all instances are not crowd
    iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
    
    target = {}
    target["boxes"] = boxes
    target["labels"] = labels
    target["masks"] = masks
    target["image_id"] = image_id
    target["area"] = area
    target["iscrowd"] = iscrowd
    
    if self.transforms is not None:
      img, target = self.transforms(img, target)
      
    return img, target
  
  def __len__(self):
    return len(self.imgs)

# PyTorch version

In [0]:
print(torch.__version__)

# ngrok

In [0]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

In [0]:
LOG_DIR = './runs'
get_ipython().system_raw(
    'tensorboard --logdir {} --host 0.0.0.0 --port 6006 &'
    .format(LOG_DIR)
)

In [0]:
get_ipython().system_raw('./ngrok http 6006 &')
! curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"


http://e8279752.ngrok.io


# kaggle

In [0]:
# Run this cell and select the kaggle.json file downloaded
# from the Kaggle account settings page.
from google.colab import files
files.upload()

In [0]:
# Let's make sure the kaggle.json file is present.
!ls -lha kaggle.json

-rw-r--r-- 1 root root 64 Jun  1 20:46 kaggle.json


In [0]:
# Next, install the Kaggle API client.
!pip install -q kaggle

In [0]:
# The Kaggle API client expects this file to be in ~/.kaggle,
# so move it there.
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/

# This permissions change avoids a warning on Kaggle tool startup.
!chmod 600 ~/.kaggle/kaggle.json

In [0]:
# List available datasets.
!kaggle datasets list

ref                                                             title                                     size  lastUpdated          downloadCount  
--------------------------------------------------------------  ---------------------------------------  -----  -------------------  -------------  
stackoverflow/stack-overflow-2018-developer-survey              Stack Overflow 2018 Developer Survey      20MB  2018-05-15 16:59:54            376  
ruslankl/mice-protein-expression                                Mice Protein Expression                  987KB  2018-05-06 15:09:39            311  
jameslko/gun-violence-data                                      Gun Violence Data                         34MB  2018-04-15 06:18:09           5438  
jessicali9530/honey-production                                  Honey Production In The USA (1998-2012)   80KB  2018-04-09 23:31:19           2384  
donorschoose/io                                                 Data Science For Good: DonorsChoose.

In [0]:
# Copy the stackoverflow data set locally.
!kaggle datasets download -d stackoverflow/stack-overflow-2018-developer-survey

stack-overflow-2018-developer-survey.zip: Downloaded 20MB of 20MB to /content/.kaggle/datasets/stackoverflow/stack-overflow-2018-developer-survey


In [0]:
!head ~/.kaggle/datasets/stackoverflow/stack-overflow-2018-developer-survey/survey_results_public.csv

Respondent,Hobby,OpenSource,Country,Student,Employment,FormalEducation,UndergradMajor,CompanySize,DevType,YearsCoding,YearsCodingProf,JobSatisfaction,CareerSatisfaction,HopeFiveYears,JobSearchStatus,LastNewJob,AssessJob1,AssessJob2,AssessJob3,AssessJob4,AssessJob5,AssessJob6,AssessJob7,AssessJob8,AssessJob9,AssessJob10,AssessBenefits1,AssessBenefits2,AssessBenefits3,AssessBenefits4,AssessBenefits5,AssessBenefits6,AssessBenefits7,AssessBenefits8,AssessBenefits9,AssessBenefits10,AssessBenefits11,JobContactPriorities1,JobContactPriorities2,JobContactPriorities3,JobContactPriorities4,JobContactPriorities5,JobEmailPriorities1,JobEmailPriorities2,JobEmailPriorities3,JobEmailPriorities4,JobEmailPriorities5,JobEmailPriorities6,JobEmailPriorities7,UpdateCV,Currency,Salary,SalaryType,ConvertedSalary,CurrencySymbol,CommunicationTools,TimeFullyProductive,EducationTypes,SelfTaughtTypes,TimeAfterBootcamp,HackathonReasons,AgreeDisagree1,AgreeDisagree2,AgreeDisagree3,LanguageWorkedWith,LanguageDesireN

# TPU

In [0]:
# trace the model
devices = [':{}'.format(n) for n in range(0, num_cores)]
inputs = torch.zeros(batch_size, 1, 28, 28)
target = torch.zeros(batch_size, dtype=torch.int64)
xla_model = xm.XlaModel(model, [inputs], loss_fn=F.nll_loss, target=target, num_cores=num_cores, devices=devices)
optimizer = optim.SGD(xla_model.parameters_list(), lr=lr, momentum=momentum)

In [0]:
log_fn = xm.get_log_fn()
torch.set_default_tensor_type('torch.FloatTensor')
for epoch in range(1, num_epochs + 1):
  xla_model.train(train_loader, optimizer, batch_size, log_interval=log_interval, metrics_debug=False,
                  log_fn=log_fn)
  accuracy = xla_model.test(test_loader, xm.category_eval_fn(F.nll_loss), batch_size, log_fn=log_fn)

# ImageDataset

In [0]:
class ImageDataset(Dataset):
  def __init__(self, root, transforms_=None, mode="train"):
    self.transform = Compose(transforms_)
    
    self.files = sorted(glob.glob(os.path.join(root, mode) + "/*.*"))
    if mode == "train":
      self.files.extend(sorted(glob.glob(os.path.join(root, "test") + "/*.*")))
  
  def __getitem__(self, index):
    img = Image.open(self.files[index % len(self.files)])
    w, h = img.size
    img_A = img.crop((0, 0, w / 2, h))
    img_B = img.crop((w / 2, 0, w, h))
    
    if np.random.random() < 0.5:
      img_A = Image.fromarray(np.array(img_A)[:, ::-1, :], "RGB")
      img_B = Image.fromarray(np.array(img_B)[:, ::-1, :], "RGB")
    
    img_A = self.transform(img_A)
    img_B = self.transform(img_B)
    
    return {"A": img_A, "B": img_B}
  
  def __len__(self):
    return (self.files)

# FloatTensor, LongTensor

In [0]:
FloatTensor = torch.cuda.FloatTensor
LongTensor = torch.cuda.LongTensor

# Sample Images

In [0]:
# 1
def sample_images(batches_done):
  """
  saves a generated sample from the validation set
  """
# 2
def sample_images(n_row, batches_done):
  """
  saves a grid of generated digits ranging from 0 to n_classes
  """
  # 1
  imgs = next(iter(val_dataloader))
  real_A = Variable(imgs["B"].type(Tensor))
  real_B = Variable(imgs["A"].type(Tensor))
  fake_B = generator(real_A)
  img_sample = torch.cat((real_A.data, fake_B.data, real_B.data), -2)
  save_image(img_sample, "./%s/%s.png" % (dataset_name, batches_done), nrow=5, normalize=True)
  # 2
  # sample noise
  z = Variable(FloatTensor(np.random.normal(0, 1, (n_row ** 2, latent_dim))))
  # get labels ranging from 0 to n_classes for n rows
  labels = np.array([num for _ in range(n_row) for num in range(n_row)])
  labels = Variable(LongTensor(labels))
  gen_imgs = generator(z, labels)
  save_image(gen_imgs.data, "./%d.png" % (batches_done), nrow=n_row, normalize=True)