### Imports

In [1]:
import os
import random
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from enum import Enum
from PIL import Image
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm

import torch
from torch import nn, optim
from torch.utils.data import DataLoader, TensorDataset, random_split

from torchvision import models
from torchvision.datasets import ImageFolder
from torchvision.io import read_image, ImageReadMode
from torchvision.transforms import functional as TF
from torchvision.transforms import v2

  from .autonotebook import tqdm as notebook_tqdm


### Util Functions

In [2]:
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
# change if on kaggle
KAGGLE_PATH = '.'

In [4]:
class ImageSet(Enum):
    full = 1
    resized = 2
    cropped = 3

In [5]:
random.seed(22)
def rotate_lucky_img():
    lucky_num = random.randint(1, 10)
    if lucky_num < 0.2:
        return True
    return False

In [6]:
IMAGE_RESIZE = (256, 256)
CENTER_CROP = (224, 224)
INCEPTIONV3_RESIZE = (299, 299) # InceptionNetV3 requires image sizes of 299x299

def test_image_transform(image_set = ImageSet.full):
    if ImageSet.full == image_set:
        img_transforms = v2.Compose([
            v2.Resize(IMAGE_RESIZE),
            v2.ToDtype(torch.float32, scale=True)
        ])
    else:
        img_transforms = v2.Compose([
            v2.ToDtype(torch.float32, scale=True)
        ])

    return img_transforms

