In [1]:
import os
import random
import shutil

import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

In [79]:
torch.__version__

'2.0.1'

# 1. Get data

In [4]:

dataset_dir = "covid-dataset"
class_names = ['COVID', 'Normal', 'Viral Pneumonia']
train_dir = os.path.join(dataset_dir, 'train')
test_dir = os.path.join(dataset_dir, "test")

# Make directories with each class name
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)
for class_name in class_names:
    os.makedirs(os.path.join(train_dir, class_name), exist_ok=True)
    os.makedirs(os.path.join(test_dir, class_name), exist_ok=True)

# Train and test ratio
split_ratio = 0.8
covid_train_split = int(len(os.listdir("covid-dataset/COVID/images/")) * split_ratio)
normal_train_split = int(len(os.listdir("covid-dataset/Normal/images/")) * split_ratio)
pneumonia_train_split = int(len(os.listdir("covid-dataset/Viral Pneumonia/images/")) * split_ratio)

# Shuffle directories
for class_name in class_names:
    random.shuffle(os.listdir(dataset_dir + "/" + class_name + "/images"))

# Move images into train and test
covid_src_images = os.listdir("covid-dataset/COVID/images/")
train_covid_images = covid_src_images[:covid_train_split]
test_covid_images = covid_src_images[covid_train_split:]
for image in train_covid_images:
    image_path = os.path.join("covid-dataset/COVID/images/", image)
    shutil.move(image_path, "covid-dataset/train/COVID")
for image in test_covid_images:
    image_path = os.path.join("covid-dataset/COVID/images/", image)
    shutil.move(image_path, "covid-dataset/test/COVID")

normal_src_images = os.listdir("covid-dataset/Normal/images/")
train_normal_images = normal_src_images[:normal_train_split]
test_normal_images = normal_src_images[normal_train_split:]
for image in train_normal_images:
    image_path = os.path.join("covid-dataset/Normal/images/", image)
    shutil.move(image_path, "covid-dataset/train/Normal")
for image in test_normal_images:
    image_path = os.path.join("covid-dataset/Normal/images/", image)
    shutil.move(image_path, "covid-dataset/test/Normal")

pneumonia_src_images = os.listdir("covid-dataset/Viral Pneumonia/images/")
train_pneumonia_images = pneumonia_src_images[:pneumonia_train_split]
test_pneumonia_images = pneumonia_src_images[pneumonia_train_split:]
for image in train_pneumonia_images:
    image_path = os.path.join("covid-dataset/Viral Pneumonia/images/", image)
    shutil.move(image_path, "covid-dataset/train/Viral Pneumonia")
for image in test_pneumonia_images:
    image_path = os.path.join("covid-dataset/Viral Pneumonia/images/", image)
    shutil.move(image_path, "covid-dataset/test/Viral Pneumonia")

print("Files moved successfully!")



FileNotFoundError: [Errno 2] No such file or directory: 'covid-dataset/COVID/images/'

# 2. Dataset and DataLoader

In [5]:
# HYPERPARAMETERS
BATCH_SIZE=32
NUM_WORKERS=os.cpu_count()
LEARNING_RATE=0.01
EPOCHS = 1

In [6]:
data_transforms = transforms.Compose([
    transforms.Resize(256),
    transforms.ToTensor()
])

train_data = datasets.ImageFolder(root=train_dir, transform=data_transforms)
test_data = datasets.ImageFolder(root=test_dir, transform=data_transforms)
class_names = train_data.classes
print(class_names)
print(f"Train data: {train_data} \n Test data: {test_data}")

['COVID', 'Normal', 'Viral Pneumonia']
Train data: Dataset ImageFolder
    Number of datapoints: 12121
    Root location: covid-dataset/train
    StandardTransform
Transform: Compose(
               Resize(size=256, interpolation=bilinear, max_size=None, antialias=warn)
               ToTensor()
           ) 
 Test data: Dataset ImageFolder
    Number of datapoints: 3032
    Root location: covid-dataset/test
    StandardTransform
Transform: Compose(
               Resize(size=256, interpolation=bilinear, max_size=None, antialias=warn)
               ToTensor()
           )


In [7]:
train_dataloader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, shuffle=True)
test_dataloader = DataLoader(dataset=test_data,batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, shuffle=True)

# 2.1 Turn it into a script

In [None]:
os.makedirs("scripts", exist_ok=True)

In [None]:
%%writefile scripts/data_setup.py
"""
Putting data into Imagefolder and Dataloader
"""

import os
from torch.utils.data import DataLoader
from torchvision import datasets

def create_dataloaders(train_dir, test_dir, transform, batch_size, num_workers):
    train_data = datasets.ImageFolder(root=train_dir, transform=transform)
    test_data = datasets.ImageFolder(root=test_dir, transform=transform)

    train_dataloader = DataLoader(dataset=train_data, batch_size=batch_size, num_workers=num_workers, shuffle=True)
    test_dataloader = DataLoader(dataset=test_data,batch_size=batch_size, num_workers=num_workers)

    class_names = train_data.classes

    return train_dataloader, test_dataloader, class_names

# 3.1 Model (CovidAid)

In [50]:

class CovidAidModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.covid_aid_1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(8),
            nn.LeakyReLU()
        )
        self.covid_aid_2 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.LeakyReLU()
        )
        self.covid_aid_3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU()
        )
        self.covid_aid_4 = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=256, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU()
        )
        self.covid_aid_5 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU()
        )
        self.covid_aid_6 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU()
        )
        self.covid_aid_7 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=3, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(3),
            nn.LeakyReLU()
        )
        
        self.covid_aid_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=32, out_channels=16, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),
        )
        self.covid_aid_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=64, out_channels=32, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),
        )
        self.covid_aid_block_3 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=128, out_channels=64, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),
        )

        self.covid_aid_block_4 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(),
        )

        self.maxpool_1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.maxpool_2 = nn.MaxPool2d(kernel_size=1, stride=1)
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(363, 3)
        self.softmax = nn.Softmax()

    def forward(self, x):
        x = self.covid_aid_1(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_2(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_1(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_2(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_3(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_4(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_3(x)
        x = self.covid_aid_4(x)
        x = self.covid_aid_5(x)
        x = self.covid_aid_6(x)
        x = self.covid_aid_7(x) # [1, 3, 11, 11]
        x = self.flatten(x) # [1, 363]
        x = self.linear(x) # [1,3]
        x = self.softmax(x)
        return x

In [41]:
# Dummy forward pass to test if it works
# batches of images and its label
img_batch, label_batch = next(iter(train_dataloader))

# testibg model to see if it works
img_single, label_single = img_batch[0].unsqueeze(dim=0), label_batch[0]
print(f"Single image.shape: {img_single.shape}")

Single image.shape: torch.Size([1, 3, 256, 256])


In [51]:
# test_img = torch.randn(32,3,256,256)
# print(test_img.shape)
model = CovidAidModel()
print(f"Output shape: {model(img_single).shape}")
print(model(img_single))

Output shape: torch.Size([1, 3])
tensor([[0.4324, 0.2526, 0.3150]], grad_fn=<SoftmaxBackward0>)


  x = self.softmax(x)


# 3.1.1 CovidAid Script mode

In [None]:
%%writefile scripts/covid_aid.py
"""
Contains code about CovidAid Model. Original paper: https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=9418407
"""
import torch
from torch import nn

class CovidAidModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.covid_aid_1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(8),
            nn.LeakyReLU()
        )
        self.covid_aid_2 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.LeakyReLU()
        )
        self.covid_aid_3 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.LeakyReLU()
        )
        self.covid_aid_4 = nn.Sequential(
            nn.Conv2d(in_channels=512, out_channels=256, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU()
        )
        self.covid_aid_5 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU()
        )
        self.covid_aid_6 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU()
        )
        self.covid_aid_7 = nn.Sequential(
            nn.Conv2d(in_channels=256, out_channels=3, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(3),
            nn.LeakyReLU()
        )
        
        self.covid_aid_block_1 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=32, out_channels=16, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),
        )
        self.covid_aid_block_2 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=64, out_channels=32, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),
        )
        self.covid_aid_block_3 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=128, out_channels=64, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),
        )

        self.covid_aid_block_4 = nn.Sequential(
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=256, out_channels=128, kernel_size=1, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(),

            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.LeakyReLU(),
        )

        self.maxpool_1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.maxpool_2 = nn.MaxPool2d(kernel_size=1, stride=1)
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(363, 3)
        self.softmax = nn.Softmax()

    def forward(self, x):
        x = self.covid_aid_1(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_2(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_1(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_2(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_3(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_block_4(x)
        x = self.maxpool_1(x)
        x = self.covid_aid_3(x)
        x = self.covid_aid_4(x)
        x = self.covid_aid_5(x)
        x = self.covid_aid_6(x)
        x = self.covid_aid_7(x)
        x = self.flatten(x)
        x = self.linear(x) # [32,3]
        x = self.softmax(x)
        return x

# 3.2 Squeeze Net

In [80]:
""""
Squeeze Net takes input of 224 instead of 256, so have to scale it accordingly
"""
import torch
import torch.nn as nn

class FireModule(nn.Module):
    """
    s1x1: number of filters in squeeze layer (all 1x1)
    e1x1: number of 1x1 filters in expand layer
    e3x3: number of 3x3 filters in expand layer
    s1x1 > (e1x1 + e3x3)
    """
    def __init__(self, in_channels, s1x1,e1x1,e3x3,):
        super().__init__()
        self.squeeze = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=s1x1, kernel_size=1, stride=1, padding=0),
            nn.ReLU(inplace=True),
        )
        self.expand_e1x1 = nn.Sequential(
            nn.Conv2d(in_channels=s1x1, out_channels=e1x1, kernel_size=1, stride=1, padding=0),
            nn.ReLU(inplace=True),
        )
        self.expand_e3x3 = nn.Sequential(
            nn.Conv2d(in_channels=s1x1, out_channels=e3x3, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.squeeze(x)
        y = self.expand_e1x1(x) # 64
        z = self.expand_e3x3(x) # 64
        return torch.cat((y, z),dim=1)


class SqueezeNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=7, stride=2, padding=0)
        self.conv_10 = nn.Conv2d(in_channels=512, out_channels=3, kernel_size=1, stride=1, padding=2)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool_1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0, ceil_mode=True)
        self.maxpool_4 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0, ceil_mode=True)
        self.maxpool_8 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0, ceil_mode=True)
        self.fireModule_2 = FireModule(96, 16, 64, 64)
        self.fireModule_3 = FireModule(128, 16, 64, 64)
        self.fireModule_4 = FireModule(128, 32, 128, 128)
        self.fireModule_5 = FireModule(256, 32, 128, 128)
        self.fireModule_6 = FireModule(256, 48, 192, 192)
        self.fireModule_7 = FireModule(384, 48, 192, 192)   
        self.fireModule_8 = FireModule(384, 64, 256, 256)
        self.fireModule_9 = FireModule(512, 64, 256, 256)
        self.dropout = nn.Dropout(p=0.5)
        self.avgpool_10 = nn.AvgPool2d((1,1))
        self.softmax = nn.Softmax()
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(867, 3)



    def forward(self, x):
        x = self.conv_1(x) # [32, 96, 109, 109]
        x = self.relu(x)
        x = self.maxpool_1(x) # [32, 96, 54, 54]
        x = self.fireModule_2(x) # [32, 128,55,55]
        x = self.fireModule_3(x) # [128,55,55]
        x = self.fireModule_4(x) # [256,55,55]
        x = self.maxpool_4(x) # [256,27,27]
        x = self.fireModule_5(x) # [256,27,27]
        x = self.fireModule_6(x) # [384, 27, 27]
        x = self.fireModule_7(x) # [384, 27, 27]
        x = self.fireModule_8(x) # [512, 27, 27]
        x = self.maxpool_8(x) # [512, 13, 13]
        x = self.fireModule_9(x) # [512, 13, 13]
        x = self.dropout(x)
        x = self.conv_10(x)
        x = self.relu(x)
        x = self.avgpool_10(x) # [3,1,1] 
        x = self.flatten(x)
        x = self.linear(x)
        x = self.softmax(x)
        return x

In [78]:
# model = SqueezeNet()
# print(f"Output shape: {model(img_single).shape}")
# print(model(img_single))

# Below throws an error for some reason
test_img = torch.rand(32,3, 224, 224)

model = SqueezeNet()
print(f"Output shape: {model(test_img).shape}")
# model(test_img)

RuntimeError: NNPACK SpatialConvolution_updateOutput failed

# 3.2.1 SqueezeNet Script Mode

In [81]:
%%writefile scripts/squeeze_net.py
""""
Squeeze Net takes input of 224 instead of 256, so have to scale it accordingly
"""
import torch
import torch.nn as nn

class FireModule(nn.Module):
    """
    s1x1: number of filters in squeeze layer (all 1x1)
    e1x1: number of 1x1 filters in expand layer
    e3x3: number of 3x3 filters in expand layer
    s1x1 > (e1x1 + e3x3)
    """
    def __init__(self, in_channels, s1x1,e1x1,e3x3,):
        super().__init__()
        self.squeeze = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=s1x1, kernel_size=1, stride=1, padding=0),
            nn.ReLU(inplace=True),
        )
        self.expand_e1x1 = nn.Sequential(
            nn.Conv2d(in_channels=s1x1, out_channels=e1x1, kernel_size=1, stride=1, padding=0),
            nn.ReLU(inplace=True),
        )
        self.expand_e3x3 = nn.Sequential(
            nn.Conv2d(in_channels=s1x1, out_channels=e3x3, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        x = self.squeeze(x)
        y = self.expand_e1x1(x) # 64
        z = self.expand_e3x3(x) # 64
        return torch.cat((y, z),dim=1)


class SqueezeNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=7, stride=2, padding=0)
        self.conv_10 = nn.Conv2d(in_channels=512, out_channels=3, kernel_size=1, stride=1, padding=2)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool_1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0, ceil_mode=True)
        self.maxpool_4 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0, ceil_mode=True)
        self.maxpool_8 = nn.MaxPool2d(kernel_size=3, stride=2, padding=0, ceil_mode=True)
        self.fireModule_2 = FireModule(96, 16, 64, 64)
        self.fireModule_3 = FireModule(128, 16, 64, 64)
        self.fireModule_4 = FireModule(128, 32, 128, 128)
        self.fireModule_5 = FireModule(256, 32, 128, 128)
        self.fireModule_6 = FireModule(256, 48, 192, 192)
        self.fireModule_7 = FireModule(384, 48, 192, 192)   
        self.fireModule_8 = FireModule(384, 64, 256, 256)
        self.fireModule_9 = FireModule(512, 64, 256, 256)
        self.dropout = nn.Dropout(p=0.5)
        self.avgpool_10 = nn.AvgPool2d((1,1))
        self.softmax = nn.Softmax()
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(867, 3)



    def forward(self, x):
        x = self.conv_1(x) # [32, 96, 109, 109]
        x = self.relu(x)
        x = self.maxpool_1(x) # [32, 96, 54, 54]
        x = self.fireModule_2(x) # [32, 128,55,55]
        x = self.fireModule_3(x) # [128,55,55]
        x = self.fireModule_4(x) # [256,55,55]
        x = self.maxpool_4(x) # [256,27,27]
        x = self.fireModule_5(x) # [256,27,27]
        x = self.fireModule_6(x) # [384, 27, 27]
        x = self.fireModule_7(x) # [384, 27, 27]
        x = self.fireModule_8(x) # [512, 27, 27]
        x = self.maxpool_8(x) # [512, 13, 13]
        x = self.fireModule_9(x) # [512, 13, 13]
        x = self.dropout(x)
        x = self.conv_10(x)
        x = self.relu(x)
        x = self.avgpool_10(x) # [3,1,1] 
        x = self.flatten(x)
        x = self.linear(x)
        x = self.softmax(x)
        return x

