In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import librosa
import pandas as pd

# Another version


In [None]:
import json
import os
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from sklearn.metrics import accuracy_score, classification_report
from transformers import AutoModel, AutoTokenizer, get_scheduler
from torch.utils.data import Dataset, DataLoader, RandomSampler, SequentialSampler
from torch.optim import AdamW,Adam
from tqdm.notebook import tqdm, trange
from time import perf_counter
from PIL import Image
import pandas as pd

In [None]:
# set random seeds for repeatability
import numpy as np
import random

def set_seed(seed_val):
    random.seed(seed_val)
    np.random.seed(seed_val)
    torch.manual_seed(seed_val)
    torch.cuda.manual_seed_all(seed_val)
seed_val = 0
set_seed(seed_val)

In [None]:
df=pd.read_csv('/kaggle/input/dsc2024/train.csv')
len(df)

In [None]:
df_test=pd.read_csv('/kaggle/input/dsc2024/test.csv')
len(df_test)

In [None]:
from sklearn.model_selection import train_test_split
# split train and dev
df_train, df_dev = train_test_split(df, test_size=0.1, random_state=42)

In [None]:
print(len(df_train))
print(len(df_dev))
print(len(df_test))

In [None]:
IMAGE_TRAIN_FOLDER='/kaggle/input/dsc2024/training-images/train-images/'
IMAGE_TEST_FOLDER='/kaggle/input/dsc2024/public-test-images/dev-images/'

In [None]:
#Encode labels
label_to_id = {lab:i for i, lab in enumerate(df_train['label'].sort_values().unique())}
id_to_label = {v:k for k,v in label_to_id.items()}
label_to_id

In [None]:
num_out_labels = len(label_to_id)
print("Number of labels ", num_out_labels)

In [None]:
# extract layers of resnet-50 to build a new model

import torch.nn as nn
from torchvision.models.resnet import resnet50

class ResNetFeatureModel(nn.Module):
    def __init__(self, output_layer):
        super().__init__()
        self.output_layer = output_layer
        pretrained_resnet = resnet50(pretrained=True)
        self.children_list = []
        for n,c in pretrained_resnet.named_children():
            self.children_list.append(c)
            if n == self.output_layer:
                break

        self.net = nn.Sequential(*self.children_list)


    def forward(self,x):
        x = self.net(x)
        x = torch.flatten(x, 1)
        return x

In [None]:
from torch.utils.data import Dataset
from PIL import Image
from torchvision import transforms

class ResNetDataset(Dataset):
    def __init__(self, df, label_to_id=None, mode='train', text_field="caption", label_field="label", image_path_field="image"):
        """
        Args:
            df (DataFrame): The DataFrame containing your data.
            label_to_id (dict): Dictionary for mapping labels to IDs. Set to None for test data.
            mode (str): Mode of the dataset. One of ['train', 'test', 'dev'].
            text_field (str): Column name for text data.
            label_field (str): Column name for label data.
            image_path_field (str): Column name for image paths.
        """
        self.df = df.reset_index(drop=True)
        self.label_to_id = label_to_id
        self.mode = mode  # Mode can be 'train', 'test', or 'dev'
        self.text_field = text_field
        self.label_field = label_field
        self.image_path_field = image_path_field

        # ResNet-50 settings
        self.img_size = 224
        self.mean, self.std = (
            0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)

        # Define different transformations based on the mode
        self.train_transform_func = transforms.Compose([
            transforms.RandomResizedCrop(self.img_size, scale=(0.5, 1.0)),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(self.mean, self.std)
        ])

        self.test_transform_func = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(self.img_size),
            transforms.ToTensor(),
            transforms.Normalize(self.mean, self.std)
        ])

        self.dev_transform_func = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(self.img_size),
            transforms.ToTensor(),
            transforms.Normalize(self.mean, self.std)
        ])

    def __getitem__(self, index):
        # Get text data
        text = str(self.df.at[index, self.text_field])

        # Select the correct image folder based on mode
        if self.mode == 'test':
            img_path = IMAGE_TEST_FOLDER + self.df.at[index, self.image_path_field]
        else:
            img_path = IMAGE_TRAIN_FOLDER + self.df.at[index, self.image_path_field]

        # Load the image
        image = Image.open(img_path).convert('RGB')  # Ensure the image is in RGB format

        # Apply appropriate transformations based on mode
        if self.mode == 'train':
            img = self.train_transform_func(image)
        elif self.mode == 'test':
            img = self.test_transform_func(image)
        elif self.mode == 'dev':
            img = self.dev_transform_func(image)

        # If labels are available, return them, else only return the image and text
        if self.label_to_id is not None and self.label_field in self.df.columns:
            label = self.label_to_id[self.df.at[index, self.label_field]]
            return text, label, img
        else:
            return text, img

    def __len__(self):
        return self.df.shape[0]


