In [None]:
# run this only one time and then restart runtime, do not run upon restart again
!pip install openai==0.28

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
import os
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from transformers import BertTokenizer
from sklearn.model_selection import train_test_split
import openai
import torch.nn as nn
import torch.optim as optim

from google.colab import userdata

# Initialize OpenAI API key
openai.api_key = userdata.get('api_key') # you will need to generate your own api key for getting representation
# Load Dataset
data_path = '/content/drive/MyDrive/NLP_fall_2024/processed_datasets/new_russian_processed_data.csv'
df = pd.read_csv(data_path)

# Split the dataset into training (70%) and testing (30%)
train_df, test_df = train_test_split(df, test_size=0.3, random_state=42)
print("After split, train_df columns:", train_df.columns)

# Specify the folder containing your images
img_folder_path = '/content/drive/MyDrive/NLP_fall_2024/russian-images-archive'

# Count the number of files in the folder
def count_images_in_folder(folder_path):
    image_count = 0
    for file in os.listdir(folder_path):
        if file.endswith(('png', 'jpg', 'jpeg')):  # Check for valid image extensions
            image_count += 1
    return image_count

image_count = count_images_in_folder(img_folder_path)
print(f"Number of image files in the folder: {image_count}")

In [None]:
print("Files in image folder:")
print(os.listdir(img_folder_path))

def load_image(image_id, img_folder_path):
    for ext in ['png', 'jpg', 'jpeg']:
        image_path = os.path.join(img_folder_path, f"{image_id}.{ext}")
        if os.path.exists(image_path):
            image = Image.open(image_path).convert('RGB').resize((224, 224))
            print(f"Loaded image: {image_path}")
            return np.array(image)
    # print(f"Image not found for ID: {image_id}")
    return None

for idx in range(5):
    image = load_image(train_df.iloc[idx]['id'], img_folder_path)
    if image is None:
        print(f"Image with ID {train_df.iloc[idx]['id']} not loaded.")
    else:
        print(f"Image with ID {train_df.iloc[idx]['id']} loaded successfully.")

In [None]:
def get_representation(text, model="gpt-4-turbo"):
    """
    Generate a representation for text using GPT-4 Turbo with the updated OpenAI API.
    """
    try:
        response = openai.ChatCompletion.create(
            model=model,
            messages=[
                {"role": "system", "content": "You are an assistant that generates concise representations for embeddings."},
                {"role": "user", "content": text}
            ]
        )
        # Extract concise representation
        representation = response['choices'][0]['message']['content']
        print(f"Representation generated for text: {text[:50]}...")
        return representation
    except Exception as e:
        print(f"Error generating representation: {e}")
        return None

In [None]:
test_text = "This is a sample text for testing."
representation = get_representation(test_text)
print("Generated representation:", representation)

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def generate_and_tokenize_representations(dataframe, model="gpt-4-turbo"):
    representations = []
    for i, row in dataframe.iterrows():
        text = row['text']
        # Generate representation
        representation = get_representation(text, model)
        if representation:
            # Tokenize representation
            tokenized = tokenizer(representation, truncation=True, padding="max_length", max_length=128)
            representations.append(tokenized['input_ids'])
        else:
            print(f"Failed to generate representation for row {i}")
            representations.append(None)
    return representations

# Generate and tokenize representations for the entire dataset
df['tokenized'] = generate_and_tokenize_representations(df)

# Save tokenized embeddings to a file
np.save("tokenized_representations.npy", np.array(df['tokenized'].tolist(), dtype=object))
df.to_csv("processed_dataset.csv", index=False)
print("Tokenized representations saved to 'tokenized_representations.npy'")