Overwriting scripts/squeeze_net.py


# 3.3 Deep GRU-CNN

In [71]:
"""
Deep GRU-CNN input 224 x 224
https://ieeexplore.ieee.org/abstract/document/9423965
https://discuss.pytorch.org/t/input-shape-to-gru-layer/171318
"""

import torch
import torch.nn as nn

class GRUCNN(nn.Module):
    def __init__(self, hidden_size=64, num_classes=3):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(3, 3, 1, 1),
            nn.ReLU(),
            nn.Conv2d(3, 64, 1, 1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(64, 128, 1,1),
            nn.ReLU(),
            nn.Conv2d(128, 128, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2)
        )
        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(128, 256, 1,1),
            nn.ReLU(),
            nn.Conv2d(256, 256, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(2)
        )

        self.conv_block_4 = nn.Sequential(
            nn.Conv2d(256, 512, 1,1),
            nn.ReLU(),
            nn.Conv2d(512, 512, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2)
        )

        self.conv_block_5 = nn.Sequential(
            nn.Conv2d(512, 512, 1,1),
            nn.ReLU(),
            nn.Conv2d(512, 512, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2)
        )
        self.gru_block = nn.Sequential(
          nn.GRUCell(512*7*7, hidden_size),
          nn.ReLU(),
          nn.BatchNorm1d(64),
          nn.Dropout(p=0.5)
        )

        self.fc= nn.Sequential(
            nn.Linear(64, num_classes), # 512*7*7*BATCH_SIZE
            nn.ReLU(),
            nn.BatchNorm1d(3),
            nn.Dropout(p=0.5),
        )
        
        self.gru = nn.GRUCell(512*7*7, 64)

    def forward(self, x):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        x = self.conv_block_4(x)
        x = self.conv_block_5(x) # torch.Size([32, 512, 7, 7])
        
        x = x.view(x.size(0), -1) # [32, 512 * 7 * 7]
        x = self.gru_block(x)
        x = self.fc(x)
        
        return x

In [72]:
model = GRUCNN()
print(f"Output shape: {model(img_single).shape}")
print(model(img_single))

# img_single = torch.randn([32, 3, 224, 224])
# model = GRUCNN()
# print(f"Output shape: {model(img_single).shape}")
# model(img_single)

Output shape: torch.Size([1, 3])
tensor([[0.3570, 0.3078, 0.3353]], grad_fn=<SoftmaxBackward0>)


  input = module(input)


# 3.3.1 GRU_CNN Script Mode


In [None]:

%%writefile scripts/gru_cnn.py
"""
Deep GRU-CNN input 224 x 224
https://ieeexplore.ieee.org/abstract/document/9423965
https://discuss.pytorch.org/t/input-shape-to-gru-layer/171318
"""

import torch
import torch.nn as nn

