In [None]:
!pip install -q transformers torch datasets rouge-score sacrebleu grapheme

In [None]:
from transformers import AutoTokenizer, TrainingArguments, Trainer
import os
import torch
import string
import numpy as np
import pandas as pd
import collections
from pathlib import Path
from datetime import datetime
from rouge_score import rouge_scorer
import grapheme
import seaborn as sns
from sacrebleu.metrics import BLEU
from sklearn.metrics import f1_score
from transformers import Trainer, TrainingArguments, default_data_collator, AutoTokenizer, AutoModelForQuestionAnswering, EarlyStoppingCallback
import tensorflow as tf
from datasets import Dataset
from tqdm.auto import tqdm
from rouge_score import rouge_scorer
import nltk
import matplotlib.pyplot as plt
import os
from transformers import TrainerCallback
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from google.colab import drive

In [None]:
drive.mount('/content/drive')

In [None]:
test = pd.read_csv('/content/drive/MyDrive/Research_folder/data/test.csv')

In [None]:
#test = pd.read_csv('/content/drive/MyDrive/Research_folder/data/SEEDData.csv')

In [None]:
test.head()

In [None]:
# test_1.head()

In [None]:
model_checkpoint = "/content/drive/MyDrive/Research_folder/models/XLM-R-75-metrics"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [None]:
# Set up configuration for the tokenization
batch_size = 16
max_length = 512
doc_stride = 128  # Stride value to handle document overflow
pad_on_right = tokenizer.padding_side == "right"  # Check if padding is applied on the right side

In [None]:
model = AutoModelForQuestionAnswering.from_pretrained(model_checkpoint)

# Tell pytorch to run this model on the GPU.
if torch.cuda.is_available():
  model.cuda()

In [None]:
def prepare_validation_features(examples):
    # Remove leading whitespace from questions to avoid unnecessary token space usage
    examples["question"] = [q.lstrip() for q in examples["question"]]

    # Tokenize questions and contexts while handling long texts with a sliding window (stride)
    tokenized_examples = tokenizer(
        examples["question" if pad_on_right else "context"],  # Choose question or context based on padding side
        examples["context" if pad_on_right else "question"],  # Opposite choice for the second input
        truncation="only_second" if pad_on_right else "only_first",  # Truncate only the context (or question if needed)
        max_length=max_length,  # Set max sequence length
        stride=doc_stride,  # Define overlap between chunks for long contexts
        return_overflowing_tokens=True,  # Return multiple input chunks for long contexts
        return_offsets_mapping=True,  # Keep track of token positions in original text
        padding="max_length",  # Ensure all sequences have the same length
    )

    # Map each generated tokenized example to its original sample index
    sample_mapping = tokenized_examples.pop("overflow_to_sample_mapping")

    # Create a list to store the example ID corresponding to each tokenized input
    tokenized_examples["example_id"] = []

    for i in range(len(tokenized_examples["input_ids"])):  # Iterate through tokenized samples
        # Identify whether the sequence contains context (1) or question (0)
        sequence_ids = tokenized_examples.sequence_ids(i)
        context_index = 1 if pad_on_right else 0  # Set correct index based on padding side

        # Find the original example index that this tokenized input corresponds to
        sample_index = sample_mapping[i]
        tokenized_examples["example_id"].append(examples["id"][sample_index])  # Store example ID

        # Adjust offset mapping: Set offsets to None for non-context tokens
        tokenized_examples["offset_mapping"][i] = [
            (offset if sequence_ids[k] == context_index else None)  # Keep offsets only for context tokens
            for k, offset in enumerate(tokenized_examples["offset_mapping"][i])
        ]

    return tokenized_examples

In [None]:
def postprocess_qa_predictions(examples, features, raw_predictions, n_best_size=20, max_answer_length=30):
    all_start_logits, all_end_logits = raw_predictions

    # Map example to its corresponding feature indices
    example_id_to_index = {k: i for i, k in enumerate(examples["id"])}
    features_per_example = collections.defaultdict(list)
    for i, feature in enumerate(features):
        features_per_example[example_id_to_index[feature["example_id"]]].append(i)

    predictions = collections.OrderedDict()
    print(f"Post-processing {len(examples)} examples with {len(features)} features.")

    # Iterate over all examples
    for example_index, example in enumerate(tqdm(examples)):
        feature_indices = features_per_example[example_index]
        min_null_score = None
        valid_answers = []

        context = example["context"]
        # Process each feature associated with the current example
        for feature_index in feature_indices:
            start_logits = all_start_logits[feature_index]
            end_logits = all_end_logits[feature_index]
            offset_mapping = features[feature_index]["offset_mapping"]

            # Track minimum null score (if needed)
            cls_index = features[feature_index]["input_ids"].index(tokenizer.cls_token_id)
            feature_null_score = start_logits[cls_index] + end_logits[cls_index]
            min_null_score = max(min_null_score or float('-inf'), feature_null_score)

            # Get top n start and end logits
            start_indexes = np.argsort(start_logits)[-1 : -n_best_size - 1 : -1].tolist()
            end_indexes = np.argsort(end_logits)[-1 : -n_best_size - 1 : -1].tolist()

            # Evaluate each possible answer span
            for start_index in start_indexes:
                for end_index in end_indexes:
                    # Skip invalid spans
                    if start_index >= len(offset_mapping) or end_index >= len(offset_mapping) or offset_mapping[start_index] is None or offset_mapping[end_index] is None:
                        continue
                    if end_index < start_index or end_index - start_index + 1 > max_answer_length:
                        continue

                    # Extract the answer text
                    start_char, end_char = offset_mapping[start_index][0], offset_mapping[end_index][1]
                    valid_answers.append({"score": start_logits[start_index] + end_logits[end_index], "text": context[start_char:end_char]})

        # Select the best answer
        best_answer = sorted(valid_answers, key=lambda x: x["score"], reverse=True)[0] if valid_answers else {"text": "", "score": 0.0}
        predictions[example["id"]] = best_answer["text"]

    return predictions

