In [None]:
import os, sys

import json
import numpy as np
import pandas as pd
import re
import string
from collections import Counter
from tqdm.auto import tqdm
from tqdm.notebook import trange

import torch
from torch.utils.data import DataLoader
from transformers import (
    pipeline,
    AutoTokenizer,
    AutoModelForCausalLM,
    BitsAndBytesConfig,
    TrainingArguments,
    Trainer
)
from datasets import load_dataset, Dataset, DatasetDict
from accelerate import Accelerator
from peft import LoraConfig, PeftConfig, PeftModel, AutoPeftModelForCausalLM
from trl import SFTTrainer

In [None]:
from huggingface_hub import login

login(token="YOUR_HUGGINGFACE_TOKEN") 

### 파인튜닝한 모델 불러오기

In [None]:
# 모델 불러오기
model_path = "" #허깅페이스에 올라온 모델의 이름을 넣거나, 체크포인트 디렉토리의 경로를 넣어주면 됨.
config = PeftConfig.from_pretrained(model_path)
model = AutoModelForCausalLM.from_pretrained(
    config.base_model_name_or_path,
    torch_dtype="auto",
    load_in_4bit=True,
    low_cpu_mem_usage = True,
    token=True
)
tokenizer = AutoTokenizer.from_pretrained(config.base_model_name_or_path)
model = PeftModel.from_pretrained(model, model_path)
model.generation_config.pad_token_id = tokenizer.pad_token_id

# # tokenizer.eos_token : 문장의 끝을 나타내는 토큰. 패딩 토큰을 문장의 끝 토큰으로 설정하여 문장의 패딩을 수행할 것임을 의미
tokenizer.pad_token = tokenizer.eos_token
# # 패딩을 적용할 쪽을 오른쪽으로 설정. 토크나이저가 패딩 토큰을 문장의 끝에 추가할 것임을 나타냄. 즉, 오른쪽에 패딩을 적용
tokenizer.padding_side = "left"

tokenizer.use_default_system_prompt = False  # 기본 시스템 프롬프트를 사용하지 않도록 설정

## 전처리 함수 및 F1 스코어를 얻는 함수 정의

In [None]:
def normalize_answer(s):
    def remove_(text):
        ''' 불필요한 기호 제거 '''
        text = re.sub("'", " ", text)
        text = re.sub('"', " ", text)
        text = re.sub('《', " ", text)
        text = re.sub('》', " ", text)
        text = re.sub('<', " ", text)
        text = re.sub('>', " ", text)
        text = re.sub('〈', " ", text)
        text = re.sub('〉', " ", text)
        text = re.sub("\(", " ", text)
        text = re.sub("\)", " ", text)
        text = re.sub("‘", " ", text)
        text = re.sub("’", " ", text)
        return text

    def white_space_fix(text):
        '''연속된 공백일 경우 하나의 공백으로 대체'''
        return ' '.join(text.split())

    def remove_punc(text):
        '''구두점 제거'''
        exclude = set(string.punctuation)
        return ''.join(ch for ch in text if ch not in exclude)

    def lower(text):
        '''소문자 전환'''
        return text.lower()

    return white_space_fix(remove_punc(lower(remove_(s))))

# 정답과의 f1_score 출력
def f1_score(prediction, ground_truth):
    prediction_tokens = normalize_answer(prediction).split()
    ground_truth_tokens = normalize_answer(ground_truth).split()

    # 문자 단위로 f1-score를 계산 합니다.
    prediction_Char = []
    for tok in prediction_tokens:
        now = [a for a in tok]
        prediction_Char.extend(now)

    ground_truth_Char = []
    for tok in ground_truth_tokens:
        now = [a for a in tok]
        ground_truth_Char.extend(now)

    common = Counter(prediction_Char) & Counter(ground_truth_Char)
    num_same = sum(common.values())
    if num_same == 0:
        return 0

    precision = 1.0 * num_same / len(prediction_Char)
    recall = 1.0 * num_same / len(ground_truth_Char)
    f1 = (2 * precision * recall) / (precision + recall)

    return f1

def evaluate(ground_truth_df, predictions_df):
    predictions = dict(zip(predictions_df['question'], predictions_df['answer']))
    f1 = exact_match = total = 0

    for index, row in ground_truth_df.iterrows():
        question_text = row['question']
        ground_truths = row['answer']
        total += 1
        if question_text not in predictions:
            continue
        prediction = predictions[question_text]
        f1 = f1 + f1_score(prediction, ground_truths)

    f1 = 100.0 * f1 / total
    return {'f1': f1}

