# Apply the models on the Test set

In [None]:
!pip install sentencepiece transformers==4.33 datasets sacremoses sacrebleu -q

In [None]:
import pandas as pd
from tqdm.auto import tqdm, trange
from transformers import AutoModelForSeq2SeqLM, NllbTokenizer

In [None]:
# The project folder can be saved in google drive and accessed through Google colab
from google.colab import drive
# Mount Google Drive
drive.mount('/content/drive')

In [None]:
####################
### Change here ####
####################
# path to the project folder that contains a folder data
path_project = "/content/drive/MyDrive/German_to_Swiss_Translation_ANLP_2023"

# Path to where the models are stored
PATH_MODELS = "/content/drive/MyDrive/anlp_project/NLLB-200/ANLP_SUBMISSION/models"

import os
os.chdir(path_project)

In [None]:
def translate(text, src_lang, tgt_lang, a=16, b=1.5, max_input_length=1024, **kwargs):
    tokenizer.src_lang = src_lang
    tokenizer.tgt_lang = tgt_lang
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True, max_length=max_input_length)
    result = model.generate(
        **inputs.to(model.device),
        forced_bos_token_id=tokenizer.convert_tokens_to_ids(tgt_lang),
        max_new_tokens=int(a + b * inputs.input_ids.shape[1]),
        **kwargs
    )
    return tokenizer.batch_decode(result, skip_special_tokens=True)

def batched_translate(texts, batch_size=16, **kwargs):
    """Translate texts in batches of similar length"""
    idxs, texts2 = zip(*sorted(enumerate(texts), key=lambda p: len(p[1]), reverse=True))
    results = []
    for i in trange(0, len(texts2), batch_size):
        results.extend(translate(texts2[i: i+batch_size], **kwargs))
    return [p for i, p in sorted(zip(idxs, results))]

In [None]:
df_test = pd.read_csv(f"Data/df_test.csv")

In [None]:
def fix_tokenizer2n(tokenizer, new_lang_tokens):
    """
    Add a new language token to the tokenizer vocabulary,
    this should be done each time after its initialization.
    This function is used when the model already contains another newly added token
    e.g. "ch_vs" or "ch_gr"
    """

    print(f"BEFORE IDs: {tokenizer.convert_tokens_to_ids(new_lang_tokens[::-1] + ['<mask>'])}")
    print(f"BEFORE Tokens: {tokenizer.convert_ids_to_tokens(tokenizer.convert_tokens_to_ids(new_lang_tokens[::-1] + ['<mask>']))}")

    # how many of the tokens are actually in tokenizer.added_tokens_encoder
    n_added_tokens = len([token for token in new_lang_tokens if token in tokenizer.added_tokens_encoder])

    # get the old/ original length of the tokenizer
    old_len = len(tokenizer) - n_added_tokens

    # move the new tokens in the previous position
    for i, token in enumerate(new_lang_tokens):
      tokenizer.lang_code_to_id[token] = old_len-i
      tokenizer.id_to_lang_code[old_len-old_len-n_added_tokens-i] = token

    # always move "mask" to the last position
    tokenizer.fairseq_tokens_to_ids["<mask>"] = len(tokenizer.sp_model) + len(tokenizer.lang_code_to_id) + tokenizer.fairseq_offset

    tokenizer.fairseq_tokens_to_ids.update(tokenizer.lang_code_to_id)
    tokenizer.fairseq_ids_to_tokens = {v: k for k, v in tokenizer.fairseq_tokens_to_ids.items()}

    # if the token is not yet trained in the model, add it to the special tokens
    for token in new_lang_tokens:
      if token not in tokenizer._additional_special_tokens:
          tokenizer._additional_special_tokens.append(token)

    # clear the added token encoder; otherwise a new token may end up there by mistake
    tokenizer.added_tokens_encoder = {}
    tokenizer.added_tokens_decoder = {}

    print(f"AFTER IDs: {tokenizer.convert_tokens_to_ids(new_lang_tokens[::-1] + ['<mask>'])}")
    print(f"AFTER Tokens: {tokenizer.convert_ids_to_tokens(tokenizer.convert_tokens_to_ids(new_lang_tokens[::-1] + ['<mask>']))}")

