# this script will be used for evaluation, it will load the model trained training notebook

In [1]:
import json
import torch
from transformers import AutoTokenizer, RobertaForMaskedLM
import os
from nltk.corpus import stopwords
import random
import yake
import spacy
import collections
import argparse
from scipy.stats import kendalltau, pearsonr, spearmanr
import numpy as np
from tqdm import tqdm
import math
import pandas as pd

In [2]:
model_dir = "trained_model/"
model_size = 'base'
add_pos = True
mask_random = False
batch_size = 64
m_ratio = 0.15

In [3]:
if(model_dir=="roberta-base"):
    print("Evaluating with roberta-base model.")
    model_dir = "roberta-base"
    out_dir = "result_roberta_base"
    no_pre = True
else:
    if(not os.path.isdir(model_dir)):
        print("Model Directory does not exist.")
        exit(0)
    else:
        out_dir = os.path.join(model_dir, args['out'])

out_dir = args['out']
if(not os.path.isdir(out_dir)):
    os.mkdir(out_dir)
dataset = "Golden_test_set_de_en.tsv"
out_path = os.path.join(out_dir, 'out_label_logs.txt')
logger = open(out_path, "w")

#----------------------------

SEED = 10
random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)      
    device = torch.device("cuda")
    print(f"Using GPUs!")
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

    
lst_pos_tags = ['NN', 'NNP', 'NNS', 'JJ', 'CD', 'VB', 'VBN', 'VBD', 'VBG', 'RB', 'VBP', 'VBZ', 'NNPS', 'JJS']
stop_words = stopwords.words('english')

ROBERTA_MODEL = "xlm-roberta-base"
max_len_english = 64
top_n = 20
language = "en"
max_ngram_size = 1
deduplication_threshold = 0.9
deduplication_algo = 'seqm'
windowSize = 1
custom_kw_extractor = yake.KeywordExtractor(lan=language, n=max_ngram_size, dedupLim=deduplication_threshold, dedupFunc=deduplication_algo, windowsSize=windowSize, top=top_n, features=None)

nlp = spacy.load("en_core_web_sm")
print("Modules Loaded")


tokenizer = AutoTokenizer.from_pretrained(ROBERTA_MODEL)
model = RobertaForMaskedLM.from_pretrained(model_dir, use_safetensors = True)
model.to(device)
model.eval()
print("Model loaded")


#----------------------------

def tokenize_sentence(txt, tokenizer):
    """
    Sentence tokenizer
    """
    result = tokenizer(txt, max_length=max_len_english, padding='max_length', truncation=True)
    word_ids = result.word_ids()
    if tokenizer.is_fast:
        result["word_ids"] = [word_ids[i] for i in range(len(result["input_ids"]))]
    return result

def get_word_mapping(tok):
    """
    once sentence is converted into token, this function maps the word id to token id
    """
    word_ids = tok["word_ids"].copy()
    mapping = collections.defaultdict(list)
    current_word_index = -1
    current_word = None
    for idx, word_id in enumerate(word_ids):
        if word_id is not None:
            if word_id != current_word:
                current_word = word_id
                current_word_index += 1
            mapping[current_word_index].append(idx)
    return mapping

def get_pos_tags(doc):
    """
    From the sentence we get the POS tags, used in masking
    """
    pos_tags = {}
    for token in doc:
        if(not (token.is_stop or token.is_punct or token.is_space or token.text.lower() in stop_words)):
            if(token.tag_ in lst_pos_tags):
                pos_tags[token.text] = token.tag_
    return pos_tags

def get_mask_phrases(txt, tok, mapping, add_pos):
    """
    This function mask the phrases from the sentence
    """
    prev_word = None
    prev_id = None
    next = False
    if(mask_random):
        n_sample = math.ceil(0.15*len(mapping))
        mask = random.sample(range(len(mapping)),n_sample)
        mask_words = []
        for idx in mask:
            start, end = tok.word_to_chars(idx)
            word = txt[start:end].lower()
            mask_words.append(word)
    else:
        yake_doc = txt.replace(tokenizer.eos_token, "")
        yake_doc = yake_doc.replace(tokenizer.bos_token, "")
        yake_doc = yake_doc.strip()
        max_keyword = max(3, math.ceil(m_ratio*len(mapping)))
        keywords = custom_kw_extractor.extract_keywords(yake_doc)[:max_keyword]
        lst_kw = [kw[0].lower() for kw in keywords]
        if(len(lst_kw)<max_keyword and add_pos):
            n = max_keyword-len(lst_kw)
            txt_doc = nlp(txt)
            pos_tags = get_pos_tags(txt_doc)
            for w in pos_tags:
                if(w not in lst_kw):
                    lst_kw.append(w.lower())
                    n = n-1
                    if(n==0):
                        break

        mask = []
        mask_words = []
        for idx in mapping:
            start, end = tok.word_to_chars(idx)
            word = txt[start:end].lower()
            if word in lst_kw or next:
                if prev_word is not None:
                    mask.append(prev_id)
                    mask_words.append(prev_word)
                    mask.append(idx)
                    mask_words.append(word)
                    prev_word = None
                else:
                    mask.append(idx)
                    mask_words.append(word)
                    prev_word = None
                if word in lst_kw:
                    next = True
                else:
                    next = False
            else:
                prev_word = word
                prev_id = idx
                next = False
    return mask, mask_words


