In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [9]:
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision import models, transforms
from PIL import Image
from transformers import BertModel, BertTokenizer

# Load data
train_data = pd.read_csv('/kaggle/input/ml-project/train/subtask_a_train.csv')
target_data = pd.read_csv('/kaggle/input/target-files/target_t.csv')

# Image transformations
image_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Dataset class
class IdiomImageDataset(Dataset):
    def __init__(self, dataframe, target_df, image_dir):
        self.dataframe = dataframe
        self.target_df = target_df
        self.image_dir = image_dir
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, index):
        row = self.dataframe.iloc[index]
        target_row = self.target_df.iloc[index]
        sentence = row['sentence']
        idiom_name = row['compound'].replace("'", "_")
        image_names = [row[f'image{i}_name'] for i in range(1, 6)]
        
        expected_order = torch.tensor(eval(target_row['target']), dtype=torch.long) - 1

        # Tokenize text
        inputs = self.tokenizer(sentence, return_tensors='pt', padding='max_length', truncation=True, max_length=128)
        
        # Load images
        images = []
        for img_name in image_names:
            img_path = os.path.join(self.image_dir, idiom_name, img_name)
            img = Image.open(img_path).convert('RGB')
            img = image_transforms(img)
            images.append(img)
        images_tensor = torch.stack(images)

        return inputs['input_ids'].squeeze(0), inputs['attention_mask'].squeeze(0), images_tensor, expected_order

# Updated model
class AdvancedMultimodalRankingModel(nn.Module):
    def __init__(self):
        super(AdvancedMultimodalRankingModel, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.resnet = models.resnet50(weights='DEFAULT')
        self.resnet.fc = nn.Identity()
        
        # Feature projectors
        self.fc_text = nn.Linear(768, 128)
        self.fc_image = nn.Linear(2048, 128)
        
        # Rank prediction layers
        self.fc_combined = nn.Sequential(
            nn.Linear(128*2, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, input_ids, attention_mask, images):
        text_features = self.bert(input_ids=input_ids, attention_mask=attention_mask).last_hidden_state
        text_features = text_features.mean(dim=1)
        text_features = self.fc_text(text_features)
        
        batch_size, num_images, channels, height, width = images.size()
        images = images.view(batch_size * num_images, channels, height, width)
        image_features = self.resnet(images)
        image_features = image_features.view(batch_size, num_images, -1)
        
        image_features = self.fc_image(image_features)
        text_features = text_features.unsqueeze(1).repeat(1, num_images, 1)
        
        combined_features = torch.cat((text_features, image_features), dim=2)
        rankings = self.fc_combined(combined_features).squeeze(-1)
        
        return rankings

# Set up data loaders and model
image_folder = '/kaggle/input/ml-project/train'  
dataset = IdiomImageDataset(train_data, target_data, image_folder)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Training setup
model = AdvancedMultimodalRankingModel()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# Update the criterion to MSELoss if comparing scores directly
criterion = nn.MSELoss()  # Or nn.MarginRankingLoss() for ranking comparisons

def train_model(model, data_loader, criterion, optimizer, epochs=5):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for input_ids, attention_mask, images, expected_order in data_loader:
            optimizer.zero_grad()
            
            # Forward pass
            outputs = model(input_ids, attention_mask, images)
            
            # If using MSELoss
            expected_order = expected_order.float()  # Ensure dtype consistency
            loss = criterion(outputs, expected_order)
            
            # Backpropagation
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(data_loader)}')


# Run the training process
train_model(model, train_loader, criterion, optimizer, epochs=5)

torch.save(model.state_dict(), 'advanced_multimodal_ranking_model.pth')

# Evaluation function
def evaluate_model(model, data_loader):
    model.eval()
    all_predictions = []
    with torch.no_grad():
        for input_ids, attention_mask, images, _ in data_loader:
            outputs = model(input_ids, attention_mask, images)
            rankings = torch.argsort(outputs, dim=1)
            all_predictions.extend(rankings.cpu().numpy())
    return all_predictions

predicted_rankings = evaluate_model(model, test_loader)
predicted_rankings_final = [[x + 1 for x in ranking] for ranking in predicted_rankings]
predicted_rankings_final 
print(predicted_rankings_final)

Epoch 1/5, Loss: 5.19644021987915
Epoch 2/5, Loss: 2.606148898601532
Epoch 3/5, Loss: 1.590873658657074
Epoch 4/5, Loss: 1.031388521194458
Epoch 5/5, Loss: 0.6192614510655403
[[3, 4, 5, 1, 2], [5, 4, 3, 2, 1], [1, 4, 5, 3, 2], [2, 5, 4, 3, 1], [4, 1, 5, 2, 3], [4, 1, 2, 3, 5], [3, 4, 1, 5, 2], [1, 5, 4, 3, 2], [1, 2, 3, 4, 5], [5, 3, 4, 2, 1], [2, 3, 1, 4, 5], [1, 2, 5, 4, 3], [3, 2, 4, 1, 5], [5, 4, 1, 2, 3]]


In [4]:
# Get the indices of the test data points
test_indices = test_dataset.indices

# Display the indices
print("Test Indices:", test_indices)

# Optionally, print the actual test data points (e.g., true rankings or other features)
true_test_rankings = [eval(target_data.iloc[idx]['target']) for idx in test_indices]
true_test_rankings

Test Indices: [3, 52, 23, 8, 62, 0, 14, 64, 6, 12, 65, 54, 2, 19]


[[4, 3, 1, 2, 5],
 [1, 3, 2, 4, 5],
 [4, 3, 1, 5, 2],
 [2, 3, 4, 5, 1],
 [4, 1, 5, 2, 3],
 [1, 3, 2, 5, 4],
 [3, 1, 2, 4, 5],
 [3, 1, 2, 5, 4],
 [5, 3, 4, 2, 1],
 [5, 3, 4, 1, 2],
 [2, 5, 1, 3, 4],
 [2, 1, 4, 5, 3],
 [3, 2, 1, 5, 4],
 [2, 4, 5, 1, 3]]

In [10]:
import numpy as np
def mean_reciprocal_rank(true_rankings, predicted_rankings):
    reciprocal_ranks = []
    for true, pred in zip(true_rankings, predicted_rankings):
        for i, p in enumerate(pred):
            if p == true[i]:
                reciprocal_ranks.append(1 / (i + 1))
                break
        else:
            reciprocal_ranks.append(0)
    return np.mean(reciprocal_ranks)
print(mean_reciprocal_rank(true_test_rankings,predicted_rankings_final ))

0.5166666666666667


In [11]:
# Calculate accuracy by checking exact matches between the two arrays
def calculate_accuracy(predicted, true_test):
    # Convert to numpy arrays for easy comparison
    predicted_np = np.array(predicted)
    true_test_np = np.array(true_test)
    
    # Check for exact matches row by row
    matches = np.all(predicted_np == true_test_np, axis=1)
    accuracy = np.mean(matches) * 100  # Percentage of exact matches
    return accuracy

# Calculate and print the accuracy
accuracy = calculate_accuracy(true_test_rankings,predicted_rankings_final )
print(f"Accuracy: {accuracy:.2f}%")

Accuracy: 7.14%