## 모델 응답 확인하기

In [None]:
import random
def set_seed(seed: int = 43569):
    """Seed fixer (random, numpy, torch)
    Args:
        seed (:obj:`int`): The seed to set.
    """
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed()

In [None]:
torch.backends.cuda.enable_mem_efficient_sdp(False)
# torch.backends.cuda.enable_flash_sdp(False)
# qa_pipeline = pipeline("text-generation", model=model, tokenizer=tokenizer)

# 모델로 추론 후, 전처리를 수행한 뒤, 완성된 정답으로 반환합니다.
# https://huggingface.co/docs/transformers/v4.40.2/en/main_classes/configuration 참고
# https://velog.io/@eenzeenee/hugginface-generate-option 참고
config = {
    "max_new_tokens": 50,
    "num_beams": 2,
    # "num_beam_groups": 3,
    # "diversity_penalty": 0.2,
    "do_sample": True,
    "top_k":40,
    # "no_repeat_ngram_size":5,
    # # # # # "penalty_alpha": 0.5,
    "top_p":0.989,
    "epsilon_cutoff": 0.3,
    # "eta_cutoff": 0.3,
    # # "length_penalty": -20,
    # "temperature": 0.2,
    "renormalize_logits": True,
    # "early_stopping": True
}
def generate_response(question_prompt):
    # 생성할 최대 토큰 수와, 답변 생성 수, 패딩 토큰의 idx를 지정하여 모델 파이프 라인을 설정하고, 답변을 생성합니다.
    with torch.no_grad():
      question_prompt = tokenizer(question_prompt, return_tensors="pt")
      outputs = model.generate(input_ids=question_prompt["input_ids"].to("cuda"), attention_mask = question_prompt["attention_mask"].to("cuda"),**config, pad_token_id=tokenizer.eos_token_id)

      response = tokenizer.batch_decode(outputs)[0]
      if "assistant<|end_header_id|>" in response:
              # <|start_header_id|>assistant<|end_header_id|> 이후에 생성된 토큰 들만을 답변으로 사용합니다.
              response = response.split("assistant<|end_header_id|>", 1)[1]

              # 토큰 반복 생성 및 노이즈 토큰 관련 처리
              # 모델 파인튜닝 후 나오는 결과 보고 어떤 걸 먼저 처리해야할까 생각해보기
              if "<|eot_id|>" in response:
                  response = response.split("<|eot_id|>", 1)[0]
              if "<|start_header_id|>" in response:
                  response = response.replace("<|start_header_id|>", "")
              if "Context:" in response:
                  response = response.replace("Context:", "")
              if "\n" in response:
                  response = response.replace("\n", "")
    return response

## test.csv 로 평가하기 - 제출을 위한 인풋 데이터셋


In [None]:
file_path = './data/test.csv'
test_data = pd.read_csv(file_path)

# 모델 추론

prompting = "###What you must follow###\n \
1. Understand context completely before solving the question.\
2. When talking about a reason(이유) or purpose(목적), you MUST write it in a sentence.\
3. Think step by step\
4. Make sure you understand what the QUESTION is asking. . You MUST need to know how many things to LOOK for and what you WANT.\
If you follow the above instructions, a tip of $1000 will be provided."


submission_dict = {}

print("테스트 데이터 개수 : ", test_data.size/3)
count = 0

for index, row in tqdm(test_data.iterrows(), total=test_data.size/3):
    set_seed()
    try:
        context = row['context']
        question = row['question']
        id = row['id']
        
        if context is not None and question is not None:
            inputs = [
                    {"role": "system", "content": "A chat between a curious user and an artificial intelligence assistant. The assistant gives answers to the user's questions."},
                    {"role": "user", "content": f"Your goal is to provide an appropriate answer to a given question based on the given context. {prompting}\n###Context:###{context}\n###Question:## {question}? "},
                ]
            question_prompt = tokenizer.apply_chat_template(inputs, tokenize=False, add_generation_prompt=True)
            answer = generate_response(question_prompt)
            submission_dict[id] = answer

            print("Processed count:", count)
            print("Answer for question:",question,":", answer)
            # print(context)
            count += 1

        else:
            submission_dict[id] = 'Invalid question or context'

    except Exception as e:
        print(f"Error processing question {e}")

# 제출
df = pd.DataFrame(list(submission_dict.items()), columns=['id', 'answer'])
df.to_csv('./submission.csv', index=False)