# 개발 환경 OS

In [None]:
# !nvidia-smi

In [None]:
# !head /proc/cpuinfo

In [None]:
# !head -n 3 /proc/meminfo

In [None]:
# import sys
# print(f"Python version: {sys.version}")

#Library

In [None]:
!pip install torch

In [None]:
!pip install datasets transformers

In [None]:
import os
import re
import json
import torch
import random
import numpy as np
import pandas as pd
import tqdm
import datasets
import transformers
import pkg_resources
import sentence_transformers

from tqdm import tqdm
from copy import deepcopy

from sklearn.model_selection import train_test_split

from datasets import load_dataset, Dataset
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback,  BartForConditionalGeneration, PreTrainedTokenizerFast
from sentence_transformers import SentenceTransformer

In [None]:
# # 각 라이브러리의 버전 출력
# print(f"torch version: {torch.__version__}")
# print(f"numpy version: {np.__version__}")
# print(f"pandas version: {pd.__version__}")
# print(f"tqdm version: {pkg_resources.get_distribution('tqdm').version}")
# print(f"scikit-learn version: {pkg_resources.get_distribution('scikit-learn').version}")
# print(f"datasets version: {pkg_resources.get_distribution('datasets').version}")
# print(f"transformers version: {pkg_resources.get_distribution('transformers').version}")
# print(f"sentence_transformers version: {pkg_resources.get_distribution('sentence-transformers').version}")

# 경로 설정

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# path = '/content/drive/MyDrive/A3/' # Colab에서 실행 시
path = '/' # Local에서 실행 시
db_path = path + 'DB/'
vectorstore_path = path + 'VectorStore/'
model_path = path + 'Model/'
score_path = path + 'TestScore/'

# 데이터 로드 및 전처리

In [None]:
train = pd.read_csv(db_path + 'train_preprocessing.csv')
test = pd.read_csv(db_path + 'test_preprocessing.csv')

In [None]:
# 데이터 전처리
train['공사종류(대분류)'] = train['공사종류'].str.split(' / ').str[0]
train['공사종류(중분류)'] = train['공사종류'].str.split(' / ').str[1]
train['공종(대분류)'] = train['공종'].str.split(' > ').str[0]
train['공종(중분류)'] = train['공종'].str.split(' > ').str[1]
train['사고객체(대분류)'] = train['사고객체'].str.split(' > ').str[0]
train['사고객체(중분류)'] = train['사고객체'].str.split(' > ').str[1]

test['공사종류(대분류)'] = test['공사종류'].str.split(' / ').str[0]
test['공사종류(중분류)'] = test['공사종류'].str.split(' / ').str[1]
test['공종(대분류)'] = test['공종'].str.split(' > ').str[0]
test['공종(중분류)'] = test['공종'].str.split(' > ').str[1]
test['사고객체(대분류)'] = test['사고객체'].str.split(' > ').str[0]
test['사고객체(중분류)'] = test['사고객체'].str.split(' > ').str[1]

In [None]:
# 공사종류 대분류, 공종 중분류, 사고객체 중분류, 작업프로세스
# 훈련 데이터 생성
data = train.apply(
    lambda row: {
        "process": row["작업프로세스"],
        "construct_type": row["공종(중분류)"],
        "object_type": row["사고객체(중분류)"],
        "situation": (
            f"'{row['공사종류(대분류)']}' 공사 중 '{row['공종(중분류)']}' 작업 중 '{row['사고원인']}'으로 인해 사고가 발생하였습니다."),
        "response": row['재발방지대책 및 향후조치계획']
    },
    axis=1
)

# DataFrame으로 변환
data = pd.DataFrame(list(data))

In [None]:
train, val = train_test_split(data, test_size=0.1, random_state=42)

In [None]:
# 텐서 데이터셋
train_ds = Dataset.from_pandas(train)
val_ds = Dataset.from_pandas(val)

# 파인튜닝


## 모델 로드

