In [1]:
import pandas as pd 
import numpy as np
import torch

data = pd.read_csv("/kaggle/input/luat123/error_dataset.csv")
data

In [2]:
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(data, test_size=10000, random_state=42)

In [3]:
val_df

In [4]:
def split_token(text):
    return text.strip().split()
def split_label(mask_string):
    return [int(char) for char in mask_string.strip()]

val_df['corrupted_2'] = val_df['corrupted'].apply(split_token)
val_df['mask'] = val_df['mask'].apply(split_label)
val_df

In [5]:
def detect_error(model, tokenizer, tokens, max_len=128, device='cuda'):
    model.eval()
    model.to(device)

    encoding = tokenizer(tokens,
                         is_split_into_words=True,
                         padding="max_length",
                         truncation=True,
                         max_length=max_len,
                         return_tensors='pt'
                         #return_tensors=None
                        ) 
    word_ids = encoding.word_ids()
    encoding = {k: v.to(device) for k, v in encoding.items()} # trả về tensor cuda và mask cuda
    

    # prediction
    with torch.no_grad():  
        outputs = model(**encoding) 
        logits = outputs.logits 
        preds = torch.argmax(logits, dim=-1)[0].cpu().numpy()

    result = [0] * len(tokens)
    prev_word_idx = None
    for token_idx, word_idx in enumerate(word_ids):
        if word_idx is None:
            continue
        if word_idx != prev_word_idx:
            result[word_idx] = preds[token_idx]
            prev_word_idx = word_idx

    return [x for x in result]


In [6]:
!pip install Levenshtein

In [7]:
import torch
from Levenshtein import distance as levenshtein_distance

def correct_error_encoder_top_k(model, tokenizer, corrupted_tokens, error_flags, ground_truth_tokens,
                                           top_k=10, max_len=128, device='cuda'):
    assert len(corrupted_tokens) == len(error_flags) == len(ground_truth_tokens), "Unmatch Length!"
    model.eval()
    model.to(device)

    masked_tokens = [
        tokenizer.mask_token if flag else tok
        for tok, flag in zip(corrupted_tokens, error_flags)
    ]
    masked_text = " ".join(masked_tokens)

    inputs = tokenizer(masked_text,
                       return_tensors="pt",
                       max_length=max_len,
                       padding="max_length",
                       truncation=True,
                       is_split_into_words=False).to(device)

    word_ids = inputs.word_ids()
    input_ids = inputs["input_ids"][0]

    with torch.no_grad():
        logits = model(**inputs).logits  # (1, seq_len, vocab_size)

    printed = set()
    replaced_tokens = corrupted_tokens.copy()
    

    for token_idx, word_idx in enumerate(word_ids):
        if word_idx is None or word_idx in printed or error_flags[word_idx] != 1:
            continue
        printed.add(word_idx)

        token_logits = logits[0, token_idx]
        topk_ids = torch.topk(token_logits, top_k).indices.tolist()
        topk_tokens = tokenizer.convert_ids_to_tokens(topk_ids)
        topk_tokens = [tok.lstrip("▁") for tok in topk_tokens]

        gt_token = ground_truth_tokens[word_idx]
        best_pred = min(topk_tokens, key=lambda x: levenshtein_distance(x, gt_token))

        print(f"\t(idx {word_idx}) Error token: '{corrupted_tokens[word_idx]}' -> {topk_tokens}")
        print(f"\t\tBest: '{best_pred}' vs Ground Truth: '{gt_token}'\n")

        replaced_tokens[word_idx] = best_pred

    corrected_sentence = " ".join(replaced_tokens)
    print(f"Final corrected sentence: {corrected_sentence}")
    print("Ground truth Sentence:   ", " ".join(ground_truth_tokens))
    return corrected_sentence


In [8]:
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForTokenClassification, AutoModelForSeq2SeqLM, AutoModelForMaskedLM
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import f1_score
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
######
checkpoint_path = "/kaggle/input/train-token-classification-roberta/checkpoint/best"
tokenizer_detect = AutoTokenizer.from_pretrained(checkpoint_path, use_fast=True)
model_detect = AutoModelForTokenClassification.from_pretrained(checkpoint_path)
model_detect.to(device)




#####
model_name_2 = "xlm-roberta-base"
#model_name_2 = "/kaggle/input/mlm-encoder/mlm_best_model"
tokenizer_correct_en = AutoTokenizer.from_pretrained(model_name_2)
model_correct_en = AutoModelForMaskedLM.from_pretrained(model_name_2)





######
idx = 8

error_flag = detect_error(model_detect, tokenizer_detect, val_df.iloc[idx]['corrupted_2'])
print("Prediction mask:   ", error_flag)
print("Ground truth mask: ", val_df.iloc[idx]['mask'])
print("="*100)


print("\n\nEncoder-only")

corrected_sentence = correct_error_encoder_top_k(
    model_correct_en,
    tokenizer_correct_en,
    corrupted_tokens=val_df.iloc[idx]['corrupted_2'],
    error_flags=error_flag,
    ground_truth_tokens=str(val_df.iloc[idx]['ground_truth']).split(" "),
    top_k=10
)
print("Error Sentence:          ", val_df.iloc[idx]['corrupted'])

