In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from datasets import load_dataset
from collections import Counter
from nltk.tokenize import word_tokenize
from nltk.translate.bleu_score import corpus_bleu
from rouge import Rouge
from PIL import Image
from tqdm import tqdm
import evaluate
import nltk
nltk.download('punkt')

In [None]:
class ConsumerComplaintDataset(Dataset):
    def __init__(self, dataset_split, word_to_ix, transform=None, max_seq_length=20):
        self.dataset = dataset_split
        self.word_to_ix = word_to_ix
        self.transform = transform
        self.max_seq_length = max_seq_length

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image = self.dataset[idx]['image']
        answer = self.dataset[idx]['answer']
        if isinstance(image, Image.Image):
            image = image.convert("RGB")
        else:
            image = Image.open(image).convert("RGB")
        if self.transform is not None:
            image = self.transform(image)
        tokens = word_tokenize(answer.lower())
        caption = [SOS_TOKEN] + tokens + [EOS_TOKEN]
        caption = [self.word_to_ix.get(w, self.word_to_ix[UNK_TOKEN]) for w in caption]
        if len(caption) < self.max_seq_length:
            caption += [self.word_to_ix[PAD_TOKEN]] * (self.max_seq_length - len(caption))
        else:
            caption = caption[:self.max_seq_length]
        caption = torch.tensor(caption)
        return image, caption

In [None]:
class VGG16LSTM(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(VGG16LSTM, self).__init__()
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        self.num_layers = num_layers
        vgg = models.vgg16(pretrained=True)
        modules = list(vgg.features.children())
        self.vgg = nn.Sequential(*modules)
        for param in self.vgg.parameters():
            param.requires_grad = False
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        self.fc = nn.Linear(512 * 7 * 7, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.embed = nn.Embedding(vocab_size, embed_size)

    def forward(self, images, captions):
        with torch.no_grad():
            features = self.vgg(images)
        features = self.avgpool(features)
        features = features.view(features.size(0), -1)
        features = self.fc(features).unsqueeze(1)  
        embeddings = self.embed(captions[:, :-1])  
        embeddings = torch.cat((features, embeddings), 1)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs

    def sample(self, images, max_seq_length=20):
        with torch.no_grad():
            features = self.vgg(images)
        features = self.avgpool(features)
        features = features.view(features.size(0), -1)
        features = self.fc(features).unsqueeze(1)
        outputs = []
        states = None
        inputs = features
        for _ in range(max_seq_length):
            hiddens, states = self.lstm(inputs, states)
            output = self.linear(hiddens.squeeze(1))
            predicted = output.max(1)[1]
            outputs.append(predicted)
            inputs = self.embed(predicted)
            inputs = inputs.unsqueeze(1)
        outputs = torch.stack(outputs, 1)
        return outputs


In [None]:

PAD_TOKEN = '<PAD>'
SOS_TOKEN = '<SOS>'
EOS_TOKEN = '<EOS>'
UNK_TOKEN = '<UNK>'
dataset = load_dataset("/com-vid.csv")
answers = []
for split in ['train', 'test']:
    for item in dataset[split]:
        answers.append(item['answer'])
word_counts = Counter()
for ans in answers:
    tokens = word_tokenize(ans.lower())
    word_counts.update(tokens)

threshold = 1  
words = [w for w, c in word_counts.items() if c >= threshold]
words = [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN, UNK_TOKEN] + words
word_to_ix = {w: i for i, w in enumerate(words)}
ix_to_word = {i: w for w, i in word_to_ix.items()}
vocab_size = len(word_to_ix)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std =[0.229, 0.224, 0.225])
])

batch_size = 8
train_dataset = ConsumerComplaintDataset(dataset['train'], word_to_ix, transform=transform)
test_dataset = ConsumerComplaintDataset(dataset['test'], word_to_ix, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
embed_size = 256
hidden_size = 512
num_layers = 1
model = VGG16LSTM(embed_size, hidden_size, vocab_size, num_layers).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=word_to_ix[PAD_TOKEN])
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    tqdm_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}")
    for images, captions in tqdm_bar:
        images = images.to(device)
        captions = captions.to(device)
        optimizer.zero_grad()
        outputs = model(images, captions)
        outputs = outputs.view(-1, vocab_size)
        targets = captions.contiguous().view(-1)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        tqdm_bar.set_postfix(loss=loss.item())
    avg_loss = total_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Training Loss: {avg_loss:.4f}')

In [None]:
model.eval()
ground_truths = []
predicted_answers = []
with torch.no_grad():
    for images, answers in tqdm(test_loader, desc="Evaluating"):
        images = images.to(device)
        answers = answers.to(device)
        outputs = model.sample(images, max_seq_length=20)
        for i in range(images.size(0)):
            output_indices = outputs[i].tolist()
            answer_indices = answers[i].tolist()
            hypo = [ix_to_word.get(idx, UNK_TOKEN) for idx in output_indices]
            ref = [ix_to_word.get(idx, UNK_TOKEN) for idx in answer_indices[1:]]  # Exclude SOS token
            hypo = [w for w in hypo if w not in [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN]]
            ref = [w for w in ref if w not in [PAD_TOKEN, SOS_TOKEN, EOS_TOKEN]]
            predicted_answers.append(hypo)
            ground_truths.append(ref) 
