# Question 1

In [None]:
import itertools
import random

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import torch
from datasets import load_dataset
from IPython.display import display
from models.auto_encoder import AutoEncoder
from sklearn.decomposition import PCA
from torchvision import transforms
from torchviz import make_dot
from training_testing import train_autoencoder
from utils.data_loader import CustomImageDataset
from utils.transformations import augment_dataset_with_replacement, resize_dataset

In [None]:
random_seed = 42
random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)

## Data Preprocessing

In [None]:
dataset = load_dataset("valhalla/emoji-dataset", cache_dir="data")

### Creating a Subset

In [None]:
expression_categories = [
    "face",
    "vampire",
    "elf",
    "mage",
    "hero",
    "villain",
    "evil monkey",
    "zombie",
    "haircut",
    "juggling",
]

data_subset = dataset["train"].filter(
    lambda example: any(
        category in example["text"] for category in expression_categories
    )
)

In [None]:
print("Number of images related to expression categories:", len(data_subset))
print("Subset example:", data_subset[25]["text"])
sample_image = data_subset[25]["image"]
display(sample_image)

In [None]:
max_width = 0
max_height = 0

for item in data_subset:
    img = item["image"]
    width, height = img.size
    if width > max_width:
        max_width = width
    if height > max_height:
        max_height = height

max_size = (max_width, max_height)
print("Maximum size of all images:", max_size)

### Splitting the data

Dividing this subset into training, validation and test sets using a 60/20/20 ratio.

In [None]:
total_size = len(data_subset)
train_size = int(0.6 * total_size)
val_size = int(0.2 * total_size)
test_size = total_size - train_size - val_size

In [None]:
train, val, test = torch.utils.data.random_split(
    data_subset, [train_size, val_size, test_size]
)

In [None]:
print("Split Size:\n----------")
print("Train dataset size:", len(train))
print("Validation dataset size:", len(val))
print("Test dataset size:", len(test))

### Augmenting to 600/200/200

In [None]:
augmentation_transforms = transforms.Compose(
    [
        transforms.RandomRotation(degrees=15),  # Random rotation up to 10 degrees
        transforms.RandomHorizontalFlip(
            p=0.5
        ),  # Random horizontal flip with a probability of 0.5
        transforms.RandomVerticalFlip(
            p=0.5
        ),  # Random vertical flip with a probability of 0.5
        transforms.ColorJitter(
            brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1
        ),  # Randomly Adjust brightness, contrast, saturation, and hue
        transforms.RandomAffine(
            degrees=5, translate=(0.1, 0.1)
        ),  # Random affine transformation
        transforms.RandomApply(
            [transforms.GaussianBlur(kernel_size=3)], p=0.1
        ),  # Random Gaussian blur
    ]
)

* We're augmenting data after sampling with replacement.
* This method increases the chances of seeing more varied augmented versions of the same image.
* It's particularly useful when your original dataset is small, as it helps to introduce more variability and potentially prevent overfitting.

In [None]:
train_aug = augment_dataset_with_replacement(train, 600, augmentation_transforms)
val_aug = augment_dataset_with_replacement(val, 200, augmentation_transforms)
test_aug = augment_dataset_with_replacement(test, 200, augmentation_transforms)

In [None]:
print("Augmented train dataset size:", len(train_aug))
print("Augmented validation dataset size:", len(val_aug))
print("Augmented test dataset size:", len(test_aug))

### Resizing to 64x64

In [None]:
resize_transform = transforms.Resize((64, 64))

In [None]:
train_aug_resized = resize_dataset(train_aug, resize_transform)
val_aug_resized = resize_dataset(val_aug, resize_transform)
test_aug_resized = resize_dataset(test_aug, resize_transform)

### Sample data

In [None]:
print("Subset example:", train_aug_resized[500]["text"])
sample_image = train_aug_resized[500]["image"]
display(sample_image)

### Tensor Dataset

In [None]:
train_dataset = CustomImageDataset(train_aug_resized)
val_dataset = CustomImageDataset(val_aug_resized)
test_dataset = CustomImageDataset(test_aug_resized)

In [None]:
min_value = float("inf")
max_value = float("-inf")

for item in train_dataset:
    image_tensor = item["image"]

    min_value = min(torch.min(image_tensor).item(), min_value)
    max_value = max(torch.max(image_tensor).item(), max_value)

print(f"Minimum value across the dataset: {min_value}")
print(f"Maximum value across the dataset: {max_value}")

* This images look to be normalized, so we can use Sigmoid Activation towards the end.
* We can also use LeakyReLU as a bottleneck.

In [None]:
batch_size = 16

In [None]:
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True
)
val_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False
)

## Hyperparameters

Do not run the code below unless you want to perform an extensive grid search.

In [None]:
results = []

learning_rates = [0.001, 0.0001]
weight_decays = [1e-5, 1e-4]
encoder_channel_options = [
    (32, 16, 16),
    (64, 32, 32),
]  # last element of the tuple is the latent size
kernel_sizes = [3]
strides = [2]
paddings = [1]

num_epochs = 350

