In [None]:
# import header files
%matplotlib inline
import torch
import torch.nn as nn
import torchvision
from functools import partial
from dataclasses import dataclass
from collections import OrderedDict
import glob
import os
import random
import tensorflow as tf
from tensorflow import keras
import numpy as np
import seaborn as sn
import pandas as pd
from matplotlib import pyplot as plt
from tqdm import tqdm
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_recall_fscore_support
import time
import copy
import tqdm
import torch
import random
from PIL import Image
import torch.optim as optim
from torchvision import models
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset,DataLoader
from torch import nn
from torch.utils.data import DataLoader, ConcatDataset
from torchvision import transforms
from sklearn.model_selection import KFold

In [None]:
# load my google drive
import os
def auth_gdrive():
  from google.colab import drive
  if os.path.exists('content/gdrive/My Drive'): return
  drive.mount('/content/gdrive')
def load_gdrive_dataset():
  loader_assets = 'CVPollen23E.zip'
  auth_gdrive()

In [None]:
# mount my google drive
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
load_gdrive_dataset()

Mounted at /content/gdrive
Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
# unzip  dataset
!unzip "/content/gdrive/MyDrive/CVPollen23E.zip"

Archive:  /content/gdrive/MyDrive/CVPollen23E.zip
   creating: CVPollen23E/
   creating: CVPollen23E/test/
   creating: CVPollen23E/test/1.Anadenanthera/
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_15.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_16.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_17.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_18.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_19.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_20.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_21.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_22.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_23.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_24.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_25.jpg  
  inflating: CVPollen23E/test/1.Anadenanthera/anadenanthera_26.jpg  
  inflating: CVPol

