In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, AutoModelForMaskedLM
from peft import PeftModel, PeftConfig
import os
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import random

# 원하는 시드 값으로 설정
random_seed = 42
random.seed(random_seed)
## 모델 준비
model_id = "beomi/polyglot-ko-12.8b-safetensors"  # safetensors 컨버팅된 레포

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
)

model = AutoModelForCausalLM.from_pretrained(model_id, quantization_config=bnb_config, device_map={'':0})
tokenizer = AutoTokenizer.from_pretrained(
    model_id,
    padding_side="left",
    model_max_length=512,    
)


tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = model.config.eos_token_id



In [None]:
model = PeftModel.from_pretrained(model = model, model_id = './output/SFT_aihub')

In [None]:
import pandas as pd
from tqdm import trange


prompt_df = pd.read_json('SFT_aihub_eval.json')

prompt_df

In [None]:
import json
import pandas as pd
from utils.prompter import Prompter
from accelerate import Accelerator

accelerator = Accelerator()
prompter = Prompter()

source, target = [], []
for i in range(len(prompt_df)):
    source.append(prompt_df['input'][i])
    target.append(prompt_df['output'][i])
    

len(source)

In [None]:
import pickle

with open('../ngram/uni_model.pkl', 'rb') as f:
    uni_model = pickle.load(f)
    
with open('../ngram/bi_model.pkl', 'rb') as f:
    bi_model = pickle.load(f)
    
def load_ngram_counts(filename):
    """
    Load bigram and trigram counts from a .pkl file.
    """
    with open(filename, 'rb') as f:
        bigram_counts, trigram_counts = pickle.load(f)
    return bigram_counts, trigram_counts

def calculate_trigram_probabilities(sentence, bigram_counts, trigram_counts):
    """
    Calculate trigram probabilities for a given sentence using loaded bigram and trigram counts.
    """
    # 토큰화된 문장 준비
    words = sentence.split()
    trigram_probabilities = 0

    for i in range(len(words) - 2):
        trigram = (words[i], words[i + 1], words[i + 2])
        bigram = (words[i], words[i + 1])

        # 바이그램과 트라이그램 빈도를 사용하여 확률 계산
        if trigram in trigram_counts and bigram in bigram_counts:
#             trigram_probabilities[trigram] = trigram_counts[trigram] / bigram_counts[bigram]
            trigram_probabilities += trigram_counts[trigram] / bigram_counts[bigram]
#         else:
#             trigram_probabilities[trigram] = 0

    return trigram_probabilities

bigram_counts, trigram_counts = load_ngram_counts('../ngram/tri_model.pkl')
probs = calculate_trigram_probabilities('한국 정부도 알고 있다', bigram_counts, trigram_counts)
probs  # 문장의 트라이그램 확률 출력


In [None]:
data = {
    'prob': [],
       'edit': [],
       'lcs': [],
        'gleu': [],
       'label': [],
       'uni':[],
       'bi':[],
    'tri':[]
       }

In [None]:
"""aihub 데이터"""
from transformers import AutoModelForCausalLM, AutoTokenizer
import numpy as np
from tqdm.auto import trange
from soynlp.hangle import jamo_levenshtein
from nltk.translate.gleu_score import sentence_gleu
import sys

hyps = []
pred = []
scores = []