predicted_answers = [' '.join(pred) for pred in predicted_answers]
ground_truths = [' '.join(truth) for truth in ground_truths]

In [None]:
import csv
csv_filename = f"lstmvgg16__all_results.csv"

with open(csv_filename, mode='w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)
    writer.writerow(["predicted_answer", "ground_truth"])
    for  pred, gt in zip(predicted_answers, ground_truths):
        writer.writerow([pred, gt])
print(f"Results saved to {csv_filename}")

In [None]:
import csv
import pandas as pd
import evaluate
from transformers import GPT2Tokenizer, GPT2LMHeadModel
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from textstat import textstat
from nltk.translate.meteor_score import meteor_score
from moverscore import get_idf_dict, word_mover_score
import torch
import numpy as np
input_csv =  f"lstmvgg16__all_results.csv"
data = pd.read_csv(input_csv)
predicted_answers = data["predicted_answer"].tolist()
ground_truths = data["ground_truth"].tolist()
# video_ids = data["video_id"].tolist()
rouge_metric = evaluate.load("rouge")
bert_score_metric = evaluate.load("bertscore")
meteor_metric = evaluate.load("meteor")
gpt2_tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
gpt2_model = GPT2LMHeadModel.from_pretrained("gpt2")
def calculate_bleu(pred, ref):
    smoothing = SmoothingFunction().method1
    return [
        sentence_bleu([ref], pred, weights=(1, 0, 0, 0), smoothing_function=smoothing),
        sentence_bleu([ref], pred, weights=(0.5, 0.5, 0, 0), smoothing_function=smoothing),
        sentence_bleu([ref], pred, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smoothing)
    ]

def calculate_perplexity(text):
    inputs = gpt2_tokenizer(text, return_tensors="pt")
    with torch.no_grad():
        outputs = gpt2_model(**inputs, labels=inputs["input_ids"])
        log_likelihood = outputs.loss.item()
    return torch.exp(torch.tensor(log_likelihood))

def calculate_jaccard(pred, ref):
    pred_set = set(pred.split())
    ref_set = set(ref.split())
    intersection = len(pred_set.intersection(ref_set))
    union = len(pred_set.union(ref_set))
    return intersection / union if union != 0 else 0

idf_dict_hyp = get_idf_dict(predicted_answers)
idf_dict_ref = get_idf_dict(ground_truths)
mover_scores = word_mover_score(ground_truths, predicted_answers, idf_dict_ref, idf_dict_hyp, stop_words=[], n_gram=1, remove_subwords=True)

rouge_scores = rouge_metric.compute(predictions=predicted_answers, references=ground_truths)
bleu_scores = [calculate_bleu(pred, ref) for pred, ref in zip(predicted_answers, ground_truths)]
bert_scores = bert_score_metric.compute(predictions=predicted_answers, references=ground_truths, lang="en")
flesch_scores = [textstat.flesch_reading_ease(text) for text in predicted_answers]
coleman_liau_scores = [textstat.coleman_liau_index(text) for text in predicted_answers]
perplexity_scores = [calculate_perplexity(pred) for pred in predicted_answers]
meteor_scores = [meteor_score([ref.split()], pred.split()) for pred, ref in zip(predicted_answers, ground_truths)]
hamming_distances = [sum(el1 != el2 for el1, el2 in zip(pred, ref)) for pred, ref in zip(predicted_answers, ground_truths)]
jaccard_similarities = [calculate_jaccard(pred, ref) for pred, ref in zip(predicted_answers, ground_truths)]

metrics_summary = {
    "ROUGE-1": np.mean(rouge_scores["rouge1"]),
    "ROUGE-2": np.mean(rouge_scores["rouge2"]),
    "ROUGE-L": np.mean(rouge_scores["rougeL"]),
    "BLEU-1": np.mean([score[0] for score in bleu_scores]),
    "BLEU-2": np.mean([score[1] for score in bleu_scores]),
    "BLEU-3": np.mean([score[2] for score in bleu_scores]),
    "BERT Score": np.mean(bert_scores['f1']),
    "FLESCH Readability Ease Score": np.mean(flesch_scores),
    "Coleman-Liau Readability Score": np.mean(coleman_liau_scores),
    "Perplexity Score": np.mean(perplexity_scores),
    "Meteor Score": np.mean(meteor_scores),
    "Mover Score": np.mean(mover_scores),
    "Hamming Distance": np.mean(hamming_distances),
    "Jaccard Similarity": np.mean(jaccard_similarities)
}

summary_csv = f"metrics_summary_lstmvgg16.csv"
with open(summary_csv, mode='w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)
    writer.writerow(["Metric", "Score"])
    for metric, score in metrics_summary.items():
        writer.writerow([metric, score])
print(f"Metrics summary saved to {summary_csv}")

