In [None]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import torch
import datasets
import pandas as pd

import os
import json
from tqdm import tqdm
from datetime import datetime
import argparse

In [None]:
def load_test_datasets_from_csv(test_file):
    # Load CSV file into a DataFrame
    df = pd.read_csv(test_file)
    
    # Ensure the DataFrame has the necessary columns
    if 'id' not in df.columns or 'err_sentence' not in df.columns:
        raise ValueError("CSV file must contain 'id' and 'err_sentence' columns.")
    
    # Convert DataFrame to Dataset
    dataset = datasets.Dataset.from_pandas(df)
    
    # Wrap it in a DatasetDict
    dataset_dict = {
        'test': dataset
    }
    return datasets.DatasetDict(dataset_dict)


In [None]:
def get_ngram(text, n_gram):
    ngram_list = []
    text_length = len(text)
    for i in range(text_length - n_gram + 1):
        ngram_list.append(text[i:i+n_gram])
    return ngram_list

def calc_f_05(cor_sentence, prd_sentence, n_gram):
    prd_word_list = get_ngram(prd_sentence, n_gram)
    cor_word_list = get_ngram(cor_sentence, n_gram)
    
    cnt = 0
    for idx in range(len(prd_word_list)):
        start_idx = 0
        end_idx = idx + 2
        if idx > 2:
            start_idx = idx - 2
        if prd_word_list[idx] in cor_word_list[start_idx:end_idx]:
            cnt += 1
    
    if not prd_word_list:
        return 0, 0, 0
    
    precision = cnt / len(prd_word_list)
    recall = cnt / len(cor_word_list)
    
    if not (0.25 * precision + recall):
        return 0, 0, 0
    
    f_05 = 1.25 * (precision * recall) / (0.25 * precision + recall)
    
    return precision, recall, f_05


In [None]:
def my_test(gpus='cpu', model_path=None, test_file=None, eval_length=None, save_path=None, pb=False):
    # Set the device to CPU or CUDA based on the input
    device = torch.device(gpus)

    # Load model and tokenizer
    model = AutoModelForSeq2SeqLM.from_pretrained(model_path)
    tokenizer = AutoTokenizer.from_pretrained(model_path)

    # Move model to the specified device
    model.to(device)
    
    # Load dataset
    dataset = load_test_datasets_from_csv(test_file)
    
    # inference data
    id_list = []
    err_sentence_list = []
    prd_sentence_list = []
    
    data_len = len(dataset['test'])
    if eval_length:
        data_len = min(eval_length, len(dataset['test']))
    
    for n in tqdm(range(data_len), disable=pb):
        data_id = dataset['test'][n]['id']
        id_list.append(data_id)
        err_sentence = dataset['test'][n]['err_sentence']
        err_sentence_list.append(err_sentence)
        tokenized = tokenizer(err_sentence, return_tensors='pt')
        input_ids = tokenized['input_ids']
        input_ids = input_ids.to(device)
        res = model.generate(
            inputs=input_ids,
            num_beams=5,  # Adjust beam size as needed
            max_length=128,  # Maximum length of the output
            no_repeat_ngram_size=2
        ).cpu().tolist()[0]
        prd_sentence = tokenizer.decode(res, skip_special_tokens=True).strip()
        prd_sentence_list.append(prd_sentence)
        
        # Print prediction results for each sentence
        print(f'Input Sentence: {err_sentence}')
        print(f'Predicted Sentence: {prd_sentence}')
        print('=' * 50)
    
    # Save predictions
    save_file_name = os.path.split(test_file)[-1].replace('.json', '') + '_predictions.csv'
    save_file_path = os.path.join(save_path, save_file_name)
    _df = pd.DataFrame({
        'id': id_list,
        'err_sentence': err_sentence_list,
        'prd_sentence': prd_sentence_list
    })
    _df.to_csv(save_file_path, index=False)
    print(f'Saved predictions to {save_file_path}')


In [None]:
# 설정값 정의
gpus = 'cpu'  # 'cuda:0'로 설정 가능
model_path = '/home/yjtech2/Desktop/yurim/LLM/Pre_processing/spelling/grm_model_checkpoint'
test_file = "./path/to/test.csv"  # CSV 파일 경로
eval_length = 10  # None으로 설정하면 전체 데이터 사용
save_path = "./results"
pb = True  # 프로그래스바 표시 여부

# 저장 경로 생성
os.makedirs(save_path, exist_ok=True)

# 실행
dataset = load_test_datasets_from_csv(test_file)  # CSV 파일에서 데이터셋 로드
my_test(gpus=gpus, model_path=model_path, test_file=test_file, eval_length=eval_length, save_path=save_path, pb=pb)
