<a href="https://colab.research.google.com/github/tansyab1/MMSP-2020/blob/main/CAESRWC_(4_subbands).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **CAESRWC-extend**: Convolutional Autoencoder Sparse Representation Wavelet classification (CAESRWC) is a transductive classification model based on sparse representations. 

#Author:  **Tan-Sy NGUYEN**


Connect the **Google Colab** with **Google Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import torch
print(torch.__version__)

1.6.0+cu101


We will verify that GPU is enabled for this notebook

Following should print: ***CUDA is available!  Training on GPU ...***
 
If it prints otherwise, then you need to enable GPU: 

From **Menu** > **Runtime** > **Change Runtime Type** > **Hardware Accelerator** > **GPU**

In [None]:
import torch
import numpy as np

# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

CUDA is available!  Training on GPU ...


In [None]:
!nvidia-smi

Sun Oct 11 07:41:16 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 455.23.05    Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla K80           Off  | 00000000:00:04.0 Off |                    0 |
| N/A   73C    P0    74W / 149W |   2659MiB / 11441MiB |      0%      Default |
|                               |                      |                 ERR! |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:

from torch.autograd import Variable
from torchvision import datasets
from torchvision.utils import make_grid
from torch.utils.data import DataLoader
from torch.utils.data.dataset import Dataset
from torch.distributions import Categorical
import torchvision.transforms as transforms
import argparse
import os
import torch.nn as nn
import torch.nn.functional as F
import scipy.io as io
import math
import numbers
import pywt
from tqdm import tqdm
from matplotlib import pyplot
from ipywidgets import interact
from PIL import Image
from scipy.stats import entropy
from collections import OrderedDict
from skimage.feature import greycomatrix, greycoprops
! git clone https://github.com/fbcotter/pytorch_wavelets
! cd ./pytorch_wavelets/
! pip install ./pytorch_wavelets/
from pytorch_wavelets import DWTForward, DWTInverse # (or import DWT, IDWT)


