<a href="https://colab.research.google.com/github/ulingga/Manchu-English_babyMT/blob/main/Manchu_English_NMT_Tokenization_%26_Fine_Tuning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import nltk
nltk.download('punkt')
from nltk.tokenize import word_tokenize
from collections import Counter


# Load data
url = 'https://raw.githubusercontent.com/ulingga/dataset/main/Manchu_English.csv'
df = pd.read_csv(url)

source_data = df['Manchu']
target_data = df['English']

combined_text = ' '.join(source_data)
tokens = word_tokenize(combined_text)

word_counts = Counter(tokens)
unique_words = list(word_counts.keys())
num_unique_words = len(unique_words)

print(f"Total unique words: {num_unique_words}")
# print(f"Each word's count: {word_counts}")


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Total unique words: 1873


In [None]:
data = []
source_data = df['Manchu']
target_data = df['English']
for src, tgt in zip(source_data, target_data):  # zip() is used to pair elements from two iterables
    data.append(
        {
            "translation": {
                "mc": src.strip(),  # strip() is used to remove leading/trailing whitespace
                "en": tgt.strip()
            }
        }
    )

print(f'total size of data is {len(data)}')

total size of data is 771


In [None]:
!pip install datasets
!pip install sentencepiece
!pip install transformers[torch]
!pip install accelerate -U
!pip install transformers

In [None]:
from datasets import load_dataset
from transformers import (
    MBartForConditionalGeneration, MBartTokenizer,
     Seq2SeqTrainingArguments, Seq2SeqTrainer
   )

import torch
from torch.utils.data import random_split

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from tokenizers import ByteLevelBPETokenizer
from tokenizers.processors import BertProcessing
import os

# 'source_data' contains Manchu texts and 'target_data' contains English translations
texts = source_data.tolist() + target_data.tolist()  # Combine lists

# Initialize a tokenizer
tokenizer = ByteLevelBPETokenizer()

# Customize training
tokenizer.train_from_iterator(
    texts,
    vocab_size=5000,  # You can adjust this size as needed
    min_frequency=2,
    special_tokens=[
        "<s>",
        "<pad>",
        "</s>",
        "<unk>",
        "<mask>",
    ],
)

# Save the tokenizer
tokenizer_path = '/content/drive/MyDrive/tokenizer'

if not os.path.exists(tokenizer_path):
    os.makedirs(tokenizer_path)

# Save the trained tokenizer model
tokenizer.save_model(tokenizer_path)

# Specify the file name for the JSON
tokenizer_json_path = os.path.join(tokenizer_path, "tokenizer.json")

# Save the tokenizer's JSON
tokenizer.save(tokenizer_json_path, pretty=True)  # Saves tokenizer configuration in JSON format

In [None]:
from transformers import PreTrainedTokenizerFast

# Specify the folder where the tokenizer files are stored (vocab.json, merges.txt, tokenizer.json).
tokenizer_path = "/content/drive/MyDrive/tokenizer"  # Adjust this path if necessary.

# Load the tokenizer
tokenizer = PreTrainedTokenizerFast.from_pretrained(tokenizer_path)

# Test the tokenizer
encoded_input = tokenizer.encode("orin de jiyanggūn ing iliha", "the General built camp on the 20th")
decoded_output = tokenizer.decode(encoded_input)  # 'ids' attribute is not required here.
print(decoded_output)


In [None]:
from tokenizers import Tokenizer, models, trainers, pre_tokenizers, decoders, processors


# Load the trained tokenizer
tokenizer = Tokenizer.from_file("/content/drive/MyDrive/tokenizer/tokenizer.json")  # Adjust the path if needed

# Ensure the "<pad>" token is set correctly
tokenizer.enable_padding(pad_id=tokenizer.token_to_id("<pad>"), pad_token="<pad>")

# Load into the transformers' PreTrainedTokenizerFast
transformers_tokenizer = PreTrainedTokenizerFast(tokenizer_object=tokenizer)

# Verify if the padding token is set correctly now
print(transformers_tokenizer.pad_token_id)  # Should not be `None`
print(transformers_tokenizer.pad_token)    # Should be "<pad>"

# Define a function to encode the texts
def encode_data(text):
    return transformers_tokenizer(text, padding='max_length', truncation=True, max_length=256)