In [None]:
#bert_tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
from transformers import AutoModel, AutoTokenizer
import torch


In [None]:
class VisoBertResNetModel(nn.Module):
    def __init__(self, num_labels, text_pretrained='google-bert/bert-base-uncased', mlp_hidden_size=512, dropout_prob=0.3):
        super().__init__()
        self.text_encoder = AutoModel.from_pretrained(text_pretrained)
        self.visual_encoder = ResNetFeatureModel(output_layer='avgpool')
        self.tokenizer = AutoTokenizer.from_pretrained('google-bert/bert-base-uncased')
        self.image_hidden_size = 2048

        # MLP with one hidden layer
        self.mlp = nn.Sequential(
            nn.Linear(self.text_encoder.config.hidden_size + self.image_hidden_size, mlp_hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout_prob),  # Optional: apply dropout for regularization
            nn.Linear(mlp_hidden_size, mlp_hidden_size),  # New hidden layer
            nn.ReLU(),
            #nn.Dropout(dropout_prob)  # Optional: apply dropout for the second hidden layer
        )

        # Classifier

        self.classifier = nn.Linear(mlp_hidden_size, num_labels)

    def forward(self, text, image):
        # Encode text and image
        text_output = self.text_encoder(**text)
        text_feature = text_output.last_hidden_state[:, 0, :]  # Take the [CLS] token embedding
        img_feature = self.visual_encoder(image)  # Extract image features

        # Concatenate text and image features
        features = torch.cat((text_feature, img_feature), 1)

        # Pass through MLP layer
        mlp_output = self.mlp(features)

        # Classify using the final output of the MLP
        logits = self.classifier(mlp_output)

        return logits


In [None]:
import torch

# Specify the path to the saved model
#model_path = "/kaggle/input/dsc2024/resnet_model_v7.pth"

# Assuming the same model architecture is defined
model = VisoBertResNetModel(num_labels=num_out_labels, text_pretrained='google-bert/bert-base-uncased')
 # Replace `MyModel` with your actual model class

# Load the state_dict from the file
#model.load_state_dict(torch.load(model_path))

# Move the model to the appropriate device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model=model.to(device)

# Set the model to evaluation mode
#model.train()

#print(f"Model loaded from {model_path}")


In [None]:
model

In [None]:
!nvidia-smi -L

In [None]:
# parameters
training_params = {
    "seed_val": seed_val,
    "training_size" : len(df_train),
    "dev_size": len(df_dev),
    "test_size": len(df_test),
    "num_train_epochs": 20,
    "batch_size": 64,
    "learning_rate": 1e-5,
    "weight_decay": 0.01,
    "warmup_steps": 10000,
    "max_seq_length": 64
}


In [None]:
# Accessing each value by key
seed_val = training_params['seed_val']
training_size = training_params['training_size']
dev_size = training_params['dev_size']
test_size = training_params['test_size']
num_train_epochs = training_params['num_train_epochs']
batch_size = training_params['batch_size']
learning_rate = training_params['learning_rate']
weight_decay = training_params['weight_decay']
warmup_steps = training_params['warmup_steps']
max_seq_length = training_params['max_seq_length']


In [None]:
training_params

In [None]:
# training step
import matplotlib.pyplot as plt
import torch
from torch.cuda.amp import autocast, GradScaler
from torch.utils.data import DataLoader, RandomSampler
from tqdm import trange, tqdm
import torch.nn as nn
from transformers import get_scheduler, AdamW
import time