class GRUCNN(nn.Module):
    def __init__(self, hidden_size=64, num_classes=3):
        super().__init__()
        self.conv_block_1 = nn.Sequential(
            nn.Conv2d(3, 3, 1, 1),
            nn.ReLU(),
            nn.Conv2d(3, 64, 1, 1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2)
        )
        self.conv_block_2 = nn.Sequential(
            nn.Conv2d(64, 128, 1,1),
            nn.ReLU(),
            nn.Conv2d(128, 128, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2)
        )
        self.conv_block_3 = nn.Sequential(
            nn.Conv2d(128, 256, 1,1),
            nn.ReLU(),
            nn.Conv2d(256, 256, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(256),
            nn.MaxPool2d(2)
        )

        self.conv_block_4 = nn.Sequential(
            nn.Conv2d(256, 512, 1,1),
            nn.ReLU(),
            nn.Conv2d(512, 512, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2)
        )

        self.conv_block_5 = nn.Sequential(
            nn.Conv2d(512, 512, 1,1),
            nn.ReLU(),
            nn.Conv2d(512, 512, 1,1),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2)
        )
        self.gru_block = nn.Sequential(
          nn.GRUCell(512*7*7, hidden_size),
          nn.ReLU(),
          nn.BatchNorm1d(64),
          nn.Dropout(p=0.5)
        )

        self.fc= nn.Sequential(
            nn.Linear(64, num_classes), # 512*7*7*BATCH_SIZE
            nn.ReLU(),
            nn.BatchNorm1d(3),
            nn.Dropout(p=0.5),
        )
        
        self.gru = nn.GRUCell(512*7*7, 64)

    def forward(self, x):
        x = self.conv_block_1(x)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        x = self.conv_block_4(x)
        x = self.conv_block_5(x) # torch.Size([32, 512, 7, 7])
        
        x = x.view(x.size(0), -1) # [32, 512 * 7 * 7]
        x = self.gru_block(x)
        x = self.fc(x)
        
        return x

# 3.4 Efficient CNN

In [None]:
"""
Paper: https://www.sciencedirect.com/science/article/pii/S1568494622007050#fig3
Efficient_CNN: https://www.hindawi.com/journals/complexity/2021/6621607/fig4/

"""
import torch
import torch.nn as nn