In [None]:
class MultimodalDatasetWithTokens(Dataset):
    def __init__(self, dataframe, img_folder, transform=None):
        self.dataframe = dataframe.reset_index(drop=True)
        self.img_folder = img_folder
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        image_id = row['id']
        label = row['label']
        text = row['text']
        text_tokens = torch.tensor(row['tokenized'], dtype=torch.long)

        # Load image
        image = None
        for ext in ['png', 'jpg', 'jpeg']:
            image_path = os.path.join(self.img_folder, f"{image_id}.{ext}")
            if os.path.exists(image_path):
                image = Image.open(image_path).convert("RGB")
                break

        if image is None:
            # Skip samples with missing images
            # print(f"Image not found for ID: {image_id}. Skipping sample.")
            return None  # Returning None allows the DataLoader's collate_fn to filter this out

        if self.transform:
            image = self.transform(image)

        return {
            "id": image_id,
            "text": text,
            "text_tokens": text_tokens,
            "image": image,
            "label": torch.tensor(label, dtype=torch.long) if label is not None else None,
        }

In [None]:
# Image transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Load the preprocessed dataset
processed_df = pd.read_csv("processed_dataset.csv")
processed_df['tokenized'] = processed_df['tokenized'].apply(eval)

# Create train/test splits again with tokenized processed dataset
train_df = processed_df.sample(frac=0.8, random_state=42)
test_df = processed_df.drop(train_df.index)

# Create Datasets and DataLoaders
train_dataset = MultimodalDatasetWithTokens(train_df, img_folder_path, transform=transform)
test_dataset = MultimodalDatasetWithTokens(test_df, img_folder_path, transform=transform)

def custom_collate_fn(batch):
    # Filter out None samples
    batch = [sample for sample in batch if sample is not None]
    if len(batch) == 0:
        return None

    # Create a batch dictionary with proper tensor conversion
    batch_dict = {}
    for key in batch[0]:
        if key == "id" or key == "text":  # Keep non-numerical fields as is
            batch_dict[key] = [sample[key] for sample in batch]
        else:  # Convert numerical fields to tensors
            batch_dict[key] = torch.stack([sample[key] for sample in batch])

    return batch_dict

# Create DataLoaders with the custom collate function
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=custom_collate_fn)

In [None]:
import torchvision.models as models

class MultimodalGPTModel(nn.Module):
    def __init__(self):
        super(MultimodalGPTModel, self).__init__()
        # Pretrained ResNet
        resnet = models.resnet18(pretrained=True)
        self.image_encoder = nn.Sequential(
            *(list(resnet.children())[:-1]),  # Remove the final classification layer
            nn.Flatten()
        )
        self.image_fc = nn.Linear(resnet.fc.in_features, 256)
        self.text_fc = nn.Linear(128, 256)
        self.weight_fc = nn.Linear(256, 2)  # Learnable weights for text and image
        self.fc = nn.Sequential(
            nn.Linear(256 + 128, 128),
            nn.ReLU(),
            nn.Linear(128, 2)
        )

    def forward(self, text_embeddings, images):
        image_features = self.image_encoder(images)
        image_features = self.image_fc(image_features)

        text_features = self.text_fc(text_embeddings)

        # Learnable modality weights
        weights = torch.softmax(self.weight_fc(text_features + image_features), dim=1)  # Size: (batch_size, 2)

        # Combine features using learned weights
        combined_weighted_features = weights[:, 0].unsqueeze(1) * text_features + weights[:, 1].unsqueeze(1) * image_features

        # Concatenate weighted combined features with original features
        combined_features = torch.cat((combined_weighted_features, text_embeddings), dim=1)  # Size: (batch_size, 384)

        outputs = self.fc(combined_features)
        return outputs

In [None]:
# Initialize Model, Optimizer, and Loss Function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultimodalGPTModel().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

In [None]:
# Dummy data for testing
dummy_text_embeddings = torch.randn(32, 128).to(device)  # Batch of 32, 128-dimensional embeddings
dummy_images = torch.randn(32, 3, 224, 224).to(device)   # Batch of 32 images, 3 channels, 224x224 size

# Forward pass
outputs = model(dummy_text_embeddings, dummy_images)
print(f"Model output shape: {outputs.shape}")  # Should be [32, 2] for binary classification

In [None]:
for epoch in range(10):  # Number of epochs
    model.train()
    total_loss = 0

    for batch in train_loader:
        if batch is None:
            continue
        text_tokens = batch["text_tokens"].to(device).float()
        images = batch["image"].to(device)
        labels = batch["label"].to(device)

        optimizer.zero_grad()

        # Forward pass
        outputs = model(text_tokens, images)

        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader)}")


