In [2]:
import os
import re
import json
import yaml
import torch
import pandas as pd

from accelerate import Accelerator

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
)

from dotenv import load_dotenv

load_dotenv(override=True)

from huggingface_hub import login

login(token=os.getenv("HUGGINGFACE_TOKEN"))

seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.set_float32_matmul_precision("high")
torch.cuda.empty_cache()

In [None]:
# 중국어 문자 감지 함수 (감지 기준 조절 가능)
def contains_chinese(text, min_count=1):
    chinese_count = 0
    for char in text:
        if "\u4e00" <= char <= "\u9fff":
            chinese_count += 1
            if chinese_count >= min_count:
                return True
    return False


# 특수문자/이모티콘 감지 함수
def contains_special_chars(text):
    import re

    # 일반 문자, 숫자, 기본 문장부호 외의 문자 감지
    special_pattern = re.compile(r"[^\w\s,.?!:;()\-\'\"가-힣ㄱ-ㅎㅏ-ㅣa-zA-Z0-9]")
    return bool(special_pattern.search(text))

# Data Load

In [None]:
local_path = "."  # your path

data_path = f"{local_path}/data"

test_path = f"{data_path}/ver-1-test.csv"  # processed test data
prompts_path = f"{data_path}/prompts.yaml"  # prompts

qe_filename = "ver-1-preprocessed.jsonl"  # Query Expansion 결과 저장
if os.path.exists(f"{data_path}/{qe_filename}"):
    raise FileExistsError(f"'{qe_filename}' already exists.")

temp_qe_filename = "ver-1.jsonl"  # Query Expansion 임시 저장
if os.path.exists(f"{local_path}/{temp_qe_filename}"):
    raise FileExistsError(f"'{temp_qe_filename}' already exists.")


# CSV
test = pd.read_csv(test_path, encoding="utf-8-sig")


# Prompts
with open(prompts_path, "r", encoding="utf-8-sig") as f:
    prompts = yaml.safe_load(f)
system_prompt = prompts["query_expansion"]["ver_0"]["system_prompt"]
user_prompt_template = prompts["query_expansion"]["ver_1"]["user_prompt_template"]

# Query Expansion (Ver 1)

## Setup the basic query

In [None]:
cols = ["공종2", "작업프로세스", "사고객체1", "사고객체2", "인적사고1", "사고원인"]

query_list = []
for i in range(test.shape[0]):
    gongjong = test.loc[i, "공종2"]
    job_process = test.loc[i, "작업프로세스"]
    accident_object = test.loc[i, "사고객체1"] + ", " + test.loc[i, "사고객체2"]
    human_accident = test.loc[i, "인적사고1"]
    accident_cause = test.loc[i, "사고원인"]

    user_prompt = user_prompt_template.format(
        job_process=job_process,
        gongjong=gongjong,
        human_accident=human_accident,
        accident_object=accident_object,
        accident_cause=accident_cause,
    )
    query_list.append(user_prompt)

## Model Load

In [None]:
qwen_model_id = "Qwen/Qwen2.5-7B-Instruct"
gemma_model_id = "rtzr/ko-gemma-2-9b-it"

# gemma 
gemma_model = AutoModelForCausalLM.from_pretrained(
    gemma_model_id,
    torch_dtype=torch.bfloat16,
    device_map="auto",
)
gemma_tokenizer = AutoTokenizer.from_pretrained(gemma_model_id)

# qwen 
qwen_model = AutoModelForCausalLM.from_pretrained(
    qwen_model_id,
    attn_implementation="flash_attention_2",
    torch_dtype=torch.bfloat16,
    device_map="auto",
)
qwen_tokenizer = AutoTokenizer.from_pretrained(qwen_model_id)

## RUN

- 사용한 프롬프트: 

    - system 의 경우 `ver 0`
    
    - user 의 경우 `ver 1`

- 기본 쿼리로 사용한 컬럼

    - `"공종2", "작업프로세스", "사고객체1", "사고객체2", "인적사고1", "사고원인"`