class EFFICIENT_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.block_1 = nn.Sequential(
            nn.Conv2d(3, 64, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_2 = nn.Sequential(
            nn.Conv2d(64, 64, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_3 = nn.Sequential(
            nn.Conv2d(64, 128, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_4 = nn.Sequential(
            nn.Conv2d(128, 128, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_5 = nn.Sequential(
            nn.Conv2d(128, 256, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_6 = nn.Sequential(
            nn.Conv2d(256, 256, 1),
            nn.MaxPool2d(1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        
        self.dense_1 = nn.Sequential(
            nn.Linear(1024, 512),
            nn.ReLU()
        )
        self.dense_2 = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU()
        )
        self.dense_3 = nn.Sequential(
            nn.Linear(256, 3),
            nn.Softmax()
        )

    def forward(self, x):
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.block_3(x)
        x = self.block_4(x)
        x = self.block_5(x)
        x = self.block_6(x) # [32, 256, 2, 2]
        x = x.view(x.size(0), -1) #[32, 1024]

        x = self.dense_1(x)
        x = self.dense_2(x)
        x = self.dense_3(x)
        return x

In [None]:
test_img = torch.randn(32,3,150, 150)

model = EFFICIENT_CNN()
print(f"Output shape: {model(test_img).shape}")
print(model(test_img))


# 3.4.1 Efficient_Cnn Script Mode



In [89]:
%%writefile scripts/efficient_cnn.py
"""
Paper: https://www.sciencedirect.com/science/article/pii/S1568494622007050#fig3
Efficient_CNN: https://www.hindawi.com/journals/complexity/2021/6621607/fig4/

"""
import torch
import torch.nn as nn

class EFFICIENT_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.block_1 = nn.Sequential(
            nn.Conv2d(3, 64, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_2 = nn.Sequential(
            nn.Conv2d(64, 64, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_3 = nn.Sequential(
            nn.Conv2d(64, 128, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_4 = nn.Sequential(
            nn.Conv2d(128, 128, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_5 = nn.Sequential(
            nn.Conv2d(128, 256, 3),
            nn.MaxPool2d(2),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        self.block_6 = nn.Sequential(
            nn.Conv2d(256, 256, 1),
            nn.MaxPool2d(1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout(0.2)
            )
        
        self.dense_1 = nn.Sequential(
            nn.Linear(1024, 512),
            nn.ReLU()
        )
        self.dense_2 = nn.Sequential(
            nn.Linear(512, 256),
            nn.ReLU()
        )
        self.dense_3 = nn.Sequential(
            nn.Linear(256, 3),
            nn.Softmax()
        )

    def forward(self, x):
        x = self.block_1(x)
        x = self.block_2(x)
        x = self.block_3(x)
        x = self.block_4(x)
        x = self.block_5(x)
        x = self.block_6(x)
        x = x.view(x.size(0), -1)

        x = self.dense_1(x)
        x = self.dense_2(x)
        x = self.dense_3(x)
        return x

Overwriting scripts/efficient_cnn.py


# 3.5 CodnNet


In [86]:
import torch
import torch.nn as nn

class Focus(nn.Module): 
    # Focus wh information into c-space 
    def __init__(self, c1, c2, k=1, s=1, p=0, g=1):  # ch_in, ch_out, kernel, stride, padding, groups 
        super(Focus, self).__init__() 
        self.conv = nn.Conv2d(c1 * 4, c2, k, s, p, groups=g) 
        # self.contract = Contract(gain=2) 

    def forward(self, x):  # x(b,c,w,h) -> y(b,4c,w/2,h/2) 
        return self.conv(torch.cat([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1))

class CodnNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.focus = Focus(3, 16)
        self.conv_1 = nn.Sequential(
            nn.Conv2d(3, 16, 3, 2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU()
        )
        self.depthwise_conv_1 = nn.Sequential(
            nn.Conv2d(32, 28, 2, 2, groups=4), # kernel= 3x3?
            nn.Conv2d(28, 28, 1),
            nn.BatchNorm2d(28),
            nn.ReLU()
        )
        self.depthwise_conv_2 = nn.Sequential(
            nn.BatchNorm2d(44),
            nn.Hardswish(),
            nn.Conv2d(44, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_3 = nn.Sequential(
            nn.BatchNorm2d(60),
            nn.Hardswish(),
            nn.Conv2d(60, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_4 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.Hardswish(),
            nn.Conv2d(64, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_5 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.Hardswish(),
            nn.Conv2d(64, 128, 1),
            nn.Conv2d(128, 128, 5, 1, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=2)
        )
        self.pool_block = nn.Sequential(
            nn.BatchNorm2d(96),
            nn.Hardswish(),
            nn.AvgPool2d(8)
        )
        self.fc1 = nn.Sequential(
            nn.Linear(96, 512),
            nn.Hardswish(),
            nn.Dropout(p=0.5)
        )
        self.fc2 = nn.Linear(512, 3)
        self.maxpool = nn.MaxPool2d(2)
        self.avgpool = nn.AvgPool2d(2)
    
    def forward(self, x):
        # Initial Conv
        x_cv = self.conv_1(x) # [32, 16, 128, 128

        # Focus
        x_f= self.focus(x) # [32, 16, 128, 628]

        # Max Pool
        x_mp = self.maxpool(x_cv) # [32, 16, 64, 64]

        # Concat initial Conv with Focus
        x_cc1 = torch.concat((x_cv, x_f), dim=1) #[32, 32, 128, 128]

        # Depthwise conv 1
        x_dcv1= self.depthwise_conv_1(x_cc1) # [32, 28, 64, 64]

        # Avg Pool 1
        x_ap1 = self.avgpool(x_dcv1) # [32, 28, 32, 32]

        # Concat depwise conv with maxpool
        x_cc2 = torch.concat((x_dcv1, x_mp), dim=1) # [ 32, 44, 64, 64]
        
        # Depthwise conv 2
        x_dcv2 = self.depthwise_conv_2(x_cc2) #[32, 32, 32, 32]

        # Avg Pool 2
        x_ap2 = self.avgpool(x_dcv2) # [32, 32, 16, 16]

        # Concat depthwise conv 2 with avgpool 1
        x_cc3 = torch.concat((x_dcv2, x_ap1), dim=1) # [32, 60, 32, 32]

        # Deptwise conv 3
        x_dcv3 = self.depthwise_conv_3(x_cc3) # [32, 32, 16, 16]

        # Avg Pool 3
        x_ap3 = self.avgpool(x_dcv3) # [32,32,8,8]

        # Concat depthwise conv3 with avgpool 2
        x_cc4 = torch.concat((x_dcv3, x_ap2), dim=1) # [32, 64, 16, 16]

        # Depthwise conv 4
        x_dcv4 = self.depthwise_conv_4(x_cc4) # [32,32,8,8]

        # Concat depwise conv4 with avgpool 3
        x_cc5 = torch.concat((x_dcv4, x_ap3), dim=1) # [32, 64, 8, 8]

        # Depwise conv 5
        x_dcv5 = self.depthwise_conv_5(x_cc5) # [32, 32, 8, 8]

        # Concat Depthwise conv4, AvgPool 3 and x_cc5
        x = torch.cat((x_dcv4, x_ap3, x_dcv5), dim=1) # [32, 96, 8, 8]

        x = self.pool_block(x) # [32, 96, 1, 1]
        x = x.view(x.size(0), -1)

        x = self.fc1(x)
        x = self.fc2(x)
        return x


In [88]:
%%writefile scripts/codnnet.py
"""
Paper: https://www.sciencedirect.com/science/article/pii/S1568494622007050
"""
import torch
import torch.nn as nn

class Focus(nn.Module): 
    # Focus wh information into c-space 
    def __init__(self, c1, c2, k=1, s=1, p=0, g=1):  # ch_in, ch_out, kernel, stride, padding, groups 
        super(Focus, self).__init__() 
        self.conv = nn.Conv2d(c1 * 4, c2, k, s, p, groups=g) 
        # self.contract = Contract(gain=2) 

    def forward(self, x):  # x(b,c,w,h) -> y(b,4c,w/2,h/2) 
        return self.conv(torch.cat([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1))

class CodnNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.focus = Focus(3, 16)
        self.conv_1 = nn.Sequential(
            nn.Conv2d(3, 16, 3, 2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU()
        )
        self.depthwise_conv_1 = nn.Sequential(
            nn.Conv2d(32, 28, 2, 2, groups=4), # kernel= 3x3?
            nn.Conv2d(28, 28, 1),
            nn.BatchNorm2d(28),
            nn.ReLU()
        )
        self.depthwise_conv_2 = nn.Sequential(
            nn.BatchNorm2d(44),
            nn.Hardswish(),
            nn.Conv2d(44, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_3 = nn.Sequential(
            nn.BatchNorm2d(60),
            nn.Hardswish(),
            nn.Conv2d(60, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_4 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.Hardswish(),
            nn.Conv2d(64, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_5 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.Hardswish(),
            nn.Conv2d(64, 128, 1),
            nn.Conv2d(128, 128, 5, 1, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=2)
        )
        self.pool_block = nn.Sequential(
            nn.BatchNorm2d(96),
            nn.Hardswish(),
            nn.AvgPool2d(8)
        )
        self.fc1 = nn.Sequential(
            nn.Linear(96, 512),
            nn.Hardswish(),
            nn.Dropout(p=0.5)
        )
        self.fc2 = nn.Linear(512, 3)
        self.maxpool = nn.MaxPool2d(2)
        self.avgpool = nn.AvgPool2d(2)
    
    def forward(self, x):
        # Initial Conv
        x_cv = self.conv_1(x) # [32, 16, 128, 128

        # Focus
        x_f= self.focus(x) # [32, 16, 128, 628]

        # Max Pool
        x_mp = self.maxpool(x_cv) # [32, 16, 64, 64]

        # Concat initial Conv with Focus
        x_cc1 = torch.concat((x_cv, x_f), dim=1) #[32, 32, 128, 128]

        # Depthwise conv 1
        x_dcv1= self.depthwise_conv_1(x_cc1) # [32, 28, 64, 64]

        # Avg Pool 1
        x_ap1 = self.avgpool(x_dcv1) # [32, 28, 32, 32]

        # Concat depwise conv with maxpool
        x_cc2 = torch.concat((x_dcv1, x_mp), dim=1) # [ 32, 44, 64, 64]
        
        # Depthwise conv 2
        x_dcv2 = self.depthwise_conv_2(x_cc2) #[32, 32, 32, 32]

        # Avg Pool 2
        x_ap2 = self.avgpool(x_dcv2) # [32, 32, 16, 16]

        # Concat depthwise conv 2 with avgpool 1
        x_cc3 = torch.concat((x_dcv2, x_ap1), dim=1) # [32, 60, 32, 32]

        # Deptwise conv 3
        x_dcv3 = self.depthwise_conv_3(x_cc3) # [32, 32, 16, 16]

        # Avg Pool 3
        x_ap3 = self.avgpool(x_dcv3) # [32,32,8,8]

        # Concat depthwise conv3 with avgpool 2
        x_cc4 = torch.concat((x_dcv3, x_ap2), dim=1) # [32, 64, 16, 16]

        # Depthwise conv 4
        x_dcv4 = self.depthwise_conv_4(x_cc4) # [32,32,8,8]

        # Concat depwise conv4 with avgpool 3
        x_cc5 = torch.concat((x_dcv4, x_ap3), dim=1) # [32, 64, 8, 8]

        # Depwise conv 5
        x_dcv5 = self.depthwise_conv_5(x_cc5) # [32, 32, 8, 8]

        # Concat Depthwise conv4, AvgPool 3 and x_cc5
        x = torch.cat((x_dcv4, x_ap3, x_dcv5), dim=1) # [32, 96, 8, 8]

        x = self.pool_block(x) # [32, 96, 1, 1]
        x = x.view(x.size(0), -1)

        x = self.fc1(x)
        x = self.fc2(x)
        return x

Writing scripts/codnnet.py


# 4. Create Train and Test step

In [None]:
import torch
import torch.nn as nn

class Focus(nn.Module): 
    # Focus wh information into c-space 
    def __init__(self, c1, c2, k=1, s=1, p=0, g=1):  # ch_in, ch_out, kernel, stride, padding, groups 
        super(Focus, self).__init__() 
        self.conv = nn.Conv2d(c1 * 4, c2, k, s, p, groups=g) 
        # self.contract = Contract(gain=2) 

    def forward(self, x):  # x(b,c,w,h) -> y(b,4c,w/2,h/2) 
        return self.conv(torch.cat([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1))

class CodnNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.focus = Focus(3, 16)
        self.conv_1 = nn.Sequential(
            nn.Conv2d(3, 16, 3, 2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU()
        )
        self.depthwise_conv_1 = nn.Sequential(
            nn.Conv2d(32, 28, 2, 2, groups=4), # kernel= 3x3?
            nn.Conv2d(28, 28, 1),
            nn.BatchNorm2d(28),
            nn.ReLU()
        )
        self.depthwise_conv_2 = nn.Sequential(
            nn.BatchNorm2d(44),
            nn.Hardswish(),
            nn.Conv2d(44, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_3 = nn.Sequential(
            nn.BatchNorm2d(60),
            nn.Hardswish(),
            nn.Conv2d(60, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_4 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.Hardswish(),
            nn.Conv2d(64, 128, 1),
            nn.Conv2d(128, 128, 5, 2, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=1)
        )
        self.depthwise_conv_5 = nn.Sequential(
            nn.BatchNorm2d(64),
            nn.Hardswish(),
            nn.Conv2d(64, 128, 1),
            nn.Conv2d(128, 128, 5, 1, groups=4), # kernel=7x7?
            nn.Conv2d(128, 32, 1, padding=2)
        )
        self.pool_block = nn.Sequential(
            nn.BatchNorm2d(96),
            nn.Hardswish(),
            nn.AvgPool2d(8)
        )
        self.fc1 = nn.Sequential(
            nn.Linear(96, 512),
            nn.Hardswish(),
            nn.Dropout(p=0.5)
        )
        self.fc2 = nn.Linear(512, 3)
        self.maxpool = nn.MaxPool2d(2)
        self.avgpool = nn.AvgPool2d(2)
    
    def forward(self, x):
        # Initial Conv
        x_cv = self.conv_1(x) # [32, 16, 128, 128

        # Focus
        x_f= self.focus(x) # [32, 16, 128, 628]

        # Max Pool
        x_mp = self.maxpool(x_cv) # [32, 16, 64, 64]

        # Concat initial Conv with Focus
        x_cc1 = torch.concat((x_cv, x_f), dim=1) #[32, 32, 128, 128]

        # Depthwise conv 1
        x_dcv1= self.depthwise_conv_1(x_cc1) # [32, 28, 64, 64]

        # Avg Pool 1
        x_ap1 = self.avgpool(x_dcv1) # [32, 28, 32, 32]

        # Concat depwise conv with maxpool
        x_cc2 = torch.concat((x_dcv1, x_mp), dim=1) # [ 32, 44, 64, 64]
        
        # Depthwise conv 2
        x_dcv2 = self.depthwise_conv_2(x_cc2) #[32, 32, 32, 32]

        # Avg Pool 2
        x_ap2 = self.avgpool(x_dcv2) # [32, 32, 16, 16]

        # Concat depthwise conv 2 with avgpool 1
        x_cc3 = torch.concat((x_dcv2, x_ap1), dim=1) # [32, 60, 32, 32]

        # Deptwise conv 3
        x_dcv3 = self.depthwise_conv_3(x_cc3) # [32, 32, 16, 16]

        # Avg Pool 3
        x_ap3 = self.avgpool(x_dcv3) # [32,32,8,8]

        # Concat depthwise conv3 with avgpool 2
        x_cc4 = torch.concat((x_dcv3, x_ap2), dim=1) # [32, 64, 16, 16]

        # Depthwise conv 4
        x_dcv4 = self.depthwise_conv_4(x_cc4) # [32,32,8,8]

        # Concat depwise conv4 with avgpool 3
        x_cc5 = torch.concat((x_dcv4, x_ap3), dim=1) # [32, 64, 8, 8]

        # Depwise conv 5
        x_dcv5 = self.depthwise_conv_5(x_cc5) # [32, 32, 8, 8]

        # Concat Depthwise conv4, AvgPool 3 and x_cc5
        x = torch.cat((x_dcv4, x_ap3, x_dcv5), dim=1) # [32, 96, 8, 8]

        x = self.pool_block(x) # [32, 96, 1, 1]
        x = x.view(x.size(0), -1)

        x = self.fc1(x)
        x = self.fc2(x)
        return x


In [None]:
import torch
import torchmetrics
accuracy_fn = torchmetrics.Accuracy(task="multiclass", num_classes=3).to(device)

def train_step(model, dataloader, loss_fn, optimizer, device):
    model.train()

    train_loss, train_acc = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        # Forward
        y_pred = model(X) # returns shape [32,3]

        # Loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()
        
        # optimizer
        optimizer.zero_grad()

        # backward
        loss.backward()

        # step
        optimizer.step()

        # accuracy across batch
        y_pred_class = torch.argmax(y_pred,dim=1)
        train_acc += accuracy_fn(y_pred_class, y)
        break

    # get average loss and acc per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc


In [None]:
train_step(model_0, train_dataloader, loss_fn, optimizer, device)

In [None]:
def test_step(model, dataloader, loss_fn, device):
    model.eval()

    test_loss, test_acc = 0, 0 
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            # forward
            y_pred = model(X)

            # loss
            loss = loss_fn(y_pred, y)
            test_loss += loss.item()

            # accuracy across batch
            test_pred_label = torch.argmax(y_pred,dim=1)
            test_acc += accuracy_fn(test_pred_label, y)


    # get average loss and acc per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [None]:
def train(model, optimizer, loss_fn, epochs, device):
    # Create empty results dictionary
    results = {"train_loss": [],
      "train_acc": [],
      "test_loss": [],
      "test_acc": []
    }

    # Training loop
    for epoch in range(epochs):
        train_loss, train_acc = train_step(model, train_dataloader, loss_fn, optimizer, device)
        test_loss, test_acc = test_step(model, test_dataloader, loss_fn, device)

        print(f"Epoch {epoch} | Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.2f} | Test Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.2f}")

        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    return results

# 4.1 Convert to a script

In [None]:
%%writefile scripts/training.py
import torch
import tor  chmetrics
device = 'cuda' if torch.cuda.is_available() else 'cpu'
accuracy_fn = torchmetrics.Accuracy(task="multiclass", num_classes=3).to(device)

def train_step(model, dataloader, loss_fn, optimizer, device):
    model.train()

    train_loss, train_acc = 0, 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        # Forward
        y_pred = model(X) # returns shape [32,3]

        # Loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()
        
        # optimizer
        optimizer.zero_grad()

        # backward
        loss.backward()

        # step
        optimizer.step()

        # accuracy across batch
        y_pred_class = torch.argmax(y_pred,dim=1)
        train_acc += accuracy_fn(y_pred_class, y)

    # get average loss and acc per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

def test_step(model, dataloader, loss_fn, device):
    model.eval()

    test_loss, test_acc = 0, 0 
    with torch.inference_mode():
        for batch, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            # forward
            y_pred = model(X)

            # loss
            loss = loss_fn(y_pred, y)
            test_loss += loss.item()

            # accuracy across batch
            test_pred_label = torch.argmax(y_pred,dim=1)
            test_acc += accuracy_fn(test_pred_label, y)


    # get average loss and acc per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

def train(model, train_dataloader, test_dataloader, optimizer, loss_fn, epochs, device):
    # Create empty results dictionary
    results = {"train_loss": [],
      "train_acc": [],
      "test_loss": [],
      "test_acc": []
    }

    # Training loop
    for epoch in range(epochs):
        train_loss, train_acc = train_step(model, train_dataloader, loss_fn, optimizer, device)
        test_loss, test_acc = test_step(model, test_dataloader, loss_fn, device)

        print(f"Epoch {epoch} | Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.2f} | Test Loss: {test_loss:.4f} | Test Accuracy: {test_acc:.2f}")

        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["test_loss"].append(test_loss)
        results["test_acc"].append(test_acc)

    return results

# 5. Save the model

In [None]:
import os
import torch

def save_model(model, target_dir, model_name):
    # Create directory to save models
    os.makedirs(target_dir, exist_ok=True)

    # Create model save path
    assert model_name.endswith(".pth")
    model_saved_path = target_dir + '/' + model_name

    # save model
    print(f"Saved model to: {model_saved_path}")
    torch.save(model.state_dict(), model_saved_path)
    


# 5.1 Convert to script

In [None]:
%%writefile scripts/save_model.py
import os
import torch

def save_model(model, target_dir, model_name):
    # Create directory to save models
    target_dir = os.path.join("../Covid-Classificaton", target_dir)
    os.makedirs(target_dir, exist_ok=True)

    # Create model save path
    assert model_name.endswith(".pt")
    model_saved_path = target_dir + '/' + model_name

    # save model
    print(f"Saved model to: {model_saved_path}")
    torch.save(model.state_dict(), model_saved_path)

# 6. Train, Evaluate & Save model

In [None]:
import sys
# Run this before running below code
sys.path.append("/Users/zhengyaosiah/Documents/Code/Classification-Covid/scripts")

In [None]:
data_transforms = {"CovidAid":transforms.Compose([transforms.Resize(256),
                                                  transforms.ToTensor()]),
                    "SqueezeNet": transforms.Compose([transforms.Resize(224),
                                                    transforms.ToTensor()])}

list(data_transforms.items())[1]

In [None]:
from timeit import default_timer as timer
import matplotlib.pyplot as plt
import os
import torch.nn as nn
import torch
import torchvision.transforms as transforms
import data_setup, save_model, training
import covid_aid, squeeze_net, efficient_cnn, gru_cnn

# HYPERPARAMETERS
SEED=42
BATCH_SIZE=32
NUM_WORKERS=4 #os.cpu_count()
LEARNING_RATE={
    "GRUCNN": 0.0001,
               "EfficientCNN": 0.001,
               "CovidAid": 0.001, 
               "SqueezeNet": 0.0001}
EPOCHS = 15

# Instantiniate seeds
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)

# Setup device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Varaibles
train_dir = "../covid-dataset/train/"
test_dir = "../covid-dataset/test/"
data_transforms = {
                    "GRUCNN": transforms.Compose([transforms.Resize(224),
                                                  transforms.ToTensor()]),
                    "EfficientCNN": transforms.Compose([transforms.Resize(150),
                                                  transforms.ToTensor()]),
                    "CovidAid":transforms.Compose([transforms.Resize(256),
                                                  transforms.ToTensor()]),
                    "SqueezeNet": transforms.Compose([transforms.Resize(224),
                                                    transforms.ToTensor()])}
# Models
models = {
    "GRUCNN": gru_cnn.GRUCNN().to(device), 
          "EfficientCNN": efficient_cnn.EFFICIENT_CNN().to(device),
            "CovidAid": covid_aid.CovidAidModel().to(device), 
            "SqueezeNet": squeeze_net.SqueezeNet().to(device)}

optimizers =  {
    "GRUCNN": torch.optim.Adam(models["GRUCNN"].parameters(),lr=LEARNING_RATE["GRUCNN"]),
    "EfficientCNN": torch.optim.Adam(models["EfficientCNN"].parameters(), lr=LEARNING_RATE["EfficientCNN"]),
    "CovidAid": torch.optim.SGD(models["CovidAid"].parameters(), lr=LEARNING_RATE["CovidAid"]),
    "SqueezeNet": torch.optim.Adam(models["SqueezeNet"].parameters(), lr=LEARNING_RATE["SqueezeNet"])
}


def training_loop(data_transforms, models):
    for model_name in models:
        print(f"Model Name: {model_name}")
        # DATA
        train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(train_dir, test_dir, data_transforms[model_name], BATCH_SIZE, NUM_WORKERS)

        # Loss Function and Optimizer
        loss_fn = nn.CrossEntropyLoss()
        # optimizer = torch.optim.SGD(models[model_name].parameters(), lr=LEARNING_RATE[model_name])
        optimizer = optimizers[model_name]

        # Start timer
        start_time = timer()

        # Train models
        model_results = training.train(models[model_name], train_dataloader, test_dataloader, optimizer, loss_fn, EPOCHS, device)

        # End timer
        end_time = timer()

        # Misc
        print(f"{model_name} training time: {end_time-start_time:.2f} seconds")
        pytorch_total_params = sum(p.numel() for p in models[model_name].parameters())
        print(f"Number of parameters: {pytorch_total_params}")

        # Save Model
        save_model.save_model(models[model_name], target_dir='models', model_name=f'{model_name}.pt')

        # plot graph
        fig, ax = plt.subplots()
        x_axis = [i for i in range(EPOCHS)]
        y_axis = model_results["train_loss"]
        ax.plot(x_axis, y_axis)
        ax.set(xlabel="Epochs", ylabel="Train Loss", title=model_name)
        fig.savefig(f"testloss_{model_name}")

training_loop(data_transforms, models)


# 6.1 Convert to Script

In [None]:
%%writefile scripts/executable.py
from timeit import default_timer as timer
import matplotlib.pyplot as plt
import os
import torch.nn as nn
import torch
import torchvision.transforms as transforms
import data_setup, save_model, training
import covid_aid, squeeze_net, efficient_cnn, gru_cnn

# HYPERPARAMETERS
SEED=42
BATCH_SIZE=32
NUM_WORKERS=4 #os.cpu_count()
LEARNING_RATE={
    "GRUCNN": 0.0001,
               "EfficientCNN": 0.001,
               "CovidAid": 0.001, 
               "SqueezeNet": 0.0001}
EPOCHS = 15

# Instantiniate seeds
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)

# Setup device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Varaibles
train_dir = "../covid-dataset/train/"
test_dir = "../covid-dataset/test/"
data_transforms = {
                    "GRUCNN": transforms.Compose([transforms.Resize(224),
                                                  transforms.ToTensor()]),
                    "EfficientCNN": transforms.Compose([transforms.Resize(150),
                                                  transforms.ToTensor()]),
                    "CovidAid":transforms.Compose([transforms.Resize(256),
                                                  transforms.ToTensor()]),
                    "SqueezeNet": transforms.Compose([transforms.Resize(224),
                                                    transforms.ToTensor()])}
# Models
models = {
    "GRUCNN": gru_cnn.GRUCNN().to(device), 
          "EfficientCNN": efficient_cnn.EFFICIENT_CNN().to(device),
            "CovidAid": covid_aid.CovidAidModel().to(device), 
            "SqueezeNet": squeeze_net.SqueezeNet().to(device)}

optimizers =  {
    "GRUCNN": torch.optim.Adam(models["GRUCNN"].parameters(),lr=LEARNING_RATE["GRUCNN"]),
    "EfficientCNN": torch.optim.Adam(models["EfficientCNN"].parameters(), lr=LEARNING_RATE["EfficientCNN"]),
    "CovidAid": torch.optim.SGD(models["CovidAid"].parameters(), lr=LEARNING_RATE["CovidAid"]),
    "SqueezeNet": torch.optim.Adam(models["SqueezeNet"].parameters(), lr=LEARNING_RATE["SqueezeNet"])
}


def training_loop(data_transforms, models):
    for model_name in models:
        print(f"Model Name: {model_name}")
        # DATA
        train_dataloader, test_dataloader, class_names = data_setup.create_dataloaders(train_dir, test_dir, data_transforms[model_name], BATCH_SIZE, NUM_WORKERS)

        # Loss Function and Optimizer
        loss_fn = nn.CrossEntropyLoss()
        # optimizer = torch.optim.SGD(models[model_name].parameters(), lr=LEARNING_RATE[model_name])
        optimizer = optimizers[model_name]

        # Start timer
        start_time = timer()

        # Train models
        model_results = training.train(models[model_name], train_dataloader, test_dataloader, optimizer, loss_fn, EPOCHS, device)

        # End timer
        end_time = timer()

        # Misc
        print(f"{model_name} training time: {end_time-start_time:.2f} seconds")
        pytorch_total_params = sum(p.numel() for p in models[model_name].parameters())
        print(f"Number of parameters: {pytorch_total_params}")

        # Save Model
        save_model.save_model(models[model_name], target_dir='models', model_name=f'{model_name}.pt')

        # plot graph
        fig, ax = plt.subplots()
        x_axis = [i for i in range(EPOCHS)]
        y_axis = model_results["train_loss"]
        ax.plot(x_axis, y_axis)
        ax.set(xlabel="Epochs", ylabel="Train Loss", title=model_name)
        fig.savefig(f"testloss_{model_name}")

training_loop(data_transforms, models)