def crop_image_transform():
    img_transforms = v2.Compose([
        v2.Resize(IMAGE_RESIZE),
        # v2.CenterCrop(CENTER_CROP),
        v2.RandomHorizontalFlip(0.2),
        v2.RandomVerticalFlip(0.2),
        v2.ToDtype(torch.float32, scale=True),
        v2.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return img_transforms

def random_rotate_transform():
    img_transforms = v2.Compose([
        v2.RandomRotation(0, 180),
        v2.ToDtype(torch.float32, scale=True),
        v2.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return img_transforms

def train_image_transform():
    img_transforms = v2.Compose([
        v2.RandomHorizontalFlip(0.2),
        v2.RandomVerticalFlip(0.2),
        v2.ToDtype(torch.float32, scale=True),
        v2.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return img_transforms

def inception_image_transform():
    img_transforms = v2.Compose([
        v2.Resize(INCEPTIONV3_RESIZE),
        v2.RandomHorizontalFlip(0.2),
        v2.RandomVerticalFlip(0.2),
        v2.ToDtype(torch.float32, scale=True),
        v2.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225]
        )
    ])

    return img_transforms

In [7]:
def load_image_dataset(image_paths, image_set=ImageSet.resized, train_inception=False, run_test_image=False):
    images = []

    for image_path in tqdm(image_paths):
        # for resized images, can read in using torchvision
        # for full sized images, need to use PIL as not all images are jpeg format
        if ImageSet.resized == image_set:
            image_tensor = read_image(image_path, mode=ImageReadMode.RGB)
        else:
            with Image.open(image_path) as pil_image:
                image_tensor = TF.to_tensor(pil_image.convert('RGB'))

        # setting transforms, InceptionNet V3 requires the size of 299x299
        # therefore it gets its own transformer
        if run_test_image:
            img_transformer = test_image_transform(image_set)
        elif train_inception:
            img_transformer = inception_image_transform()
        elif ImageSet.full == image_set:
            img_transformer = crop_image_transform()
        else:
            img_transformer = train_image_transform()

        image_tensor = img_transformer(image_tensor)
        images.append(image_tensor)

    return np.array(images)

In [8]:
# using the load_dataset from the starter notebook as a template.
# converting this function to work with pytorch
def load_dataset_pil(image_paths, is_training=False, use_resized=True):
    images = []

    for image_path in image_paths:
        if use_resized:
            pil_image = Image.open(image_path)
        else:
            pil_image = Image.open(image_path).convert('RGB')

        if is_training:
            if rotate_lucky_img():
                img_transform = random_rotate_transform()
            else:
                img_transform = train_image_transform()
            pil_image = img_transform(pil_image)
        else:
            pil_image = TF.to_tensor(pil_image)

        numpy_image = np.asarray(pil_image)
        images.append(numpy_image)

    return np.array(images)

In [9]:
def generate_tensor_datasets(x_train, x_val, y_train, y_val):

    x_train_tensor = torch.Tensor(x_train)
    x_val_tensor = torch.Tensor(x_val)

    y_train_tensor = torch.Tensor(y_train)
    y_val_tensor = torch.Tensor(y_val)

    # Resized Training Images
    ds_train_images = TensorDataset(x_train_tensor, y_train_tensor)
    ds_val_images = TensorDataset(x_val_tensor, y_val_tensor)

    return ds_train_images, ds_val_images

In [10]:
def generate_test_dataloader(test_images, batch_size):
    test_image_tensor = torch.Tensor(test_images)
    ds_test_images = TensorDataset(test_image_tensor)

    test_dl = DataLoader(ds_test_images, batch_size=batch_size)

    return test_dl

In [11]:
def plot_losses(train_losses, validate_losses):
    plt.plot(train_losses, label="Training")
    plt.plot(validate_losses, label="Validation")

    plt.title("Training and Validation Log Loss")
    plt.xlabel("Epochs")
    plt.ylabel("Log Loss")

    plt.legend(loc='best')
    plt.show()

In [12]:
class EarlyStopper:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.loss_counter = 0
        self.min_validation_loss = float('inf')

    def early_stop(self, validation_loss):
        if validation_loss < self.min_validation_loss:
            self.min_validation_loss = validation_loss
            self.loss_counter += 1
        elif validation_loss > (self.min_validation_loss + self.min_delta):
            self.loss_counter += 1
            if self.loss_counter >= self.patience:
                return True
        else:
            return False

#### Mean Calculations

In [13]:
def mean_columnwise_log_loss(y_true, y_pred):
    # Clip the predictions to prevent log(0)
    epsilon = 1e-7
    y_pred = np.clip(y_pred, epsilon, 1.0 - epsilon)

    # Compute log loss for each label (column)
    log_loss_per_column = -np.mean(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred), axis=0)

    # Return the mean log loss across all labels
    return np.mean(log_loss_per_column)

In [14]:
## Pytorch Mean Columnwise Log Loss
def mean_columnwise_log_loss_torch(y_true, y_pred):
    # Clip the predictions to prevent log(0)
    epsilon = 1e-7
    y_pred = torch.clamp(y_pred, epsilon, 1.0 - epsilon)

    # Compute log loss for each label (column)
    log_loss_per_column = -(y_true * torch.log(y_pred) + (1 - y_true) * torch.log(1 - y_pred)).mean(dim=0)

    # Return the mean log loss across all labels
    return log_loss_per_column.mean()

### Getting Image Data

In [15]:
def get_image_labels(train_df):
    # dropping the filename from our labels
    labels = train_df.drop(columns=["filename"]).to_numpy()
    labels = labels.astype(np.float32)

    return labels

In [16]:
def get_train_image_paths(image_set=ImageSet.resized):

    train_df = pd.read_csv("train.csv")
    if ImageSet.resized == image_set:
        train_image_paths = train_df["filename"].apply(lambda x: f"{KAGGLE_PATH}/train_resized_images/{x}.jpg").tolist()
    else:
        train_image_paths = train_df["filename"].apply(lambda x: f"{KAGGLE_PATH}/train_images/{x}.jpg").tolist()

    print(f"Checking Image Paths:")
    print(f"{train_image_paths[:10]}")

    return train_image_paths

In [17]:
def get_test_images_data(image_set=ImageSet.resized):
    # test_images_dir = "./test_images"

    my_pred_df = pd.read_csv("sample_submission.csv")

    if ImageSet.resized == image_set:
        test_image_paths = my_pred_df["filename"].apply(lambda x: f"{KAGGLE_PATH}/test_resized_images/{x}.jpg").tolist()
    else:
        test_image_paths = my_pred_df["filename"].apply(lambda x: f"{KAGGLE_PATH}/test_images/{x}.jpg").tolist()

    start_time = time.time()
    test_images = load_image_dataset(test_image_paths, image_set, run_test_image=True)
    end_time = time.time()
    print(f"Test Images Runtime: {end_time - start_time} seconds")

    return my_pred_df, test_images

In [18]:
def get_train_images_data(image_set=ImageSet.resized, train_inception=False):
    train_image_paths = get_train_image_paths(image_set)

    start_time = time.time()
    train_images = load_image_dataset(
        train_image_paths,
        image_set,
        train_inception
    )
    end_time = time.time()
    print(f"Training Image Runtime: {end_time - start_time} seconds")

    train_df = pd.read_csv("train.csv")
    train_image_paths = train_df["filename"].apply(lambda x: f"{KAGGLE_PATH}/train_resized_images/{x}.jpg").tolist()

    labels = get_image_labels(train_df)

    x_train, x_val, y_train, y_val = train_test_split(
        train_images, labels,
        test_size=0.2,
        random_state=9
    )

    ds_train_images, ds_val_images = generate_tensor_datasets(
        x_train,
        x_val,
        y_train,
        y_val,
    )

    return ds_train_images, ds_val_images

#### Crop Images Data

In [19]:
def get_cropped_images_data():
    current_dir = os.getcwd()
    crop_images_dir = os.path.join(current_dir, "crop")

    ds_crop = ImageFolder(crop_images_dir, transform=crop_image_transform())

    ds_crop_train, ds_crop_val = random_split(
        ds_crop,
        [0.8, 0.2],
        generator=torch.Generator().manual_seed(22)
    )

    num_classes = len(ds_crop.classes)
    print(f"Number of classes: {num_classes}")

    return ds_crop_train, ds_crop_val, ds_crop.classes

#### Resized Images Datasets

In [20]:
def generate_datasets(x_train, x_val, y_train, y_val, test_images):
    x_train_tensor = torch.Tensor(x_train)
    x_val_tensor = torch.Tensor(x_val)

    y_train_tensor = torch.Tensor(y_train)
    y_val_tensor = torch.Tensor(y_val)

    # Resized Training Images
    ds_train_images = TensorDataset(x_train_tensor, y_train_tensor)
    ds_val_images = TensorDataset(x_val_tensor, y_val_tensor)

    # Resized Test Images
    test_image_tensor = torch.Tensor(test_images)
    ds_test_images = TensorDataset(test_image_tensor)

    return ds_train_images, ds_val_images, ds_test_images

In [21]:
def train_model(dataloader, model, loss_fn, optimizer, use_cropped):
    log_losses = []

    model.train()
    for batch_index, (x, y) in enumerate(dataloader):
        x = x.to(DEVICE)
        # if is_crop:
        #     y = y.unsqueeze(1).to(DEVICE)
        # else:
        y = y.to(DEVICE)

        pred_y = model(x)

        # this should only trigger on inception nets
        # if isinstance(pred_y, tuple):
        #     pred_y = pred_y[0]
        loss = loss_fn(pred_y, y)
        optimizer.zero_grad()

        loss.backward()
        optimizer.step()

        if batch_index % 10 == 0:
            training_loss = mean_columnwise_log_loss_torch(y, pred_y)
            print(f"Training {training_loss=:.5f}")
            log_losses.append(training_loss.cpu().detach())
    return np.mean(log_losses)

In [22]:
def validate_model(dataloader, model, loss_fn, use_cropped):
    log_losses = []

    model.eval()
    with torch.no_grad():
        for x, y in dataloader:
            x = x.to(DEVICE)
            # if is_crop:
            #     y = y.unsqueeze(1).to(DEVICE)
            # else:
            y = y.to(DEVICE)

            pred_y = model(x)
            # if isinstance(pred_y, tuple):
            #     pred_y = pred_y[0]
            # loss = loss_fn(pred_y, y)
            validate_loss = mean_columnwise_log_loss_torch(y, pred_y)
            log_losses.append(validate_loss.cpu().detach())
            print(f"Validate {validate_loss=:.5f}")

    return np.mean(log_losses)

In [23]:
def run_train_test(
        epochs,
        batch_size,
        model,
        loss_fn,
        optimizer,
        image_set=ImageSet.resized,
        train_on_cropped=False,
        train_inception_net=False
    ):
    if ImageSet.cropped == image_set:
        ds_crop_train, ds_crop_validate, _class_list = get_cropped_images_data()
        train_dl = DataLoader(ds_crop_train, batch_size=batch_size)
        test_dl = DataLoader(ds_crop_validate, batch_size=batch_size)
    else:
        ds_train_images, ds_val_images = get_train_images_data(image_set, train_inception_net)
        train_dl = DataLoader(ds_train_images, batch_size=batch_size)
        test_dl = DataLoader(ds_val_images, batch_size=batch_size)

    train_log_loss = []
    valid_log_loss = []
    for epoch in range(epochs):
        print(f"---- Epoch: {epoch+1} ----")
        train_log_loss.append(train_model(train_dl, model, loss_fn, optimizer, train_on_cropped))
        valid_log_loss.append(validate_model(test_dl, model, loss_fn, train_on_cropped))

    return train_log_loss, valid_log_loss

In [24]:
def run_submission(nn_model, model_name, image_set, batch_size):
    my_pred_df, test_images = get_test_images_data(image_set)

    test_images_dl = generate_test_dataloader(test_images, batch_size)

    nn_model.eval()
    test_predictions = None
    with torch.no_grad():
        for x in test_images_dl:
            x = x[0].to(DEVICE)
            pred_y = nn_model(x)
            np_y = pred_y.cpu().detach().numpy()
            if test_predictions is None:
                test_predictions = np_y
            else:
                test_predictions = np.concatenate((test_predictions, np_y), axis=0)

        # test_image_tensor = test_image_tensor.to(DEVICE)
        # test_predictions = nn_model(test_image_tensor)
        # test_pred_np = test_predictions.cpu().detach().numpy()
        my_pred_df.iloc[:, 1:] = test_predictions #test_pred_np

    my_pred_df.to_csv(f"submissions/torch_{model_name}.csv", index=False)

### Defining Models

In [25]:
# writing my own AlexNet with a sigmoid layer
class AlexNet5(nn.Module):
    def __init__(self, num_classes, linear_in, dropout_1=0.1, dropout_2=0.2):
        super(AlexNet5, self).__init__()
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=0),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.BatchNorm2d(64)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.BatchNorm2d(192)
        )
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(),
        )
        self.conv_block4 = nn.Sequential(
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(),
        )
        self.conv_block5 = nn.Sequential(
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.BatchNorm2d(256)
        )

        self.avgpool = nn.AdaptiveAvgPool2d((6,6))

        self.drop_layer1 = nn.Dropout(p=dropout_1)
        self.drop_layer2 = nn.Dropout(p=dropout_2)

        self.linear_block = nn.Sequential(
            nn.Linear(linear_in, 4096),
            nn.ReLU(),
            nn.Linear(4096, 1024),
            nn.ReLU(),
            nn.Linear(1024, num_classes)
        )

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.conv_block1(x)
        out = self.drop_layer2(out)

        out = self.conv_block2(out)
        out = self.drop_layer2(out)

        out = self.conv_block3(out)

        out = self.conv_block4(out)

        out = self.conv_block5(out)

        out = self.avgpool(out)
        out = torch.flatten(out, 1)

        out = self.linear_block(out)
        out = self.sigmoid(out)

        return out

