In [None]:
import numpy as np

In [None]:
images_url = "./CUB_200_2011/images.txt"
train_test_split_url = "./CUB_200_2011/train_test_split.txt"
classes_url = "./CUB_200_2011/classes.txt"
image_class_labels_url = "./CUB_200_2011/image_class_labels.txt"

Loading .txt file

In [None]:
image = np.genfromtxt(images_url , delimiter=' ', dtype=str) #<image_id> <image_name>

train_test_split = np.genfromtxt(train_test_split_url, delimiter=" ", dtype=str) #<image_id> <is_training_image>

classes = np.genfromtxt(classes_url, delimiter=" ", dtype=str) #<class_id> <class_name>

image_class_labels = np.genfromtxt(image_class_labels_url, delimiter=" ", dtype=str).astype(int) #<image_id> <class_id>

<table>
    <tr>
        <td>image</td>
        <td>&lt;image_id&gt; &lt;image_name&gt;</td>
    </tr>
    <tr>
        <td>train_test_split</td>
        <td>&lt;image_id&gt; &lt;is_training_image&gt;</td>
    </tr>
    <tr>
        <td>classes</td>
        <td>&lt;class_id&gt; &lt;class_name&gt;</td>
    </tr>
    <tr>
        <td>image_class_labels</td>
        <td>&lt;image_id&gt; &lt;class_id&gt;</td>
    </tr>
</table>

In [None]:
image = {int(row[0]): "CUB_200_2011/images/"+row[1] for row in image}

train_test_split = {int(row[0]): int(row[1]) for row in train_test_split}

classes = {int(row[0]): row[1] for row in classes}

image_class_labels = {int(row[0]): int(row[1]) for row in image_class_labels}

In [None]:
# row = 5
# print(row, image[row])
# print(row, train_test_split[row])
# print(row, classes[row])
# print(row, image_class_labels[row])

In [None]:
# image_train = {key: value for key, value in  train_test_split.items() if value == 1}
# image_test = {key: value for key, value in  train_test_split.items() if value == 0}
image_train = np.array([key for key, value in train_test_split.items() if value == 1])
image_test = np.array([key for key, value in train_test_split.items() if value == 0])

image_split = {
    "train": image_train,
    "test": image_test
}

n_train = len(image_train)
n_test = len(image_test)
n_classes = len(classes)

print("Number of images:", len(image))
print(f"Number of training images by default: {n_train}")
print(f"Number of testing images by default: {n_test}")
print(f"Number of classes: {n_classes}\n")

<table>
    <tr>
        <td>image_train</td>
        <td>list of &lt;image_id&gt;</td>
        <td>&lt;is_training_image&gt; == 1</td>
    </tr>
    <tr>
        <td>image_test</td>
        <td>list of &lt;image_id&gt;</td>
        <td>&lt;is_training_image&gt; == 0</td>
    </tr>
</table>

In [None]:
import matplotlib.pyplot as plt

In [None]:
def showImage(image):
    plt.imshow(plt.imread(image))
    plt.axis(False)
    plt.show()

### Model

In [None]:
import torch
import torch.nn as nn
from torchvision import models

In [None]:
# hyper parameters
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_out_ftrs = 200 # number of classes for out classification is 200
image_input_size = 244 # 244x244 image; imagas are resized to this size
batch_size = 4
learning_rate = 0.001
num_epoch = 10

In [None]:
def print_model_params(model):  
    total_params = sum(p.numel() for p in model.parameters())
    print(f"Total number of parameters in model: {total_params}")

    training_params = sum(param.numel() for param in model.parameters() if param.requires_grad)
    print(f"Total number of training parameters in model: {training_params}")

    return total_params, training_params

### Loading dataset in torch dataloader

In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision.utils import save_image
import torchvision.transforms as transforms
from PIL import Image
import time
import os
import copy

In [None]:
class ExpandGrayscales:
    def __call__(self, sample):
        return sample.expand(3,-1,-1)

