This is a Google Colab notebook which installs appropriate PyTorch v1 according to the system architecture and the GPU available.

Author:  **Tan-Sy NGUYEN**


In [115]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [116]:
import torch
print(torch.__version__)

1.5.1+cu101


In [117]:
# we will verify that GPU is enabled for this notebook
# following should print: CUDA is available!  Training on GPU ...
# 
# if it prints otherwise, then you need to enable GPU: 
# from Menu > Runtime > Change Runtime Type > Hardware Accelerator > GPU

import torch
import numpy as np

# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

CUDA is available!  Training on GPU ...


In [118]:
!nvidia-smi

Wed Jul 29 10:25:09 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.51.05    Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla K80           Off  | 00000000:00:04.0 Off |                    0 |
| N/A   72C    P0    73W / 149W |    925MiB / 11441MiB |      0%      Default |
|                               |                      |                 ERR! |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [119]:
import torch, argparse
from torch.autograd import Variable
from torchvision import datasets
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torch.utils.data.dataset import Dataset
import scipy.io as io

import numpy as np
from collections import OrderedDict

import torch, os
import torch.nn as nn
import torch.nn.functional as F

In [120]:
def loss_fn(sparsecode,  z, z_c, x, recon_x ):
    recon_loss = 10 * F.mse_loss(recon_x, x)
    express_loss = F.mse_loss(z,z_c)
    reg_loss = torch.norm(sparsecode,p='fro')
    return recon_loss + express_loss + reg_loss

