# Installations & Configuration

In [None]:
!pip install kaggle
!pip install tqdm

import os
import torch
import pandas as pd
import torch.nn as nn
import numpy as np
import torchvision.transforms.v2 as transforms
import matplotlib.pyplot as plt

from PIL import Image
from tqdm import tqdm
from google.colab import files
from torch.optim import Adam
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from google.colab import drive
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report, precision_recall_curve, average_precision_score


drive.mount('/content/drive')
kaggle_json_path = '/content/drive/MyDrive/ColabNotebooks/A5/kaggle.json'

# Copy kaggle.json to the correct location
!mkdir -p ~/.kaggle
!cp {kaggle_json_path} ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Data Setup

In [None]:
# Download and unzip dataset
!kaggle datasets download -d rezaunderfit/48k-imdb-movies-with-posters > /dev/null 2>&1
!unzip -q 48k-imdb-movies-with-posters.zip

# Load title basics
tsv_path = '/content/drive/MyDrive/ColabNotebooks/A5/title.basics.tsv'
title_basics = pd.read_csv(tsv_path, sep='\t', na_values='\\N')

# List all files in the Poster directory
poster_dir = 'Poster'
poster_files = []
for root, _, files in os.walk(poster_dir):
    for file in files:
        if file.endswith('.jpg'):
            file_path = os.path.join(root, file)
            if os.path.getsize(file_path) > 0:  # Only include non-zero byte files
                poster_files.append(file_path)

# Extract tconst and startYear from file paths
poster_info = []
for file_path in poster_files:
    parts = file_path.split('/')
    start_year = parts[1]
    tconst = parts[2]
    poster_info.append((start_year, tconst))

# Convert to DataFrame
poster_df = pd.DataFrame(poster_info, columns=['startYear', 'tconst'])

# Ensure startYear is an integer
poster_df['startYear'] = poster_df['startYear'].astype(int)
title_basics['startYear'] = title_basics['startYear'].astype(float).fillna(0).astype(int)  # Handle missing startYear and convert to int

# Filter movies from the past 50 years
current_year = 2024
past_50_years = current_year - 50
poster_df = poster_df[poster_df['startYear'] >= past_50_years]

# Merge with title_basics to keep only relevant records
title_basics_filtered = pd.merge(title_basics, poster_df, on=['startYear', 'tconst'])

# Function to count genres
def count_genres(metadata):
    genre_counter = Counter()
    for genres in metadata['genres'].dropna():
        first_genre = genres.split(',')[0]
        genre_counter[first_genre] += 1
    return genre_counter

# Count genres in the filtered dataset
filtered_genre_counts = count_genres(title_basics_filtered)

# Calculate total number of movies
total_movies = len(title_basics_filtered)

# Filter out genres with less than 1% of the total dataset
min_count = total_movies * 0.01
valid_genres = {genre for genre, count in filtered_genre_counts.items() if count >= min_count}

# Filter the dataset to only include valid genres
def filter_genres(row):
    if pd.notna(row['genres']):
        genres = row['genres'].split(',')
        if any(genre in valid_genres for genre in genres):
            return True
    return False

title_basics_filtered = title_basics_filtered[title_basics_filtered.apply(filter_genres, axis=1)]

# Create your data splits
train_metadata, test_metadata = train_test_split(title_basics_filtered, test_size=0.2, random_state=42)
train_metadata, val_metadata = train_test_split(train_metadata, test_size=0.25, random_state=42)
print(f"Train size: {len(train_metadata)}, Validation size: {len(val_metadata)}, Test size: {len(test_metadata)}")

# Count genres in the training dataset
train_genre_counts = count_genres(train_metadata)

# Total number of movies in the training dataset
train_total_movies = len(train_metadata)

# Calculate and print genre distribution with percentages
print("\nGenre Distribution in Training Dataset:")
for genre, count in train_genre_counts.items():
    percentage = (count / train_total_movies) * 100
    print(f"{genre} - {count} ({percentage:.2f}%)")

