<a href="https://colab.research.google.com/github/yakovsushenok/Thesis/blob/main/Resnet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import zipfile
import os
import pandas as pd
import math, random
import torch
import torchaudio
from torchaudio import transforms
from torch.utils.data import DataLoader, Dataset, random_split
import torch.nn.functional as F
import torch.nn as nn
from torch.nn import init
from google.colab import drive
from sklearn.model_selection import StratifiedShuffleSplit
drive.mount('/content/gdrive')
import time
import matplotlib.pyplot as plt
import numpy as np
import numpy.matlib
try:
    from scipy.fftpack import fft, ifft
except ImportError:
    from numpy.fft import fft, ifft
from scipy.signal import lfilter
import scipy.io as sio
from scipy import signal
import gc
import h5py
from torchsummary import summary
random.seed(0)
np.random.seed(0)
torch.manual_seed(0)

Mounted at /content/gdrive


<torch._C.Generator at 0x7fc2b7368ad0>

In [3]:
df_500 = pd.read_csv("/content/gdrive/MyDrive/train_metadata_more_than_500.csv")
df_500['primary_label'] = df_500['primary_label'].apply(lambda x: x - 1)
df_500 = df_500[['relative_path', 'primary_label']]
df_500_toy = pd.read_csv("/content/gdrive/MyDrive/df_500_toy.csv")
toy_ind = list(df_500_toy['Unnamed: 0'])

class H5DS(Dataset):
  def __init__(self, df, path, toy_ind):
    self.path = path
    self.data = h5py.File(self.path, 'r')['data']
    self.df = df
    self.toy_ind = toy_ind
  
  def __len__(self):
    return len(self.df)    
    
  def __getitem__(self, idx):
   
   return (self.data[toy_ind[idx]], torch.tensor(self.df['primary_label'].iloc[idx]))

device = torch.device("cuda:0" if torch.cuda.is_available() else  "cpu") #

def inference(model, val_dl):
  correct_prediction = 0
  total_prediction = 0

  # Disable gradient updates
  with torch.no_grad():
    for data in val_dl:
      # Get the input features and target labels, and put them on the GPU
      inputs, labels = data[0].to(device), data[1].to(device)

      # Normalize the inputs
      # inputs_m, inputs_s = inputs.mean(), inputs.std()
      # inputs = (7 + (inputs - inputs_m) / inputs_s)

      inputs -= inputs.min(1, keepdim=True)[0]
      inputs /= inputs.max(1, keepdim=True)[0]

      inputs = inputs[None, :, :, :]
      inputs = inputs.permute(1, 0, 2, 3)
      inputs = torch.cat([inputs[:, :, :round(inputs.shape[2]/3), :],inputs[:, :, round(inputs.shape[2]/3):round(inputs.shape[2]*2/3), :],inputs[:, :, round(inputs.shape[2]*2/3):, : ]], dim = 1)

      # Get predictions
      outputs = model(inputs.float())

      # Get the predicted class with the highest score
      _, prediction = torch.max(outputs,1)
      # Count of predictions that matched the target label
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]
    
  acc = correct_prediction/total_prediction
  print(f'Val Accuracy: {acc:.2f}')

# Resnet Model

[Link](https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html) for saving best model (how to do it)

In [None]:
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, downsample):
        super().__init__()
        if downsample:
            self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1)
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
            self.shortcut = nn.Sequential()

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)

    def forward(self, input):
        shortcut = self.shortcut(input)
        input = nn.ReLU()(self.bn1(self.conv1(input)))
        input = nn.ReLU()(self.bn2(self.conv2(input)))
        input = input + shortcut
        return nn.ReLU()(input)

