In [81]:
from dotenv import load_dotenv

load_dotenv()

import csv
import random
from glob import glob

from tqdm import tqdm
from openai import OpenAI
import pandas as pd

OPENAI_CLIENT = OpenAI()

In [82]:

SRC_LANG = 'fra'
SRC_NAME = 'French'
TGT_LANG = 'eng'
TGT_NAME = 'English'


ANNOTATOR = 'A1'

EVAL_ON = 'src'
# eval_on = 'tgt'

RATINGS_FOR_MODEL = ''
# RATINGS_FOR_MODEL = ''

# whether 2 ratings for the same sentences will be averaged
AVERAGE_OUT_RATINGS = True


### Get annotations

And remap columns using standard src_example, tgt_example

In [None]:
# { 'src_word': ..., 'tgt_word': ..., 'src_example': ..., 'tgt_example': ..., 'rating': ...}
from utils import get_relevant_files, extract_rows, remap_columns

files = get_relevant_files(src_lang=SRC_LANG, ratings_for_model=RATINGS_FOR_MODEL, annotator=ANNOTATOR)
rows = extract_rows(files, remove_empty=False)
rows = [r for r in rows if r['Overall rating']]


# for each row, remap column that starts with 'example' and ends with 'src' with 'src_example'
# and the same for 'tgt_example'

for r in rows:
    remap_columns(r)

print(f"Total of {len(rows)} rated examples")
print(random.choice(rows))

## Perplexity

In [84]:
results_tracker = pd.DataFrame(columns=['Language', 'Model', 'Model output', 'correlated with', 'correlation', 'p-value'])
if ANNOTATOR:
    # add column annotator
    results_tracker['Annotator'] = ANNOTATOR

In [85]:
from transformers import AutoTokenizer, AutoModelForMaskedLM


MODEL_PATH = "FacebookAI/xlm-roberta-large"

if SRC_LANG == 'fra':
    MODEL_PATH = 'almanach/camembert-large'
    # switch to camemberta, based on deberta
    # MODEL_PATH = 'almanach/camemberta-base'
elif SRC_LANG == 'ind':
    MODEL_PATH = "indolem/indobert-base-uncased"
    # MODEL_PATH = 'LazarusNLP/NusaBERT-large' . # very bad results with this model, none of them significant
elif SRC_LANG == 'tdt':
    MODEL_PATH = 'raphaelmerx/xlm-roberta-large-tetun'


tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForMaskedLM.from_pretrained(MODEL_PATH)


In [86]:
import torch
from transformers import pipeline 
import math


fill_mask = pipeline("fill-mask", model=MODEL_PATH, tokenizer=MODEL_PATH, device=0)

def replace_word_with_token(word, text, mask_token):
    text = text.lower()
    word = word.lower()
    if word not in text and SRC_LANG == 'fra':
        # some French lemmas
        text = text.replace('média', 'media').replace('avalé', 'avaler').replace('tombé', 'tomber').replace('exaucés', 'exaucer')
    elif word not in text and SRC_LANG == 'tdt':
        text = text.replace("ha'u-nia", "ha’u-nia").replace('orgaun', 'órgaun').replace("exemplu", "examplu").replace("kompaniia", "kompañia")
        
    text = text.replace(word, mask_token)
    return text


def calculate_perplexity(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True)['input_ids']
    with torch.no_grad():
        outputs = model(inputs, labels=inputs)
    
    neg_log_likelihood = outputs.loss.item()
    perplexity = torch.exp(torch.tensor(neg_log_likelihood)).item()
    return perplexity

def calculate_masked_word_prob(word, text):
    text = replace_word_with_token(word, text, tokenizer.mask_token)
    try:
        results = fill_mask(text)
    except:
        print(f"Error for text: {text} and word: {word}")
        return 0
    # can be more than 1 result, if the word appears multiple times
    # if mask appears multiple times, take the first one
    if text.count(tokenizer.mask_token) > 1:
        results = results[0]
    return sum(r['score'] for r in results if r['token_str'] == word)

def calculate_entropy(word, text):
    text = replace_word_with_token(word, text, tokenizer.mask_token)
    try:
        results = fill_mask(text)
    except:
        print(f"Error for text: {text}")
        return 0
    
    if text.count(tokenizer.mask_token) > 1:
        results = results[0]
    entropy = -sum(r['score'] * math.log(r['score'], 2) for r in results)
    return entropy