# Define the image transformations
image_transforms = transforms.Compose([
    transforms.Resize(299),
    transforms.CenterCrop(299),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

class MovieDataset(Dataset):
    def __init__(self, metadata, img_dir, transform=None, genre_to_index=None):
        self.metadata = metadata
        self.img_dir = img_dir
        self.transform = transform
        self.genre_to_index = genre_to_index

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        tconst = self.metadata.iloc[idx]['tconst']
        start_year = self.metadata.iloc[idx]['startYear']
        img_name = os.path.join(self.img_dir, str(start_year), tconst, f"{tconst}.jpg")
        image = Image.open(img_name).convert('RGB')
        if self.transform:
            image = self.transform(image)
        genres = self.metadata.iloc[idx]['genres']
        genre_tensor = self.genres_to_tensor(genres)
        return image, genre_tensor

    def genres_to_tensor(self, genres):
        first_genre = genres.split(',')[0] if pd.notna(genres) else 'Unknown'
        genre_index = self.genre_to_index.get(first_genre, self.genre_to_index['Unknown'])
        return torch.tensor(genre_index, dtype=torch.long)

# Create a mapping from genre to index based on the filtered dataset
filtered_genres = set(g.split(',')[0] for g in title_basics_filtered['genres'].dropna())
genre_to_index = {genre: idx for idx, genre in enumerate(filtered_genres)}

# Ensure 'Unknown' genre is included in genre_to_index
genre_to_index['Unknown'] = len(genre_to_index)

# Directory containing images
img_dir = 'Poster'

# Create datasets
train_dataset = MovieDataset(metadata=train_metadata, img_dir=img_dir, transform=image_transforms, genre_to_index=genre_to_index)
val_dataset = MovieDataset(metadata=val_metadata, img_dir=img_dir, transform=image_transforms, genre_to_index=genre_to_index)
test_dataset = MovieDataset(metadata=test_metadata, img_dir=img_dir, transform=image_transforms, genre_to_index=genre_to_index)


# Hyper Tuning via Optuna

In [None]:
!pip install optuna
!pip install optuna-integration
import optuna
from optuna.integration import PyTorchLightningPruningCallback
from optuna.trial import TrialState

def objective(trial):
    # hyperparameters values
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])
    weight_decay = trial.suggest_loguniform('weight_decay', 1e-6, 1e-2)

    # Data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    # Load pretrained model
    model = torch.hub.load('pytorch/vision:v0.10.0', 'inception_v3', pretrained=True)
    model.aux_logits = False
    num_genres = len(genre_to_index)
    model.fc = nn.Linear(model.fc.in_features, num_genres)
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)

    # Loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

    # Training loop
    num_epochs = 5  # Reduced number of epochs for hyperparameter tuning
    best_val_loss = float('inf')
    patience = 2  # Early stopping patience
    epochs_no_improve = 0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0

        for images, genres in train_loader:
            images = images.to(device)
            genres = genres.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, genres)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        # Validation phase
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for val_images, val_genres in val_loader:
                val_images = val_images.to(device)
                val_genres = val_genres.to(device)
                val_outputs = model(val_images)
                val_loss += criterion(val_outputs, val_genres).item()

        val_loss /= len(val_loader)
        trial.report(val_loss, epoch)

        # Pruning
        if trial.should_prune():
            raise optuna.TrialPruned()

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                break

    return best_val_loss

# Create the study and optimize
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50, timeout=3600)  # Set a timeout to limit the total tuning time

# Best hyperparameters
print("Best hyperparameters: ", study.best_params)

# Transfer Learning --> Fine Tuning (Training & Validation)

In [None]:
# Best Hyperparameters After Grid Search
batch_size = 32
lr = 2.9829823735090913e-05
weight_decay = 7.167993883340144e-06
num_epochs = 10

# Create dataloaders
train_loader = DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=2
)
val_loader = DataLoader(
    val_dataset, batch_size=batch_size, shuffle=False, num_workers=2
)
test_loader = DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False, num_workers=2
)

# Set up model for fine tuning
model = torch.hub.load("pytorch/vision:v0.10.0", "inception_v3", pretrained=True)
model.aux_logits = False  # Disable auxiliary logits
num_genres = len(genre_to_index)
model.fc = nn.Linear(model.fc.in_features, num_genres)  # Adjust the final layer

# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

# Lists to store losses
train_losses = []
val_losses = []

# Training loop with validation
best_val_loss = float("inf")