In [121]:
class VAEGT(nn.Module):
    def __init__(self, split_ratio, batch_size=1000, num_classes=10, negative_slope=0.1):
        super(VAEGT, self).__init__()
        self.batch_size = batch_size
        self.split_ratio = split_ratio

        self.train_size = int(self.split_ratio*self.batch_size)
        self.test_size = batch_size - self.train_size

        self.num_classes = num_classes
        self.negative_slope = negative_slope

        # Encoder declaration
        self.encoder = nn.Sequential(OrderedDict([
            ('layer1', nn.Conv2d(in_channels=1, out_channels=8, kernel_size=5, stride=2, padding=(1,1))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer4', nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu4', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        # Sparse layer declaration
        self.sparsecode = torch.nn.Parameter(1.0e-4 *torch.ones(self.test_size, self.train_size).cuda(), requires_grad=True)

        # Decoder declaration
        self.decoder = nn.Sequential(OrderedDict([
            ('layer1',  nn.ConvTranspose2d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=(1,1))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2',  nn.ConvTranspose2d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3',  nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=5, stride=2, padding=(1,1),output_padding=1)),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        self._init_weights()

    def forward(self, x_train, y_train, x_test, y_test):
        if self.training:

            # Encode input
            x = torch.cat([x_train, x_test], dim =0)
            z_train = self.encoder(x_train).cuda()
            z_train = torch.reshape(z_train,[self.train_size,-1])
            z_test = self.encoder(x_test).cuda()
            z_test_o = torch.reshape(z_test,[self.test_size,-1])
            z = torch.cat([z_train, z_test_o], dim=0)

            # Sparse coding
            z_test_c = torch.matmul(self.sparsecode, z_train)

            # Encode label
            y_train_onehot = self._onehot(y_train)
            y_test_onehot = self._onehot(y_test)
            z_c = torch.cat([z_train, z_test_c], dim=0)
            z_c_o = torch.reshape(z_c,[self.batch_size,z_test.shape[1],z_test.shape[2],z_test.shape[3]])
          
            # Decode
            x_c = self.decoder(z_c_o)
            return self.sparsecode, z, z_c, x, x_c

    def _onehot(self, y):
        y_onehot = torch.FloatTensor(y.shape[0], self.num_classes)
        y_onehot.zero_()
        y_onehot.scatter_(1, y.long(), 1)
        return y_onehot

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

In [122]:
# Accuracy checker: mode "min" for loss, mode "max" for accuracy
class ImproveChecker():
	def __init__(self, mode='min', best_val=None):
		assert mode in ['min', 'max']
		self.mode = mode
		if best_val is not None:
			self.best_val = best_val
		else:
			if self.mode=='min':
				self.best_val = np.inf
			elif self.mode=='max':
				self.best_val = 0.0

	def _check(self, val):
		if self.mode=='min':
			if val < self.best_val:
				print("[%s] Improved from %.4f to %.4f" % (self.__class__.__name__, self.best_val, val))
				self.best_val = val
				return True
			else:
				print("[%s] Not improved from %.4f" % (self.__class__.__name__, self.best_val))
				return False
		else:
			if val > self.best_val:
				print("[%s] Improved from %.4f to %.4f" % (self.__class__.__name__, self.best_val, val))
				self.best_val = val
				return True
			else:
				print("[%s] Not improved from %.4f" % (self.__class__.__name__, self.best_val))
				return False

In [123]:
 # Process data
 class MyDataset(Dataset):
    def __init__(self, mat_path,split_ratio,width,height):       
      self.width = width
      self.height = height
      self.split_ratio=split_ratio
      feature = io.loadmat(mat_path,squeeze_me=True)['features']
      Label = io.loadmat(mat_path,squeeze_me=True)['Label']

      self.images = torch.from_numpy(np.transpose(feature)).type(torch.float)
      self.images = torch.reshape(self.images,[len(self.images), self.width, self.height])
      self.target = torch.from_numpy(np.transpose(Label)).type(torch.long)
      self.data=list(zip(self.images, self.target))

      self.train_size = int(self.split_ratio*len(self.data))
      self.test_size = len(self.data) - self.train_size
      
    def _generate(self):
      self.train_dataset, self.test_dataset = torch.utils.data.random_split(self.data, [self.train_size, self.test_size])
      return self.train_dataset, self.test_dataset
      
    def __getitem__(self, index):
      
      images=self.images[index]
      target=self.target[index]
      return images, target
     
    def __len__(self):
      return len(self.data)



In [124]:
mat_path = ('/content/drive/My Drive/Colab Notebooks/Internship/dataset/umd.mat')
split_ratio = 0.9
num_classes = 50
size= [32,32] #size of input image

custom_data = MyDataset(mat_path,split_ratio=split_ratio, width = size[0], height=size[1])
data_train, data_test = custom_data._generate()
dataloader_train = torch.utils.data.DataLoader(dataset=data_train,
                                           num_workers=4,
                                           batch_size= len(data_train),
                                           pin_memory=True)

dataloader_test = torch.utils.data.DataLoader(dataset=data_test,
                                           num_workers=4,
                                           batch_size= len(data_test),
                                           pin_memory=True)

In [125]:
# Initialize VAE
model = VAEGT(split_ratio=split_ratio, batch_size= custom_data.__len__() , num_classes=50)
model.cuda()

# Optimizers
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# ImproveChecker
improvechecker = ImproveChecker(mode='min')

num_epoch = 10000 #number of training epoch
cal_epoch = 1000 #the epoch in which we calculate the accuracy

In [126]:
 # Get the parameter (sparse code, weights, biases)
 class Getparam():
    def __init__(self, param_name = 'sparsecode', model = model):
     self.param_name = param_name
     self.model = model
     self.bool_get= False
    def _getparam(self, param_name):
      for param_name_get, param in self.model.named_parameters():
        if param_name_get == param_name:
          self.bool_get = True
          print("Sucessfully: [%s]" %(param_name), param.data)
      if not self.bool_get:
        print("Error: [%s] parameter is not defined..." % param_name)
      return True


In [127]:
# Calculate the accuracy
class Evaluate():
  def __init__(self, coef= model.sparsecode, train_labels = None, test_labels= None):
    self.coef= coef
    self.train_labels= train_labels
    self.test_labels= test_labels
    self.class_ = torch.zeros(int(torch.max(self.test_labels)))
    self.prediction = torch.zeros(len((self.test_labels)))

  def _eval(self):
    Coef = torch.abs(self._get_threshold(self.coef.cpu()))
    for atom in range(0,len(self.test_labels)):
        x = Coef[atom,:]
        for l in range(1,torch.max(self.test_labels)+1):
            l_idx = np.array([j for j in range(0,len(self.train_labels)) if self.train_labels[j]==l]).astype(int)
            self.class_[int(l-1)] = sum(torch.abs(x[l_idx]))
        self.prediction[atom] = torch.argmax(self.class_) +1

    # self.prediction = np.array(self.prediction)
    missrate = self._error_cal(self.test_labels, self.prediction)
    accuracy = 1 - missrate
    return accuracy

  def _error_cal(self, ground_truth, predicted_label):
    ground_truth=torch.squeeze(ground_truth,1)
    _error_value = (ground_truth != predicted_label).sum().item()
    missrate = _error_value/ (ground_truth.shape[0])
    return missrate

  def _get_threshold(self, coef, ro=0.1):
      if ro < 1:
          Cp = torch.zeros((coef.shape[0], coef.shape[1]))
          sorted,_=torch.sort(-torch.abs(coef), 0)
          S = torch.abs(sorted)
          _index = torch.argsort(-torch.abs(coef),0)
          for i in range(coef.shape[1]):
              cL1 = torch.sum(S[:, i], dtype = torch.float)
              stop = False
              csum = 0
              t = 0
              while (stop == False):
                  csum = csum + S[t, i]
                  if csum > ro * cL1:
                      stop = True
                      Cp[_index[0:t + 1, i], i] = coef[_index[0:t + 1, i], i]
                  t = t + 1
      else:
          Cp = coef

      return Cp

In [None]:
# Training process
model.train()
for epoch in range(1, num_epoch):
    # Prepare input
  for i, (imgs_train, labels_train) in enumerate(dataloader_train): # Dataloader for training set
    inputs_train = imgs_train.unsqueeze(1).cuda()
    labels_train = labels_train.view(-1, 1)
    y_onehot_train = torch.FloatTensor(imgs_train.shape[0], 50)
    y_onehot_train.zero_()
    y_onehot_train.scatter_(1, labels_train, 1).cuda()

  for j , (imgs_test, labels_test) in enumerate(dataloader_test): # Dataloader for testing set
    inputs_test = imgs_test.unsqueeze(1).cuda()
    labels_test = labels_test.view(-1, 1)
    y_onehot_test = torch.FloatTensor(imgs_test.shape[0], 50)
    y_onehot_test.zero_()
    y_onehot_test.scatter_(1, labels_test, 1).cuda()
    
    # Training
    optimizer.zero_grad()
    sparsecode, z, z_c, x, x_c = model(inputs_train, y_onehot_train, inputs_test, y_onehot_test)
    loss = loss_fn(sparsecode, z, z_c, x, x_c)
    loss.backward()
    optimizer.step()


  if (epoch % cal_epoch == 0):
    # ImproveChecker (check the best result)
    print("\n[EPOCH %.3d] Loss: %.6f" % (epoch, loss.item()))
    accuracy= Evaluate(coef= sparsecode, train_labels=labels_train, test_labels=labels_test)._eval()
    print("=================>_Accuracy: %.6f" % (accuracy))

    # Save checkpoint
    if improvechecker._check(loss.item()):
      checkpoint = dict(
        epoch=epoch,
        loss=loss.item(),
        state_dict=model.state_dict(),
        optimizer=optimizer.state_dict(),
      )
      save_file = os.path.join('/content/drive/My Drive/Colab Notebooks/Variational-Autoencoder/checkpoints/', "CAE.pth")
      torch.save(checkpoint, save_file)
      print("Best checkpoint is saved at %s" % (save_file))




[EPOCH 1000] Loss: 8095.990723
[ImproveChecker] Improved from inf to 8095.9907
Best checkpoint is saved at /content/drive/My Drive/Colab Notebooks/Variational-Autoencoder/checkpoints/CAE.pth