fatal: destination path 'pytorch_wavelets' already exists and is not an empty directory.
Processing ./pytorch_wavelets
Building wheels for collected packages: pytorch-wavelets
  Building wheel for pytorch-wavelets (setup.py) ... [?25l[?25hdone
  Created wheel for pytorch-wavelets: filename=pytorch_wavelets-1.2.3-cp36-none-any.whl size=52839 sha256=855a43055a13170a05373888318e8fc5608756ece9b157758a9b0262b7823323
  Stored in directory: /tmp/pip-ephem-wheel-cache-l91dnwti/wheels/82/e1/5b/16a4e6ccfc0bb2ce14bd2796adb13e6e9b718cb7d2e1ae3643
Successfully built pytorch-wavelets
Installing collected packages: pytorch-wavelets
  Found existing installation: pytorch-wavelets 1.2.3
    Uninstalling pytorch-wavelets-1.2.3:
      Successfully uninstalled pytorch-wavelets-1.2.3
Successfully installed pytorch-wavelets-1.2.3


In [None]:
! pip freeze > requirements.txt

## 1. Define the **Loss function** including 3 main parts:

1.   Reconstruction Loss between the original image and reconstructed image
2.   Expression Loss based on the SRC equation
3.   Regular Loss: L1 loss of sparse code


In [None]:
def loss_fn(sparsecodeLL, z_LL, z_LL_c,x_LL,x_LL_c,sparsecodeLH,z_LH, z_LH_c, x_LH,x_LH_c,sparsecodeHL,z_HL, z_HL_c,x_HL,x_HL_c,sparsecodeHH,z_HH, z_HH_c,x_HH,x_HH_c):
    recon_lossLL = 10 * F.mse_loss(x_LL_c, x_LL)
    express_lossLL = F.mse_loss(z_LL,z_LL_c)
    reg_lossLL = torch.norm(sparsecodeLL,p='fro')

    recon_lossLH = 10 * F.mse_loss(x_LH_c, x_LH)
    express_lossLH = F.mse_loss(z_LH,z_LH_c)
    reg_lossLH = torch.norm(sparsecodeLH,p='fro')

    recon_lossHL = 10 * F.mse_loss(x_HL_c, x_HL)
    express_lossHL = F.mse_loss(z_HL,z_HL_c)
    reg_lossHL = torch.norm(sparsecodeHL,p='fro')

    recon_lossHH = 10 * F.mse_loss(x_HH_c, x_HH)
    express_lossHH = F.mse_loss(z_HH,z_HH_c)
    reg_lossHH = torch.norm(sparsecodeHH,p='fro')

    return recon_lossLL+recon_lossLH+recon_lossHL+recon_lossHH+express_lossLL+express_lossLH+express_lossHL+express_lossHH+reg_lossLL+reg_lossLH+reg_lossHL+reg_lossHH

## 2. Define the **model**

In [None]:
class VAEGT(nn.Module):
    def __init__(self, split_ratio, batch_size=1000, num_classes=10, negative_slope=0.1, imageshape=32):
        super(VAEGT, self).__init__()
        self.batch_size = batch_size
        self.split_ratio = split_ratio
        self.emb_outshape = math.ceil(imageshape)/8

        self.train_size = int(self.split_ratio*self.batch_size)
        self.test_size = batch_size - self.train_size

        self.num_classes = num_classes
        self.negative_slope = negative_slope
        

        # Encoder declaration
        self.encoderLL = nn.Sequential(OrderedDict([
            ('layer1', nn.Conv2d(in_channels=1, out_channels=8, kernel_size=5, stride=2, padding=(2,2))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer4', nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu4', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        self.encoderLH = nn.Sequential(OrderedDict([
            ('layer1', nn.Conv2d(in_channels=1, out_channels=8, kernel_size=5, stride=2, padding=(2,2))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer4', nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu4', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))
                
        self.encoderHL = nn.Sequential(OrderedDict([
            ('layer1', nn.Conv2d(in_channels=1, out_channels=8, kernel_size=5, stride=2, padding=(2,2))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer4', nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu4', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))
                        
        self.encoderHH = nn.Sequential(OrderedDict([
            ('layer1', nn.Conv2d(in_channels=1, out_channels=8, kernel_size=5, stride=2, padding=(2,2))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2', nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3', nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer4', nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=(1,1))),
            ('relu4', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        # Sparse layer declaration
        self.sparsecodeLL = torch.nn.Parameter(1.0e-4 *torch.ones(self.test_size, self.train_size).cuda(), requires_grad=True)
        self.sparsecodeLH = torch.nn.Parameter(1.0e-4 *torch.ones(self.test_size, self.train_size).cuda(), requires_grad=True)
        self.sparsecodeHL = torch.nn.Parameter(1.0e-4 *torch.ones(self.test_size, self.train_size).cuda(), requires_grad=True)
        self.sparsecodeHH = torch.nn.Parameter(1.0e-4 *torch.ones(self.test_size, self.train_size).cuda(), requires_grad=True)
        # self._reshape = nn.Sequential(OrderedDict([
        #     ('reshape', nn.Linear(896, 512)),
        # ]))

        # Decoder declaration
        self.decoderLL = nn.Sequential(OrderedDict([
            ('layer1',  nn.ConvTranspose2d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=(1,1))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2',  nn.ConvTranspose2d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3',  nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=5, stride=2, padding=(1,2),output_padding=[1,1])),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        self.decoderLH = nn.Sequential(OrderedDict([
            ('layer1',  nn.ConvTranspose2d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=(1,1))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2',  nn.ConvTranspose2d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3',  nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=5, stride=2, padding=(1,2),output_padding=[1,1])),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        self.decoderHL = nn.Sequential(OrderedDict([
            ('layer1',  nn.ConvTranspose2d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=(1,1))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2',  nn.ConvTranspose2d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3',  nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=5, stride=2, padding=(1,2),output_padding=[1,1])),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        self.decoderHH = nn.Sequential(OrderedDict([
            ('layer1',  nn.ConvTranspose2d(in_channels=32, out_channels=16, kernel_size=3, stride=1, padding=(1,1))),
            ('relu1', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer2',  nn.ConvTranspose2d(in_channels=16, out_channels=8, kernel_size=3, stride=2, padding=(1,1))),
            ('relu2', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
            ('layer3',  nn.ConvTranspose2d(in_channels=8, out_channels=1, kernel_size=5, stride=2, padding=(1,2),output_padding=[1,1])),
            ('relu3', nn.LeakyReLU(negative_slope=negative_slope, inplace=True)),
        ]))

        self._init_weights()

    def forward(self,x_train_LL,x_train_LH,x_train_HL,x_train_HH,  y_train,x_test_LL,x_test_LH,x_test_HL,x_test_HH,  y_test):
        if self.training:

            # Encode input
            # i = torch.cat([imgs_train, imgs_test], dim =0).cuda()
            x_LL = torch.cat([x_train_LL, x_test_LL], dim =0).cuda()
            x_LH = torch.cat([x_train_LH, x_test_LH], dim =0).cuda()
            x_HL = torch.cat([x_train_HL, x_test_HL], dim =0).cuda()
            x_HH = torch.cat([x_train_HH, x_test_HH], dim =0).cuda()
            
            z_train_LL = self.encoderLL(x_train_LL.cuda())
            z_train_LH = self.encoderLH(x_train_LH.cuda())
            z_train_HL = self.encoderHL(x_train_HL.cuda())
            z_train_HH = self.encoderHH(x_train_HH.cuda())

            z_train_LL = torch.reshape(z_train_LL,[self.train_size,-1])
            z_train_LH = torch.reshape(z_train_LH,[self.train_size,-1])
            z_train_HL = torch.reshape(z_train_HL,[self.train_size,-1])
            z_train_HH = torch.reshape(z_train_HH,[self.train_size,-1])

            z_test_LL = self.encoderLL(x_test_LL.cuda())
            z_test_LH = self.encoderLH(x_test_LH.cuda())
            z_test_HL = self.encoderHL(x_test_HL.cuda())
            z_test_HH = self.encoderHH(x_test_HH.cuda())

            z_test_LL_o = torch.reshape(z_test_LL,[self.test_size,-1])
            z_test_LH_o = torch.reshape(z_test_LH,[self.test_size,-1])
            z_test_HL_o = torch.reshape(z_test_HL,[self.test_size,-1])
            z_test_HH_o = torch.reshape(z_test_HH,[self.test_size,-1])

            z_LL = torch.cat([z_train_LL, z_test_LL_o], dim=0)
            z_LH = torch.cat([z_train_LH, z_test_LH_o], dim=0)
            z_HL = torch.cat([z_train_HL, z_test_HL_o], dim=0)
            z_HH = torch.cat([z_train_HH, z_test_HH_o], dim=0)

            # Sparse coding
            z_test_LL_c = torch.matmul(self.sparsecodeLL, z_train_LL)
            z_test_LH_c = torch.matmul(self.sparsecodeLH, z_train_LH)
            z_test_HL_c = torch.matmul(self.sparsecodeHL, z_train_HL)
            z_test_HH_c = torch.matmul(self.sparsecodeHH, z_train_HH)

            # Encode label
            y_train_onehot = self._onehot(y_train)
            y_test_onehot = self._onehot(y_test)
            
            z_LL_c = torch.cat([z_train_LL, z_test_LL_c], dim=0)
            z_LH_c = torch.cat([z_train_LH, z_test_LH_c], dim=0)
            z_HL_c = torch.cat([z_train_HL, z_test_HL_c], dim=0)
            z_HH_c = torch.cat([z_train_HH, z_test_HH_c], dim=0)

            z_LL_c_o = torch.reshape(z_LL_c,[self.batch_size,z_test_LL.shape[1],z_test_LL.shape[2],z_test_LL.shape[3]])
            z_LH_c_o = torch.reshape(z_LH_c,[self.batch_size,z_test_LL.shape[1],z_test_LL.shape[2],z_test_LL.shape[3]])
            z_HL_c_o = torch.reshape(z_HL_c,[self.batch_size,z_test_LL.shape[1],z_test_LL.shape[2],z_test_LL.shape[3]])
            z_HH_c_o = torch.reshape(z_HH_c,[self.batch_size,z_test_LL.shape[1],z_test_LL.shape[2],z_test_LL.shape[3]])
          
            # Decode
            x_LL_c = self.decoderLL(z_LL_c_o)
            x_LH_c = self.decoderLH(z_LH_c_o)
            x_HL_c = self.decoderHL(z_HL_c_o)
            x_HH_c = self.decoderHH(z_HH_c_o)
            # i_c = torch.Tensor()
            # xhf = torch.cat([x_LH_c,x_HL_c,x_HH_c], dim=1)
            # xh = [xhf.unsqueeze(dim=1)]
            # xim = DWTInverse(wave='haar').cuda()
            # i_c = xim((x_LL_c,xh))
            
            return self.sparsecodeLL, z_LL, z_LL_c,x_LL,x_LL_c,self.sparsecodeLH,z_LH, z_LH_c, x_LH,x_LH_c,self.sparsecodeHL,z_HL, z_HL_c,x_HL,x_HL_c,self.sparsecodeHH,z_HH, z_HH_c,x_HH,x_HH_c

    def _onehot(self, y):
        y_onehot = torch.FloatTensor(y.shape[0], self.num_classes)
        y_onehot.zero_()
        y_onehot.scatter_(1, y.long(), 1)
        return y_onehot

    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

## 3. Define the **improve checker** to save the best checkpoint

In [None]:
# Accuracy checker: mode "min" for loss, mode "max" for accuracy
class ImproveChecker():
	def __init__(self, mode='min', best_val=None):
		assert mode in ['min', 'max']
		self.mode = mode
		if best_val is not None:
			self.best_val = best_val
		else:
			if self.mode=='min':
				self.best_val = np.inf
			elif self.mode=='max':
				self.best_val = 0.0

	def _check(self, val):
		if self.mode=='min':
			if val < self.best_val:
				print("[%s] Improved from %.4f to %.4f" % (self.__class__.__name__, self.best_val, val))
				self.best_val = val
				return True
			else:
				print("[%s] Not improved from %.4f" % (self.__class__.__name__, self.best_val))
				return False
		else:
			if val > self.best_val:
				print("[%s] Improved from %.4f to %.4f" % (self.__class__.__name__, self.best_val, val))
				self.best_val = val
				return True
			else:
				print("[%s] Not improved from %.4f" % (self.__class__.__name__, self.best_val))
				return False

## 4. Process the **data**

### 4.3. Apply the Wavelet transform
    Arguments:
        image batch (sample, channel, width, height): batch of image to be transformed.
        level (int): level of wavelet transform.

In [None]:
# Load SVHN dataset (Gray scale)

class MySVHNDataset(datasets.SVHN):
    def __init__(self,  root, split='train',
                 transform=None, target_transform=None, download=False):
        super(MySVHNDataset, self).__init__(
            root, split, transform, target_transform, download)
    
    def __getitem__(self, index):
        """
        Args:
            index (int): Index
        Returns:
            tuple: (image, target) where target is index of the target class.
        """
        img, target = self.data[index], int(self.labels[index])

        # doing this so that it is consistent with all other datasets
        # to return a PIL Image
        img = Image.fromarray(np.transpose(img, (1, 2, 0)))
        # Convert to grayscale
        img = img.convert('L')
        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img, target

In [None]:

 # Process CUSTOM data from .mat file
class MyDataset(Dataset):
    def __init__(self, mat_path,split_ratio,width,height):       
      self.width = width
      self.height = height
      self.split_ratio=split_ratio
      feature = io.loadmat(mat_path,squeeze_me=True)['features']
      feature = feature[:,0:2000]
      Label = io.loadmat(mat_path,squeeze_me=True)['Label']
      # print(Label.shape)
      Label = Label[0:2000]

      self.images = torch.from_numpy(np.transpose(feature)).type(torch.float)
      self.images = torch.reshape(self.images,[len(self.images), self.width, self.height])

      self.init_wt=  torch.unsqueeze(self.images, dim=1)
      xfm = DWTForward(J=1, wave='haar') 
      self.Yll, self.Yh = xfm(self.init_wt)
      self.Ylh,self.Yhl,self.Yhh = torch.unbind(self.Yh[0], dim= 2)

      self.target = torch.from_numpy(np.transpose(Label)).type(torch.long)
      self.data=list(zip(self.Yll,self.Ylh,self.Yhl,self.Yhh, self.target))

      self.train_size = int(self.split_ratio*len(self.data))
      self.test_size = len(self.data) - self.train_size
      
    def _generate(self):
      self.train_dataset, self.test_dataset = torch.utils.data.random_split(self.data, [self.train_size, self.test_size])
      return self.train_dataset, self.test_dataset
      
    def __getitem__(self, index):
      
      images=self.images[index]
      target=self.target[index]
      return images, target
     
    def __len__(self):
      return len(self.data)



### 4.1. Declare the link as well as arguments to config the **CUSTOM dataset**


In [None]:
mat_path = ('/content/drive/My Drive/Colab Notebooks/Master Internship/dataset/YaleB/YaleB_192x168.mat')
split_ratio = 0.9
num_classes = 40
size= [96,84] #size of input image
_use_data = [False,False,True]

_data_chose= input("Choose data to be observed (SVHN-RGB: 1, SVHN-Gray: 2, .mat File: 3):\n")

if _data_chose=="1":
  _use_data =[True,False,False]
elif _data_chose=="2":
  _use_data =[False, True, False]
elif _data_chose=="3":
  _use_data =[False, False, True]
else:
  print("Wrong argument ...: SVHNRGB or SVHNGRAY or MAT but (%s)" %(_data_chose))
  

# Pytorch data
# Configure data loader
if _use_data[0]:
  print("===> Use dataset from Pytorch (RGB)")
  dataset_train_SVHN_RGB = datasets.SVHN(root='./SVHN_RGB/', 
                                split='train', 
                                download=True,
                                transform=transforms.Compose([transforms.ToTensor(),
                                                              transforms.Normalize((0.5,), 
                                                                                  (0.5, ))]))
  dataset_test_SVHN_RGB = datasets.SVHN(root='./SVHN_RGB/', 
                      split='test', 
                      download=True,
                      transform=transforms.Compose([transforms.ToTensor(),
                                                    transforms.Normalize((0.5,), 
                                                                        (0.5,)),]))
  dataloader_train = torch.utils.data.DataLoader( dataset_train_SVHN_RGB, 
                                                  batch_size=dataset_train.data.shape[0],
                                                  num_workers=4, 
                                                  pin_memory=True,)
  dataloader_test = torch.utils.data.DataLoader(dataset_test_SVHN_RGB, 
                                                batch_size=dataset_test.data.shape[0],
                                                num_workers=4, 
                                                pin_memory=True,)

  if dataloader_train.dataset:
    print("\n Load Pytorch dataset (SVHN-RGB) successfully ... \n", dataloader_train.dataset)
    print("\n Training set: %d samples, Testing set: %d samples " % (len(dataloader_train.dataset),len(dataloader_test.dataset)))

if _use_data[1]:
  print("===> Use dataset from Pytorch (Gray scale)")
  dataset_train_SVHN_Gray = MySVHNDataset(root='./SVHN_Gray/', 
                                          split='train', 
                                          download=True,
                                          transform=transforms.Compose([transforms.ToTensor(),
                                                              transforms.Normalize((0.5,), 
                                                                                  (0.5,))]))
  dataset_test_SVHN_Gray = MySVHNDataset(root='./SVHN_Gray/',
                                         split='test', 
                                         download=True,
                                         transform=transforms.Compose([transforms.ToTensor(),
                                                              transforms.Normalize((0.5,), 
                                                                                  (0.5,))]))
  dataloader_train = torch.utils.data.DataLoader( dataset_train_SVHN_Gray, 
                                                  batch_size=dataset_train.data.shape[0],
                                                  num_workers=4, 
                                                  pin_memory=True,)
  dataloader_test = torch.utils.data.DataLoader(dataset_test_SVHN_Gray, 
                                                batch_size=dataset_test.data.shape[0],
                                                num_workers=4, 
                                                pin_memory=True,)
  if dataloader_train.dataset:
    print("\n Load Pytorch dataset (SVHN-Gray) successfully ... \n", dataloader_train.dataset)
    print("\n Training set: %d samples, Testing set: %d samples " % (len(dataloader_train.dataset),len(dataloader_test.dataset)))

if _use_data[2]:
  print("===> Use default dataset from .mat file \n [path]: %s" %(mat_path))
  custom_data = MyDataset(mat_path,split_ratio=split_ratio, width = size[0], height=size[1])
  data_train, data_test = custom_data._generate()

  dataloader_train = torch.utils.data.DataLoader(dataset=data_train,
                                            num_workers=4,
                                            batch_size= 1800,
                                            pin_memory=True)

  dataloader_test = torch.utils.data.DataLoader(dataset=data_test,
                                            num_workers=4,
                                            batch_size= 200,
                                            pin_memory=True)
  if dataloader_train.dataset:
    print("\n Dataset change: Load CUSTOM data from .mat file successfully .... \n", dataloader_train.dataset)
    print("\n Training set: %d samples, Testing set: %d samples " % (len(dataloader_train.dataset),len(dataloader_test.dataset)))

Choose data to be observed (SVHN-RGB: 1, SVHN-Gray: 2, .mat File: 3):
3
===> Use default dataset from .mat file 
 [path]: /content/drive/My Drive/Colab Notebooks/Master Internship/dataset/YaleB/YaleB_192x168.mat


RuntimeError: ignored

### 8.1. Preparing the input

In [None]:
# Prepare input
    
for i, (inputs_train_LL,inputs_train_LH,inputs_train_HL,inputs_train_HH, labels_train) in enumerate(dataloader_train): # Dataloader for training set
  labels_train = labels_train.view(-1, 1)
  y_onehot_train = torch.FloatTensor(inputs_train_LL.shape[0], num_classes)
  y_onehot_train.zero_()
  y_onehot_train.scatter_(1, labels_train, 1).cuda()

for j , (inputs_test_LL,inputs_test_LH,inputs_test_HL,inputs_test_HH, labels_test) in enumerate(dataloader_test): # Dataloader for testing set
  labels_test = labels_test.view(-1, 1)
  y_onehot_test = torch.FloatTensor(inputs_test_LL.shape[0], num_classes)
  y_onehot_test.zero_()
  y_onehot_test.scatter_(1, labels_test, 1).cuda()


inputs_train_LL.shape



## 5. Init the **model**, **optimizer** and **improvechecker**



In [None]:
# Initialize VAE
model = VAEGT(split_ratio=split_ratio, batch_size= custom_data.__len__() , num_classes=50,imageshape=inputs_train_LL.shape[2])
model.cuda()

# Optimizers
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

# ImproveChecker
improvechecker = ImproveChecker(mode='min')

num_epoch = 10000 #number of training epoch
cal_epoch = 1000 #the epoch in which we calculate the accuracy

## 6. Class with the role of get the **parameters**, **weights** as well as **biases**

In [None]:
 # Get the parameter (sparse code, weights, biases)
 class Getparam():
    def __init__(self, param_name = 'sparsecode', model = model):
     self.param_name = param_name
     self.model = model
     self.bool_get= False
    def _getparam(self, param_name):
      for param_name_get, param in self.model.named_parameters():
        if param_name_get == param_name:
          self.bool_get = True
          print("Sucessfully: [%s]" %(param_name), param.data)
      if not self.bool_get:
        print("Error: [%s] parameter is not defined..." % param_name)
      return True


## 7. Class with the role of resulting the **metrics** containing:


1.   Error calculation function
2.   Applying the threshold to the coefficients



In [None]:
# Calculate the accuracy
class Evaluate():
  def __init__(self, coefLL= model.sparsecodeLL, coefLH= model.sparsecodeLH,coefHL= model.sparsecodeHL,coefHH= model.sparsecodeHH, train_labels = None, test_labels= None):
    self.coefLL= coefLL
    self.coefLH= coefLH
    self.coefHL= coefHL
    self.coefHH= coefHH

    self.train_labels= train_labels
    self.test_labels= test_labels

    self.class_LL = torch.zeros(int(torch.max(self.test_labels)))
    self.class_LH = torch.zeros(int(torch.max(self.test_labels)))
    self.class_HL = torch.zeros(int(torch.max(self.test_labels)))
    self.class_HH = torch.zeros(int(torch.max(self.test_labels)))

    self.prediction = torch.zeros(len((self.test_labels)))

  def _eval(self):
    CoefLL = torch.abs(self._get_threshold(self.coefLL.cpu()))
    CoefLH = torch.abs(self._get_threshold(self.coefLH.cpu()))
    CoefHL = torch.abs(self._get_threshold(self.coefHL.cpu()))
    CoefHH = torch.abs(self._get_threshold(self.coefHH.cpu()))


    for atom in range(0,len(self.test_labels)):
        xll = CoefLL[atom,:]
        xlh = CoefLH[atom,:]
        xhl = CoefHL[atom,:]
        xhh = CoefHH[atom,:]

        for l in range(1,torch.max(self.test_labels)+1):
            l_idx = np.array([j for j in range(0,len(self.train_labels)) if self.train_labels[j]==l]).astype(int)
            self.class_LL[int(l-1)] = sum(torch.abs(xll[l_idx]))
            self.class_LH[int(l-1)] = sum(torch.abs(xlh[l_idx]))
            self.class_HL[int(l-1)] = sum(torch.abs(xhl[l_idx]))
            self.class_HH[int(l-1)] = sum(torch.abs(xhh[l_idx]))

        self.prediction[atom] = torch.argmax(1/4*self.class_LL + 1/4*self.class_LH + 1/4*self.class_HL + 1/4*self.class_HH) +1

    # self.prediction = np.array(self.prediction)
    missrate = self._error_cal(self.test_labels, self.prediction)
    accuracy = 1 - missrate
    return accuracy

  def _error_cal(self, ground_truth, predicted_label):
    ground_truth=torch.squeeze(ground_truth,1)
    _error_value = (ground_truth != predicted_label).sum().item()
    missrate = _error_value/ (ground_truth.shape[0])
    return missrate

  def _get_threshold(self, coef, ro=0.1):
      if ro < 1:
          Cp = torch.zeros((coef.shape[0], coef.shape[1]))
          sorted,_=torch.sort(-torch.abs(coef), 0)
          S = torch.abs(sorted)
          _index = torch.argsort(-torch.abs(coef),0)
          for i in range(coef.shape[1]):
              cL1 = torch.sum(S[:, i], dtype = torch.float)
              stop = False
              csum = 0
              t = 0
              while (stop == False):
                  csum = csum + S[t, i]
                  if csum > ro * cL1:
                      stop = True
                      Cp[_index[0:t + 1, i], i] = coef[_index[0:t + 1, i], i]
                  t = t + 1
      else:
          Cp = coef

      return Cp

## 8. Training process

In [None]:
# Training process

model.train()
xim = DWTInverse(wave='haar').cuda()
for epoch in range(1, num_epoch):
    # Training
    optimizer.zero_grad()
    sparsecodeLL, z_LL, z_LL_c,x_LL,x_LL_c,sparsecodeLH,z_LH, z_LH_c, x_LH,x_LH_c,sparsecodeHL,z_HL, z_HL_c,x_HL,x_HL_c,sparsecodeHH,z_HH, z_HH_c,x_HH,x_HH_c = model(
        inputs_train_LL,inputs_train_LH,inputs_train_HL,inputs_train_HH,  y_onehot_train,inputs_test_LL,inputs_test_LH,inputs_test_HL,inputs_test_HH,  y_onehot_test)
    loss = loss_fn(sparsecodeLL, z_LL, z_LL_c,x_LL,x_LL_c,sparsecodeLH,z_LH, z_LH_c, x_LH,x_LH_c,sparsecodeHL,z_HL, z_HL_c,x_HL,x_HL_c,sparsecodeHH,z_HH, z_HH_c,x_HH,x_HH_c)
    loss.backward()
    optimizer.step()

    print("\n[EPOCH %.3d] Loss: %.6f" % (epoch, loss.item()))
    if (epoch % cal_epoch == 0):
      # ImproveChecker (check the best result)
      accuracy= Evaluate( coefLL= model.sparsecodeLL, coefLH= model.sparsecodeLH,coefHL= model.sparsecodeHL,coefHH= model.sparsecodeHH, train_labels=labels_train, test_labels=labels_test)._eval()
      print("=================>_Accuracy: %.6f" % (accuracy))

      # Save checkpoint
      if improvechecker._check(loss.item()):
        checkpoint = dict(
          epoch=epoch,
          loss=loss.item(),
          state_dict=model.state_dict(),
          optimizer=optimizer.state_dict(),
        )
        save_file = os.path.join('/content/drive/My Drive/Colab Notebooks/Variational-Autoencoder/checkpoints/', "CAE.pth")
        torch.save(checkpoint, save_file)
        print("Best checkpoint is saved at %s" % (save_file))

