In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchtune
import pickle as pkl

from typing import Literal
from dataclasses import dataclass
from matplotlib import pyplot as plt
from tqdm import tqdm
from torchvision.transforms import v2
from torchsummary import summary
from torchmetrics.classification import MulticlassAccuracy, MulticlassPrecision, MulticlassRecall, MulticlassF1Score



In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


# Data Loader

In [3]:
IMG_DIM = 128
NUM_CHANNELS = 3
BATCH_SIZE = 512
NORMALIZE_MEAN = (0.485,0.456,0.406)
NORMALIZE_STD = (0.229,0.224,0.225)
NUM_CLASSES = 90
NUM_REAL_IMG_PER_CLASS = 60
NUM_AI_IMG_PER_CLASS = 30
REAL_IMG_TRAIN_PERCENTAGE = 0.5
REAL_IMG_TEST_PERCENTAGE = 0.5

In [4]:
def get_subset_indices(
    num_img_per_class: int,
    percent: float = 1.0,
    side: Literal["left", "right"] = "left",
) -> np.ndarray:
    indices = []
    for i in range(NUM_CLASSES):
        base = i * num_img_per_class
        class_size = int(num_img_per_class * percent)
        start = 0 if side == "left" else num_img_per_class - class_size
        indices.extend(
            list(np.arange(base + start, base + start + class_size))
        )
    return np.array(indices, dtype=np.int32)

In [5]:
def get_loader(
    real_img_dir: str = "./real_animals",
    ai_img_dir: str = "./ai_animals",
    real_img_percent: float = 1.0,
    ai_img_percent: float = 0.0,
    batch_size: int = BATCH_SIZE,
    num_workers: int = 0,
    shuffle: bool = True,
) -> tuple[torch.utils.data.DataLoader, torch.utils.data.DataLoader]:
    """
    Get train/test dataloaders for real and AI-generated images.
    """
    transform = v2.Compose([
        v2.Resize((IMG_DIM, IMG_DIM)),
        v2.ToImage(),
        v2.ToDtype(torch.float32, scale=True),
        v2.Normalize(mean=NORMALIZE_MEAN, std=NORMALIZE_STD),
    ])

    train_real_img_subset = torch.utils.data.Subset(
        torchvision.datasets.ImageFolder(
            root=real_img_dir,
            transform=transform,
            allow_empty=True,
        ),
        indices=get_subset_indices(NUM_REAL_IMG_PER_CLASS, percent=REAL_IMG_TRAIN_PERCENTAGE * real_img_percent, side="left"),
    )
    test_real_img_subset = torch.utils.data.Subset(
        torchvision.datasets.ImageFolder(
            root=real_img_dir,
            transform=transform,
            allow_empty=True,
        ),
        indices=get_subset_indices(NUM_REAL_IMG_PER_CLASS, percent=REAL_IMG_TEST_PERCENTAGE, side="right"),
    )
    ai_img_subset = torch.utils.data.Subset(
        torchvision.datasets.ImageFolder(
            root=ai_img_dir,
            transform=transform,
            allow_empty=True,
        ),
        indices=get_subset_indices(NUM_AI_IMG_PER_CLASS, percent=ai_img_percent, side="left"),
    )

    train_dataset = torchtune.datasets.ConcatDataset(datasets=[train_real_img_subset, ai_img_subset]) if ai_img_percent > 0.0 else train_real_img_subset
    train_img_dataloader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
    )
    test_img_dataloader = torch.utils.data.DataLoader(
        dataset=test_real_img_subset,
        batch_size=batch_size,
        shuffle=shuffle,
        num_workers=num_workers,
    )

    return train_img_dataloader, test_img_dataloader

# Models