In [None]:
# do not use, used for full comparison between original label, our model label and gpt4 label
def openai_predict(text, model="gpt-4-turbo"):
    try:
        response = openai.ChatCompletion.create(
            model=model,
            messages=[
                {"role": "system", "content": "You are a classifier for sarcasm detection."},
                {"role": "user", "content": f"Is the following sarcastic? {text}"}
            ]
        )
        prediction = response['choices'][0]['message']['content'].strip().lower()
        return 1 if "sarcastic" in prediction else 0  # Assuming binary classification
    except Exception as e:
        print(f"OpenAI prediction failed: {e}")
        return -1  # Indicate failure

model.eval()
true_labels = []
predicted_labels = []
openai_predictions = []
results = []  # To store detailed results for comparison

print("\nTesting starts...")
with torch.no_grad():
    for batch_idx, batch in enumerate(test_loader):
        if batch is None:
            continue

        images = batch["image"].to(device)
        text_tokens = batch["text_tokens"].to(device).float()
        labels = batch["label"].to(device)
        texts = batch["text"]

        # Forward pass through the trained model
        model_outputs = model(text_tokens, images)
        model_preds = torch.argmax(model_outputs, dim=1).cpu().tolist()

        # Get predictions from OpenAI
        for text in texts:
            openai_pred = openai_predict(text)
            openai_predictions.append(openai_pred)

        # Log results for comparison
        for idx in range(len(labels)):
            results.append({
                "text": texts[idx],
                "true_label": labels[idx].item(),
                "model_pred": model_preds[idx],
                "openai_pred": openai_predictions[idx]
            })

        true_labels.extend(labels.cpu().tolist())
        predicted_labels.extend(model_preds)

# Save detailed results for comparison
results_df = pd.DataFrame(results)
results_df.to_csv("full_comparison_results.csv", index=False)
print("Full comparison results saved to full_comparison_results.csv.")


In [None]:
# Testing loop
model.eval()
true_labels = []
predicted_labels = []
results = []

print("\nTesting starts...")
with torch.no_grad():
    for batch_idx, batch in enumerate(test_loader):
        print(f"\nBatch {batch_idx + 1}:")

        # Skip None batches due to potential filtering in custom_collate_fn
        if batch is None:
            print(f"Batch {batch_idx + 1} skipped due to missing data.")
            continue

        # Move tensors to the device
        images = batch["image"].to(device)
        text_tokens = batch["text_tokens"].to(device).float()
        labels = batch["label"].to(device)
        ids = batch["id"]
        texts = batch["text"]

        # Forward pass
        outputs = model(text_tokens, images)
        preds = torch.argmax(outputs, dim=1).cpu().tolist()

        # Log results for each sample
        for idx in range(len(labels)):
            result = {
                "id": ids[idx],
                "text": texts[idx],
                "true_label": labels[idx].item(),  # Original label
                "predicted_label": preds[idx],  # Model's prediction
            }
            results.append(result)

        # Collect labels for metric computation
        true_labels.extend(labels.cpu().tolist())
        predicted_labels.extend(preds)
        print(f"Predictions for batch: {preds}")

# Save detailed results to CSV
results_df = pd.DataFrame(results)
results_df.to_csv("comparison_results.csv", index=False)
print("Detailed results saved to comparison_results.csv.")


In [None]:
# Calculate metrics
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report

accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels, average='weighted')
recall = recall_score(true_labels, predicted_labels, average='weighted')
f1 = f1_score(true_labels, predicted_labels, average='weighted')

# Create a DataFrame for display
metrics_df = pd.DataFrame({
    "Metric": ["Accuracy", "Precision", "Recall", "F1-Score"],
    "Value": [accuracy, precision, recall, f1]
})

print("\nClassification Report:")
print(classification_report(true_labels, predicted_labels, target_names=["Non-Sarcastic", "Sarcastic"]))

print("\nMetrics Summary:")
print(metrics_df)