In [87]:
# preprocessing

def convert_to_float(value):
    if value == "Yes":
        return 1
    if value == "No":
        return 0
    return 0.5

metrics = ['Typical', 'Informative', 'Intelligible', 'Translation correct', 'rating']

sentences_with_rating = [
    {
        'word': r[SRC_LANG if EVAL_ON == 'src' else TGT_LANG],
        'rating': int(r['Overall rating'][0]),
        'text': r['src_example' if EVAL_ON == 'src' else 'tgt_example'],
        'Typical': convert_to_float(r['Typical']),
        'Informative': convert_to_float(r['Informative']),
        'Intelligible': convert_to_float(r['Intelligible']),
        'Translation correct': convert_to_float(r['Translation correct'])
    } for r in rows
]


In [None]:
def add_perplexity(sentences_with_rating):
    for s in tqdm(sentences_with_rating, desc='Calculating perplexity'):
        s['perplexity'] = calculate_perplexity(s['text'])
        s['word_perplexity'] = calculate_perplexity(s['word'])
        s['weighted_perplexity'] = s['perplexity'] / s['word_perplexity']

def add_masked_probabilities(sentences_with_rating):
    for s in tqdm(sentences_with_rating, desc='Calculating masked probabilities'):
        s['word_probability'] = calculate_masked_word_prob(s['word'], s['text'])

def add_entropy(sentences_with_rating):
    for s in tqdm(sentences_with_rating, desc='Calculating entropy'):
        s['entropy'] = calculate_entropy(s['word'], s['text'])

add_masked_probabilities(sentences_with_rating)
add_entropy(sentences_with_rating)
add_perplexity(sentences_with_rating)


In [None]:
from scipy.stats import pearsonr

data = pd.DataFrame(sentences_with_rating)

def get_corel_perplexity(metric = 'rating'):
    correlation, p_value = pearsonr(data['perplexity'], data[metric])
    row = [SRC_LANG, MODEL_PATH, 'Perplexity', metric, correlation, p_value]
    if ANNOTATOR:
        row.append(ANNOTATOR)
    results_tracker.loc[len(results_tracker)] = row
    print(f"Perplexity correlation for {metric}: {correlation:.3f}, p-value: {p_value}")

def get_corel_word_prob(metric = 'Informative'):
    correlation, p_value = pearsonr(data['word_probability'], data[metric])
    row = [SRC_LANG, MODEL_PATH, 'Masked LM', metric, correlation, p_value]
    if ANNOTATOR:
        row.append(ANNOTATOR)
    results_tracker.loc[len(results_tracker)] = row
    print(f"Masked LM correlation for {metric}: {correlation:.3f}, p-value: {p_value:.3f}")

def get_corel_entropy(metric = 'rating'):
    correlation, p_value = pearsonr(data['entropy'], data[metric])
    row = [SRC_LANG, MODEL_PATH, 'Entropy', metric, correlation, p_value]
    if ANNOTATOR:
        row.append(ANNOTATOR)
    results_tracker.loc[len(results_tracker)] = row
    print(f"Entropy correlation for {metric}: {correlation:.3f}, p-value: {p_value:.3f}")

print(f"Working with language {SRC_LANG} and model {MODEL_PATH}")
print(f"Annotator: {ANNOTATOR}, model: {RATINGS_FOR_MODEL}")
print()
for metric in metrics:
    get_corel_word_prob(metric)
    get_corel_perplexity(metric)
    get_corel_entropy(metric)

In [None]:
results_tracker

In [91]:
if ANNOTATOR:
    filename = f'{SRC_LANG}_{ANNOTATOR}_results_tracker.csv'
else:
    filename = f'{SRC_LANG}_results_tracker.csv'

results_tracker.to_csv(filename, index=False)

In [None]:
langs = ['fra', 'ind', 'tdt']
annotator = 'A1'

allfiles = []

for lang in langs:
    files = glob(f'{lang}_{annotator}_results_tracker.csv')
    if files:
        allfiles.extend(files)

print(allfiles)
# dataframe with all results
df = pd.concat([pd.read_csv(f) for f in allfiles])
df.to_csv('all_results_tracker.csv', index=False)
df