def fix_tokenizer1n(tokenizer, new_lang_token):
    """
    Add a new language token to the tokenizer vocabulary,
    this should be done each time after its initialization.
    This function is used when the model does not contain another newly added token.
    """
    old_len = len(tokenizer) - int(new_lang_token in tokenizer.added_tokens_encoder)
    tokenizer.lang_code_to_id[new_lang_token] = old_len-1
    tokenizer.id_to_lang_code[old_len-1] = new_lang_token
    # always move "mask" to the last position
    tokenizer.fairseq_tokens_to_ids["<mask>"] = len(tokenizer.sp_model) + len(tokenizer.lang_code_to_id) + tokenizer.fairseq_offset

    tokenizer.fairseq_tokens_to_ids.update(tokenizer.lang_code_to_id)
    tokenizer.fairseq_ids_to_tokens = {v: k for k, v in tokenizer.fairseq_tokens_to_ids.items()}
    if new_lang_token not in tokenizer._additional_special_tokens:
        tokenizer._additional_special_tokens.append(new_lang_token)
    # clear the added token encoder; otherwise a new token may end up there by mistake
    tokenizer.added_tokens_encoder = {}
    tokenizer.added_tokens_decoder = {}

    print(f"Length of the tokenizer: {len(tokenizer)}")
    print(f"IDs: {tokenizer.convert_tokens_to_ids([new_lang_token, '<mask>'])}")
    print(f"IDs: {tokenizer.convert_ids_to_tokens(tokenizer.convert_tokens_to_ids([new_lang_token, '<mask>']))}")

In [None]:
models = {
    "init_none" : [f"{PATH_MODELS}/nllb-de-be_initNONE_best_checkpoint", ["ch_be"]],
    "init_vs" : [f"{PATH_MODELS}/nllb-de-be_initvs_best_checkpoint", ["ch_be", "ch_vs"]],
    "init_gr_large" : [f"{PATH_MODELS}/nllb-de-be_initgrLarge_best_checkpoint", ["ch_be", "ch_gr"]],
    "init_de" :[f"{PATH_MODELS}/nllb-de-be_initde_best_checkpoint", ["ch_be"]],
    "init_average": [f"{PATH_MODELS}/nllb-de-be_init_average_v1_best_checkpoint", ["ch_be"]],
    "init_gr_small": [f"{PATH_MODELS}/nllb-de-be_initgrSmall_best_checkpoint", ["ch_be", "ch_gr"]]
}

for key, value in models.items():
  model = AutoModelForSeq2SeqLM.from_pretrained(value[0])
  tokenizer = NllbTokenizer.from_pretrained(value[0])

  if len(value[1]) == 2:
    fix_tokenizer2n(tokenizer, new_lang_tokens=value[1])
  elif len(value[1]) == 1:
    fix_tokenizer1n(tokenizer, new_lang_token=value[1][0])

  df_test[key] = [translate(t, "ch_be", 'deu_Latn')[0] for t in tqdm(df_test["ch_be"])]

In [None]:
# Filename
#df_test.to_excel("df_test_output.xlsx")

# Apply the model on the Synthetic Test Set

In [None]:
# Path to the file that contains the synthetic source and target sentences
df_syn = pd.read_excel("Results/Manual_Syntax_Evaluation_Synt_TestSet.xlsx")
df_syn = df_syn[["de", "be"]]

In [None]:
for key, value in models.items():
  model = AutoModelForSeq2SeqLM.from_pretrained(value[0])
  tokenizer = NllbTokenizer.from_pretrained(value[0])

  if len(value[1]) == 2:
    fix_tokenizer2n(tokenizer, new_lang_tokens=value[1])
  elif len(value[1]) == 1:
    fix_tokenizer1n(tokenizer, new_lang_token=value[1][0])

  df_test[key] = [translate(t, "ch_be", 'deu_Latn')[0] for t in tqdm(df_syn["be"])]

In [None]:
# Filename
#df_test.to_excel("df_syn_test_output.xlsx")