transform_train = transforms.Compose([
    transforms.Resize((256,256)),
    transforms.RandomResizedCrop((image_input_size, image_input_size)),
    transforms.ToTensor(),
    ExpandGrayscales()
])

transform_test = transforms.Compose([
    transforms.Resize((300,300)),
    transforms.CenterCrop((image_input_size, image_input_size)),
    transforms.ToTensor(),
    ExpandGrayscales()
])

trasform_augmentation = transforms.Compose([
    transforms.Resize((300,300)),
    transforms.ColorJitter(
        brightness=0.5,
        contrast=0.3,
        saturation=0.3,
        hue=0.05
    ),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=30),
    transforms.RandomGrayscale(p=0.1),
    transforms.CenterCrop((250,250)),
    transforms.ToTensor(),
    ExpandGrayscales()
])

data_transforms = {
    "train": transform_train,
    "test": transform_test,
    "augmentation": trasform_augmentation
}

In [None]:
class CUBDataset(Dataset):
    def __init__(self, image_id, id_to_url, image_class_labels, transform):
        self.x = [id_to_url[x] for x in image_id]
        self.y = [image_class_labels[x] for x in image_id]
        self.n_samples = len(image_id)
        self.transform = transform
    
    def __getitem__(self, index):
        img = Image.open(self.x[index])
        img = self.transform(img)
        return img, self.y[index]-1
    
    def __len__(self):
        return self.n_samples

### Data Augmentation Process

In [None]:
augment_train_dataset = CUBDataset(
    image_id=image_split["train"],
    id_to_url=image,
    image_class_labels=image_class_labels,
    transform=data_transforms["augmentation"]
)

augment_batch_size = 20

augment_train_dataloader = DataLoader(
    dataset=augment_train_dataset,
    batch_size=augment_batch_size,
    shuffle=False
)

In [None]:
def data_augmentation():
    img_num = int(time.time())
    folder_path = "CUB_200_2011/aug_images"

    if not os.path.exists(folder_path):
        print("Data Augmentation in process")
        print("This might take few minutes\nPlease wait...")
        os.makedirs(folder_path)
        for _ in range(3):
            for images, labels in augment_train_dataloader:
                for i in range(images.shape[0]):
                    file_path = folder_path + "/" + str(int(labels[i])+1) + "_img" + str(img_num) + ".png"
                    save_image(images[i], file_path)
                    image[img_num] = file_path
                    image_split["train"] = np.append(image_split["train"], img_num)
                    image_class_labels[img_num] = (int(labels[i])+1)
                    img_num += 1
        print("Data Augmentation Completed\n")

    else:
        print("Loading Augmented Images")
        file_names = os.listdir(folder_path)
        for file_name in file_names:
            class_id = int(file_name.split("_")[0])
            file_path = folder_path + "/" + file_name
            image[img_num] = file_path
            image_split["train"] = np.append(image_split["train"], img_num)
            image_class_labels[img_num] = (class_id)
            img_num += 1
        print("Augmented Images Loaded\n")

In [None]:
np.random.seed(42)
np.random.shuffle(image_split["train"])
np.random.shuffle(image_split["test"])

In [None]:
print(f"Number of images after augmentation: {len(image)}")
print(f"Number of training images after augmentation: {len(image_split["train"])}")
print(f"Number of testing images: {len(image_split["test"])}\n")

### Creating Actual Train / Test Datasets

In [None]:
datasets = {
    x: CUBDataset(
        image_id=image_split[x],
        id_to_url=image, 
        image_class_labels=image_class_labels,
        transform=data_transforms[x]
    )
    for x in ["train", "test"]
}

In [None]:
dataloaders = {
    x: DataLoader(
        dataset=datasets[x],
        batch_size=batch_size,
        shuffle=False
    )
    for x in ["train", "test"]
}

In [None]:
dataset_sizes = {
    x: len(datasets[x])
    for x in ["train", "test"]
}
dataset_sizes

In [None]:
dataloader_sizes = {
    x: len(dataloaders[x])
    for x in ["train", "test"]
}

