This is a Google Colab notebook which installs appropriate PyTorch v1 according to the system architecture and the GPU available.

If you come across any issues, then feel free to contact me on Slack. My username is ***avinashss***


In [None]:
# google colab does not come with torch installed. And also, in lesson we are 
# using torch v1.0 
# so following snippet of code installs the relevant version according to the 
# GPU architecture
!pip install -q torch==1.1.0 torchvision
import torch
print(torch.__version__)

In [None]:
# we will verify that GPU is enabled for this notebook
# following should print: CUDA is available!  Training on GPU ...
# 
# if it prints otherwise, then you need to enable GPU: 
# from Menu > Runtime > Change Runtime Type > Hardware Accelerator > GPU

import torch
import numpy as np

# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

In [None]:
!nvidia-smi

In [None]:
import torch, argparse
from torchvision import datasets
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
import torchvision.transforms as transforms

import numpy as np
from collections import OrderedDict

import torch, os
import torch.nn as nn
import torch.nn.functional as F

In [None]:
def loss_fn(recon_x, x, mu, logvar):
    BCE = F.mse_loss(recon_x, x)
    KLD = -0.5 * torch.mean(1 + logvar - mu.pow(2) - logvar.exp())
    return BCE + KLD

In [None]:
class VAEGT(nn.Module):
    def __init__(self, in_dims=784, hid1_dims=100, hid2_dims=64, num_classes=10, negative_slope=0.1):
        super(VAEGT, self).__init__()
        self.in_dims = in_dims
        self.hid1_dims = hid1_dims
        self.hid2_dims = hid2_dims
        self.num_classes = num_classes
        self.negative_slope = negative_slope

        # Encoder
        self.encoder = nn.Sequential(OrderedDict([
            ('layer1', nn.Linear(in_dims, 512)),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Linear(512, 256)),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Linear(256, 128)),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))
        self.fc_mu = nn.Linear(128, hid1_dims)
        self.fc_var = nn.Linear(128, hid1_dims)

        # Conditioner
        self.conditioner = nn.Sequential(OrderedDict([
            ('layer1', nn.Linear(num_classes, 16)),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Linear(16, 32)),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Linear(32, hid2_dims)),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        # Decoder
        self.decoder = nn.Sequential(OrderedDict([
            ('layer1', nn.Linear(hid1_dims+hid2_dims, 128)),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Linear(128, 256)),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Linear(256, 512)),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer4', nn.Linear(512, in_dims)),
            ('sigmoid', nn.Sigmoid()),
        ]))

        self._init_weights()

    def forward(self, x, y):
        if self.training:
            # Encode input
            h = self.encoder(x)
            mu, logvar = self.fc_mu(h), self.fc_var(h)
            hx = self._reparameterize(mu, logvar)
            # Encode label
            y_onehot = self._onehot(y)
            hy = self.conditioner(y_onehot)
            # Hidden representation
            h = torch.cat([hx, hy], dim=1)
            # Decode
            y = self.decoder(h)
            return y, mu, logvar
        else:
            hx = self._represent(x)
            hy = self.conditioner(self._onehot(y))
            h = torch.cat([hx, hy], dim=1)
            y = self.decoder(h)
            return y

    def generate(self, y):
        hy = self.conditioner(self._onehot(y))
        hx = self._sample(y.shape[0]).type_as(hy)
        h = torch.cat([hx, hy], dim=1)
        y = self.decoder(h)
        return y

    def _represent(self, x):
        h = self.encoder(x)
        mu, logvar = self.fc_mu(h), self.fc_var(h)
        hx = self._reparameterize(mu, logvar)
        return hx

    def _reparameterize(self, mu, logvar):
        std = logvar.mul(0.5).exp_()
        esp = torch.randn(*mu.size()).type_as(mu)
        z = mu + std * esp
        return z

    def _onehot(self, y):
        y_onehot = torch.FloatTensor(y.shape[0], self.num_classes)
        y_onehot.zero_()
        y_onehot.scatter_(1, y, 1)
        return y_onehot

    def _sample(self, num_samples):
        return torch.FloatTensor(num_samples, self.hid1_dims).normal_()

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

In [None]:
class ImproveChecker():
	def __init__(self, mode='min', best_val=None):
		assert mode in ['min', 'max']
		self.mode = mode
		if best_val is not None:
			self.best_val = best_val
		else:
			if self.mode=='min':
				self.best_val = np.inf
			elif self.mode=='max':
				self.best_val = 0.0

	def check(self, val):
		if self.mode=='min':
			if val < self.best_val:
				print("[%s] Improved from %.4f to %.4f" % (self.__class__.__name__, self.best_val, val))
				self.best_val = val
				return True
			else:
				print("[%s] Not improved from %.4f" % (self.__class__.__name__, self.best_val))
				return False
		else:
			if val > self.best_val:
				print("[%s] Improved from %.4f to %.4f" % (self.__class__.__name__, self.best_val, val))
				self.best_val = val
				return True
			else:
				print("[%s] Not improved from %.4f" % (self.__class__.__name__, self.best_val))
				return False

In [None]:
# Initialize VAE
model = VAEGT(in_dims=784, num_classes=10)
model.cuda()

# Configure data loader
dataset = datasets.MNIST(root='.', train=True, download=True,
	transform=transforms.Compose([
		transforms.ToTensor(),
		transforms.Normalize((0.1307,), (0.3081,))
]))
dataloader = torch.utils.data.DataLoader(
	dataset, batch_size=128,
	num_workers=4, shuffle=True, pin_memory=True,
)

# Optimizers
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# ImproveChecker
improvechecker = ImproveChecker(mode='min')

In [None]:
model.train()
for epoch in range(1, 301):
  for i, (imgs, labels) in enumerate(dataloader):
    # Prepare input
    inputs = imgs.view(imgs.shape[0], -1)
    inputs = inputs.cuda()
    labels = labels.view(-1, 1)
    y_onehot = torch.FloatTensor(imgs.shape[0], 10)
    y_onehot.zero_()
    y_onehot.scatter_(1, labels, 1)
    y_onehot = y_onehot.cuda()

    # Train
    optimizer.zero_grad()
    outputs, mu, logvar = model(inputs, y_onehot)
    loss = loss_fn(outputs, inputs, mu, logvar)
    loss.backward()
    optimizer.step()

  # ImproveChecker
  print("[EPOCH %.3d] Loss: %.6f" % (epoch, loss.item()))
  if improvechecker.check(loss.item()):
    checkpoint = dict(
      epoch=epoch,
      loss=loss.item(),
      state_dict=model.state_dict(),
      optimizer=optimizer.state_dict(),
    )
    save_file = os.path.join('.', "vaegt.pth")
    torch.save(checkpoint, save_file)
    print("Best checkpoint is saved at %s" % (save_file))
