In [None]:
!pip install transformers
!pip install datasets
!pip install accelerate
!pip install google-transliteration-api

In [None]:
import json
import pandas as pd
import numpy as np
from sklearn.utils.class_weight import compute_class_weight
import torch
from datasets import Dataset
from datasets import load_dataset
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from transformers import AutoTokenizer, AutoModelForMaskedLM,AutoModelForSequenceClassification
from transformers import TrainerCallback, EarlyStoppingCallback
from transformers import TrainingArguments, Trainer
import wandb
from transformers.integrations import WandbCallback

In [None]:
train_df = pd.read_csv("train.csv")
test_df = pd.read_csv("test.csv")
val_df = pd.read_csv("val.csv")


train_df = train_df.drop(columns=['Num_Tokens','Num_Sentences'])
test_df = test_df.drop(columns=['Num_Tokens','Num_Sentences'])
val_df = val_df.drop(columns=['Num_Tokens','Num_Sentences'])

In [None]:
dataset = load_dataset('csv', data_files={'train': "final_train.csv",
                                              'val':"final_val.csv",'test':"final_test.csv"})

In [None]:
import pickle
with open('/content/rh-code-mixed-2.pkl', 'rb') as f:
    dict_words = pickle.load(f)

In [None]:
def corrected_preprocess(sentences, related_words):
    texts = sentences['clean_text']
    processed_texts = []

    for text in texts:
        words = text.split()
        process_text = []

        for word in words:
            # Check if the word exists in the related_words dictionary
            if related_words[word].get('corrected_word') != None :
                # Append the h_script value from related_words to the interleaved_text list
                process_text.append(related_words[word]['corrected_word'])
            else:
                # If the word is not in related_words, just append the original word
                process_text.append(word)

        # Join the list back into a string
        processed_texts.append(" ".join(process_text))

    return {"corrected_text": processed_texts}

dataset = dataset.map(
    corrected_preprocess,
    fn_kwargs={'related_words': dict_words},
    batched=True
)

In [None]:
tokenizer = AutoTokenizer.from_pretrained("l3cube-pune/hing-mbert")

mbert_dataset = dataset.map(
    lambda example: tokenizer(example['corrected_text'], max_length=97, padding='max_length', truncation=True),
    batched=True,
    batch_size=64
)
mbert_dataset = mbert_dataset.remove_columns(["clean_text","corrected_text","language_tags"])
mbert_dataset.set_format("torch")

# Define all possible class labels
class_labels = np.unique(mbert_dataset['train']['labels'])

# Calculate class weights
labels = mbert_dataset['train']['labels']
class_weights = compute_class_weight(class_weight='balanced', classes=class_labels, y=labels.numpy())
class_weights = torch.tensor(class_weights, dtype=torch.float)
print(class_weights)

In [None]:
from transformers import TrainingArguments, Trainer
# Define a custom Trainer class to include class weights
class WeightedLossTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels").to(model.device)  # Ensure labels are on the same device as model
        inputs = {k: v.to(model.device) for k, v in inputs.items()}  # Ensure all inputs are on the same device as model
        # Forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        # Compute weighted loss
        loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights.to(model.device))  # Move class_weights to the same device as model
        loss = loss_fct(logits, labels)
        return (loss, outputs) if return_outputs else loss

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    precision = precision_score(labels, predictions, average='weighted')
    recall = recall_score(labels, predictions, average='weighted')
    f1 = f1_score(labels, predictions, average='weighted')
    accuracy = accuracy_score(labels, predictions)

    # Additional metrics
    recall_micro = recall_score(labels, predictions, average='micro')
    f1_micro = f1_score(labels, predictions, average='micro')

    recall_macro = recall_score(labels, predictions, average='macro')
    f1_macro = f1_score(labels, predictions, average='macro')

    recall_positive = recall_score(labels, predictions, pos_label=1)
    f1_positive = f1_score(labels, predictions, pos_label=1)

    recall_negative = recall_score(labels, predictions, pos_label=0)
    f1_negative = f1_score(labels, predictions, pos_label=0)

    return {
        'accuracy': accuracy,
        'precision_weighted': precision,
        'recall_weighted': recall,
        'recall_micro': recall_micro,
        'recall_macro': recall_macro,
        'recall_positive': recall_positive,
        'recall_negative': recall_negative,
        'f1_weighted': f1,
        'f1_micro': f1_micro,
        'f1_macro': f1_macro,
        'f1_positive': f1_positive,
        'f1_negative': f1_negative
    }

In [None]:
wandb.init(project="rh1", name="Hing_mBERT_corrected")
model = AutoModelForSequenceClassification.from_pretrained("l3cube-pune/hing-mbert", num_labels=2)

arguments = TrainingArguments(
    output_dir="sample_HingMBert_trainer_5k_corrected_text",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    optim = 'adamw_torch',
    evaluation_strategy="epoch", # run validation at the end of each epoch
    save_strategy="epoch",
    learning_rate=2e-5,
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1_weighted',  # Define the metric for early stopping
    greater_is_better=True,  # Set to False because we want to minimize the loss
    save_total_limit=1,
    seed=224,
    report_to="wandb"

)