In [None]:
tokenizer = PreTrainedTokenizerFast.from_pretrained('hyunwoongko/kobart')
model = BartForConditionalGeneration.from_pretrained('hyunwoongko/kobart',num_labels=2)

In [None]:
# 패딩 토큰 설정 확인
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

def preprocess_function(data):
    tokenizer.pad_token = tokenizer.eos_token  # 패딩 토큰 설정

    # Encoder Input (입력)
    inputs = [
        f"""제공된 data는 건설 공사현장에서 발생한 사고 상황입니다. 주어진 situation을 분석하고 재발방지 대책을 포함한 대응책을 작성하세요.
        ### process:
        {p}

        ### construct_type:
        {ct}

        ### object_type:
        {ot}

        ### situation:
        {q}

        ### response:"""
        for p, ct, ot, q in zip(
            data["process"],
            data["construct_type"],
            data["object_type"],
            data["situation"]
        )
    ]

    # Decoder Target (출력)
    targets = [
        f"{res}{tokenizer.eos_token}"
        for res in data["response"]
    ]

    # 토큰화 - 입력
    encodings = tokenizer(
        inputs,
        truncation=True,
        padding="max_length",
        max_length=512,  # 충분한 입력 길이 확보
        return_tensors="pt"
    )

    # 토큰화 - 출력
    target_encodings = tokenizer(
        targets,
        truncation=True,
        padding="max_length",
        max_length=128,  # 출력은 비교적 짧게
        return_tensors="pt"
    )

    # 라벨 설정 (Loss 계산 시 패딩 무시)
    encodings["labels"] = target_encodings["input_ids"].clone()
    encodings["labels"][encodings["labels"] == tokenizer.pad_token_id] = -100

    return encodings


# 데이터셋 변환 (불필요한 컬럼 삭제)
train_ds = train_ds.map(preprocess_function, batched=True, remove_columns=train_ds.column_names)
val_ds = val_ds.map(preprocess_function, batched=True, remove_columns=val_ds.column_names)

# 필요한 컬럼만 유지
columns = ["input_ids", "attention_mask", "labels"]
train_ds.set_format(type="torch", columns=columns)
val_ds.set_format(type="torch", columns=columns)

## 하이퍼파라미터

In [None]:
training_args = TrainingArguments(
    output_dir="./results",               # 출력 디렉토리
    eval_strategy="epoch",                # 에폭마다 평가
    save_strategy="epoch",                # 에폭마다 체크포인트 저장
    learning_rate=1e-5,                   # <--[수정] 학습률 증가
    per_device_train_batch_size=4,        # <--[수정] 배치 사이즈 감소
    per_device_eval_batch_size=4,         # <--[수정] 평가 배치 사이즈 감소
    gradient_accumulation_steps=2,        # 배치 사이즈 효과를 늘리는 대안
    num_train_epochs=10,                  # <--[수정] Epoch 감소
    weight_decay=0.01,                    # <--[수정] weight decay 조정
    load_best_model_at_end=True,          # 가장 좋은 모델을 마지막에 로드
    logging_dir="./logs",                 # 로깅 디렉토리
    logging_steps=100,                    # <--[수정] 로깅 간격 증가
    report_to="tensorboard"               # TensorBoard에 로깅
)


In [None]:
# 패딩 토큰 추가
tokenizer.add_special_tokens({'pad_token': tokenizer.eos_token})

# 모델에 tokenizer 변경 사항 반영
model.resize_token_embeddings(len(tokenizer))  # mean_resizing 제거

## 학습

In [None]:
# Trainer 설정
trainer = Trainer(
    model=model,                         # 학습할 모델
    args=training_args,                  # TrainingArguments
    train_dataset = train_ds,
    eval_dataset = val_ds,
    processing_class=tokenizer,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=1)], # 조기 종료
)

# 모델 학습
trainer.train()

## 모델 저장

In [None]:
model_name = 'kobart7'
output_dir = model_path + model_name

trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)

# 모델 테스트(Val)

