In [20]:
import io
import os
import torch
import base64
import sqlite3
import numpy as np
import pandas as pd
from PIL import Image
import torch.nn as nn
from torch import optim
import matplotlib.pyplot as plt
import torchvision.models as models
import torchvision.datasets as datasets
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader, Dataset

In [11]:
CURR_PATH = os.getcwd()
PROJECT_PATH = os.path.dirname(CURR_PATH)
DATA_PATH = os.path.join(PROJECT_PATH, 'data')
TRAIN_PATH = os.path.join(DATA_PATH, 'train')
TEST_PATH = os.path.join(DATA_PATH, 'test')
RES_PATH = os.path.join(PROJECT_PATH, 'res')
MODELS_PATH = os.path.join(PROJECT_PATH, 'models')
RLHF_PATH = os.path.join(DATA_PATH, 'RLHF.db')

# Check if GPU is available, and set the device accordingly
device =  torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [12]:
# check folder is exist, if not then create folder
def check_path(folderPaths:list):
    for folderPath in folderPaths:
        if not os.path.exists(folderPath):
            os.makedirs(folderPath)

In [13]:
# check path
check_path([
    TRAIN_PATH, TEST_PATH,
])

In [14]:
# download training & test data and put in DATA_PATH
mnist_trainset = datasets.MNIST(
    root=TRAIN_PATH, 
    train=True, 
    download=True, 
    transform=ToTensor()
)

mnist_testset = datasets.MNIST(
    root=TEST_PATH, 
    train=False, 
    download=True, 
    transform=ToTensor()
)

In [15]:
# Data Prep
train_dataloader = DataLoader(
    mnist_trainset, 
    batch_size=20, 
    shuffle=True, 
    num_workers=0,
    pin_memory=True
)

test_dataloader = DataLoader(
    mnist_testset, 
    batch_size=20, 
    shuffle=False, 
    num_workers=0,
    pin_memory=True
)

In [16]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        
        # First Convolutional Layer
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,              
                out_channels=16,            
                kernel_size=5,              
                stride=1,                   
                padding=2,                  
            ),
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        )
        
        # Second Convolutional Layer
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, 5, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        )
        
        # Fully Connected Layer for Classification
        self.out = nn.Linear(32 * 7 * 7, 10)

    def forward(self, x):
        # Forward pass through the first convolutional layer
        x = self.conv1(x)
        
        # Forward pass through the second convolutional layer
        x = self.conv2(x)
        
        # Flatten the output for the fully connected layer
        x = x.view(x.size(0), -1)  # flattening the layer      
        
        # Forward pass through the fully connected layer for classification
        output = self.out(x)
        
        return output

In [18]:
def calculate_accuracy(model_output, target):
    # get the prediction
    predictions = torch.max(model_output, 1)[1].data.squeeze()
    
    # get the accuracy
    accuracy = (predictions == target).sum().item()/float(target.size(0))
    return accuracy

In [34]:
def train_model(model, num_epoch, loss_function, optimizer, train_dataloader):
    """
    Function to train the CNN model.

    Parameters:
    - model: The DL model to be trained.
    - num_epoch: Iteration to be train
    - loss_function: The loss function used for training.
    - optimizer: The optimization algorithm used for updating model parameters.
    - train_dataloader: DataLoader providing training data.

    Returns:
    None
    """
    
    # model training 
    model.train()
    
    for epoch in range(num_epoch):
        
        epoch_loss = 0
        epoch_accuracy = 0
        i = 0
        for i, (images, labels) in enumerate(train_dataloader):

            images, labels = images.to(device), labels.to(device)
            
            # Forward pass
            output = model(images)
            
            # Calculate loss
            loss = loss_function(output, labels)
            
            # Releasing the cache
            optimizer.zero_grad() 
            
            # Backward Pass
            loss.backward()

            # Update model parameter
            optimizer.step()

            # accummulate the loss and accuracy for each epoch
            epoch_loss += loss.item()
            epoch_accuracy += calculate_accuracy(output, labels)

        print(f"Epoch: {epoch} - Loss: {epoch_loss} - Accuracy: {epoch_accuracy/(i+1)}")


