# MultiBERTs Evaluation

In [None]:
!pip install transformers jsonlines

In [None]:
from typing import Type

import torch
from transformers import AutoTokenizer, PreTrainedModel


class MaskPredictor:
    def __init__(
            self, model_name: str, model: Type[PreTrainedModel], revision: str = None
    ):
        if revision:
            self.tokenizer = AutoTokenizer.from_pretrained(
                model_name, revision=revision
            )
        else:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = model.from_pretrained(model_name)
        self.model.cuda()

    def pll_score(self, sentence: str):
        """
        References:
        - https://stackoverflow.com/questions/70464428/how-to-calculate-perplexity-of-a-sentence-using-huggingface-masked-language-mode
        - https://arxiv.org/abs/1910.14659
        """
        tensor_input = self.tokenizer.encode(sentence, return_tensors="pt")
        repeat_input = tensor_input.repeat(tensor_input.size(-1) - 2, 1)
        mask = torch.ones(tensor_input.size(-1) - 1).diag(1)[:-2]
        masked_input = repeat_input.masked_fill(mask == 1, self.tokenizer.mask_token_id)
        labels = repeat_input.masked_fill(
            masked_input != self.tokenizer.mask_token_id, -100
        )
        with torch.inference_mode():
            # Loss here is actually the average negative log likelihood, it should be positive since each log likelihood
            # is negative (since it's a log of a probability between 0 and 1
            loss = self.model(masked_input.cuda(), labels=labels.cuda()).loss
        return np.exp(loss.item())

In [None]:
import jsonlines
import numpy as np
from transformers import BertForMaskedLM


def test_pll_score(sentences: list[tuple[str, str]], seed: int, steps_to_test: list[int]):
    success_rates = {}
    for step in steps_to_test:
        print(f"(*) Calculating PLL score for step {step}k")
        predictor = MaskPredictor(
            f"google/multiberts-seed_{seed}-step_{step}k", BertForMaskedLM
        )
        success_count = 0
        for right_sentence, wrong_sentence in sentences:
            right_score = predictor.pll_score(right_sentence)
            wrong_score = predictor.pll_score(wrong_sentence)
            if right_score < wrong_score:
                success_count += 1

        success_rates[step] = success_count / len(sentences) * 100

        print(f"\t(*) Success rate: {success_count / len(sentences) * 100}%")

    return success_rates

In [None]:
from pathlib import Path

def parse_gordon_questions_file(file_path: Path):
    questions = {'subject_extracted': [], 'definite_description': [], 'object_extracted': [], 'indexical_pronoun': [],'name':[]}
    with jsonlines.open(file_path) as reader:
        for obj in reader:
            right_sentence = f"{obj['sentence']} {obj['true_question']}"
            wrong_sentence = f"{obj['sentence']} {obj['false_question']}"

            condition_sentence = obj["condition_sentence"]
            condition_distractor = obj["condition_distractor"]
            condition_question = obj["condition_question"]

            questions[condition_sentence].append((right_sentence, wrong_sentence))
            questions[condition_distractor].append((right_sentence, wrong_sentence))
            # questions[condition_question].append((right_sentence, wrong_sentence)) for now we dont focus on that parameter

    return questions

def parse_naama_questions_file(file_path: Path):
    questions = {'S-V': [], 'F-G': [], 'animate': [], 'inanimate': []}
    with jsonlines.open(file_path) as reader:
        for obj in reader:
            dependency = obj['dependency']
            animacy = obj['animacy']
            right_sentence = f"{obj['sentence']} Therefore, {obj['true_question']}"
            wrong_sentence = f"{obj['sentence']} Therefore, {obj['false_question']}"
            questions[dependency].append((right_sentence, wrong_sentence))
            questions[animacy].append((right_sentence, wrong_sentence))

    return questions

In [None]:
import seaborn as sns
import pandas as pd


def plot_success_rates(rate_per_seed: dict[int, dict[int, float]]):
    """
    Plot beautiful line plot using seaborn
    X axis - step
    Y axis - success rate
    Color - seed
    """
    # Create a Dataframe from rates_per_seed
    data = []
    for seed, rates in rate_per_seed.items():
        for step, rate in rates.items():
            data.append({"seed": seed, "step": step, "rate": rate})

    data = pd.DataFrame(data, columns=["seed", "step", "rate"])

    sns.lineplot(data=data, x="step", y="rate", hue="seed")

In [None]:
SEEDS_TO_TEST = [0, 1, 2]
STEPS_TO_TEST = [0, 40, 100, 140, 200]

# Gordon

In [None]:
import csv

gordon_questions = parse_gordon_questions_file(Path("gordon_questions.jsonl"))

# Create or overwrite a CSV file named 'output.csv'
with open('gordon_output.csv', 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)

    # Write the header row to the CSV file
    csv_writer.writerow(['group', 'seed', 'step', 'success_rate'])

    x = {}
    for group_tuple in [("subject_extracted", "definite_description"), ("object_extracted", "definite_description"), ("subject_extracted", "indexical_pronoun"), ("object_extracted", "indexical_pronoun")]:
        question_group = f"{group_tuple[0]}_{group_tuple[1]}"
        print(f"(*) Testing question group {question_group}")
        print('\t(*) Questions: ', len(gordon_questions[group_tuple[0]]), len(gordon_questions[group_tuple[1]]))

        # Get only questions that are both in the dependency and animacy groups
        questions = list(set(gordon_questions[group_tuple[0]]).intersection(gordon_questions[group_tuple[1]]))
        print('\t(*) Questions after intersection: ', len(questions))

        x[question_group] = questions

        rates_per_seed = {}

        for seed in SEEDS_TO_TEST:
            print(f"(*) Testing seed {seed}")
            success_rates = test_pll_score(questions, seed, STEPS_TO_TEST)
            rates_per_seed[seed] = success_rates

            # Iterate through the success rates and write to the CSV file
            for step in STEPS_TO_TEST:
                csv_writer.writerow([question_group, seed, step, success_rates[step]])

        # plot_success_rates(rates_per_seed)

In [None]:
plot_success_rates(rates_per_seed)

# Naama

In [None]:
import csv

naama_questions = parse_naama_questions_file(Path("naama_questions.jsonl"))

# Create or overwrite a CSV file named 'output.csv'
with open('naama_output.csv', 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)

    # Write the header row to the CSV file
    csv_writer.writerow(['group', 'seed', 'step', 'success_rate'])

    x = {}
    for group_tuple in [("S-V", "animate"), ("S-V", "inanimate"), ("F-G", "animate"), ("F-G", "inanimate")]:
        question_group = f"{group_tuple[0]}_{group_tuple[1]}"
        print(f"(*) Testing question group {question_group}")
        print('\t(*) Questions: ', len(naama_questions[group_tuple[0]]), len(naama_questions[group_tuple[1]]))

        # Get only questions that are both in the dependency and animacy groups
        questions = list(set(naama_questions[group_tuple[0]]).intersection(naama_questions[group_tuple[1]]))
        print('\t(*) Questions after intersection: ', len(questions))

        x[question_group] = questions

        rates_per_seed = {}

        for seed in SEEDS_TO_TEST:
            print(f"(*) Testing seed {seed}")
            success_rates = test_pll_score(questions, seed, STEPS_TO_TEST)
            rates_per_seed[seed] = success_rates

            # Iterate through the success rates and write to the CSV file
            for step in STEPS_TO_TEST:
                csv_writer.writerow([question_group, seed, step, success_rates[step]])

        # plot_success_rates(rates_per_seed)