In [None]:
# define transforms
train_transforms = torchvision.transforms.Compose([torchvision.transforms.RandomRotation(30),
                                       torchvision.transforms.Resize((84, 84)),
                                       torchvision.transforms.RandomHorizontalFlip(),
                                       torchvision.transforms.ToTensor(),
                                       torchvision.transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

In [None]:
# get data
dataset_train_part = torchvision.datasets.ImageFolder("/content/CVPollen23E/train/", transform=train_transforms)
dataset_test_part = torchvision.datasets.ImageFolder("/content/CVPollen23E/test/", transform=train_transforms)
dataset = ConcatDataset([dataset_train_part, dataset_test_part])

In [None]:
# define the suggested attention block (VDAB)
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=16):
        super(ChannelAttention, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.fc1   = nn.Conv2d(in_planes, in_planes // 16, 1, bias=False)
        self.relu1 = nn.ReLU()
        self.fc2   = nn.Conv2d(in_planes // 16, in_planes, 1, bias=False)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        avg_out = self.fc2(self.relu1(self.fc1(self.avg_pool(x))))
        max_out = self.fc2(self.relu1(self.fc1(self.max_pool(x))))
        out = avg_out + max_out
        return self.sigmoid(out)
class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=3):
        super(SpatialAttention, self).__init__()
        assert kernel_size in (3, 7), 'kernel size must be 3 or 7'
        padding = 1 if kernel_size == 3 else 3
        self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out, _ = torch.max(x, dim=1, keepdim=True)
        x = torch.cat([avg_out, max_out], dim=1)
        x = self.conv1(x)
        return self.sigmoid(x)
class VDAB(nn.Module):
    def __init__(self, in_planes):
        super(VDAB, self).__init__()
        self.ca = ChannelAttention(in_planes)
        self.sa = SpatialAttention()
    def forward(self, x):
        out = x * (self.ca(x))
        out = out * (self.sa(out))
        return out

In [None]:
# define the proposed method (RCA-Net)
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

        self.vdab = VDAB(planes)

    def forward(self, x):
        residual = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.vdab(out)
        out += self.shortcut(residual)
        out = F.relu(out)
        return out

class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

        self.vdab = VDAB(self.expansion*planes)

    def forward(self, x):
        residual = x
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out = self.vdab(out)
        out += self.shortcut(residual)
        out = F.relu(out)
        return out

class ResNetVDAB(nn.Module):
    def __init__(self, block, num_blocks, num_classes=23):
        super(ResNetVDAB, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(2048*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

def ResNet18VDAB():
    return ResNetVDAB(BasicBlock, [2,2,2,2])

In [None]:
# print the model
import math
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet18VDAB()
model.to(device)

ResNetVDAB(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
      (vdab): VDAB(
        (ca): ChannelAttention(
          (avg_pool): AdaptiveAvgPool2d(output_size=1)
          (max_pool): AdaptiveMaxPool2d(output_size=1)
          (fc1): Conv2d(64, 4, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (relu1): ReLU()
          (fc2): Conv2d(4, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (sigmoid)

In [None]:
# print summary of the model
from torchvision import models
from torchsummary import summary
summary(model, (3, 84, 84))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 84, 84]           1,728
       BatchNorm2d-2           [-1, 64, 84, 84]             128
            Conv2d-3           [-1, 64, 84, 84]          36,864
       BatchNorm2d-4           [-1, 64, 84, 84]             128
            Conv2d-5           [-1, 64, 84, 84]          36,864
       BatchNorm2d-6           [-1, 64, 84, 84]             128
 AdaptiveAvgPool2d-7             [-1, 64, 1, 1]               0
            Conv2d-8              [-1, 4, 1, 1]             256
              ReLU-9              [-1, 4, 1, 1]               0
           Conv2d-10             [-1, 64, 1, 1]             256
AdaptiveMaxPool2d-11             [-1, 64, 1, 1]               0
           Conv2d-12              [-1, 4, 1, 1]             256
             ReLU-13              [-1, 4, 1, 1]               0
           Conv2d-14             [-1, 6

In [None]:
if __name__ == '__main__':
  # Configuration options
  #k-fold cross-validation requires at least 2
  k_folds = 5
  num_epochs = 10
  loss_function = nn.CrossEntropyLoss()
  # For fold results
  results = {}
  # Set fixed random number seed
  torch.manual_seed(42)

In [None]:
  # Define the K-fold Cross Validator
  kfold = KFold(n_splits=k_folds, shuffle=True)
  # Start print
  print('--------------------------------')

--------------------------------


In [None]:
  # K-fold Cross Validation model evaluation
  for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)):
    # Print
    print(f'Fold-{fold}')
    print('--------------------------------')
    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
    # Define data loaders for training and testing data in this fold
    trainloader = torch.utils.data.DataLoader(dataset, batch_size=16, sampler=train_subsampler)
    testloader = torch.utils.data.DataLoader(dataset, batch_size=16, sampler=test_subsampler)
    # Init the neural network
    model = ResNet18VDAB()
    # Initialize optimizer
    optimizer = torch.optim.SGD(model.parameters(), lr=5e-3)
    # Run the training loop for defined number of epochs
    # Print about training
    print('Starting training')
    for epoch in range(0, num_epochs):
      # Print epoch
      print(f'Starting epoch-{epoch+1}')
      # Set current loss value
      current_loss = 0.0
    # Evaluation for this fold
    correct, total = 0, 0
    with torch.no_grad():
      # Iterate over the test data and generate predictions
      # Iterate over the DataLoader for training data
      for i, data in enumerate(trainloader, 0):
        # Get inputs
        inputs, labels = data
        #inputs, labels = inputs.cuda(), labels.cuda() # for using data in GPU
        # Zero the gradients
        optimizer.zero_grad()
        # Perform forward pass
        outputs = model(inputs)
        # Set total and correct
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
      # Print accuracy
      print('Training accuracy for fold-%d: %.4f %%' % (fold, 100.0 * correct / total))
      print('--------------------------------')
      results[fold] = 100.0 * (correct / total)
# Process is complete.
print('Training process has finished. Saving training model.')

Fold-0
--------------------------------
Starting training
Starting epoch-1
Starting epoch-2
Starting epoch-3
Starting epoch-4
Starting epoch-5
Starting epoch-6
Starting epoch-7
Starting epoch-8
Starting epoch-9
Starting epoch-10
Training accuracy for fold-0: 98.3523 %
--------------------------------
Fold-1
--------------------------------
Starting training
Starting epoch-1
Starting epoch-2
Starting epoch-3
Starting epoch-4
Starting epoch-5
Starting epoch-6
Starting epoch-7
Starting epoch-8
Starting epoch-9
Starting epoch-10
Training accuracy for fold-1: 98.2014 %
--------------------------------
Fold-2
--------------------------------
Starting training
Starting epoch-1
Starting epoch-2
Starting epoch-3
Starting epoch-4
Starting epoch-5
Starting epoch-6
Starting epoch-7
Starting epoch-8
Starting epoch-9
Starting epoch-10
Training accuracy for fold-2: 97.9194 %
--------------------------------
Fold-3
--------------------------------
Starting training
Starting epoch-1
Starting epoch-2
St

In [None]:
  # Print fold results
  print(f'K-fold cross validation results for {k_folds}-Folds')
  print('--------------------------------')
  sum = 0.0
  for key, value in results.items():
    print(f'Training accuracy of Fold-{key}: %.4f %%' % value)
    sum += value
    Average=sum/len(results.items())
  print(f'Average training accuracy: %.4f %%' % Average)

K-fold cross validation results for 5-Folds
--------------------------------
Training accuracy of Fold-0: 98.3523 %
Training accuracy of Fold-1: 98.2014 %
Training accuracy of Fold-2: 97.9194 %
Training accuracy of Fold-3: 96.2903 %
Training accuracy of Fold-4: 98.2447 %
Average training accuracy: 97.8016 %


In [None]:
  # K-fold Cross Validation model evaluation
  for fold, (train_ids, test_ids) in enumerate(kfold.split(dataset)):
    # Print
    print(f'Fold-{fold}')
    print('--------------------------------')
    # Saving the model
    save_path = f'./model-fold-{fold}.pth'
    torch.save(model.state_dict(), save_path)
    # Iterate over the test data and generate predictions
    # Run the test loop for defined number of epochs
    # Print about test
    print('Starting test')
    for epoch in range(0, num_epochs):
      # Print epoch
      print(f'Starting epoch-{epoch+1}')
      # Set current loss value
      current_loss = 0.0
    # Evaluation for this fold
    correct, total = 0, 0
    with torch.no_grad():
      # Iterate over the test data and generate predictions
      # Iterate over the DataLoader for test data
      for i, data in enumerate(testloader, 0):
        # Get inputs
        inputs, labels = data
        #inputs, labels = inputs.cuda(), labels.cuda() # for using data in GPU
        # Generate outputs
        outputs = model(inputs)
        # Set total and correct
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
      # Print accuracy
      print('Test accuracy for fold-%d: %.4f %%' % (fold, 100.0 * correct / total))
      print('--------------------------------')
      results[fold] = 100.0 * (correct / total)
# Process is complete.
print('Test process has finished. Saving test model.')

Fold-0
--------------------------------
Starting test
Starting epoch-1
Starting epoch-2
Starting epoch-3
Starting epoch-4
Starting epoch-5
Starting epoch-6
Starting epoch-7
Starting epoch-8
Starting epoch-9
Starting epoch-10
Test accuracy for fold-0: 96.1436 %
--------------------------------
Fold-1
--------------------------------
Starting test
Starting epoch-1
Starting epoch-2
Starting epoch-3
Starting epoch-4
Starting epoch-5
Starting epoch-6
Starting epoch-7
Starting epoch-8
Starting epoch-9
Starting epoch-10
Test accuracy for fold-1: 96.1899 %
--------------------------------
Fold-2
--------------------------------
Starting test
Starting epoch-1
Starting epoch-2
Starting epoch-3
Starting epoch-4
Starting epoch-5
Starting epoch-6
Starting epoch-7
Starting epoch-8
Starting epoch-9
Starting epoch-10
Test accuracy for fold-2: 96.3827 %
--------------------------------
Fold-3
--------------------------------
Starting test
Starting epoch-1
Starting epoch-2
Starting epoch-3
Starting epoc

In [None]:
  # Print fold results
  print(f'K-fold cross validation results for {k_folds}-Folds')
  print('--------------------------------')
  sum = 0.0
  for key, value in results.items():
    print(f'Test ccuracy of Fold-{key}: %.4f %%' % value)
    sum += value
    Average=sum/len(results.items())
  print(f'Average test accuracy: %.4f %%' % Average)

K-fold cross validation results for 5-Folds
--------------------------------
Test ccuracy of Fold-0: 96.1436 %
Test ccuracy of Fold-1: 96.1899 %
Test ccuracy of Fold-2: 96.3827 %
Test ccuracy of Fold-3: 95.9905 %
Test ccuracy of Fold-4: 96.0217 %
Average test accuracy: 96.1456 %