In [None]:
for (
    encoder_channels,
    lr,
    weight_decay,
    kernel_size,
    stride,
    padding,
) in itertools.product(
    encoder_channel_options,
    learning_rates,
    weight_decays,
    kernel_sizes,
    strides,
    paddings,
):
    latent_size = encoder_channels[-1]
    model = AutoEncoder(
        latent_size=latent_size,
        input_channels=3,
        hidden_layer_1=encoder_channels[0],
        hidden_layer_2=encoder_channels[1],
        kernel_size=kernel_size,
        stride=stride,
        padding=padding,
    )

    _, train_losses, val_losses = train_autoencoder(
        model,
        train_loader,
        val_loader,
        num_epochs=num_epochs,
        lr=lr,
        weight_decay=weight_decay,
    )

    # Plot and save losses
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label="Train Loss")
    plt.plot(val_losses, label="Validation Loss")
    plt.title(
        f"LS: {latent_size}, LR: {lr}, WD: {weight_decay}, EC: {encoder_channels}, KS: {kernel_size}, S: {stride}, P: {padding}"
    )
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.savefig(
        "learning_curves/question_1/"
        + f"losses_ls{latent_size}_lr{lr}_wd{weight_decay}_ec{encoder_channels}_ks{kernel_size}_s{stride}_p{padding}.png"
    )
    plt.close()

    # Store results
    results.append(
        {
            "latent_size": latent_size,
            "learning_rate": lr,
            "weight_decay": weight_decay,
            "encoder_channels": encoder_channels,
            "kernel_size": kernel_size,
            "stride": stride,
            "padding": padding,
            "final_train_loss": train_losses[-1],
            "final_val_loss": val_losses[-1],
        }
    )

In [None]:
results_df = pd.DataFrame(results)

In [None]:
results_df.to_csv("results/question_1/hyperparam_results.csv", index=False)

In [None]:
results_df.sort_values(by="final_val_loss", ascending=True).head(10)

## Testing Best Model

In [None]:
results_df = pd.read_csv("results/question_1/hyperparam_results.csv")

In [None]:
results_df.sort_values(by="final_val_loss", ascending=True).head(10)

In [None]:
batch_size = 16

In [None]:
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False
)

In [None]:
learning_rate = 0.001
weight_decay = 0.00001
encoder_channels = (64, 32, 32)
kernel_size = 3
stride = 2
padding = 1
num_epochs = 150
latent_size = encoder_channels[2]

In [None]:
model = AutoEncoder(
    latent_size=latent_size,
    hidden_layer_1=encoder_channels[0],
    hidden_layer_2=encoder_channels[1],
    kernel_size=kernel_size,
    stride=stride,
    padding=padding,
)

In [None]:
_, train_losses, val_losses = train_autoencoder(
    model,
    train_loader,
    val_loader,
    num_epochs=num_epochs,
    lr=learning_rate,
    weight_decay=weight_decay,
)

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label="Train Loss")
plt.plot(val_losses, label="Validation Loss")
plt.title(
    f"LS: {latent_size}, LR: {learning_rate}, WD: {weight_decay}, EC: {encoder_channels}, KS: {kernel_size}, S: {stride}, P: {padding}"
)
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.savefig(
    "results/question_1/"
    + f"losses_ls{latent_size}_lr{learning_rate}_wd{weight_decay}_ec{encoder_channels}_ks{kernel_size}_s{stride}_p{padding}.png"
)
plt.close()

In [None]:
model.eval()
test_loss = 0.0
criterion = torch.nn.MSELoss()
with torch.no_grad():
    for batch in test_loader:
        images = batch["image"]
        outputs = model(images)
        loss = criterion(outputs, images)
        test_loss += loss.item() * images.size(0)
test_loss /= len(test_loader.dataset)

In [None]:
test_loss

## Saving Best Model

In [None]:
# architecture
from torchsummary import summary

print(summary(model=model, input_size=(3, 64, 64), batch_size=batch_size))

In [None]:
model.eval()

dummy_input = torch.randn(1, 3, 64, 64)
output = model(dummy_input)

dot = make_dot(output, params=dict(model.named_parameters()))
dot.render("results/question_1/autoencoder_graph", format="png")

In [None]:
# saving weights
torch.save(model.state_dict(), "results/question_1/q1_model_weights.pth")

In [None]:
# latent representation
model.eval()
latent_representations = []

with torch.no_grad():
    for batch in test_loader:
        images = batch["image"]
        latent = model.encoder(images)
        latent_representations.append(latent.cpu().numpy())

latent_representations = np.concatenate(latent_representations, axis=0)

latent_representations_path = "results/question_1/latent_representations.npy"
np.save(latent_representations_path, latent_representations)

## Sample Image

In [None]:
model.eval()

# take the first image
sample_batch = next(iter(test_loader))
images = sample_batch["image"]
sample_image = images[0]

with torch.no_grad():
    reconstructed = model(sample_image.unsqueeze(0))

In [None]:
# convert image
sample_image_np = sample_image.numpy().transpose(1, 2, 0)
reconstructed_np = reconstructed.squeeze(0).numpy().transpose(1, 2, 0)