# Now apply the function using a list comprehension or similar method
source_data_tokenized = [encode_data(text) for text in source_data.tolist()]
target_data_tokenized = [encode_data(text) for text in target_data.tolist()]






In [None]:
def tokenize_for_model(example):
    # Tokenize the Manchu text and the English translation
    # This will give you the input_ids necessary for the model
    mc_tokenized = transformers_tokenizer.encode_plus(example['translation']['mc'], padding='max_length', truncation=True, max_length=128)
    en_tokenized = transformers_tokenizer.encode_plus(example['translation']['en'], padding='max_length', truncation=True, max_length=128)

    return {
        "input_ids": mc_tokenized['input_ids'],  # Manchu tokens, serving as the model input
        "attention_mask": mc_tokenized['attention_mask'],  # Attention mask for the input (optional, but often helpful)
        "labels": en_tokenized['input_ids']  # English tokens, serving as the expected model output (labels)
    }

# Now, tokenize the entire dataset
tokenized_dataset = [tokenize_for_model(example) for example in data]


In [None]:
from datasets import Dataset
import numpy as np

# Create a Dataset from your processed data
tokenized_dataset = Dataset.from_dict({
    'input_ids': [entry['input_ids'] for entry in tokenized_dataset],
    'attention_mask': [entry['attention_mask'] for entry in tokenized_dataset],  # Optional
    'labels': [entry['labels'] for entry in tokenized_dataset],
})

# Now, you can proceed with your train-test split as you did before.

# Calculate the size of each dataset split, based on the proportions.
total_size = len(tokenized_dataset)
train_size = int(0.75 * total_size)  # 75% for training
test_size = int(0.15 * total_size)  # 15% for testing
val_size = total_size - train_size - test_size  # Remaining 10% for validation

# Generate random but non-overlapping indices for each split.
indices = np.random.permutation(total_size)
train_indices = indices[:train_size]
test_indices = indices[train_size:train_size + test_size]
val_indices = indices[train_size + test_size:]

# Create data subsets using the `.select()` method and the indices.
train_dataset = tokenized_dataset.select(train_indices)
test_dataset = tokenized_dataset.select(test_indices)
val_dataset = tokenized_dataset.select(val_indices)

In [None]:
# Prepare your data: each 'input_ids' and 'labels' should be a flat value, not a list.
# Here is a simplified transformation. Adjust as necessary for your specific case.

def transform_data_for_dataset(data_entry):
    input_ids_list = data_entry['input_ids']  # Ensure this is actually a list of ints
    labels_int = data_entry['labels']  # Assuming labels are single values (e.g., for classification)
    return {'input_ids': input_ids_list, 'labels': labels_int}

transformed_data = [transform_data_for_dataset(entry) for entry in tokenized_dataset]


# Now, you have train_dataset, test_dataset, and val_dataset for training, testing, and validation, respectively.



In [None]:
from typing import List, Dict, Union
from torch.nn.utils.rnn import pad_sequence
import torch

# device = torch.device("cpu") # remove this if you have a GPU

class CustomDataCollator:
    # 'tokenizer' is the parameter name in the method definition.
    # It doesn't have to match the variable name outside of this method.
    def __init__(self, tokenizer, pad_token_id: int, pad_attention_mask: bool = False):
        self.tokenizer = tokenizer  # Here we use 'tokenizer', the name of the parameter, not the external variable name.
        self.pad_token_id = pad_token_id
        self.pad_attention_mask = pad_attention_mask

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        input_ids = []
        attention_masks = []  # Initialize even if they might not be used
        labels = []
        batch_to_return = {}

        for feature in features:
            ids = feature.get('input_ids')
            # Standardizing input_ids to be a Tensor, if it's not already
            if isinstance(ids, list):
                ids = torch.tensor(ids, dtype=torch.long)
            elif not isinstance(ids, torch.Tensor):
                raise ValueError(f"Unexpected type for input_ids: {type(ids)}. Each 'input_ids' should be a list or a Tensor.")

            input_ids.append(ids)

            # If attention_mask is to be processed, either use existing or compute it
            if self.pad_attention_mask:
                attention_mask = feature.get('attention_mask')
                if attention_mask is None:  # Compute attention_mask if it doesn't exist
                    attention_mask = [1] * len(ids)
                elif isinstance(attention_mask, list):
                    attention_mask = torch.tensor(attention_mask, dtype=torch.long)

                attention_masks.append(attention_mask)

        input_ids_padded = pad_sequence(input_ids, batch_first=True, padding_value=self.pad_token_id)
        labels = [torch.tensor(feature['labels'], dtype=torch.long) for feature in features]
        labels_padded = pad_sequence(labels, batch_first=True, padding_value=self.pad_token_id)

        batch_to_return['labels'] = labels_padded
        batch_to_return['input_ids'] = input_ids_padded

        if self.pad_attention_mask:
            attention_masks_padded = pad_sequence(attention_masks, batch_first=True, padding_value=0)  # 0 is standard for attention masks
            batch_to_return['attention_mask'] = attention_masks_padded

        return batch_to_return