# Set up gradient accumulation steps and use mixed precision
accumulation_steps = 4  # Perform backward pass and optimizer step after this many batches
scaler = GradScaler()   # For mixed precision

train_dataset = ResNetDataset(df=df_train, label_to_id=label_to_id, mode='train', text_field='caption', label_field='label', image_path_field='image')
train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(dataset=train_dataset, batch_size=batch_size, sampler=train_sampler)

t_total = len(train_dataloader) * num_train_epochs

optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
scheduler = get_scheduler(name="cosine", optimizer=optimizer, num_warmup_steps=warmup_steps, num_training_steps=t_total)

# Set the device to the second GPU (GPU 1)
# Move model to the device
criterion = nn.CrossEntropyLoss()

start = time.perf_counter()


# Initialize lists to store epoch loss and learning rate
epoch_losses = []
learning_rates = []

# Set up training loop with tqdm only for epochs
for epoch_num in trange(num_train_epochs, desc='Epochs'):
    model.train()  # Set the model to training mode
    epoch_total_loss = 0

    for step, batch in enumerate(train_dataloader):
        b_text, b_labels, b_imgs = batch

        # Tokenize text input
        b_inputs = model.tokenizer(
            list(b_text), truncation=True, max_length=max_seq_length,
            return_tensors="pt", padding=True
        )

        # Move labels, images, and inputs to the GPU
        b_labels = b_labels.to(device)
        b_imgs = b_imgs.to(device)
        b_inputs = {k: v.to(device) for k, v in b_inputs.items()}

        # Enable mixed precision using autocast
        with autocast():
            b_logits = model(text=b_inputs, image=b_imgs)  # Forward pass
            loss = criterion(b_logits, b_labels)  # Calculate loss

        # Accumulate loss for gradient accumulation
        loss = loss / accumulation_steps
        scaler.scale(loss).backward()  # Scale and backpropagate the loss

        # Perform optimizer step after the defined accumulation steps
        if (step + 1) % accumulation_steps == 0 or (step + 1) == len(train_dataloader):
            scaler.step(optimizer)  # Perform the optimizer step
            scaler.update()  # Update the scale for next iteration
            optimizer.zero_grad()  # Clear gradients
            scheduler.step()  # Update learning rate at the end of each batch

        # Accumulate the total loss
        epoch_total_loss += loss.item() * accumulation_steps

    # Compute average loss for the epoch
    avg_loss = epoch_total_loss / len(train_dataloader)

    # Save the average loss and learning rate for this epoch
    epoch_losses.append(avg_loss)
    learning_rates.append(optimizer.param_groups[0]['lr'])
    torch.cuda.empty_cache()  # Clear unused cached memory after each epoch


    # Print results after each epoch
    print(f'Epoch = {epoch_num + 1}')
    print(f'    Epoch loss = {epoch_total_loss}')
    print(f'    Average epoch loss = {avg_loss}')
    print(f'    Learning rate = {optimizer.param_groups[0]["lr"]}')

end = time.perf_counter()
resnet_training_time = end - start
print(f'Training completed in {resnet_training_time} seconds')

# Plot the loss and learning rate curves
plt.figure(figsize=(12, 5))

# Plot loss
plt.subplot(1, 2, 1)
plt.plot(range(1, num_train_epochs + 1), epoch_losses, label="Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss per Epoch")
plt.grid(True)

# Plot learning rate
plt.subplot(1, 2, 2)
plt.plot(range(1, num_train_epochs + 1), learning_rates, label="Learning Rate", color='orange')
plt.xlabel("Epoch")
plt.ylabel("Learning Rate")
plt.title("Learning Rate per Epoch")
plt.grid(True)




In [None]:
import matplotlib.pyplot as plt

In [None]:
#save model
import torch

# Assuming your trained model is called 'model'
import os
# Path to save the model
model_save_path = 'resnet_model_v8.pth'

# Save the model's state_dict
torch.save(model.state_dict(), model_save_path)

print(f"Model saved to {model_save_path}")


In [None]:
# testing loop

resnet_prediction_results = []