for i in trange(len(source)):
    prompt = prompter.generate_prompt('맞춤법에 맞게 수정하세요', source[i])
    source_len = len(tokenizer.encode(source[i]))
    model, test_loader = accelerator.prepare(model, prompt)
    batch = tokenizer(prompt, return_tensors='pt')
    ids = batch['input_ids'].to('cuda:0', dtype=torch.long)
    masks = batch['attention_mask'].to('cuda:0', dtype=torch.long)

    with torch.no_grad():
        outputs = accelerator.unwrap_model(model).generate(input_ids=ids,
                                                              attention_mask=masks,
                                                              num_beams=5,
                                                              max_new_tokens=100,
                                                              return_dict_in_generate=True, 
                                                              output_scores=True, 
                                                              temperature=1,
                                                              eos_token_id=tokenizer.eos_token_id,
                                                              do_sample=False,
                                                           num_return_sequences=5,
                                                           repetition_penalty=2.0,
                                                          )
        
        transition_scores = accelerator.unwrap_model(model).compute_transition_scores(outputs.sequences, 
                                                                 outputs.scores, 
                                                                 outputs.beam_indices, 
                                                                 normalize_logits=True)
    
    input_length = ids.shape[1]

    for t in range(5):
        sentence = tokenizer.decode(outputs.sequences[t, input_length:], skip_special_tokens=True)
        prob = np.exp(outputs.sequences_scores[t].cpu().numpy())
        edit = jamo_levenshtein(source[i], sentence)
        # LCS
        word1, word2 = source[i], sentence
        l1, l2 = len(word1), len(word2)
        cache = [0] * l2

        for j in range(l1):
            cnt = 0
            for k in range(l2):
                if cnt < cache[k]:
                    cnt = cache[k]
                elif word1[j] == word2[k]:
                    cache[k] = cnt + 1
        try:
            lcs = max(cache)
        except:
            lcs = 0
        # gleu
        reference = [source[i].split()]
        candidate = sentence.split()
        gleu = sentence_gleu(reference, candidate)
        
        label = 1 if target[i] == sentence else 0
        
        data['prob'].append(prob)
        data['edit'].append(edit)
        data['lcs'].append(lcs)
        data['gleu'].append(gleu)
        data['label'].append(label)
        
        uni_score, bi_score = 0, 0
        tri_score = calculate_trigram_probabilities(sentence, bigram_counts, trigram_counts)
        

        if len(sentence.split()) > 1:
            for a in range(len(sentence.split())):
                uni_score += uni_model.score(sentence.split()[a])
            for a in range(len(sentence.split())-1):
                bi_score += bi_model.score(sentence.split()[a+1], [sentence.split()[a]])

            data['uni'].append(uni_score/len(sentence.split()))
            data['bi'].append(bi_score/(len(sentence.split())-1))
            data['tri'].append(tri_score/len(sentence.split())-2)

        else:
            uni_score += uni_model.score(sentence)
            bi_score += bi_model.score(sentence)

            data['uni'].append(uni_score)
            data['bi'].append(bi_score)
            data['tri'].append(tri_score)
            
            
data


In [None]:
df = pd.DataFrame(data)
df

In [None]:
df.to_csv("ngram_aihub_tri.csv", index=False)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import RobustScaler
import pandas as pd

data = pd.read_csv("ngram_aihub_tri.csv")
data = df[['prob','edit', 'lcs', 'gleu','uni','bi','tri','label']]

# scaler = RobustScaler()
# std_data = scaler.fit_transform(data.iloc[:,:-1])

# df = pd.DataFrame(std_data, columns=data.columns[:-1])
X_train, X_test, y_train, y_test = train_test_split(data.iloc[:,:-1], data.iloc[:,-1], test_size=0.2, random_state=42)

X_train

In [None]:
from xgboost import XGBClassifier

model = XGBClassifier()
model.fit(X_train, y_train)

print("훈련세트 점수: {:.2f}".format( model.score(X_train, y_train) ))
print("테스트세트 점수: {:.2f}".format( model.score(X_test, y_test) ))

In [None]:
from xgboost import plot_importance
ax = plot_importance(model)

In [None]:
import lightgbm as lgb

model = lgb.LGBMClassifier()
model.fit(X_train, y_train)

print("훈련세트 점수: {:.2f}".format( model.score(X_train, y_train) ))
print("테스트세트 점수: {:.2f}".format( model.score(X_test, y_test) ))

In [None]:
from lightgbm import plot_importance
ax = plot_importance(model)

# 데이터 생성

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler, StandardScaler
import pandas as pd


data = pd.read_csv("ngram_aihub_tri.csv")


data = data[['prob','edit','lcs','gleu','uni', 'bi', 'tri', 'label']]
scaler = StandardScaler()
std_data = scaler.fit_transform(data.iloc[:,:-1])

columns = ['prob', 'edit', 'lcs', 'gleu', 'uni', 'bi', 'tri']  # Replace with your actual column names
df = pd.DataFrame(std_data, columns=columns)

X_train, X_test, y_train, y_test = train_test_split(df, data.iloc[:, -1], test_size=0.2, shuffle=False)

