In [None]:
import torch  # PyTorch deep learning framework
import os  # Operating system interface
from torch import nn  # Neural network modules
import torchvision  # Computer vision library
from torchvision import transforms as T  # Image transformations
import torchvision.models as models  # Pre-trained models
from torchvision.transforms import ToTensor, Compose, Normalize, Resize, CenterCrop, ColorJitter  # Image transformation utilities
from torchvision.transforms import v2  # Latest version of transforms
from torch.utils.data import DataLoader, Dataset  # Data loading utilities
from torch.utils.data.dataset import random_split  # Dataset splitting utility
from PIL import Image  # Image processing library
import pathlib  # File path handling
import numpy as np  # Numerical computing
import pandas as pd  # Data manipulation library
import matplotlib.pyplot as plt  # Plotting library
from tqdm.notebook import tqdm  # Progress bar for notebooks
import torchmetrics  # Model evaluation metrics
from torchmetrics import Accuracy  # Accuracy metric
from torchmetrics.classification import MulticlassF1Score  # F1 score for multiclass
from torch.optim import Adam, AdamW  # Optimization algorithms
from torchinfo import summary  # Model summary utility

In [None]:
dataroot = "data/butterfly_data"  # Root directory for butterfly dataset
N_EPOCHS = 20  # Number of training epochs
BATCH_SIZE = 128  # Size of mini-batches for training
NUM_CLASSES = 75  # Number of butterfly classes
z_dim = 400  # Dimension of latent space
LATENT_DIM = 128  # Dimension of latent vectors
N_Latent = 10  # Number of latent samples
Img_channels = 3  # Number of image channels (RGB)
Input_Shape = [3, 128, 128]  # Input image dimensions [channels, height, width]
Hidden_dims = 64  # Number of hidden dimensions
lr = 1e-3  # Learning rate for optimizer
betas = (0.5, 0.999)  # Beta parameters for Adam optimizer

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # Set device to GPU if available, else CPU
print(device)  # Display which device is being used

In [None]:
df = pd.read_csv("data/butterfly_data/Training_set.csv")  # Read training data from CSV file
label = df['label']  # Extract labels from dataframe
class_to_label = {}  # Initialize dictionary to map class names to numeric labels
i = 0  # Initialize counter for label encoding
for l in label:  # Iterate through each label
    if class_to_label.get((l), -1) == -1:  # Check if label not already mapped
        class_to_label[l] = i  # Assign numeric label to class name
        i += 1  # Increment counter

transform_train = v2.Compose([  # Define training data transformations pipeline
    v2.Resize((128, 128)),  # Resize images to 128x128
    v2.RandomAffine(degrees=(-30, +30)),  # Apply random affine transformation
    v2.RandomRotation(degrees=(-30, +30)),  # Apply random rotation
    v2.RandomHorizontalFlip(p=.4),  # Randomly flip images horizontally with 0.4 probability
    v2.ToImage(),  # Convert to torch image format
    v2.ToDtype(torch.float32, scale=True),  # Convert to float32 and scale pixels
    v2.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize using ImageNet statistics
                 std=[0.229, 0.224, 0.225]),
])
transform_val = v2.Compose([  # Define validation data transformations pipeline
    v2.Resize((128, 128)),  # Resize images to 128x128
    v2.ToImage(),  # Convert to torch image format
    v2.ToDtype(torch.float32, scale=True),  # Convert to float32 and scale pixels
    v2.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize using ImageNet statistics
                 std=[0.229, 0.224, 0.225]),

])

In [None]:
class CustomImageDataset(Dataset):

    def __init__(self, root, train=True, transform=None, target_transform=class_to_label):
        self.root = root
        self.transform = transform
        self.target_transform = target_transform
        self.train = train

        if self.train:
            self.label_dir = os.path.join(root, "Training_set.csv")
            self.img_root = os.path.join(root, "train")
        else:
            self.label_dir = os.path.join(root, "Testing_set.csv")
            self.img_root = os.path.join(root, "test")

    def __len__(self):
        labels = pd.read_csv(self.label_dir)
        return len(labels)

    def __getitem__(self, idx):

        labels = pd.read_csv(self.label_dir)
        img_path = os.path.join(self.img_root, labels.iloc[idx, 0])
        img = torchvision.io.read_image(img_path)/255.0
        if self.train:
            label = labels.iloc[idx, 1]
            if self.target_transform:
                label = self.target_transform[label]

        if self.transform:
            img = self.transform(img)

        if self.train:
            return (img, label)
        else:
            return img

In [None]:
train_dataset, val_dataset = random_split(
    CustomImageDataset(root=dataroot, train=True), [0.8, .2])