- 1단계: gemma 모델 사용 (메인)

    - 최대 5회 재시도 (빈 값일 경우)

- 2단계: qwen 모델 사용 (gemma 실패시)

    - 최대 10회 재시도 (중국어, 이모티콘, 빈 값일 경우)

- 3단계: 다시 gemma 모델 사용 (qwen 실패시)

    - 최대 3회 재시도

- 마지막 단계

    - 빈 값 저장

In [None]:
temp_qe_path = f"{local_path}/{temp_qe_filename}"

# 메인 모델, 서브 모델 설정
main_model = gemma_model
main_tokenizer = gemma_tokenizer
sub_model = qwen_model
sub_tokenizer = qwen_tokenizer

for i in range(len(query_list)):
    user_prompt = query_list[i]

    # 1단계: gemma 모델 사용 (메인)
    print(f"[{i+1}/{len(query_list)}] Level 1: Gemma", "-----" * 10)
    retry_count = 0
    max_retries_main = 5

    # 최대 5회 재시도 (빈 값일 경우)
    while retry_count < max_retries_main:
        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]

        text = main_tokenizer.apply_chat_template(
            messages, tokenize=False, add_generation_prompt=True
        )
        model_inputs = main_tokenizer([text], return_tensors="pt").to(main_model.device)

        generated_ids = main_model.generate(
            **model_inputs,
            max_new_tokens=1024,
            do_sample=True,
            temperature=0.1,
        )
        generated_ids = [
            output_ids[len(input_ids) :]
            for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
        ]

        response = main_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[
            0
        ]
        response = response.strip()

        # 빈 값 확인 (gemma는 빈 값만 확인)
        if not response:
            retry_count += 1
            print(
                f"[{i+1}/{len(query_list)}] Empty Response. Retrying... ({retry_count}/{max_retries_main})"
            )
            continue

        # 유효한 응답을 얻었으면 저장하고 종료
        response_split = response.strip().split("\n")
        five_q = {"questions": response_split, "test_id": test.loc[i, "ID"]}
        with open(temp_qe_path, "a", encoding="utf-8-sig") as f:
            f.write(json.dumps(five_q, ensure_ascii=False) + "\n")
        print(response)
        break

    # 2단계: qwen 모델 사용 (gemma 실패시)
    if retry_count >= max_retries_main:
        print(f"[{i+1}/{len(query_list)}] Level 2: Qwen", "-----" * 10)
        retry_count = 0
        max_retries_sub = 10

        # 최대 10회 재시도 (중국어, 이모티콘, 빈 값일 경우)
        while retry_count < max_retries_sub:
            messages = [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt},
            ]

            text = sub_tokenizer.apply_chat_template(
                messages, tokenize=False, add_generation_prompt=True
            )
            model_inputs = sub_tokenizer([text], return_tensors="pt").to(
                sub_model.device
            )

            generated_ids = sub_model.generate(
                **model_inputs,
                max_new_tokens=1024,
                do_sample=True,
                temperature=0.1,
            )
            generated_ids = [
                output_ids[len(input_ids) :]
                for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
            ]

            response = sub_tokenizer.batch_decode(
                generated_ids, skip_special_tokens=True
            )[0]
            response = response.strip()

            # qwen은 중국어와 특수문자 모두 확인
            if contains_chinese(response) or contains_special_chars(response):
                retry_count += 1
                print(
                    f"[{i+1}/{len(query_list)}] Chinese/Special Character in Response. Retrying... ({retry_count}/{max_retries_sub})"
                )
                continue

            # 빈 응답인 경우도 재시도
            if not response:
                retry_count += 1
                print(
                    f"[{i+1}/{len(query_list)}] Empty Response. Retrying... ({retry_count}/{max_retries_sub})"
                )
                continue

            # 유효한 응답을 얻었으면 저장하고 종료
            response_split = response.strip().split("\n")
            five_q = {"questions": response_split, "test_id": test.loc[i, "ID"]}
            with open(temp_qe_path, "a", encoding="utf-8-sig") as f:
                f.write(json.dumps(five_q, ensure_ascii=False) + "\n")
            print(response)
            break

        # 3단계: 다시 gemma 모델 사용 (qwen 실패시)
        if retry_count >= max_retries_sub:
            print(f"[{i+1}/{len(query_list)}] Level 3: Gemma", "-----" * 10)
            retry_count = 0
            max_retries_final = 3

            # 최대 3회 재시도
            while retry_count < max_retries_final:
                messages = [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_prompt},
                ]

                text = main_tokenizer.apply_chat_template(
                    messages, tokenize=False, add_generation_prompt=True
                )
                model_inputs = main_tokenizer([text], return_tensors="pt").to(
                    main_model.device
                )

                generated_ids = main_model.generate(
                    **model_inputs,
                    max_new_tokens=1024,
                    do_sample=True,
                    temperature=0.1,
                )
                generated_ids = [
                    output_ids[len(input_ids) :]
                    for input_ids, output_ids in zip(
                        model_inputs.input_ids, generated_ids
                    )
                ]

                response = main_tokenizer.batch_decode(
                    generated_ids, skip_special_tokens=True
                )[0]
                response = response.strip()

                # 빈 값 확인 (gemma는 빈 값만 확인)
                if not response:
                    retry_count += 1
                    print(
                        f"[{i+1}/{len(query_list)}] Empty Response. Retrying... ({retry_count}/{max_retries_final})"
                    )
                    continue

                # 유효한 응답을 얻었으면 저장하고 종료
                response_split = response.strip().split("\n")
                five_q = {"questions": response_split, "test_id": test.loc[i, "ID"]}
                with open(temp_qe_path, "a", encoding="utf-8-sig") as f:
                    f.write(json.dumps(five_q, ensure_ascii=False) + "\n")
                print(response)
                break

            # 모든 단계 실패시 빈 리스트 저장
            if retry_count >= max_retries_final:
                print(f"[{i+1}/{len(query_list)}] All Steps Failed. Empty List Saved.")
                five_q = {"questions": [], "test_id": test.loc[i, "ID"]}
                with open(temp_qe_path, "a", encoding="utf-8-sig") as f:
                    f.write(json.dumps(five_q, ensure_ascii=False) + "\n")