In [None]:
plt.figure(figsize=(12, 6))

# Original Image
plt.subplot(1, 2, 1)
plt.imshow(sample_image_np)
plt.title(f"Original Image")

# Reconstructed Image
plt.subplot(1, 2, 2)
plt.imshow(reconstructed_np)
plt.title(f"Reconstructed Image")

plt.show()

## Plotting Reconstructed Images vs Original Images

In [None]:
selected_classes = [
    "face with tears of joy",
    "face palm",
    "selfie",
]
colors = ["red", "green", "blue"]
color_map = dict(zip(selected_classes, colors))

In [None]:
reconstructed_images = []
original_images = []
filtered_labels = []

In [None]:
with torch.no_grad():
    for batch in test_loader:
        images, labels = batch["image"], batch["text"]

        mask = [label in selected_classes for label in labels]
        if not any(mask):
            continue

        filtered_images = images[mask]
        filtered_labels.extend([labels[i] for i in range(len(labels)) if mask[i]])

        # Get the latent representations
        latent = model.encoder(filtered_images)

        # Reconstruct images from the latent representations
        reconstructed = model.decoder(latent)

        # Flatten the images for PCA
        images_flat = filtered_images.view(filtered_images.size(0), -1)
        reconstructed_flat = reconstructed.view(reconstructed.size(0), -1)

        original_images.append(images_flat.cpu().numpy())
        reconstructed_images.append(reconstructed_flat.cpu().numpy())

In [None]:
original_images_flat = np.concatenate(original_images, axis=0)
reconstructed_images_flat = np.concatenate(reconstructed_images, axis=0)
filtered_labels = np.array(filtered_labels)

In [None]:
pca_3d = PCA(n_components=3)
reconstructed_3d = pca_3d.fit_transform(reconstructed_images_flat)
original_3d = pca_3d.transform(original_images_flat)

In [None]:
fig = go.Figure()
fig.update_layout(
    autosize=False,
    width=1000,
    height=800,
    title="3D Visualization of Original and Reconstructed Images",
    scene=dict(
        xaxis_title="Principal Component 1",
        yaxis_title="Principal Component 2",
        zaxis_title="Principal Component 3",
    ),
    margin=dict(l=0, r=0, b=0, t=0),
)

for class_name in selected_classes:
    indices = np.where(filtered_labels == class_name)

    # Reconstructed images
    fig.add_trace(
        go.Scatter3d(
            x=reconstructed_3d[indices, 0].flatten(),
            y=reconstructed_3d[indices, 1].flatten(),
            z=reconstructed_3d[indices, 2].flatten(),
            mode="markers",
            marker=dict(size=5, symbol="x", color=color_map[class_name]),
            name=f"{class_name} - Reconstructed",
        )
    )

    # Original images
    fig.add_trace(
        go.Scatter3d(
            x=original_3d[indices, 0].flatten(),
            y=original_3d[indices, 1].flatten(),
            z=original_3d[indices, 2].flatten(),
            mode="markers",
            marker=dict(size=5, symbol="circle", color=color_map[class_name]),
            name=f"{class_name} - Original",
        )
    )

fig.show()

# Question 3

## Composite Images

In [None]:
def display_reconstructed_image(idx, latent_representations, model):
    model.eval()

    latent_vector = torch.tensor(latent_representations[idx]).unsqueeze(0).float()

    if next(model.parameters()).is_cuda:
        latent_vector = latent_vector.cuda()

    with torch.no_grad():
        reconstructed_img = model.decoder(latent_vector).squeeze(0)

    reconstructed_img = reconstructed_img.cpu().numpy()
    # Change from CxHxW to HxWxC if needed
    reconstructed_img = np.transpose(reconstructed_img, (1, 2, 0))

    plt.imshow(reconstructed_img)
    plt.axis("off")
    plt.show()

In [None]:
# woman superhero light skin tone
display_reconstructed_image(123, latent_representations, model)

In [None]:
# superhero
display_reconstructed_image(48, latent_representations, model)

In [None]:
# man getting face massage type 4
display_reconstructed_image(105, latent_representations, model)

In [None]:
model.eval()

# woman superhero light skin tone
latent_vector1 = torch.tensor(latent_representations[123]).unsqueeze(0).float()
# superhero
latent_vector2 = torch.tensor(latent_representations[48]).unsqueeze(0).float()
# man getting face massage type 4
latent_vector3 = torch.tensor(latent_representations[105]).unsqueeze(0).float()

if next(model.parameters()).is_cuda:
    latent_vector1 = latent_vector1.cuda()
    latent_vector2 = latent_vector2.cuda()
    latent_vector3 = latent_vector3.cuda()

result_vector = latent_vector1 - latent_vector2 + latent_vector3

with torch.no_grad():
    new_image = model.decoder(result_vector).squeeze(0)

new_image = new_image.cpu().numpy()
new_image = np.transpose(new_image, (1, 2, 0))

plt.imshow(new_image)
plt.axis("off")
plt.show()