train_dataset.dataset.transform = transform_train
val_dataset.dataset.transform = transform_val
print(len(train_dataset), len(val_dataset))

In [None]:
class Classifier(nn.Module):
    def __init__(self, Hidden_dims=Hidden_dims):
        super(Classifier, self).__init__()
        self.Hidden_dims = Hidden_dims
        self.main = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=self.Hidden_dims,
                      kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            # nn.Dropout2d(0.2),
            nn.BatchNorm2d(self.Hidden_dims),

            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(in_channels=self.Hidden_dims, out_channels=2 * \
                      self.Hidden_dims, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            # nn.Dropout2d(0.2),
            nn.BatchNorm2d(2*self.Hidden_dims),

            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(in_channels=2*self.Hidden_dims, out_channels=4 * \
                      self.Hidden_dims, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            nn.Dropout2d(0.2),
            nn.BatchNorm2d(4*self.Hidden_dims),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(in_channels=4*self.Hidden_dims, out_channels=8 * \
                      self.Hidden_dims, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            nn.Dropout2d(0.2),
            nn.BatchNorm2d(8*self.Hidden_dims),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(in_channels=8*self.Hidden_dims, out_channels=16 * \
                      self.Hidden_dims, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            nn.Dropout2d(0.2),
            nn.BatchNorm2d(16*self.Hidden_dims),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Flatten(),
            nn.Linear(1024*4*4, 512),
            nn.Dropout(p=0.5),
            nn.ReLU(),
            nn.Linear(512, 75)

        )

    def forward(self, x):
        return self.main(x)

In [None]:
model_0 = Classifier().to(device)
summary(model_0, (1, 3, 128, 128))

In [None]:
train_dataloader = DataLoader(dataset = train_dataset, batch_size = BATCH_SIZE, num_workers  = 2, persistent_workers = True, shuffle = True  )
val_dataloader = DataLoader(dataset = val_dataset, batch_size = BATCH_SIZE, num_workers  = 2, persistent_workers = True, shuffle = True)

In [None]:
model_0.to(device)
loss_fn = nn.CrossEntropyLoss()
acc_fn = Accuracy(task="multiclass", num_classes=NUM_CLASSES).to(device)
F1_Score = torchmetrics.classification.MulticlassF1Score(
    NUM_CLASSES).to(device)
opt = AdamW(
    model_0.parameters(),
    lr=.0001, weight_decay=1e-2
)

In [None]:
def train_fn(dataloader, loss_function, acc_function, opt, model):
    model.train()
    train_loss, train_acc, train_f1score = 0, 0, 0
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        y_pred = model(X)
        loss = loss_function(y_pred, y)
        acc = acc_function(torch.argmax(
            torch.softmax(y_pred, dim=1), axis=1), y)
        f1score = F1_Score(torch.argmax(
            torch.softmax(y_pred, dim=1), axis=1), y)
        train_acc += acc
        train_f1score += f1score
        train_loss += loss
        opt.zero_grad()
        loss.backward()
        opt.step()
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
    train_f1score /= len(dataloader)
    return train_loss, train_acc, train_f1score


def val_fn(dataloader, loss_function, acc_function, model):
    model.eval()
    val_loss, val_acc, val_f1_score = 0, 0, 0
    with torch.inference_mode():
        for X_test, y_test in dataloader:
            X_test, y_test = X_test.to(device), y_test.to(device)
            y_test_pred = model(X_test.to(device))
            loss = loss_function(y_test_pred, y_test)
            acc = acc_function(torch.argmax(
                torch.softmax(y_test_pred, dim=1), axis=1), y_test)
            f1_score = F1_Score(torch.argmax(
                torch.softmax(y_test_pred, dim=1), axis=1), y_test)
            val_acc += acc
            val_loss += loss
            val_f1_score += f1_score
        val_loss /= len(dataloader)
        val_acc /= len(dataloader)
        val_f1_score /= len(dataloader)
    return val_loss, val_acc, val_f1_score

In [None]:
if TRAIN_CNN:
    NUM_EPOCHS = 20
    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    for i in range(NUM_EPOCHS):
        train_loss, train_accuracy, train_f1_score = train_fn(
            train_dataloader,
            loss_fn,
            acc_fn,
            opt,
            model_0
        )
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        val_loss, val_accuracy, val_f1_score = val_fn(
            val_dataloader,
            loss_fn,
            acc_fn,
            model_0
        )
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
        print(f"Epoch {i+1}")
        print(f"Train: Loss : {train_loss:.4f} Accuracy : {
            train_accuracy:.4f} | | Validation: Loss : {val_loss:.4f} Accuracy : {val_accuracy:.4f}")
        print(f"Train: f1Score : {
            train_f1_score:.4f}  | | Validation: f1Score : {val_f1_score:.4f}")