## 모델 불러오기

In [None]:
model_name = 'kobart7'

# 저장된 모델 경로
load_model_path = model_path + model_name

# 모델 & 토크나이저 로드
model = BartForConditionalGeneration.from_pretrained(load_model_path, device_map="auto", num_labels=2)
tokenizer = PreTrainedTokenizerFast.from_pretrained(load_model_path)

print("모델과 토크나이저가 정상적으로 로드되었습니다!")

print("pad_token:", tokenizer.pad_token)
print("pad_token_id:", tokenizer.pad_token_id)

## 텍스트 추론

In [None]:
def generate_answer_val(val, model, tokenizer, max_new_tokens=50):
    # 🔹 프롬프트 템플릿
    prompt = '''
    제공된 data는 건설 공사현장에서 발생한 사고 상황입니다. 주어진 situation을 분석해 재발방지 대책을 포함한 대응책을 response에 작성하세요.

    ### process:
    {process}

    ### construct_type:
    {construct_type}

    ### object_type:
    {object_type}

    ### situation:
    {situation}

    ※ Example response:
    - 작업자 안전교육 및 재발 방지 대책 수립과 작업 전 안전교육 철저 및 관리자 추가 배치를 통한 동종 사고 예방.
    - 근로자 보행 통로 구간 안전표지판 설치와 특별안전교육 실시, 일일 작업 투입 전 상시 교육, 관리 대상 선정 등을 통한 이동 구간 보행 안전 확보와 작업자 안전 교육 지시.
    ### response:
    '''

    # 값 추출
    process = val['process']
    construct_type = val['construct_type']
    object_type = val['object_type']
    situation = val['situation']

    # 프롬프트 완성
    formatted_prompt = prompt.format(
        process=process,
        construct_type=construct_type,
        object_type=object_type,
        situation=situation
    )

    # 토큰화
    inputs = tokenizer(
        formatted_prompt,
        return_tensors="pt",
        truncation=True,
        max_length=512,
    )

    # 디바이스 설정
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model.to(device)
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # 생성
    outputs = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=True,  # 샘플링 활성화
        top_k=40,
        temperature=0.7,
        top_p=0.85,
        repetition_penalty=1.2,
        num_return_sequences=1
    )

    # 디코딩 및 후처리
    generated_text = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]

    # 자연스러운 한 문장만 추출
    def postprocess_korean_sentence(text):
        sentences = re.split(r'(?<=[다|요|함|됨|합니다|됩니다|있습니다|예정입니다])\s+', text.strip())
        return sentences[0].strip() + '.' if sentences else text.strip()

    return postprocess_korean_sentence(generated_text)

In [None]:
# 하나만 테스트하기
ex = val.iloc[0]
process = ex['process']
construct_type = ex['construct_type']
object_type = ex['object_type']
situation = ex['situation']
response = ex['response']
generated_text = generate_answer_val(ex, model, tokenizer)

print(f"정답: {response}")
print(f"생성된 문장: {generated_text}")

## 임베딩 모델 로드

In [None]:
sbert_model = SentenceTransformer('jhgan/ko-sbert-sts')

## 평가

In [None]:
def cosine_similarity(a, b):
    """코사인 유사도 계산"""
    dot_product = np.dot(a, b)
    norm_a = np.linalg.norm(a)
    norm_b = np.linalg.norm(b)
    return dot_product / (norm_a * norm_b) if norm_a != 0 and norm_b != 0 else 0

def jaccard_similarity(text1, text2):
    """자카드 유사도 계산"""
    set1, set2 = set(text1.split()), set(text2.split())  # 단어 집합 생성
    intersection = len(set1.intersection(set2))  # 교집합 크기
    union = len(set1.union(set2))  # 합집합 크기
    return intersection / union if union != 0 else 0