In [None]:
def plot_acc_loss(losses, accuracies):
    num_epoch = len(losses['train'])
    epochs = [x+1 for x in range(num_epoch)]

    plt.figure(figsize=(12,6))

    plt.subplot(1,2,1)
    plt.plot(epochs, losses["train"], label="Training Loss", marker='o')
    plt.plot(epochs, losses["test"], label="Testing Loss", marker='o')
    plt.title('Training and Testing Losses')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    plt.subplot(1,2,2)
    plt.plot(epochs, accuracies["train"], label="Training Accuracy", marker='o')
    plt.plot(epochs, accuracies["test"], label="Testing Accuracy", marker='o')
    plt.title('Training and Testing Accuracies')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()
    
    plt.show()

In [None]:
def train_model(model, 
                criterion = nn.CrossEntropyLoss(),
                learning_rate=learning_rate,
                optimizer = None, 
                schedular=None, 
                num_epoch=num_epoch,
                save_checkpoint=False):
    
    if optimizer == None:
        optimizer = torch.optim.Adam(
            params=model.parameters(),
            lr=learning_rate
        )

    print(f"Training Started on {device}")
    train_time = 0

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    epoch_loss_list = {
        "train": [],
        "test": []
    }

    epoch_acc_list = {
        "train": [],
        "test": []
    }

    for epoch in range(num_epoch):
        print(f"Epoch {epoch+1}/{num_epoch}", end="")

        #each epoch has a training and a validation phase
        for phase in ["train", "test"]:
            time_start = time.time()

            model.train() if phase == "train" else model.eval()

            running_loss = 0.0
            running_corrects = 0.0

            for i, (images, labels) in enumerate(dataloaders[phase]):
                images = images.to(device)
                labels = labels.to(device)
                
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(images)
                    output_one_hot = torch.argmax(outputs, dim=1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training loop
                    if phase == "train":
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                if phase == "train" and (i+1) % int(dataloader_sizes[phase]/10) == 0:
                    print("-", end=" ")
                        
                running_loss += loss.item() * images.size(0)
                running_corrects += torch.sum(output_one_hot == labels.data)

            if schedular != None and phase == "train":
                    schedular.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = 100 * running_corrects.double() / dataset_sizes[phase]
            
            epoch_time = time.time() - time_start
            if phase == "train":
                train_time += epoch_time

            print(f"\n{phase} Loss: {epoch_loss:.2f} Acc: {epoch_acc:.2f}%", f"Time_Taken: {epoch_time//60:.0f}m {epoch_time%60:.0f}s", end="")

            epoch_loss_list[phase].append(float(epoch_loss))
            epoch_acc_list[phase].append(float(epoch_acc))

            # deep copy the best model
            if phase == "test" and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print("\n")

    print(f"Training Finished in {train_time//60:.0f}m {train_time%60:.0f}s")
    print(f"Best Test Accuracy: {best_acc:4f}")
    print(model.__class__.__name__ + "_checkpoint_best_acc_" + str(f"{best_acc:.4f}") + "_" + "_epoch_" + str(num_epoch) + "_optim_" + optimizer.__class__.__name__ + "_criterion_" + criterion.__class__.__name__)

    model.load_state_dict(best_model_wts)

    checkpoint = {
        "epoch": num_epoch,
        "criterion": criterion,
        "model_state": model.state_dict(),
        "optim_state": optimizer.state_dict(),
        "epoch_losses": epoch_loss_list,
        "epoch_accuracies": epoch_acc_list
    }
    if save_checkpoint == True:
        file_name = model.__class__.__name__ + "_checkpoint_best_acc_" + str(f"{best_acc:.4f}") + "_" + "_epoch_" + str(num_epoch) + "_optim_" + optimizer.__class__.__name__ + "_criterion_" + criterion.__class__.__name__ + ".pth"
        torch.save(checkpoint, file_name)
    
    plot_acc_loss(epoch_loss_list, epoch_acc_list)

    return model, checkpoint