# Preprocessing

- `questions` 는 저장될 때 `\n` 를 기준으로 split 되어 저장됨
    
    - 아래처럼 태그만 추출하면 intro 설명(Here ~), 빈 값, Explanation 등 필요없는 부분을 쉽게 처리할 수 있음

    - user prompt 를 ver 1 로 사용한 이유임

    - `questions` 에서 `"<q{N}>` 은 각 964개로 test 개수와 동일하게 생성됨 (간단하게 ctrl+f) 

    - `</q{N}>` 태그 제거 후, 질문 number 도 제거하면 쉽게 전처리 가능
        
        ```json
        "questions": [
            "Here are 5 questions in Korean to help identify preventive measures from safety guideline documents:",
            "",
            "<q1> TSC GIRDER 조립 시 SPLICE PLATE 설치 시 작업자의 안전 거리 및 작업 공간 확보에 대한 안전 지침은 무엇인가요? ",
            "<q2> 철근 및 건설 자재의 안전 보관 및 이동 시 적용되는 안전 장비 및 절차는 무엇인가요?",
            "<q3> 후두부 부딪힘 사고 예방을 위한 TSC GIRDER 조립 작업 시 안전 교육 및 훈련 프로그램은 무엇인가요?",
            "<q4>  SPLICE PLATE 설치 작업 시 작업자의 시야 확보를 위한 안전 장치나 설계 요구 사항은 무엇인가요?",
            "<q5>  작업 환경의 높은 곳에서 작업 시 안전 벨트, 안전줄 등의 안전 장비 사용 의무 및 점검 기준은 무엇인가요? ",
            "",
            "",
            "",
            "**Explanation:**",
            "",
            "* **q1:** Focuses on safe distances and workspace during SPLICE PLATE installation.",
            "* **q2:**  Addresses safe handling and storage of materials to prevent accidental collisions.",
            "* **q3:**  Seeks information on training programs specifically designed to prevent head injuries.",
            "* **q4:**  Investigates safety devices or design requirements to ensure clear visibility during installation.",
            "* **q5:**  Covers mandatory use and inspection protocols for fall protection equipment in elevated work areas."
        ],
        ```