In [None]:
# Convert the test DataFrame to a Hugging Face Dataset and process the features for validation
test_dataset = Dataset.from_pandas(test)

test_features = test_dataset.map(
    prepare_validation_features,
    batched=True,
    remove_columns=test_dataset.column_names
)

In [None]:
# Remove unnecessary columns ('example_id' and 'offset_mapping') from the test features
test_feats_small = test_features.map(lambda example: example, remove_columns=['example_id', 'offset_mapping'])

In [None]:
test_feats_small

In [None]:
%env WANDB_DISABLED=True
args = TrainingArguments(
    output_dir='./results',
    label_names=["start_positions", "end_positions"]
)
trainer = Trainer(model, args)

In [None]:
# Make predictions on the test data
test_predictions = trainer.predict(test_feats_small)

In [None]:
# Set the format of the test features dataset, retaining the original columns and format
test_features.set_format(type=test_features.format["type"], columns=list(test_features.features.keys()))

In [None]:
# Post-process the raw predictions on the test dataset to get the final answers
final_test_predictions = postprocess_qa_predictions(test_dataset, test_features, test_predictions.predictions)

In [None]:
# Add the predicted answers to the test DataFrame
test['PredictionString'] = test['id'].apply(lambda r: final_test_predictions[r])
test.head()

Evaluation Metrics

In [None]:
# Function to remove punctuation from a string
def remove_punctuation(text):
    return text.translate(str.maketrans('', '', string.punctuation))

In [None]:
# Function to compute Exact Match
def compute_exact_match(pred, truth):
    pred = remove_punctuation(pred.strip().lower())
    truth = remove_punctuation(truth.strip().lower())
    return int(pred == truth)

In [None]:
# Function to compute F1 Score
def compute_f1(pred, truth):
    # Remove punctuation from both pred and truth
    pred = remove_punctuation(pred.strip().lower())
    truth = remove_punctuation(truth.strip().lower())

    # Split the text into words for F1 calculation
    pred_tokens = pred.split()
    truth_tokens = truth.split()

    # Calculate the number of common tokens between the prediction and the truth
    common_tokens = set(pred_tokens) & set(truth_tokens)

    if len(pred_tokens) == 0 or len(truth_tokens) == 0:
        return 0.0

    precision = len(common_tokens) / len(pred_tokens)
    recall = len(common_tokens) / len(truth_tokens)

    if precision + recall == 0:
        return 0.0

    return 2 * (precision * recall) / (precision + recall)

In [None]:
# Function to compute Jaccard score
def compute_jaccard(pred, truth):
    # Normalize and remove punctuation
    pred = remove_punctuation(pred.strip().lower())
    truth = remove_punctuation(truth.strip().lower())

    # Convert to token sets
    pred_tokens = set(pred.split())
    truth_tokens = set(truth.split())

    # If both are empty
    if not pred_tokens and not truth_tokens:
        return 1.0

    # If either is empty
    if not pred_tokens or not truth_tokens:
        return 0.0

    intersection = pred_tokens.intersection(truth_tokens)
    union = pred_tokens.union(truth_tokens)

    return len(intersection) / len(union)

In [None]:
def tokenize_xlm_r(text):
    """Tokenize Sinhala text using XLM-R tokenizer and join tokens."""
    tokens = tokenizer.tokenize(text)
    return " ".join(tokens)  # Join tokens with space for ROUGE

In [None]:
# Initialize ROUGE scorer
scorer = rouge_scorer.RougeScorer(["rouge2", "rouge3", "rouge4", "rougeL"], use_stemmer=False)

def compute_rouge(pred, truth):
    """Compute ROUGE-2 and ROUGE-L scores for a given prediction and truth."""
    pred_tokens = tokenize_xlm_r(pred)
    truth_tokens = tokenize_xlm_r(truth)
    scores = scorer.score(truth_tokens, pred_tokens)
    return scores['rouge2'].fmeasure, scores['rouge3'].fmeasure, scores['rouge4'].fmeasure, scores['rougeL'].fmeasure