In [10]:
class CNN(nn.Module):
    def __init__(self, input_channels, n_classes):
        super(CNN, self).__init__()

        # set metadata
        self.input_channels = input_channels
        self.n_classes = n_classes
        self.FINAL_LAYER_SIZE = 4
        self.final_layer_channels = 40
        self.flatten_layer_size = self.final_layer_channels * self.FINAL_LAYER_SIZE * self.FINAL_LAYER_SIZE

        # dropout layers
        self.dropout50 = nn.Dropout(p=0.5)
        self.dropout10 = nn.Dropout(p=0.1)

        # set up layers
        self.conv1 = nn.Conv2d(in_channels=input_channels, out_channels=8, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv3 = nn.Conv2d(in_channels=16, out_channels=24, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv4 = nn.Conv2d(in_channels=24, out_channels=32, kernel_size=3, padding=1)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv5 = nn.Conv2d(in_channels=32, out_channels=self.final_layer_channels, kernel_size=3, padding=1)
        self.pool5 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.fc1 = nn.Linear(self.flatten_layer_size, 128)
        self.fc2 = nn.Linear(128, n_classes)

    def forward(self, x):
        # 1: conv -> pool
        x = self.conv1(x)
        x = self.dropout10(torch.nn.functional.leaky_relu(x))
        x = self.pool1(x)

        # 2: conv -> pool
        x = self.conv2(x)
        x = self.dropout10(torch.nn.functional.leaky_relu(x))
        x = self.pool2(x)

        # 3: conv -> pool
        x = self.conv3(x)
        x = self.dropout10(torch.nn.functional.leaky_relu(x))
        x = self.pool3(x)

        # 4: conv -> pool
        x = self.conv4(x)
        x = self.dropout10(torch.nn.functional.leaky_relu(x))
        x = self.pool4(x)

        # 5: conv -> pool
        x = self.conv5(x)
        x = self.dropout10(torch.nn.functional.leaky_relu(x))
        x = self.pool5(x)

        # flatten the features (the first dimension is batch size)
        x = x.view(-1, self.flatten_layer_size)

        # fc layers
        x = self.dropout50(torch.nn.functional.leaky_relu(self.fc1(x)))
        x = self.fc2(x)
        return x

In [11]:
summary(CNN(input_channels=3, n_classes=NUM_CLASSES).to(device), (3, IMG_DIM, IMG_DIM), batch_size=BATCH_SIZE, device=device.type)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [512, 8, 128, 128]             224
           Dropout-2         [512, 8, 128, 128]               0
         MaxPool2d-3           [512, 8, 64, 64]               0
            Conv2d-4          [512, 16, 64, 64]           1,168
           Dropout-5          [512, 16, 64, 64]               0
         MaxPool2d-6          [512, 16, 32, 32]               0
            Conv2d-7          [512, 24, 32, 32]           3,480
           Dropout-8          [512, 24, 32, 32]               0
         MaxPool2d-9          [512, 24, 16, 16]               0
           Conv2d-10          [512, 32, 16, 16]           6,944
          Dropout-11          [512, 32, 16, 16]               0
        MaxPool2d-12            [512, 32, 8, 8]               0
           Conv2d-13            [512, 40, 8, 8]          11,560
          Dropout-14            [512, 4

In [None]:
class CombinedResNet50(nn.Module):
    def __init__(self, input_channels, n_classes):
        super(CNN, self).__init__()

        # set metadata
        self.input_channels = input_channels
    
    def forward(self, x):
        return x

In [None]:
summary(CombinedResNet50(input_channels=3, n_classes=NUM_CLASSES).to(device), (3, IMG_DIM, IMG_DIM), batch_size=BATCH_SIZE, device=device.type)

# Hyper Parameter Tuning

In [18]:
@dataclass
class Result:
    train_losses: list[float]
    avg_train_accuracies: list[float]
    test_losses: list[float]
    test_accuracies: np.ndarray
    avg_test_accuracies: list[float]
    test_precision: np.ndarray
    test_recall: np.ndarray
    test_f1score: np.ndarray
    avg_test_precision: float
    avg_test_recall: float
    avg_test_f1score: float

In [19]:
def print_basic_results(result: Result):
    print(f"Train Loss: {result.train_losses[-1]}")
    print(f"Train Accuracy: {result.avg_train_accuracies[-1] * 100}%\n")
    print(f"Test Loss: {result.test_losses[-1]}")
    print(f"Test Accuracy: {result.avg_test_accuracies[-1] * 100}%\n")
    print(f"Test Precision: {result.avg_test_precision * 100}%")
    print(f"Test Recall: {result.avg_test_recall * 100}%")
    print(f"Test F1 Score: {result.avg_test_f1score * 100}%")

In [20]:
def plot_loss_accuracy(result: Result) -> None:
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(16, 4))
    fig.subplots_adjust(wspace=0.4)

    ax[0].set_title("Loss")
    ax[0].set_xlabel("Epoch")
    ax[0].plot(result.train_losses, label="Train Loss", color="blue")
    ax[0].plot(result.test_losses, label="Test Loss", color="red")
    ax[0].legend()
    ax[0].set_ylabel("Cross Entropy Loss")
    ax[0].grid(axis="both", linestyle="--", alpha=0.7)

    ax[1].set_title("Accuracy")
    ax[1].set_xlabel("Epoch")
    ax[1].plot(np.array(result.avg_train_accuracies)*100, label="Train Accuracy", color="blue")
    ax[1].plot(np.array(result.avg_test_accuracies)*100, label="Test Accuracy", color="red")
    ax[1].legend()
    ax[1].set_ylabel("Accuracy (%)")
    ax[1].grid(axis="both", linestyle="--", alpha=0.7)

In [21]:
def plot_improvement_by_class(
    result_left: Result,
    result_right: Result,
    metric: str,
    classes: list[str],
    title: str = "Changes by Class",
    multiply_by: float = 1.0,
) -> None:
    # metric can be "test_accuracies", "test_precision", "test_recall", "test_f1score"
    fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(16, 4))

    diffs = np.array(result_right.__getattribute__(metric) - result_left.__getattribute__(metric))*multiply_by
    sorted_indices = np.argsort(diffs)
    diffs = diffs[sorted_indices]
    classes = np.array(classes)[sorted_indices]

    mask_positive = diffs >= 0
    mask_negative = diffs < 0

    negative = ax.bar(classes[mask_negative], diffs[mask_negative], color="red")
    ax.bar_label(negative, np.round(diffs[mask_negative], decimals=1), label_type="edge", rotation=90)
    
    positive = ax.bar(classes[mask_positive], diffs[mask_positive], color="green")
    ax.bar_label(positive, np.round(diffs[mask_positive], decimals=1), label_type="edge", rotation=90)

    ax.tick_params(axis='x', labelrotation=90)
    ax.set_title(title)
    ax.set_xlabel("Class")
    ax.set_ylabel(f"Change in {metric.replace('_', ' ').title()}")

    diff_range = max(diffs) - min(diffs)
    ax.set_ylim(bottom=min(diffs) - 0.2*diff_range, top=max(diffs) + 0.2*diff_range)
    ax.axhline(0, color="black", lw=1, ls="-")

In [22]:
def plot_diff_barchart(results: dict[str, Result]) -> None:
    # bar chart showing performance in each desired metric for each group of results using the str key as a name
    metrics = {
        "Train Loss": {
            "key": "train_losses",
            "get_value": lambda x: x[-1],
        },
        "Train Accuracy (%)": {
            "key": "avg_train_accuracies",
            "get_value": lambda x: x[-1]*100.0,
        },
        "Test Loss": {
            "key": "test_losses",
            "get_value": lambda x: x[-1],
        },
        "Test Accuracy (%)": {
            "key": "avg_test_accuracies",
            "get_value": lambda x: x[-1]*100.0,
        },
        "Test Precision (%)": {
            "key": "avg_test_precision",
            "get_value": lambda x: x*100.0,
        },
        "Test Recall (%)": {
            "key": "avg_test_recall",
            "get_value": lambda x: x*100.0,
        },
        "Test F1 Score (%)": {
            "key": "avg_test_f1score",
            "get_value": lambda x: x*100.0,
        }
    }

    metric_names = list(metrics.keys())
    x = np.arange(len(metrics))
    total_width = 0.5
    width = total_width / len(results)
    offsets = np.linspace(-total_width/2, total_width/2, len(results))
    values = {}
    max_value = 0.0
    for result_name, result in results.items():
        for _, metric in metrics.items():
            if result_name not in values:
                values[result_name] = []
            value = metric["get_value"](result.__getattribute__(metric["key"]))
            values[result_name].append(value)
            if value > max_value:
                max_value = value

    fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(16, 4))
    for i, result_name in enumerate(values):
        bars = ax.bar(x + offsets[i], values[result_name], width, label=result_name)
        ax.bar_label(bars, np.round(values[result_name], decimals=1), label_type="edge", rotation=90)

    ax.tick_params(axis='x', labelrotation=45)
    ax.set_title("Comparison of Results by Metric")
    ax.set_xlabel("Metric")
    ax.set_ylabel("Percent or Loss Depending on Metric")
    ax.set_xticks(x, metric_names)
    ax.legend()
    ax.set_ylim(bottom=0, top=max_value + 0.2*max_value)