# 단일 평가
def calculate_final_score(bs, gs):
    """두 문장 간 유사도 계산"""
    # SBERT 임베딩 벡터 변환
    bs_embedding = sbert_model.encode(bs)
    gs_embedding = sbert_model.encode(gs)

    # 유사도 계산
    cosine_sim = cosine_similarity(bs_embedding, gs_embedding)
    jaccard_sim = jaccard_similarity(bs, gs)

    # 최종 점수 계산 (공식 적용)
    final_score = 0.7 * max(cosine_sim, 0) + 0.3 * max(jaccard_sim, 0)

    return {
        "bs": bs,
        "gs": gs,
        "cosine_similarity": cosine_sim,
        "jaccard_similarity": jaccard_sim,
        "final_score": final_score
    }

# 다중 평가
def calculate_final_scores(best_sentences, generated_sentences):
    """여러 문장 간 유사도 계산"""
    # SBERT로 문장 임베딩을 한번에 생성
    bs_embeddings = sbert_model.encode(best_sentences)  # 정답 문장들
    gs_embeddings = sbert_model.encode(generated_sentences)  # 생성된 문장들

    # 코사인 유사도 계산 (벡터화)
    cosine_sims = cosine_similarity(bs_embeddings, gs_embeddings)

    # 자카드 유사도 계산 (벡터화)
    jaccard_sims = jaccard_similarity(best_sentences, generated_sentences)

    # 최종 점수 계산
    final_scores = 0.7 * np.maximum(cosine_sims, 0) + 0.3 * np.maximum(jaccard_sims, 0)

    results = []
    for bs, gs, cosine_sim, jaccard_sim, final_score in zip(best_sentences, generated_sentences, cosine_sims, jaccard_sims, final_scores):
        results.append({
            "best_sentence": bs,
            "generated_sentence": gs,
            "cosine_similarity": cosine_sim,
            "jaccard_similarity": jaccard_sim,
            "final_score": final_score
        })

    return pd.DataFrame(results)

### 랜덤 5가지

In [None]:
# val에서 랜덤하게 5개의 인덱스 추출
random_indices = random.sample(range(len(val)), 5)

for i in random_indices:
    ex = val.iloc[i]
    process = ex['process']
    construct_type = ex['construct_type']
    object_type = ex['object_type']
    situation = ex['situation']
    response = ex['response']
    generated_text = generate_answer_val(val, model, tokenizer)
    results = calculate_final_score(response, generated_text)
    print(f"정답: {response}")
    print(f"생성된 문장: {generated_text}")
    print(f"Cosine Similarity: {results['cosine_similarity']}")
    print(f"Jaccard Similarity: {results['jaccard_similarity']}")
    print(f"Final Score: {results['final_score']}\n")

### 전체

In [None]:
# 결과 저장 리스트
all_results = []

# val에서 situation과 response 추출
situations = val['situation'].tolist()
responses = val['response'].tolist()

# 생성된 문장들을 리스트에 담기
generated_sentences = []
for ex in tqdm(val.to_dict(orient='records'), desc="Generating sentences", total=len(val)):
    generated_sentence = generate_answer_val(ex, model, tokenizer)
    generated_sentences.append(generated_sentence)

# 최종 점수 계산
df_scores_all = calculate_final_scores(responses, generated_sentences)

# 결과를 all_results 리스트에 추가
for text, best_sentence, generated_sentence, row in zip(situations, responses, generated_sentences, df_scores_all.to_dict(orient='records')):
    all_results.append({
        "input_prompt": text,
        **row
    })

# 리스트를 DataFrame으로 변환
df_scores_all = pd.DataFrame(all_results)
# 결과 확인
print(df_scores_all.head())

## 테스트 결과 저장

In [None]:
# CSV 파일로 저장
csv_name = f"{model_name}_eval.csv"
output_path = score_path + csv_name

df_scores_all.to_csv(output_path, index=False, encoding='utf-8-sig')
print(f"CSV 파일이 '{output_path}' 경로에 저장되었습니다.")

### 테스트 결과 확인

In [None]:
df = pd.read_csv(output_path)
df.describe()