class ResBottleneckBlock(nn.Module):
    def __init__(self, in_channels, out_channels, downsample):
        super().__init__()
        self.downsample = downsample
        self.conv1 = nn.Conv2d(in_channels, out_channels//4, kernel_size=1, stride=1)
        self.conv2 = nn.Conv2d(out_channels//4, out_channels//4, kernel_size=3, stride=2 if downsample else 1, padding=1)
        self.conv3 = nn.Conv2d(out_channels//4, out_channels, kernel_size=1, stride=1)
        self.shortcut = nn.Sequential()

        if self.downsample or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2 if self.downsample else 1),
                nn.BatchNorm2d(out_channels)
            )

        self.bn1 = nn.BatchNorm2d(out_channels//4)
        self.bn2 = nn.BatchNorm2d(out_channels//4)
        self.bn3 = nn.BatchNorm2d(out_channels)

    def forward(self, input):
        shortcut = self.shortcut(input)
        input = nn.ReLU()(self.bn1(self.conv1(input)))
        input = nn.ReLU()(self.bn2(self.conv2(input)))
        input = nn.ReLU()(self.bn3(self.conv3(input)))
        input = input + shortcut
        return nn.ReLU()(input)

class ResNet(nn.Module):
    def __init__(self, in_channels, resblock, repeat, useBottleneck=False, outputs=12):
        super().__init__()
        self.layer0 = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=7, stride=2, padding=3),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        if useBottleneck:
            filters = [64, 256, 512, 1024, 2048]
        else:
            filters = [64, 64, 128, 256, 512]

        self.layer1 = nn.Sequential()
        self.layer1.add_module('conv2_1', resblock(filters[0], filters[1], downsample=False))
        for i in range(1, repeat[0]):
                self.layer1.add_module('conv2_%d'%(i+1,), resblock(filters[1], filters[1], downsample=False))

        self.layer2 = nn.Sequential()
        self.layer2.add_module('conv3_1', resblock(filters[1], filters[2], downsample=True))
        for i in range(1, repeat[1]):
                self.layer2.add_module('conv3_%d' % (i+1,), resblock(filters[2], filters[2], downsample=False))

        self.layer3 = nn.Sequential()
        self.layer3.add_module('conv4_1', resblock(filters[2], filters[3], downsample=True))
        for i in range(1, repeat[2]):
            self.layer3.add_module('conv2_%d' % (i+1,), resblock(filters[3], filters[3], downsample=False))

        self.layer4 = nn.Sequential()
        self.layer4.add_module('conv5_1', resblock(filters[3], filters[4], downsample=True))
        for i in range(1, repeat[3]):
            self.layer4.add_module('conv3_%d'%(i+1,), resblock(filters[4], filters[4], downsample=False))

        self.gap = torch.nn.AdaptiveAvgPool2d(1)
        self.fc = torch.nn.Linear(filters[4], outputs)

    def forward(self, input):
        input = self.layer0(input)
        input = self.layer1(input)
        input = self.layer2(input)
        input = self.layer3(input)
        input = self.layer4(input)
        input = self.gap(input)
        input = torch.flatten(input, start_dim=1)
        input = self.fc(input)

        return input
class H5DS(Dataset):
  def __init__(self, df, path):
    self.path = path
    self.data = h5py.File(self.path, 'r')['data']
    self.df = df
    self.toy_ind = toy_ind
  
  def __len__(self):
    return len(self.df)    
    
  def __getitem__(self, idx):
   
   return (self.data[idx], torch.tensor(self.df['primary_label'].iloc[idx]))

device = torch.device("cuda:0" if torch.cuda.is_available() else  "cpu") #

#myModel = ResNet(3, ResBlock, [2, 2, 2, 2], useBottleneck=False, outputs=12) # resnet18
#myModel = ResNet(3, ResBottleneckBlock, [3, 4, 6, 3], useBottleneck=True, outputs=12) # resnet50
myModel = ResNet(3, ResBottleneckBlock, [3, 8, 36, 3], useBottleneck=True, outputs=12)
device = torch.device("cuda:0" if torch.cuda.is_available() else  "cpu") #
myModel = myModel.to(device)




def training(model, train_dl, num_epochs, val_dl):
  # Loss Function, Optimizer 
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(), lr= 10**(-5))
  
  # scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.00001,
  #                                               steps_per_epoch=int(len(train_dl)),
  #                                               epochs=num_epochs,
  #                                               anneal_strategy='linear')

  # Epoch iterator
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0
    indices = []
    # Batch iterator
    j = 0
    for i, data in enumerate(train_dl):

        inputs, labels = torch.tensor(data[0]).to(device), torch.tensor(data[1]).to(device) # Get the input features and target labels, and put them on the GPU
        if torch.isnan(torch.tensor(data[0])).any() == True:
          j += 1
          continue
        # Normalize the inputs - 1 (ignore the 7). We normalize the tensors using the mean from the whole dataset, not per-image.https://stats.stackexchange.com/questions/211436/why-normalize-images-by-subtracting-datasets-image-mean-instead-of-the-current
        # inputs_m, inputs_s = inputs.mean(), inputs.std()
        # inputs = (7 + (inputs - inputs_m) / inputs_s)
        
        # Normalize the inputs - 2
        
        inputs -= inputs.min(1, keepdim=True)[0]
        inputs /= inputs.max(1, keepdim=True)[0]

        inputs = inputs[None, :, :, :]
        inputs = inputs.permute(1, 0, 2, 3)
        inputs = torch.cat([inputs[:, :, :round(inputs.shape[2]/3), :],inputs[:, :, round(inputs.shape[2]/3):round(inputs.shape[2]*2/3), :],inputs[:, :, round(inputs.shape[2]*2/3):, : ]], dim = 1)

        optimizer.zero_grad() # Zero the parameter gradients

        # forward + backward + optimize
        outputs = model(inputs.float())
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        # scheduler.step()

        running_loss += loss.item() # Keep stats for Loss and Accuracy

        _, prediction = torch.max(outputs,1) # Get the predicted class with the highest score
        #print(prediction, labels)
        correct_prediction += (prediction == labels).sum().item() # Count of predictions that matched the target label
        total_prediction += prediction.shape[0]

    
    # Print stats at the end of the epoch
    print(j)
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch + 1}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')
    print(f"TESTING:")
    inference(model, val_dl)
    print("\n")

  print('Finished Training')