test_dataset = ResNetDataset(df=df_dev, label_to_id=label_to_id, mode='dev', text_field='caption', label_field='label', image_path_field='image')
test_sampler = SequentialSampler(test_dataset)
test_dataloader = DataLoader(dataset=test_dataset,
                            batch_size=batch_size,
                            sampler=test_sampler)


for batch in tqdm(test_dataloader):

  b_text, b_labels, b_imgs = batch

  b_inputs = model.tokenizer(list(b_text), truncation=True, max_length=max_seq_length, return_tensors="pt", padding=True)

  b_labels = b_labels.to(device)
  b_imgs = b_imgs.to(device)
  b_inputs = b_inputs.to(device)
  model.eval()
  with torch.no_grad():
      b_logits = model(text=b_inputs, image=b_imgs)
      b_logits = b_logits.detach().cpu()

  resnet_prediction_results += torch.argmax(b_logits, dim=-1).tolist()

resnet_prediction_labels = [id_to_label[p] for p in resnet_prediction_results]

In [None]:
len(resnet_prediction_labels)

In [None]:
if (len(resnet_prediction_labels) == len(df_dev)):
    print(True)

In [None]:
resnet_class_report = classification_report(df_dev['label'], resnet_prediction_labels)



In [None]:
resnet_class_report

In [None]:
training_params['results']=resnet_class_report

In [None]:
training_params

In [None]:
# Specify the file path
file_path = "/kaggle/working/training_report_v6.txt"

# Format the dictionary as a string
report_content = "\n".join([f"{key}: {value}" for key, value in training_params.items()])

# Save the report to a text file
with open(file_path, mode="w") as file:
    file.write(report_content)

print(f"Report saved to {file_path}")


# Making prediction


In [None]:
df_test

In [None]:
# Assuming df_test has columns ['caption', 'image']

# Initialize the dataset for the test set (no labels, test transformation)
test_dataset = ResNetDataset(
    df=df_test,  # DataFrame containing the new data
    label_to_id=None,  # No labels for prediction
    mode='test',  # Since it's a test dataset, set train=False for evaluation transforms
    text_field="caption",  # Column for captions
    image_path_field="image"  # Column for image paths
)

# Set up a DataLoader for the test dataset
test_sampler = SequentialSampler(test_dataset)
test_dataloader = DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,  # Adjust your batch size as needed
    sampler=test_sampler
)

# Now you can use the test_dataloader in a prediction loop
prediction_results = []
# Set the model to evaluation mode

for batch in tqdm(test_dataloader):
    b_text, b_imgs = batch  # No labels are unpacked here

    # Tokenize the input text (captions)
    b_inputs = model.tokenizer(
        list(b_text), truncation=True, max_length=max_seq_length, return_tensors="pt", padding=True
    )

    # Move inputs to the correct device
    b_imgs = b_imgs.to(device)
    b_inputs = {k: v.to(device) for k, v in b_inputs.items()}

    # Perform inference
    model.eval()
    with torch.no_grad():
        b_logits = model(text=b_inputs, image=b_imgs)
        b_logits = b_logits.detach().cpu()  # Move logits to CPU for further processing

    # Collect predictions
    prediction_results += torch.argmax(b_logits, dim=-1).tolist()

# If you have an id_to_label mapping, convert indices to labels (if needed)
if 'id_to_label' in locals():
    predicted_labels = [id_to_label[p] for p in prediction_results]
else:
    predicted_labels = prediction_results  # Return indices if no label mapping exists

# Output predictions


In [None]:
df_test.sample(5)

In [None]:
if(len(predicted_labels)==len(df_test)):
    print(True)
else: print(False)

In [None]:
# make submission
prediction_results = {
    "results": {
        str(df_test['_key'].iloc[i]): predicted_labels[i] for i in range(len(predicted_labels))  # Map IDs to predicted labels
    },
    "phase": "dev"  # Set phase as 'test', 'dev', or 'train' as appropriate
}

# Print the structured results

In [None]:
import json

# File path where you want to save the JSON file
output_file = "results3.json"

# Save the dictionary as a JSON file
with open(output_file, 'w') as f:
    json.dump(prediction_results, f, indent=4)

print(f"Prediction results saved to {output_file}")


In [None]:
model