for epoch in tqdm(range(num_epochs), desc="Epochs", unit="epoch"):
    model.train()
    running_loss = 0.0

    train_loader_tqdm = tqdm(
        train_loader, desc=f"Epoch {epoch+1}/{num_epochs}", unit="batch"
    )

    for images, genres in train_loader_tqdm:
        images = images.to(device)
        genres = genres.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, genres)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    train_loss = running_loss / len(train_loader)
    train_losses.append(train_loss)

    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for val_images, val_genres in val_loader:
            val_images = val_images.to(device)
            val_genres = val_genres.to(device)
            val_outputs = model(val_images)
            val_loss += criterion(val_outputs, val_genres).item()

    val_loss /= len(val_loader)
    val_losses.append(val_loss)

    print(
        f"Epoch [{epoch+1}/{num_epochs}], Training Loss: {train_loss}, Validation Loss: {val_loss}"
    )

    # Save the model if validation loss decreases
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(
            model.state_dict(),
            "/content/drive/MyDrive/ColabNotebooks/A5/best_model_inception.pth",
        )

print("Training complete.")

# Plot the learning curves
plt.figure(figsize=(10, 5))
plt.plot(range(1, num_epochs + 1), train_losses, label="Training Loss")
plt.plot(range(1, num_epochs + 1), val_losses, label="Validation Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Learning Curves")
plt.legend()
plt.grid(True)
plt.show()
plt.savefig("/content/drive/MyDrive/ColabNotebooks/A5/inceptionv3_learning_curves.png")


# TODO: If validation continues to increase a certain number of periods, cut training early

# Transfer Learning --> Fine Tuning (Testing)

In [None]:
# Helper function to print unique labels
def print_unique_labels(all_labels, genre_to_index):
    unique_labels_in_data = np.unique(all_labels)
    print("Unique labels in the dataset:", unique_labels_in_data)
    print("Labels in genre_to_index:", list(genre_to_index.values()))

    missing_labels = set(unique_labels_in_data) - set(genre_to_index.values())
    extra_labels = set(genre_to_index.values()) - set(unique_labels_in_data)
    print("Missing labels in genre_to_index:", missing_labels)
    print("Extra labels in genre_to_index:", extra_labels)


# Load the best model
model.load_state_dict(
    torch.load("/content/drive/MyDrive/ColabNotebooks/A5/best_model_inception.pth")
)

# Evaluate on test set
model.eval()
test_loss = 0.0
correct = 0
total = 0
all_preds = []
all_labels = []
all_probs = []

with torch.no_grad():
    test_loader_tqdm = tqdm(test_loader, desc="Testing", unit="batch")
    for test_images, test_genres in test_loader_tqdm:
        test_images = test_images.to(device)
        test_genres = test_genres.to(device)
        test_outputs = model(test_images)
        test_loss += criterion(test_outputs, test_genres).item()
        _, predicted = torch.max(test_outputs, 1)
        total += test_genres.size(0)
        correct += (predicted == test_genres).sum().item()

        # Collect all predictions, true labels, and probabilities for precision-recall curves
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(test_genres.cpu().numpy())
        all_probs.extend(torch.softmax(test_outputs, dim=1).cpu().numpy())

test_loss /= len(test_loader)
test_accuracy = correct / total
print(f"Test Loss: {test_loss}, Test Accuracy: {test_accuracy}")

# Print unique labels and find mismatches
print_unique_labels(all_labels, genre_to_index)

# Adjust the genre_to_index dictionary if necessary
# Remove unused labels
used_labels = sorted(np.unique(all_labels))
genre_to_index = {
    genre: idx for genre, idx in genre_to_index.items() if idx in used_labels
}

# Ensure the number of classes matches the labels
assert len(genre_to_index) == len(
    np.unique(all_labels)
), "Mismatch in number of genres and labels."

# Create confusion matrix
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(
    confusion_matrix=cm, display_labels=list(genre_to_index.keys())
)

# Plot confusion matrix
fig, ax = plt.subplots(figsize=(12, 12))
disp.plot(ax=ax)
plt.title("Confusion Matrix")
plt.xticks(rotation=45)
plt.show()
plt.savefig("/content/drive/MyDrive/ColabNotebooks/A5/inceptionv3_confusion_matrix.png")