- 영어 결과가 남아있음

    - 2차 전처리 필요할 듯

    - 볼드체 기호 "**" 가 2개가 있는데 마지막인 2번째 기호에서 split 하면 됨

    - 이후 남아있는 볼드체 기호를 replace 로 제거하면 될 듯

        ```json
        "questions": [
            "**E/V홀 작업 시,  자재 보관 및 안전 거리에 대한 안전 지침은 무엇인가요?**  (What are the safety guidelines for storing materials and maintaining safe distances during E/V hall work?)",
            "**기계설비공사 중 이동 시,  높은 곳에서의 물체 낙하 방지 조치는 어떻게 해야 하나요?** (What measures should be taken to prevent falling objects during movement in machinery installation work?)",
            "**집수정 막음 조치 후 작업자의 안전을 위한 추가적인 안전 절차는 무엇인가요?** (What additional safety procedures are required for workers after implementing water stop measures?)",
            "**E/V문틀과 같은 건설 자재의 안전한 운반 및 이동 방법은 무엇인가요?** (What are the safe methods for transporting and moving construction materials like E/V frames?)",
            "**작업 공간 내 물체 낙하 사고 예방을 위한 정기적인 점검 및 관리 계획은 어떻게 수립되어야 하나요?** (How should a regular inspection and maintenance plan be established to prevent falling object accidents in the work area?)"
        ],
        ```

In [5]:
with open(f"{local_path}/{temp_qe_filename}", "r", encoding="utf-8-sig") as f:
    qe_data = [json.loads(line) for line in f]

# 각 데이터에서 <q1>, <q2>, ...<q5>로 시작하는 질문만 추출하고 태그와 번호 제거
# 그리고 영어 설명이 있는 경우 제거 (볼드체 기호 "**" 기준으로 분리)
processed_qe_data = []
for data in qe_data:
    questions = data["questions"]
    filtered_questions = []

    for q in questions:
        for i in range(1, 6):
            if q.strip().startswith(f"<q{i}>"):
                # 태그와 번호 제거 처리
                clean_q = q
                # <q1>, <q2> 등의 태그 제거
                clean_q = re.sub(r"<q\d+>", "", clean_q)
                # </q1>, </q2> 등의 닫는 태그 제거
                clean_q = re.sub(r"</q\d+>", "", clean_q)
                # 숫자와 점으로 시작하는 패턴 제거 (예: "1. ", "2. ")
                clean_q = re.sub(r"^\s*\d+\.\s*", "", clean_q)
                # 앞뒤 공백 제거
                clean_q = clean_q.strip()

                # 영어 설명이 있는 경우 처리 (볼드체 기호 "**" 기준으로 분리)
                if "**" in clean_q:
                    # 마지막 볼드체 기호를 기준으로 분리
                    parts = clean_q.split("**")
                    if len(parts) >= 2:
                        clean_q = "**".join(parts[:-1])
                    # 남아있는 볼드체 기호 제거
                    clean_q = clean_q.replace("**", "")

                # 앞뒤 공백 제거
                clean_q = clean_q.strip()

                filtered_questions.append(clean_q)
                break

    if len(filtered_questions) == 5:
        processed_qe_data.append(
            {"questions": filtered_questions, "test_id": data["test_id"]}
        )
    else:
        print(
            f"Warning: {data['test_id']}에서 5개의 질문을 찾지 못했습니다. 찾은 질문 수: {len(filtered_questions)}"
        )

# jsonl 형식으로 전처리 결과 저장
with open(f"{data_path}/{qe_filename}", "w", encoding="utf-8-sig") as f:
    for data in processed_qe_data:
        f.write(json.dumps(data, ensure_ascii=False) + "\n")