trainer = WeightedLossTrainer(
    model=model,
    args=arguments,
    train_dataset=mbert_dataset['train'],
    eval_dataset=mbert_dataset['val'], # change to test when you do your final evaluation!
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
class LoggingCallback(TrainerCallback):
    def __init__(self, log_path):
        self.log_path = log_path
    # will call on_log on each logging step, specified by TrainerArguement. (i.e TrainerArguement.logginng_step)
    def on_log(self, args, state, control, logs=None, **kwargs):
        _ = logs.pop("total_flos", None)
        if state.is_local_process_zero:
            with open(self.log_path, "a") as f:
                f.write(json.dumps(logs) + "\n")


In [None]:
trainer.add_callback(EarlyStoppingCallback(early_stopping_patience=4, early_stopping_threshold=0.01))
trainer.add_callback(LoggingCallback("sample_HingMBert_trainer_5k_corrected_text/log.jsonl"))

# train the model
trainer.train()

mbert_results = trainer.evaluate()

test_results_mbert = trainer.predict(mbert_dataset['test'])
test_results_mbert.predictions.argmax(axis=1)
test_df['corrected'] = test_results_mbert.predictions.argmax(axis=1)

## Transliterated Words

In [None]:
from google.transliteration import transliterate_word
def transliterated_preprocess(sentences, related_words):
    texts = sentences['clean_text']
    processed_texts = []

    for text in texts:
        words = text.split()
        process_text = []

        for word in words:
            # Check if the word exists in the related_words dictionary
            if related_words[word].get('h_script') != None :
                # Append the h_script value from related_words to the interleaved_text list
                process_text.append(related_words[word]['h_script'])
            else:
                # If the word is not in related_words, just append the original word
                suggestions = transliterate_word(word, lang_code='hi')
                process_text.append(suggestions[0])

        # Join the list back into a string
        processed_texts.append(" ".join(process_text))

    return {"transliterated_text": processed_texts}

trans_dataset = dataset.map(
    transliterated_preprocess,
    fn_kwargs={'related_words': dict_words},
    batched=True
)

In [None]:
mbert_trans_dataset = trans_dataset.map(
    lambda example: tokenizer(example['transliterated_text'],max_length=97,padding='max_length', truncation=True),
    batched=True,
    batch_size=64
)
mbert_trans_dataset = mbert_trans_dataset.remove_columns(["clean_text","language_tags","transliterated_text"])
mbert_trans_dataset.set_format("torch")

In [None]:
model = AutoModelForSequenceClassification.from_pretrained("l3cube-pune/hing-mbert", num_labels=2)

model.resize_token_embeddings(len(tokenizer))

arguments = TrainingArguments(
    output_dir="sample_HingMBert_trainer_5k_hi_dev",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    optim = 'adamw_torch',
    evaluation_strategy="epoch", # run validation at the end of each epoch
    save_strategy="epoch",
    learning_rate=3e-5,
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1_weighted',  # Define the metric for early stopping
    greater_is_better=True,  # Set to False because we want to minimize the loss
    save_total_limit=1,
    seed=224,
    report_to="wandb"
)


trans_trainer = WeightedLossTrainer(
    model=model,
    args=arguments,
    train_dataset=mbert_dataset['train'],
    eval_dataset=mbert_dataset['val'], # change to test when you do your final evaluation!
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
trans_trainer.add_callback(EarlyStoppingCallback(early_stopping_patience=4, early_stopping_threshold=0.01))
trans_trainer.add_callback(LoggingCallback("sample_HingMBert_trainer_5k_hi_dev/log.jsonl"))
trans_trainer.train()

mbert_trans_results = trans_trainer.evaluate()

test_results_mbert = trans_trainer.predict(mbert_trans_dataset['test'])
test_results_mbert.predictions.argmax(axis=1)
test_df['transliterated'] = test_results_mbert.predictions.argmax(axis=1)

## transliterated + Skip English Word

In [None]:
def transliterated_preprocess(sentences, related_words):
    texts = sentences['clean_text']
    processed_texts = []

    for text in texts:
        words = text.split()
        process_text = []

        for word in words:
            if related_words[word].get('language') == 'HI':
                process_text.append(related_words[word]['h_script'])
            else:
              if related_words[word].get('corrected_word') != None:
                process_text.append(related_words[word]['corrected_word'])
              else:
                process_text.append(word)

        # Join the list back into a string
        processed_texts.append(" ".join(process_text))

    return {"transliterated_skip_eng": processed_texts}

trans_skip_dataset = dataset.map(
    transliterated_preprocess,
    fn_kwargs={'related_words': dict_words},
    batched=True
)

In [None]:
mbert_trans_skip_dataset = trans_skip_dataset.map(
    lambda example: tokenizer(example['transliterated_skip_eng'],max_length=97,padding='max_length', truncation=True),
    batched=True,
    batch_size=64
)
mbert_trans_skip_dataset = mbert_trans_skip_dataset.remove_columns(["clean_text","language_tags","transliterated_skip_eng"])
mbert_trans_skip_dataset.set_format("torch")

In [None]:
arguments = TrainingArguments(
    output_dir="sample_HingMBert_trainer_5k_skip_eng",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    optim = 'adamw_torch',
    evaluation_strategy="epoch", # run validation at the end of each epoch
    save_strategy="epoch",
    learning_rate=5e-5,
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1_weighted',  # Define the metric for early stopping
    greater_is_better=True,  # Set to False because we want to minimize the loss
    save_total_limit=1,
    seed=224,
    report_to="wandb"
)

trans_skip_trainer = WeightedLossTrainer(
    model=model,
    args=arguments,
    train_dataset=mbert_trans_skip_dataset['train'],
    eval_dataset=mbert_trans_skip_dataset['val'], # change to test when you do your final evaluation!
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
trans_skip_trainer.add_callback(EarlyStoppingCallback(early_stopping_patience=4, early_stopping_threshold=0.01))
trans_skip_trainer.add_callback(LoggingCallback("sample_HingMBert_trainer_5k_skip_eng/log.jsonl"))
trans_skip_trainer.train()

mbert_trans_skip_results = trans_skip_trainer.evaluate()

test_results_mbert = trans_skip_trainer.predict(mbert_trans_skip_dataset['test'])
test_results_mbert.predictions.argmax(axis=1)
test_df['transliterated_skip'] = test_results_mbert.predictions.argmax(axis=1)

## Interleaved Language tags

In [None]:
def interleaved_preprocess(sentences):
    texts = sentences['clean_text']
    tags = sentences['language_tags']

    processed_texts = []

    for text, tag in zip(texts, tags):
        words = text.split()
        tags_list = tag.split(", ")
        interleaved_text = " ".join([f"{related_words[word].get('corrected_word',word)} [{tag}]" for word, tag in zip(words, tags_list)])
        processed_texts.append(interleaved_text)

    return {"interleaved_text_tags": processed_texts}

itags_dataset = dataset.map(interleaved_preprocess, batched=True)

In [None]:
wandb.init(project="rh4", name="Hing_mBERT_itag")
model = AutoModelForSequenceClassification.from_pretrained("l3cube-pune/hing-mbert", num_labels=2)

model.resize_token_embeddings(len(tokenizer))

arguments = TrainingArguments(
    output_dir="sample_HingMBert_trainer_5k_itags",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    optim = 'adamw_torch',
    evaluation_strategy="epoch", # run validation at the end of each epoch
    save_strategy="epoch",
    learning_rate=5e-5,
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1_weighted',  # Define the metric for early stopping
    greater_is_better=True,  # Set to False because we want to minimize the loss
    save_total_limit=1,
    seed=224,
    report_to="wandb"
)


itags_trainer = WeightedLossTrainer(
    model=model,
    args=arguments,
    train_dataset=itags_dataset['train'],
    eval_dataset=itags_dataset['val'], # change to test when you do your final evaluation!
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)

In [None]:
itags_trainer.add_callback(EarlyStoppingCallback(early_stopping_patience=4, early_stopping_threshold=0.01))
itags_trainer.add_callback(LoggingCallback("sample_HingMBert_trainer_5k_skip_eng/log.jsonl"))
itags_trainer.train()

itags_results = itags_trainer.evaluate()

test_results_mbert = itags_trainer.predict(itags_dataset['test'])
test_results_mbert.predictions.argmax(axis=1)
test_df['transliterated_skip'] = test_results_mbert.predictions.argmax(axis=1)

## Adjacent Sentence Language Tags

In [None]:
def adjacent_preprocess(sentences):
    texts = sentences['clean_text']
    tags = sentences['language_tags']

    processed_texts = []

    for text, tag in zip(texts, tags):
        if isinstance(tag, str):
            tag = tag.split(', ')  # Convert a string of tags into a list of tags

        tag_string = " ".join([f"[{label}]" for label in tag])
        text = " ".join([f"{related_words[word].get('corrected_word',word)}" for word in text.split(" ")])
        adjacent_text = f"{text} {tag_string}"
        processed_texts.append(adjacent_text)

    return {"adjacent_text_tags": processed_texts}

atags_dataset = dataset.map(adjacent_preprocess, batched=True)

In [None]:
wandb.init(project="rh4", name="Hing_mBERT_itag")
model = AutoModelForSequenceClassification.from_pretrained("l3cube-pune/hing-mbert", num_labels=2)

model.resize_token_embeddings(len(tokenizer))

arguments = TrainingArguments(
    output_dir="sample_HingMBert_trainer_5k_itags",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=16,
    num_train_epochs=10,
    optim = 'adamw_torch',
    evaluation_strategy="epoch", # run validation at the end of each epoch
    save_strategy="epoch",
    learning_rate=5e-5,
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1_weighted',  # Define the metric for early stopping
    greater_is_better=True,  # Set to False because we want to minimize the loss
    save_total_limit=1,
    seed=224,
    report_to="wandb"
)


atags_trainer = WeightedLossTrainer(
    model=model,
    args=arguments,
    train_dataset=atags_dataset['train'],
    eval_dataset=atags_dataset['val'], # change to test when you do your final evaluation!
    tokenizer=tokenizer,
    compute_metrics=compute_metrics
)