# Print classification report with F1 score
print("Classification Report:")
report = classification_report(
    all_labels, all_preds, target_names=list(genre_to_index.keys()), output_dict=True
)
print(
    classification_report(
        all_labels, all_preds, target_names=list(genre_to_index.keys())
    )
)

# Extract and print the F1 score
f1_scores = {genre: report[genre]["f1-score"] for genre in genre_to_index.keys()}
print("F1 Scores:")
for genre, f1 in f1_scores.items():
    print(f"{genre}: {f1:.2f}")

# Plot precision-recall curves for each genre
plt.figure(figsize=(12, 12))
for i, genre in enumerate(genre_to_index.keys()):
    precision, recall, _ = precision_recall_curve(
        np.array(all_labels) == i, np.array(all_probs)[:, i]
    )
    average_precision = average_precision_score(
        np.array(all_labels) == i, np.array(all_probs)[:, i]
    )
    plt.plot(recall, precision, lw=2, label=f"{genre} (AP={average_precision:.2f})")

plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curves")
plt.legend(loc="best")
plt.grid(True)
plt.show()
plt.savefig(
    "/content/drive/MyDrive/ColabNotebooks/A5/inceptionv3_precision_recall_curves.png"
)

# Gradcam Implementation

In [None]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt
import cv2
import numpy as np

# Load the Pre-trained InceptionV3 Model
model = models.inception_v3(pretrained=True)

# Modify the fully connected layer to match the number of classes (27 in this case)
model.fc = torch.nn.Linear(model.fc.in_features, 27)

# Load the custom pre-trained model
model.load_state_dict(torch.load("best_model_inception.pth"))
model.eval()


# Prepare the Image
def preprocess_image(img_path):
    preprocess = transforms.Compose(
        [
            transforms.Resize((299, 299)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    )
    img = Image.open(img_path).convert("RGB")
    img_tensor = preprocess(img).unsqueeze(0)  # Add batch dimension
    return img_tensor


# Pick a random index from title_basics_filtered
random_index = np.random.randint(0, len(title_basics_filtered))
img_path = title_basics_filtered.iloc[random_index]["Poster"]

# Print the original poster using matplotlib
img = cv2.imread(img_path)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.imshow(img)
plt.axis("off")
plt.show()

# Prepare the image for the model
input_tensor = preprocess_image(img_path)


# Get the Gradients and Activations
def get_gradients_hook(module, grad_input, grad_output):
    gradients.append(grad_output[0])


def get_activations_hook(module, input, output):
    activations.append(output)


gradients = []
activations = []

target_layer = model.Mixed_7c
target_layer.register_forward_hook(get_activations_hook)
target_layer.register_backward_hook(get_gradients_hook)

# Make a Forward Pass and Compute Gradients
output = model(input_tensor)
pred_class = output.argmax().item()

model.zero_grad()
one_hot_output = torch.zeros((1, output.size()[-1]), dtype=torch.float32)
one_hot_output[0][pred_class] = 1

output.backward(gradient=one_hot_output)


# Compute Grad-CAM
def compute_gradcam(activations, gradients):
    pooled_gradients = torch.mean(gradients[0], dim=[0, 2, 3])
    activations = activations[0].squeeze(0).detach()  # Remove batch dimension

    for i in range(len(pooled_gradients)):
        activations[i, :, :] *= pooled_gradients[i]

    heatmap = torch.mean(activations, dim=0).cpu()
    heatmap = np.maximum(heatmap, 0)
    heatmap /= torch.max(heatmap)

    return heatmap


heatmap = compute_gradcam(activations, gradients)


# Visualize the Heatmap
def visualize_heatmap(img_path, heatmap):
    img = cv2.imread(img_path)
    heatmap = cv2.resize(heatmap.numpy(), (img.shape[1], img.shape[0]))
    heatmap = np.uint8(255 * heatmap)
    heatmap = cv2.applyColorMap(heatmap, cv2.COLORMAP_JET)
    superimposed_img = cv2.addWeighted(img, 0.5, heatmap, 0.5, 0)
    cv2.imwrite("gradcam.jpg", superimposed_img)

    plt.imshow(cv2.cvtColor(superimposed_img, cv2.COLOR_BGR2RGB))
    plt.axis("off")
    plt.show()


visualize_heatmap(img_path, heatmap)