X_train


In [None]:
import lightgbm as lgb
import matplotlib.pyplot as plt

cls = lgb.LGBMClassifier()
cls.fit(X_train, y_train)

print("훈련세트 점수: {:.2f}".format( cls.score(X_train, y_train) ))
print("테스트세트 점수: {:.2f}".format( cls.score(X_test, y_test) ))

# 특성 중요도를 시각화합니다.
lgb.plot_importance(cls, figsize=(10, 6), importance_type='split') # 또는 'gain'을 사용할 수도 있습니다.
plt.show()

In [None]:
from xgboost import XGBClassifier

cls = XGBClassifier()
cls.fit(X_train, y_train)

print("훈련세트 점수: {:.2f}".format( cls.score(X_train, y_train) ))
print("테스트세트 점수: {:.2f}".format( cls.score(X_test, y_test) ))

In [None]:
import pickle

with open('../ngram/uni_model.pkl', 'rb') as f:
    uni_model = pickle.load(f)
    
with open('../ngram/bi_model.pkl', 'rb') as f:
    bi_model = pickle.load(f)
    
def load_ngram_counts(filename):
    """
    Load bigram and trigram counts from a .pkl file.
    """
    with open(filename, 'rb') as f:
        bigram_counts, trigram_counts = pickle.load(f)
    return bigram_counts, trigram_counts

def calculate_trigram_probabilities(sentence, bigram_counts, trigram_counts):
    """
    Calculate trigram probabilities for a given sentence using loaded bigram and trigram counts.
    """
    # 토큰화된 문장 준비
    words = sentence.split()
    trigram_probabilities = 0

    for i in range(len(words) - 2):
        trigram = (words[i], words[i + 1], words[i + 2])
        bigram = (words[i], words[i + 1])

        # 바이그램과 트라이그램 빈도를 사용하여 확률 계산
        if trigram in trigram_counts and bigram in bigram_counts:
#             trigram_probabilities[trigram] = trigram_counts[trigram] / bigram_counts[bigram]
            trigram_probabilities += trigram_counts[trigram] / bigram_counts[bigram]
#         else:
#             trigram_probabilities[trigram] = 0

    return trigram_probabilities

bigram_counts, trigram_counts = load_ngram_counts('../ngram/tri_model.pkl')

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, AutoModelForMaskedLM
from peft import PeftModel, PeftConfig
import os
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import random

# 원하는 시드 값으로 설정
random_seed = 42
random.seed(random_seed)
## 모델 준비
model_id = "beomi/polyglot-ko-12.8b-safetensors"  # safetensors 컨버팅된 레포

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
#     load_in_8bit=True,
)

model = AutoModelForCausalLM.from_pretrained(model_id, quantization_config=bnb_config, device_map={'':0})
tokenizer = AutoTokenizer.from_pretrained(
    model_id,
    padding_side="left",
    model_max_length=512,    
)


tokenizer.pad_token = tokenizer.eos_token
model.config.pad_token_id = model.config.eos_token_id
model = PeftModel.from_pretrained(model = model, model_id = './output/SFT_aihub')


In [None]:
import json
import pandas as pd
from utils.prompter import Prompter
from accelerate import Accelerator

accelerator = Accelerator()
prompter = Prompter()

with open('SFT_aihub_test.json', 'r', encoding='utf-8-sig') as f:
    json_read = json.load(f)

source, target = [], []
for s in json_read:
    source.append(s['input'])
    target.append(s['output'])
    
len(source)

In [None]:
"""aihub 데이터"""
from transformers import AutoModelForCausalLM, AutoTokenizer
import numpy as np
from tqdm import trange
from soynlp.hangle import jamo_levenshtein
from nltk.translate.gleu_score import sentence_gleu

hyps = []
pred = []
scores = []