def test_model(model, test_dataloader):
    """
    Function to test the CNN model on a given dataset.

    Parameters:
    - model: The trained DL model to be tested.
    - test_dataloader: DataLoader providing test data.

    Returns:
    None
    """

    model.eval()

    accuracy = 0
    i = 0
    for i, (images, labels) in enumerate(test_dataloader):

        images, labels = images.to(device), labels.to(device)

        # Forward pass
        output = model(images)

        # Accumulate accuracy
        accuracy += calculate_accuracy(output, labels)

    # Print test accuracy
    print(f"Test Accuracy: {accuracy / (i + 1)}")

In [17]:
# Instantiate the CNN model and move it to the specified device (GPU or CPU)
cnn_model = CNN().to(device)

# Define the loss function (cross-entropy) - for classification problem
loss_function = nn.CrossEntropyLoss()

# Define the optimizer (Adam) for updating the model parameters during training
optimizer = optim.Adam(cnn_model.parameters(), lr=0.005)

In [19]:
# model training 
cnn_model.train()

for epoch in range(10):
    
    epoch_loss = 0
    epoch_accuracy = 0
    i = 0
    for i, (images, labels) in enumerate(train_dataloader):

        images, labels = images.to(device), labels.to(device)
        
        # Forward pass
        output = cnn_model(images)
        
        # Calculate loss
        loss = loss_function(output, labels)
        
        # Releasing the cache
        optimizer.zero_grad() 
        
        # Backward Pass
        loss.backward()

        # Update model parameter
        optimizer.step()

        # accummulate the loss and accuracy for each epoch
        epoch_loss += loss.item()
        epoch_accuracy += calculate_accuracy(output, labels)

    print(f"Epoch: {epoch} - Loss: {epoch_loss} - Accuracy: {epoch_accuracy/(i+1)}")

Epoch: 0 - Loss: 317.98804757976177 - Accuracy: 0.9680333333333294
Epoch: 1 - Loss: 174.56941176936016 - Accuracy: 0.9830333333333268
Epoch: 2 - Loss: 153.98018257774373 - Accuracy: 0.9848999999999947
Epoch: 3 - Loss: 140.38584096728073 - Accuracy: 0.9868166666666593
Epoch: 4 - Loss: 121.21884261796752 - Accuracy: 0.9883833333333288
Epoch: 5 - Loss: 122.27417995003461 - Accuracy: 0.9884166666666605
Epoch: 6 - Loss: 114.99268381551488 - Accuracy: 0.9895999999999945
Epoch: 7 - Loss: 115.35596555538473 - Accuracy: 0.9905666666666617
Epoch: 8 - Loss: 108.80548466048455 - Accuracy: 0.9907333333333285
Epoch: 9 - Loss: 114.09904385531509 - Accuracy: 0.990866666666663


In [35]:
test_model(cnn_model, test_dataloader)

Test Accuracy: 0.9842999999999986


In [28]:
from torchvision.models import resnet18, ResNet18_Weights

In [30]:
# Define the ResNet-based image classifier
class ResNetImageClassifier(nn.Module):
    def __init__(self, num_classes, pretrained=True):
        super(ResNetImageClassifier, self).__init__()

        self.gray_img = nn.Conv2d(
            in_channels=1,
            out_channels=3,
            kernel_size=7
        )
        
        # Load the pre-trained ResNet model
        weights = ResNet18_Weights.DEFAULT
        self.resnet = resnet18(weights=weights)
        
        # Modify the last layer to match the number of classes
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(num_features, num_classes)
        
    def forward(self, x):
        # Pass the input through the ResNet model
        x = self.gray_img(x)
        x = self.resnet(x)
        return x

In [31]:
# Initialize the model
model = ResNetImageClassifier(num_classes=len(mnist_trainset.classes)).to(device)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [37]:
test_model(model, test_dataloader)

Test Accuracy: 0.9922999999999992
