In [None]:
!pip install -U transformers

In [None]:
import pandas as pd
import torch

seed = 19
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# get rid of duplicates in the source language
d = {}
with open('/content/drive/MyDrive/data/en-kw.tsv', 'rt') as f:
  for line in f:
    try:
      en, kw = line.strip().split('\t')
      if en not in d:
        d[en] = kw
    except:
      print(line)
      continue

with open('/content/drive/MyDrive/data/en-kw-no-dups.tsv', 'wt') as f:
  for en, kw in d.items():
    f.write(f'{en}\t{kw}\n')


In [None]:
trans_df = pd.read_csv('/content/drive/MyDrive/data/en-kw-no-dups.tsv', sep="\t")
trans_df = trans_df.dropna()
train = trans_df.sample(frac=0.9, random_state=seed)
test = trans_df.drop(train.index)

In [None]:
trans_df = pd.read_csv('/content/drive/MyDrive/data/cornwall-council-2025-09-17.csv', sep=",", header= None, names = ["en", "kw"])
trans_df = trans_df.dropna()
train = trans_df.sample(frac=0.95, random_state=seed)
test = trans_df.drop(train.index)

In [None]:
print(train.shape)
print(test.shape)

In [None]:
# using tatoeba for train and wikimedia for test
train = pd.read_csv('/content/drive/MyDrive/data/tatoeba.tsv', sep="\t")
test = pd.read_csv('/content/drive/MyDrive/data/wikimedia.tsv', sep="\t")
train = train.dropna()
test = test.dropna()
print(train.shape)
print(test.shape)

In [None]:
from transformers import NllbTokenizer

def fix_tokenizer(tokenizer, new_lang="cor_Latn"):

    # Add as a new special token in the tokenizer
    if new_lang not in tokenizer.get_vocab():
        tokenizer.add_special_tokens({"additional_special_tokens": [new_lang]})
        print(f"Added new language token: {new_lang}")
    else:
        print(f"Language token {new_lang} already exists in vocab.")

    return


In [None]:
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
model_name = "facebook/nllb-200-distilled-600M"
transfer_embeds = True

# loading the tokenizer and the model
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
# patching them
fix_tokenizer(tokenizer)
model.resize_token_embeddings(len(tokenizer))

# fixing the new/moved token embeddings in the model
added_token_id = tokenizer.convert_tokens_to_ids('cor_Latn')
similar_lang_id = tokenizer.convert_tokens_to_ids('cym_Latn')
embeds = model.model.shared.weight.data

# initializing new language token with welsh embeddings
if transfer_embeds:
  embeds[added_token_id] = embeds[similar_lang_id]

# initialising randomly
else:
  embedding_dim = embeds.shape[1]
  embeds[added_token_id] = torch.randn(embedding_dim) * model.config.init_std


In [None]:
import gc
import random
import numpy as np
import torch
from tqdm.auto import tqdm, trange
from transformers.optimization import Adafactor
from transformers import get_constant_schedule_with_warmup

def cleanup():
    # Free up GPU memory
    gc.collect()
    torch.cuda.empty_cache()

cleanup()

In [None]:
model.cuda()

In [None]:
optimizer = Adafactor(
    [p for p in model.parameters() if p.requires_grad],
    scale_parameter=False,
    relative_step=False,
    lr=1e-4,
    clip_threshold=1.0,
    weight_decay=1e-3,
)

In [None]:
batch_size = 8
max_length = 128
warmup_steps = 500
training_steps = len(train) // batch_size

In [None]:
losses = []
scheduler = get_constant_schedule_with_warmup(optimizer, num_warmup_steps=warmup_steps)

In [None]:

def get_batch_pairs(batch_size, data=train):
    xx, yy = [], []
    for _ in range(batch_size):
        item = data.iloc[random.randint(0, len(data)-1)]
        xx.append(item['en'])
        yy.append(item['kw'])
    return xx, yy

print(get_batch_pairs(1))

In [None]:
model.train()
x, y, loss = None, None, None
cleanup()

lang1 = 'eng_Latn'
lang2 = 'cor_Latn'

epochs = 1
for epoch in range(epochs):
    tq = trange(training_steps, desc=f"Epoch {epoch+1}/{epochs}")
    for i in tq:
      xx, yy = get_batch_pairs(batch_size)
      try:
          tokenizer.src_lang = lang1
          x = tokenizer(xx, return_tensors='pt', padding=True, truncation=True, max_length=max_length).to(model.device)
          tokenizer.src_lang = lang2
          y = tokenizer(yy, return_tensors='pt', padding=True, truncation=True, max_length=max_length).to(model.device)
          y.input_ids[y.input_ids == tokenizer.pad_token_id] = -100

          loss = model(**x, labels=y.input_ids).loss
          loss.backward()
          losses.append(loss.item())

          optimizer.step()
          optimizer.zero_grad(set_to_none=True)
          scheduler.step()

      except RuntimeError as e:
          optimizer.zero_grad(set_to_none=True)
          x, y, loss = None, None, None
          cleanup()
          print('error', max(len(s) for s in xx + yy), e)
          continue

      if i % 1000 == 0:
          print(i, np.mean(losses[-1000:]))

# model.save_pretrained(MODEL_SAVE_PATH)
# tokenizer.save_pretrained(MODEL_SAVE_PATH)

In [None]:
def translate(text, src_lang='eng_Latn', tgt_lang='cor_Latn', a=16, b=1.5, max_input_length=1024, **kwargs):
    tokenizer.src_lang = src_lang
    tokenizer.tgt_lang = tgt_lang
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True, max_length=max_input_length)
    result = model.generate(
        **inputs.to(model.device),
        forced_bos_token_id=tokenizer.convert_tokens_to_ids(tgt_lang),
        max_new_tokens=int(a + b * inputs.input_ids.shape[1]),
        **kwargs
    )

    return tokenizer.batch_decode(result, skip_special_tokens=True)

In [None]:
lang1 = 'eng_Latn'
lang2 = 'cor_Latn'

xx, yy = get_batch_pairs(1, data=test)
print(xx)
print(yy)
model.eval()
print(translate(xx[0], lang1, lang2, no_repeat_ngram_size=3, num_beams=5))

In [None]:
# SAVE MODEL
MODEL_SAVE_PATH = '/content/drive/MyDrive/models/nllb-eng-cor-v7'
model.save_pretrained(MODEL_SAVE_PATH)
tokenizer.save_pretrained(MODEL_SAVE_PATH)

In [None]:
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
# LOAD MODEL
model_load_name = '/content/drive/MyDrive/models/nllb-eng-cor-v7'
model = AutoModelForSeq2SeqLM.from_pretrained(model_load_name).cuda()
tokenizer = NllbTokenizer.from_pretrained(model_load_name)
fix_tokenizer(tokenizer)

In [None]:
from tqdm import tqdm

test['translated'] = [translate(t, 'eng_Latn', 'cor_Latn', no_repeat_ngram_size=3, num_beams=5)[0] for t in tqdm(test.en)]


In [None]:
!pip install sacrebleu

In [None]:
import sacrebleu
bleu_calc = sacrebleu.BLEU()
chrf_calc = sacrebleu.CHRF(word_order=2)  # ChrF++

In [None]:
print(bleu_calc.corpus_score(test['translated'].tolist(), [test['kw'].tolist()]))
print(chrf_calc.corpus_score(test['translated'].tolist(), [test['kw'].tolist()]))


In [None]:
test.to_csv('/content/drive/MyDrive/data/translations.tsv', sep = "\t")