for i in trange(len(source)):
    prompt = prompter.generate_prompt('맞춤법에 맞게 수정하세요', source[i])
    model, test_loader = accelerator.prepare(model, prompt)
    batch = tokenizer(prompt, return_tensors='pt')
    ids = batch['input_ids'].to('cuda:0', dtype=torch.long)
    masks = batch['attention_mask'].to('cuda:0', dtype=torch.long)

    with torch.no_grad():
        outputs = accelerator.unwrap_model(model).generate(input_ids=ids,
                                                              attention_mask=masks,
                                                              num_beams=5,
                                                              max_new_tokens=100,
                                                              return_dict_in_generate=True, 
                                                              output_scores=True, 
                                                              temperature=0.5,
                                                              eos_token_id=tokenizer.eos_token_id,
#                                                               top_p=False,
                                                              do_sample=False,
#                                                                 origin_sentence = tokenizer.encode(source[i]),
                                                           num_return_sequences=5,
#                                                        tokenizer=tokenizer,
#                                                        mlm_model=mlm_model,
#                                                        mlm_tokenizer=mlm_tokenizer,
                                                           )
        
        transition_scores = accelerator.unwrap_model(model).compute_transition_scores(outputs.sequences, 
                                                                 outputs.scores, 
                                                                 outputs.beam_indices, 
                                                                 normalize_logits=True)

    input_length = ids.shape[1]
    preds = ""
    probabilities = {}
    for t in range(5):
        sentence = tokenizer.decode(outputs.sequences[t, input_length:], skip_special_tokens=True)
        prob = np.exp(outputs.sequences_scores[t].cpu().numpy())
        edit = jamo_levenshtein(source[i], sentence)
        # LCS
        word1, word2 = source[i], sentence
        l1, l2 = len(word1), len(word2)
        cache = [0] * l2

        for j in range(l1):
            cnt = 0
            for k in range(l2):
                if cnt < cache[k]:
                    cnt = cache[k]
                elif word1[j] == word2[k]:
                    cache[k] = cnt + 1
        try:
            lcs = max(cache)
        except:
            lcs = 0
        # gleu
        reference = [source[i].split()]
        candidate = sentence.split()
        gleu = sentence_gleu(reference, candidate)
        
        uni_score, bi_score = 0, 0
        tri_score = calculate_trigram_probabilities(sentence, bigram_counts, trigram_counts)
        if len(sentence.split()) > 2:
            for a in range(len(sentence.split())):
                uni_score += uni_model.score(sentence.split()[a])
            for a in range(len(sentence.split())-1):
                bi_score += bi_model.score(sentence.split()[a+1], [sentence.split()[a]])
                
            uni = uni_score / len(sentence.split())
            bi = bi_score / (len(sentence.split())-1)
            tri = tri_score/(len(sentence.split())-2)

        else:
            uni = uni_model.score(sentence)
            bi = bi_model.score(sentence)
            
        feature = np.array([[prob, edit, lcs, gleu, uni, bi, tri]])
        feature = scaler.transform(feature)

        predict = cls.predict_proba(feature)[0]
        probabilities[t] = predict[1]
    probabilities = sorted(probabilities.items(), key=lambda x: x[1], reverse=True)
    preds = tokenizer.decode(outputs.sequences[probabilities[0][0], input_length:])
    pred.append(preds.replace("<|endoftext|>", "").replace('<|unused0|>', ''))
pred
        

In [None]:
pred = pd.DataFrame(pred)
pred

In [None]:
#pred_df = pd.DataFrame({'pred':pred})
pred.to_csv('all_xbg_pred_tri.csv', index=False, header=False)
pred

In [None]:
import pandas as pd
import re

# pred = pd.read_csv('all_xbg_pred.csv', header=None, names=['pred'])
total = len(pred)
correct_pred, correct_hyp = 0, 0
wrong_hyp_df, correct_hyp_df = pd.DataFrame(), pd.DataFrame()
sources, targets, preds = [],[], []
len_sources, len_targets, len_preds = [], [], []
count = 0

for i in range(total):
    is_correct = False
    pred[0][i] = pred[0][i].replace("<|endoftext|>","").replace("<|unused0|>", "").strip()
    
    target_cleaned = re.sub(r'[^\w\s]', '', target[i]).strip()
    pred_cleaned = re.sub(r'[^\w\s]', '', pred[0][i]).strip()
            
    if target[i] == pred[0][i]:
        correct_pred += 1
#     else:
#         print(pred[0][i], "|", target[i])
    

        
print(f"pred acc:{correct_pred}/{total} = {correct_pred/total}")