In [26]:
class CNN7(nn.Module):
    def __init__(self, num_classes, linear1_in) -> None:
        super(CNN7, self).__init__()

        self.conv_block1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            # nn.Conv2d(64, 64, kernel_size=3, padding=1),
            # nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.BatchNorm2d(64)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            # nn.Conv2d(128, 128, kernel_size=3, padding=1),
            # nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.BatchNorm2d(128)
        )
        self.conv_block3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            # nn.Conv2d(256, 256, kernel_size=3, padding=1),
            # nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.BatchNorm2d(256)
        )
        self.conv_block4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(),
            # nn.Conv2d(512, 512, kernel_size=3, padding=1),
            # nn.ReLU(),
            # nn.Conv2d(512, 512, kernel_size=3, padding=1),
            # nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.BatchNorm2d(512)
        )

        self.linear_layer1 = nn.Linear(linear1_in, 512)
        self.linear_layer2 = nn.Linear(512, num_classes)

        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        out = self.conv_block1(x)
        out = self.conv_block2(out)
        out = self.conv_block3(out)
        out = self.conv_block4(out)

        out = torch.flatten(out, 1)

        out = self.linear_layer1(out)
        out = self.relu(out)
        out = self.linear_layer2(out)

        out = self.sigmoid(out)

        return out