In [None]:
save_directory = "/content/drive/MyDrive/my_tokenizer"

# Save your tokenizer to the directory.
transformers_tokenizer.save_pretrained(save_directory)

# Load model
model = MBartForConditionalGeneration.from_pretrained("facebook/mbart-large-cc25")

# Ensure the tokenizer uses the correct pad token
tokenizer.pad_token = transformers_tokenizer.eos_token  # mBART uses the <eos> token as the padding token
pad_token_id = transformers_tokenizer.pad_token_id  # gets the pad token id which is recognized by the tokenizer


# 'transformers_tokenizer' is the variable name in the outer scope.
data_collator = CustomDataCollator(tokenizer=transformers_tokenizer, pad_token_id=pad_token_id, pad_attention_mask=True) # or False, based on your preference

# torch.cuda.empty_cache()


In [None]:
# Training arguments setting
training_args = Seq2SeqTrainingArguments(
    output_dir="/content/drive/MyDrive/fine_tune_mbart",  # The directory where the model predictions and checkpoints will be written.
    do_train=True,
    do_eval=True,
    evaluation_strategy="steps",
    per_device_train_batch_size=1,  # Adjust based on your GPU's capabilities
    per_device_eval_batch_size=1,
    predict_with_generate=True,
    logging_steps=100,
    save_steps=100,
    eval_steps=100,
    warmup_steps=100,
    num_train_epochs=1,  # You can adjust the number of training epochs
    fp16=True,  # If you have a supported GPU, you can enable fp16 for faster training
)

# Check if GPU is available and use it
# device = "cuda" if torch.cuda.is_available() else "cpu"
# model.to(device)



In [None]:
# Create Trainer
trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    data_collator=data_collator,
    tokenizer=transformers_tokenizer,

)

# Start training
trainer.train()

# Define model's name
model_name = "manchu_to_english_babyMT"

# Save the model
model_save_path = f"/content/drive/MyDrive/fine_tune_mbart/{model_name}"
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

# Specify the path to your model
path_to_model = "/content/drive/MyDrive/fine_tune_mbart/finished_model"

# Load the model and tokenizer from the saved model directory
model = AutoModelForSeq2SeqLM.from_pretrained(path_to_model)
tokenizer = AutoTokenizer.from_pretrained(path_to_model)

# Now you can use 'model' and 'tokenizer' with your data for inference.
model.save_pretrained(model_save_path)

In [None]:
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
# Ensure this path points to the directory where the model and tokenizer were saved
path_to_saved_model = "/content/drive/MyDrive/fine_tune_mbart/finished_model"  # Update with your path

# Load the model and tokenizer
model = AutoModelForSeq2SeqLM.from_pretrained(path_to_saved_model)
tokenizer = AutoTokenizer.from_pretrained(path_to_saved_model)

# Sample sentence for translation
sample_sentence = "dergici. hafan coohai sain be fonjiha."

# Encode the sentence
inputs = tokenizer(
    sample_sentence,
    return_tensors="pt",
    padding=True,
    truncation=True,
    max_length=512  # or another appropriate value based on your model's configuration
)

# Translate the sentence
model.eval()  # Make sure the model is in evaluation mode
with torch.no_grad():  # No need to track gradients for inference
    outputs = model.generate(**inputs, forced_bos_token_id=tokenizer.lang_code_to_id["en_XX"])

# Decode the model output to get the translation
translated_sentence = tokenizer.decode(outputs[0], skip_special_tokens=True)

print("Translated Sentence: ", translated_sentence)