path = '/content/gdrive/MyDrive/df_train_tensor_60_fs50-8k.h5' #               '/content/gdrive/MyDrive/df_train_tensor_60_500.h5'
NUM_EPOCHS = 40
# Initializing the dataset
myds = H5DS(df_500, path)
# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items*(0.80))
num_val = num_items - num_train
print(num_train,num_val)
train_ds, val_ds = random_split(myds, [num_train, num_val])
# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=1, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=1, shuffle=False)

training(myModel, train_dl, NUM_EPOCHS, val_dl) # Training

4800 1200




0
Epoch: 1, Loss: 2.30, Accuracy: 0.20
TESTING:
Val Accuracy: 0.31


0
Epoch: 2, Loss: 1.84, Accuracy: 0.38
TESTING:
Val Accuracy: 0.48


0
Epoch: 3, Loss: 1.46, Accuracy: 0.52
TESTING:
Val Accuracy: 0.55


0
Epoch: 4, Loss: 1.14, Accuracy: 0.64
TESTING:
Val Accuracy: 0.63


0
Epoch: 5, Loss: 0.90, Accuracy: 0.72
TESTING:
Val Accuracy: 0.68


0
Epoch: 6, Loss: 0.73, Accuracy: 0.77
TESTING:
Val Accuracy: 0.71


0
Epoch: 7, Loss: 0.60, Accuracy: 0.81
TESTING:
Val Accuracy: 0.69


0
Epoch: 8, Loss: 0.49, Accuracy: 0.85
TESTING:
Val Accuracy: 0.66


0
Epoch: 9, Loss: 0.43, Accuracy: 0.87
TESTING:
Val Accuracy: 0.66


0
Epoch: 10, Loss: 0.37, Accuracy: 0.89
TESTING:
Val Accuracy: 0.72


0
Epoch: 11, Loss: 0.31, Accuracy: 0.90
TESTING:
Val Accuracy: 0.68


0
Epoch: 12, Loss: 0.30, Accuracy: 0.91
TESTING:
Val Accuracy: 0.71


0
Epoch: 13, Loss: 0.25, Accuracy: 0.92
TESTING:
Val Accuracy: 0.71




# Results:

---

Sample Length (s) : 60

lr= 10**(-5)

Model: Resnet18

Normalization: Yes

Number of classes: 12

Mini-batch size = 1

Number of Samples in training: 2000*(0.8)

`fs = [50, 8000]`

Best Val Accurary = 0.69 epoch 14/30 $|$ 0.81 epoch 17

Input shape: `(768, 6025)`


Trainable params: 11,181,000


---

Sample Length (s) : 60

lr= 10**(-5)

Model: Resnet 50

Normalization: Yes

Number of classes: 12

Mini-batch size = 1

Number of Samples in training: 2000*(0.8)

`fs = [50, 8000]`

Best Val Accurary = 0.51 epoch 11/15

Input shape: `(768, 6025)`


Trainable params: 23,559,180