In [23]:
def train_and_test_model(
        model: nn.Module,
        optimizer: optim.Optimizer,
        train_loader: torch.utils.data.DataLoader,
        test_loader: torch.utils.data.DataLoader,
        E: int,
        verbose: Literal["none", "prints", "epoch_tqdm", "loader_tqdm"] = "epoch_tqdm",
    ) -> Result:
    """
    Train and test the given model with the given parameters.
    """
    loss_function = nn.CrossEntropyLoss().to(device)

    accuracy_metric = MulticlassAccuracy(average='none', num_classes=NUM_CLASSES).to(device)
    precision_metric = MulticlassPrecision(average='none', num_classes=NUM_CLASSES).to(device)
    recall_metric = MulticlassRecall(average='none', num_classes=NUM_CLASSES).to(device)
    f1_metric = MulticlassF1Score(average='none', num_classes=NUM_CLASSES).to(device)

    train_losses = []
    avg_train_accuracies = []
    test_losses = []
    test_accuracies = 0
    avg_test_accuracies = []
    test_precision = 0
    test_recall = 0
    test_f1score = 0
    avg_test_precision = 0
    avg_test_recall = 0
    avg_test_f1score = 0

    for epoch in tqdm(range(E), total=E, disable=verbose!="epoch_tqdm"):
        # TRAINING
        model.train()
        batch_losses = []
        accuracy_metric.reset()
        for images, labels in tqdm(train_loader, total=len(train_loader), disable=verbose!="loader_tqdm"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = loss_function(outputs, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            batch_losses.append(loss.item())
            accuracy_metric.update(outputs, labels)
        train_loss = np.mean(np.array(batch_losses))
        train_losses.append(train_loss)
        train_acc = accuracy_metric.compute()
        avg_train_accuracies.append(train_acc.mean().item())

        # TESTING
        model.eval()
        test_batch_losses = []
        accuracy_metric.reset()
        precision_metric.reset()
        recall_metric.reset()
        for images, labels in tqdm(test_loader, total=len(test_loader), disable=verbose!="loader_tqdm"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            test_batch_losses.append(loss_function(outputs, labels).item())
            accuracy_metric.update(outputs, labels)
            if epoch >= E - 1:
                precision_metric.update(outputs, labels)
                recall_metric.update(outputs, labels)
                f1_metric.update(outputs, labels)
        test_loss = np.mean(np.array(test_batch_losses))
        test_losses.append(test_loss)
        test_acc = accuracy_metric.compute()
        avg_test_accuracies.append(test_acc.mean().item())
        if epoch >= E - 1:
            test_accuracies = test_acc.cpu().numpy()
            test_precision = precision_metric.compute().cpu().numpy()
            test_recall = recall_metric.compute().cpu().numpy()
            test_f1score = f1_metric.compute().cpu().numpy()
            avg_test_precision = test_precision.mean().item()
            avg_test_recall = test_recall.mean().item()
            avg_test_f1score = test_f1score.mean().item()

        if verbose=="prints":
            print(f"Epoch [{epoch+1}/{E}]: Train Accuracy: {avg_train_accuracies[-1]*100:.2f}%, Train Loss: {train_loss:.4f}, Test Accuracy: {avg_test_accuracies[-1]*100:.2f}%, Test Loss: {test_loss:.4f}")

    print(f"\nEvaluation results:\nTrain Accuracy: {avg_train_accuracies[-1]*100:.2f}%, Train Loss: {train_loss:.4f}\nTest Accuracy: {avg_test_accuracies[-1]*100:.2f}%, Test Loss: {test_loss:.4f}")

    return Result(
        train_losses=train_losses,
        avg_train_accuracies=avg_train_accuracies,
        test_losses=test_losses,
        test_accuracies=test_accuracies,
        avg_test_accuracies=avg_test_accuracies,
        test_precision=test_precision,
        test_recall=test_recall,
        test_f1score=test_f1score,
        avg_test_precision=avg_test_precision,
        avg_test_recall=avg_test_recall,
        avg_test_f1score=avg_test_f1score,
    )

In [24]:
train_loader, test_loader = get_loader(real_img_percent=1.0, ai_img_percent=0.0, num_workers=0)

In [24]:
model_params = {
    "name": "cnn_tune",
    "learning_rate": 0.0075,
    "weight_decay": 0.0001,
    "epochs": 50,
}
model = CNN(input_channels=NUM_CHANNELS, n_classes=NUM_CLASSES).to(device)
optimizer = optim.Adam(model.parameters(), lr=model_params["learning_rate"], weight_decay=model_params["weight_decay"])
results_1 = train_and_test_model(
    model=model,
    optimizer=optimizer,
    train_loader=train_loader,
    test_loader=test_loader,
    E=model_params["epochs"],
    verbose="epoch_tqdm",
)

  0%|          | 0/50 [00:00<?, ?it/s]

100%|██████████| 50/50 [37:41<00:00, 45.23s/it]


Evaluation results:
Train Accuracy: 37.26%, Train Loss: 2.3609
Test Accuracy: 14.85%, Test Loss: 4.1641





In [27]:
with open(f"./results/{model_params["name"]}_{model_params["epochs"]}e_{str(model_params["learning_rate"])[2:]}lr_{str(model_params["weight_decay"])[2:]}wd.pkl", "wb") as f:
    pkl.dump(results_1, f)

# Evaluation

# Plot Results