In [None]:
# Splits Sinhala text into graphemes using the `grapheme` library.
def tokenize_graphemes(text):
    return grapheme.graphemes(text)  # Returns list of graphemes

In [None]:
def compute_bleu(pred, truth, n_gram):
    # Tokenize the prediction and truth
    pred_tokens = " ".join(tokenize_graphemes(pred))
    truth_tokens = [" ".join(tokenize_graphemes(truth))]  # Must be a list

    # Create BLEU scorer with proper settings
    bleu_scorer = BLEU(
        max_ngram_order=n_gram,
        smooth_method="exp",
        effective_order=True  # This addresses the warnings
    )
    bleu_score = bleu_scorer.sentence_score(pred_tokens, truth_tokens)
    return bleu_score.score

In [None]:
# add metric per model
def update_metrics_csv(model_name, metrics_dict, csv_file):
    """
    Update a CSV file with model metrics, preserving existing data.

    Args:
        model_name (str): Name of the model
        metrics_dict (dict): Dictionary of metrics to save
        csv_file (str): Path to CSV file
    """
    # Create new DataFrame with current metrics
    new_data = {
        'model': [model_name],
        'timestamp': [datetime.now().isoformat()],
        **metrics_dict
    }
    new_df = pd.DataFrame(new_data)

    # If file exists, load and append new data
    if Path(csv_file).exists():
        existing_df = pd.read_csv(csv_file)
        updated_df = pd.concat([existing_df, new_df], ignore_index=True)
    else:
        updated_df = new_df

    # Save to CSV
    updated_df.to_csv(csv_file, index=False)
    print(f"Metrics saved to {csv_file}")

In [None]:
# Calculate Exact Match for each row in the DataFrame
test['exact_match'] = test.apply(lambda row: compute_exact_match(row['PredictionString'], row['answer_text']), axis=1)
exact_match_score = test['exact_match'].mean() # percentage

# Calculate F1 Score for each row in the DataFrame
test['f1_scores'] = test.apply(lambda row: compute_f1(row['PredictionString'], row['answer_text']), axis=1)
f1_score_value = sum(test['f1_scores']) / len(test['f1_scores'])

# Apply row-wise Jaccard score
test['jaccard_scores'] = test.apply(lambda row: compute_jaccard(row['PredictionString'], row['answer_text']), axis=1)
jaccard_score_value = test['jaccard_scores'].mean()


# Print the results test set
print(f"Exact Match Score for test set: {exact_match_score:.4f}")
print(f"F1 Score for test set: {f1_score_value:.4f}")
print(f'Jaccard scores: {jaccard_score_value:.4f}')

In [None]:
# Calculate ROUGE-L and ROUGE-2
test[["rouge_2","rouge_3", "rouge_4", "rouge_L"]] = test.apply(
    lambda row: compute_rouge(row['PredictionString'], row['answer_text']), axis=1, result_type="expand"
)
avg_rouge_2 = test['rouge_2'].mean()
avg_rouge_3 = test['rouge_3'].mean()
avg_rouge_4 = test['rouge_4'].mean()
avg_rouge_L = test['rouge_L'].mean()

print(f"Average ROUGE-2 for Val set: {avg_rouge_2:.4f}")
print(f"Average ROUGE-3 for Val set: {avg_rouge_3:.4f}")
print(f"Average ROUGE-4 for Val set: {avg_rouge_4:.4f}")
print(f"Average ROUGE-L for Val set: {avg_rouge_L:.4f}")

In [None]:
# Calculate BLEU-1 and BLEU-2
test['bleu_1'] = test.apply(lambda row: compute_bleu(row['PredictionString'], row['answer_text'], n_gram=1), axis=1)
test['bleu_2'] = test.apply(lambda row: compute_bleu(row['PredictionString'], row['answer_text'], n_gram=2), axis=1)

avg_bleu_1 = test['bleu_1'].mean()
avg_bleu_2 = test['bleu_2'].mean()

# Print the results test set
print(f"Average BLEU-1 for Test set: {avg_bleu_1:.4f}")
print(f"Average BLEU-2 for Test set: {avg_bleu_2:.4f}")

In [None]:
metrics = {
    'avg_bleu_1': avg_bleu_1,
    'avg_bleu_2': avg_bleu_2,
    'avg_rouge_2': avg_rouge_2,
    'avg_rouge_3': avg_rouge_3,
    'avg_rouge_4': avg_rouge_4,
    'avg_rouge_L': avg_rouge_L,
    'exact_match_score': exact_match_score,
    'f1_score_value': f1_score_value,
    'jaccard_score_value': jaccard_score_value
}
update_metrics_csv("XLM-R-75", metrics, csv_file = "/content/drive/MyDrive/Research_folder/outputs/Results/Evaluation_Results_Test.csv") #update model