### Training & Testing

#### Init Model & Hyperparameters

In [None]:
# Hyperparameters
LR = 0.001
DECAY_RATE = 0.1

BATCH_SIZE = 64
EPOCHS = 50

# we know this to be 15, we can use dataloader on cropped image folder to verify
_NUM_CLASSES = 15

IMAGE_SET = ImageSet.resized

ALEXNET_IMG200_IN = 9216

# W * L * 512
# 61952 for 200X200
# 86528 for 224X224
CNN_IMG200_IN = 61952
CNN_IMG224_IN = 86528
CNN_IMG244_IN = 100352
CNN_IMG256_IN = 115200

# Model iteration
VERSION = "1"
# nn_model = AlexNet5(num_classes=_NUM_CLASSES, linear_in=ALEXNET_IMG200_IN).to(DEVICE)
# nn_model = CNN7(num_classes=_NUM_CLASSES, linear1_in=CNN_IMG200_IN).to(DEVICE)

nn_model = models.efficientnet_b4(weights=True, progress=True, num_classes=_NUM_CLASSES).to(DEVICE)

In [None]:
training_log_losses, validation_log_losses = run_train_test(
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    model=nn_model,
    loss_fn=nn.BCELoss(),
    optimizer=optim.RMSprop(nn_model.parameters(), lr=LR, weight_decay=DECAY_RATE, momentum=0.8),
    image_set=IMAGE_SET,
    train_on_cropped=False,
    train_inception_net=False
)

In [None]:
plot_losses(training_log_losses, validation_log_losses)

### Save/Load Torch Models

In [None]:
if ImageSet.resized == IMAGE_SET:
    image_size = 200
else:
    image_size = IMAGE_RESIZE[0]

model_name = f"{nn_model.__class__.__name__}_v{VERSION}_epoch{EPOCHS}_batch{BATCH_SIZE}_lr{LR}_decay{DECAY_RATE}_resize{image_size}_transform_rotate"
model_path = os.path.join("models", f"{model_name}.pt")
print(model_name)

In [None]:
torch.save(nn_model.state_dict(), model_path)

In [None]:
nn_model.load_state_dict(torch.load(model_path))

### Submission

In [None]:
run_submission(nn_model, model_name, IMAGE_SET, BATCH_SIZE)