def get_mask_words(txt, tok, mapping, add_pos):
    """
    This function mask the words from the sentence
    """
    
    if(mask_random):
        n_sample = math.ceil(0.15*len(mapping))
        mask = random.sample(range(len(mapping)),n_sample)
        mask_words = []
        for idx in mask:
            start, end = tok.word_to_chars(idx)
            word = txt[start:end].lower()
            mask_words.append(word)
    else:
        yake_doc = txt.replace(tokenizer.eos_token, "")
        yake_doc = yake_doc.replace(tokenizer.bos_token, "")
        yake_doc = yake_doc.strip()
        max_keyword = max(3, math.ceil(m_ratio*len(mapping)))
        keywords = custom_kw_extractor.extract_keywords(yake_doc)[:max_keyword]
        lst_kw = [kw[0].lower() for kw in keywords]
        if(len(lst_kw)<max_keyword and add_pos):
            n = max_keyword-len(lst_kw)
            txt_doc = nlp(txt)
            pos_tags = get_pos_tags(txt_doc)
            for w in pos_tags:
                if(w not in lst_kw):
                    #lst_kw.append(w)
                    lst_kw.append(w.lower())
                    n = n-1
                    if(n==0):
                        break

        mask = []
        mask_words = []
        for idx in mapping:
            start, end = tok.word_to_chars(idx)
            word = txt[start:end].lower()
            if word in lst_kw:
                mask.append(idx)
                mask_words.append(word)
    return mask, mask_words

def get_masked_tokens(tokenizer, tok, mapping, mask):
    """
    once we get the mask word id,this function replace with masked tokens
    """
    input_ids = tok["input_ids"].copy()
    labels = [-100]*len(input_ids)
    for word_id in mask:
        for idx in mapping[word_id]:
            labels[idx] = input_ids[idx]
            input_ids[idx] = tokenizer.mask_token_id
    return input_ids, labels

def evaluate(input_id, lbl, attn_mask):
    """
    evaluate the each sentence
    """
    b_input_ids = torch.tensor([input_id], dtype=torch.long).to(device)
    b_labels = torch.tensor([lbl], dtype=torch.long).to(device)
    b_attn_mask = torch.tensor([attn_mask], dtype=torch.long).to(device)

    with torch.no_grad():
        inputs_embeds = model.roberta.embeddings.word_embeddings(b_input_ids)
        output = model(inputs_embeds=inputs_embeds, attention_mask=b_attn_mask, labels=b_labels)
        loss = output.loss.item()
    return loss

def calculate_score(english, german):
    tok_english = tokenize_sentence(english, tokenizer)
    map_english_words = get_word_mapping(tok_english)
    mask, mask_words = get_mask_phrases(english, tok_english, map_english_words, False)
    english_masked, label = get_masked_tokens(tokenizer, tok_english, map_english_words, mask)

    tok_german = tokenize_sentence(german, tokenizer)
    german_labels = [-100]*len(tok_german['input_ids'])
    input_id = tok_german['input_ids']+english_masked
    label = german_labels+label
    
    attn_mask = [1]*(len(input_id))
    attn_mask.extend([0]*(max_len-len(input_id)))
    
    input_id.extend([tokenizer.pad_token_id]*(max_len - len(input_id)))
    label.extend([-100]*(max_len - len(label)))
    logger.write(f"mask_words: {mask_words}\n")
    logger.write(f"mask: {mask}\n")
    
    score = evaluate(input_id, label, attn_mask)
    return round(score, 4), mask_words

In [4]:
# loading test set
dataset = 'Golden_test_set_de_en.tsv'
df_test_set = pd.read_csv(dataset, sep = "\t")
df_test_set = df_test_set.rename(columns = {"wmt-z:seg" : "HUMAN_score"})

In [5]:
# inferencing for each records
scores = []
masked_words_list = []
for i, row in df_test_set.iterrows():
    german = row['source']
    english = row['output']
    score, masked_words_ = calculate_score(english, german)
    scores.append(score)
    masked_words_list.append(masked_words_)
    print(i, end = "\r")
df_test_set['dial-m_score'] = scores
df_test_set['mask_words'] = masked_words_list

17203


In [6]:
# evaluation: xlm-roberta-base + lr=5e-5 + phrase masking + separate embeddings
x = df_test_set['HUMAN_score']
y = df_test_set['dial-m_score']
pearson_corr, pearson_p_val = pearsonr(x, y)
spearman_corr, spearman_p_val = spearmanr(x, y)
tau_c, pval_c = kendalltau(x, y, variant='c')
tau_b, pval_b = kendalltau(x, y, variant='b')
print("Kendall tau-c:", (round(tau_c, 4), round(pval_c, 4)))
print("Kendall tau-b:", (round(tau_b, 4), round(pval_b, 4)))
print(f"Correlation between GT and score: Pearson = {(round(pearson_corr, 4), round(pearson_p_val, 4))}, Spearman = {(round(spearman_corr, 4), round(spearman_p_val, 4))}")

Kendall tau-c: (-0.3249, 0)
Kendall tau-b: (-0.3378, 0)
Correlation between GT and score: Pearson = (-0.4534, 0.0), Spearman = (-0.4509, 0.0)
