In [1]:
import os, json, time, uuid
from typing import List, Dict

from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.retrievers import EnsembleRetriever

In [None]:
# ========= 사용자 설정 =========
INPUT_JSONL  = "./eval_driver_insurance.jsonl"   # 기존 데이터셋
OUTPUT_JSONL = "./eval_driver_insurance_rebuilt.jsonl"  # 결과 파일
REWRITE_CONTEXTS = True       # 컨텍스트도 새로 검색해 갈아끼우기 여부
K = 3                         # 검색 문서 k
MODEL = "gpt-4o-mini"         # LLM 모델
TEMP  = 0.1                   # LLM temperature
MAX_TOKENS = 1500
RATE_LIMIT_SLEEP = 0.8        # 과한 호출 방지용 딜레이(초)

In [4]:
# ======== 벡터DB/리트리버 연결 ========
connection_string = "postgresql+psycopg2://play:123@192.168.0.8:5432/team3"
embedding = OpenAIEmbeddings()

collection_names = ['DB', 'samsung', 'hanwha', "현대해상", "test", "meritzfire"]
retrievers = []
for name in collection_names:
    vectorstore = PGVector(
        embeddings=embedding,
        collection_name=name,
        connection=connection_string,
        use_jsonb=True
    )
    retrievers.append(vectorstore.as_retriever(search_kwargs={'k': K}))

combined_retriever = EnsembleRetriever(retrievers=retrievers)

# ======== LLM & 프롬프트 ========
# ※ 프롬프트는 원하시는 대로 자유롭게 바꾸시면 됩니다.
template_dw =   """당신은 여러 보험사의 상품을 비교 분석하여 사용자에게 가장 쉽고 친절하게 정보를 전달하는 보험 전문가 챗봇입니다.

다음 지시사항을 철저히 따라 사용자의 질문에 한국어로 답변하세요.

답변의 서식:
마크다운(Markdown)을 사용하여 답변을 구조화하세요.
줄바꿈은 최대 두 칸만 사용하세요.
주요 정보는 볼드체로 강조하세요.

모든 섹션은 반드시 아래 계층을 따릅니다.
1) 제목: ##  (한 답변에 2~4개 이하 권장)
2) 소제목: ###  (상품명/항목명/소단락 제목은 반드시 소제목으로, 불릿 금지)
3) 세부 포인트: -  불릿 목록 (각 소제목 아래 설명은 모두 불릿으로 정리)
불릿은 “- ” + 공백 + 텍스트 형식만 사용합니다.
헤딩 앞뒤 빈 줄은 딱 1줄만, 불릿 사이에는 빈 줄 금지.

여러 회사를 비교하는 경우, 반드시 마크다운 표를 활용하여 설명과 함께 항목별로 정리하세요.
표를 만들 때는 필요시 '추천도' 열을 포함하세요.
추천도 열에만 ⭕ / ▲ / ○(보통) / × / - 다섯 가지 기호만 사용하세요.
추천도 열 사용시, 표 바로 아래줄에 반드시 범례를 추가하세요: ⭕ 강력 추천 / ▲ 조건부 / ○ 보통 / × 없음 / - 미확인


답변의 내용:
전문 용어는 피하고, 누구나 이해할 수 있도록 쉽고 자세하게 설명하세요.
답변은 질문에 대한 직접적인 내용만을 포함하며, 불필요한 서론이나 결론은 제외하세요.
제시된 '컨텍스트' 내의 정보만 사용하세요. 컨텍스트에 없는 내용은 절대로 지어내지 마세요.
컨텍스트에 없는 내용을 질문할 시, 모른다고 말하지말고 질문한 회사의 전화번호나 링크를 줘서 직접 내용을 탐색하도록 유도하세요.
비교할때는 차이점을 명확하게 구체적으로 설명하세요.
추천 시에는 그 이유(보장 범위, 특화 옵션, 지급 조건 등)를 구체적으로 설명하세요.
사용자 상황(사고 유형/과실/연령 등)을 반영해 차이점·추천 근거를 구체적으로 제시.

출처 표기:
답변에 사용된 모든 정보는 마지막에 출처(파일명/페이지)를 명확히 명시하세요.
출처는 항상 괄호(()) 안에 (출처: 파일명, 페이지 p.N) 형식으로 표시하세요.

마지막에는 항상 더 물어볼 내용이 있는지 물어보면서 대화를 더 길게하도록 유도해

[대화 기록]
{history}

[질문]
{question}

[컨텍스트]
{context}
"""


prompt_dowon = ChatPromptTemplate.from_template(template_dw)

llm = ChatOpenAI(model=MODEL, temperature=TEMP, max_tokens=MAX_TOKENS)
chain = (
    {
        "context": RunnablePassthrough(),  # 컨텍스트는 바깥에서 준비해 주입
        "question": RunnablePassthrough()
    }
    | prompt_dowon
    | llm
    | StrOutputParser()
)

# ======== 유틸 함수들 ========
def format_docs_for_prompt(docs: List[Document]) -> str:
    """프롬프트 주입용: 본문 + (파일명/페이지) 라벨 포함"""
    chunks = []
    source_map = {}
    counter = 1
    for d in docs:
        src = d.metadata.get("source", "알 수 없는 출처")
        page = d.metadata.get("page", None)
        if isinstance(page, int):
            page_disp = page + 1
        else:
            page_disp = page or "?"
        if src not in source_map:
            source_map[src] = counter
            counter += 1
        doc_id = source_map[src]
        base = os.path.basename(src)
        chunks.append(f"[문서 {doc_id} - {base}, 페이지 {page_disp}]\n{d.page_content}\n")
    return "\n".join(chunks)

def docs_to_context_list(docs: List[Document], max_chars: int = 1200) -> List[str]:
    """데이터셋 저장용: contexts 필드(List[str])에 넣을 적당한 길이의 청크"""
    out = []
    for d in docs:
        src = d.metadata.get("source", "unknown")
        page = d.metadata.get("page", None)
        page_disp = page + 1 if isinstance(page, int) else page or "?"
        head = f"[{os.path.basename(src)} p.{page_disp}] "
        text = (d.page_content or "").strip()
        # 너무 긴 경우 잘라서 저장
        clipped = (text[:max_chars] + "…") if len(text) > max_chars else text
        out.append(head + clipped)
    return out

def retrieve_contexts(question: str, k: int = K) -> List[Document]:
    return combined_retriever.get_relevant_documents(question)

def load_jsonl(path: str) -> List[Dict]:
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or not line.startswith("{"):
                continue
            try:
                rows.append(json.loads(line))
            except json.JSONDecodeError:
                # 만약 }{ 형태로 붙어있는 라인이 있다면 보정 시도
                try:
                    fixed = "[" + line.replace("}\n{", "},{") + "]"
                    rows.extend(json.loads(fixed))
                except Exception:
                    pass
    return rows

def save_jsonl(rows: List[Dict], path: str):
    with open(path, "w", encoding="utf-8") as f:
        for r in rows:
            f.write(json.dumps(r, ensure_ascii=False) + "\n")


In [5]:

# ======== 본 작업 파이프라인 ========
dataset = load_jsonl(INPUT_JSONL)
print(f"입력 샘플 수: {len(dataset)}")

rebuilt = []
for i, row in enumerate(dataset, 1):
    q = row.get("question", "").strip()
    if not q:
        continue

    # 컨텍스트 새로 뽑기 또는 기존 유지
    if REWRITE_CONTEXTS:
        docs = retrieve_contexts(q)
        ctx_for_prompt = format_docs_for_prompt(docs)
        ctx_for_json = docs_to_context_list(docs)
    else:
        # 기존 contexts를 프롬프트에 주입 가능한 문자열로 변환
        old_ctx_list = row.get("contexts", []) or []
        ctx_for_prompt = "\n".join(f"[컨텍스트 {idx+1}]\n{c}\n" for idx, c in enumerate(old_ctx_list))
        ctx_for_json = old_ctx_list

    # LLM으로 새 answer 생성
    try:
        answer = chain.invoke({"question": q, "context": ctx_for_prompt})
    except Exception as e:
        # 실패 시 빈 문자열로 남기고 계속 진행
        answer = f"(생성 실패: {e})"

    # 결과 레코드 구성: id/timestamp/ground_truth 등은 원본 유지
    new_row = dict(row)
    new_row["answer"] = answer
    new_row["contexts"] = ctx_for_json
    # 필요 시 prompt 버전 표시(옵션)
    new_row["prompt_version"] = "template_dw.v2"

    rebuilt.append(new_row)

    if i % 5 == 0:
        print(f"진행 {i}/{len(dataset)}")
    time.sleep(RATE_LIMIT_SLEEP)


print(ctx_for_json)

save_jsonl(rebuilt, OUTPUT_JSONL)
print("완료:", OUTPUT_JSONL, "샘플 수:", len(rebuilt))

입력 샘플 수: 30


  return combined_retriever.get_relevant_documents(question)


진행 5/30
진행 10/30
진행 15/30
진행 20/30
진행 25/30
진행 30/30
['[무배당 삼성화재 운전자보험 안전운전 파트너 플러스(2504.8) 1종(연만기, 납입면제형).pdf p.460] 외상성 상부관절와순 파열로 수술을 시행한 상해 \n10. 어깨관절 탈구로 수술을 시행한 상해 \n11. 어깨관절의 골절 및 탈구로 수술을 시행하지 않은 상해 \n12. 위팔뼈 대결절 견열 골절 \n13. 위팔뼈 먼쪽 부위 견열골절(외상과 골절, 내상과 골절 등에 해당한다) \n14. 팔꿈치관절 부위 골절 및 탈구로 수술을 시행하지 않은 상해 \n15.', '[한화 운전자상해보험 무배당 2504.pdf p.258] 경찰서에서 발행한 교통사고사실확인원3. 경찰서 또는 검찰청에 제출된 자동차 교통사고 형사합의서(단, 합의금액이 명시되어 있어야 하며, 합의금액을 장래에 지급한다는 내용이 포함되어 있어야 합니다)4. 검찰에 의해 기소된 경우 검찰청에서 발행한 공소장5.', '[DB운전자보험.pdf p.42] - 경찰서에 제출된 형사합의서(합의금액 명시)\n- 피해자 공탁금 출금 확인서(미합의시) 및\n  공탁서\n- 경찰서/법원\n- 법원\n- 법원', '[현대해상약관.pdf p.26] 방어비용 - 공소 제기시(약식기소 포함)   ① 교통사고사실확인원   ② 약식명령문 또는 법원 판결문경찰서법원 - 구속시   ① 구속영장 또는 사건처분증명원   ② 재소 또는 출소증명원법원구치소변호사선임비용 - 교통사고사실확인원 - 판결문, 공소장, 변호사가 발행한 세금계산서 - 구속영장 또는 사건처분증명원 - 재소 또는 출소증명원경찰서/법원구치소변호사사무소면허정지/취소위로금 - 교통사고사실확인원 - 운전경력증명서 - 면허정지확인원(교육 이수 후) - 면허취소확인원경찰서★ 당사 자동차보험 가입자는 별도 서류 없이 당사에서 확인 가능합니다.', '[unknown p.?] 한국의 수도는 서울입니다.', '[메리츠 운전자 상해 종합보험.pdf p.429] 경골하 3분의 1부 분쇄성골절

In [None]:
# -*- coding: utf-8 -*-
import os, json, time
from typing import List, Dict
from datasets import Dataset

from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_postgres import PGVector
from langchain.retrievers import EnsembleRetriever

# =========================
# 사용자 설정
# =========================
INPUT_JSONL  = "./eval_driver_insurance.jsonl"  # 기존 jsonl 파일 경로
REWRITE_CONTEXTS = True      # True: retriever로 contexts 새로 생성 / False: 기존 contexts 사용
K = 3                        # 검색 문서 수
MODEL = "gpt-4o-mini"
TEMP  = 0.1
MAX_TOKENS = 1500
RATE_LIMIT_SLEEP = 0.7       # 과도 호출 방지(초)

# =========================
# 프롬프트 (원하시면 자유 수정)
# =========================
template_dw =  """당신은 여러 보험사의 상품을 비교 분석하여 사용자에게 가장 쉽고 친절하게 정보를 전달하는 보험 전문가 챗봇입니다.

다음 지시사항을 철저히 따라 사용자의 질문에 한국어로 답변하세요.

답변의 서식:
- 마크다운(Markdown)을 사용하여 답변을 구조화하세요.
- 주요 정보는 **굵게** 강조하세요.
- 여러 회사를 비교하는 경우, 반드시 표를 활용하여 항목별로 정리하세요.
- 항목 나열은 체크리스트(-, *, 숫자 목록)로 표현하세요.

답변의 내용:
- 전문 용어는 피하고, 누구나 이해할 수 있도록 쉽고 자세하게 설명하세요.
- 질문에 대한 직접적인 내용만 포함하세요. 불필요한 서론/결론 금지.
- 반드시 [컨텍스트] 내 정보만 사용하세요. 컨텍스트에 없는 내용은 절대로 지어내지 마세요.
- 사용자의 상황(사고 유형, 과실 비율, 연령 등)을 고려해서 답변하세요.
- 비교 시 차이점을 구체적으로 설명하세요.
- 추천 시 이유(보장 범위, 특화 옵션, 지급 조건 등)를 구체적으로 설명하세요.

출처 표기:
- 답변 마지막에 사용한 모든 정보의 출처를 (출처: 파일명, 페이지 p.N) 형식으로 괄호에 넣어 명시하세요.

마지막에는 항상 더 물어볼 내용이 있는지 1문장으로 물어보세요.

[질문]
{question}

[컨텍스트]
{context}
"""
prompt_dowon = ChatPromptTemplate.from_template(template_dw)

# =========================
# 벡터DB/리트리버 준비 (환경에 맞게 connection_string/collection_names 수정 가능)
# =========================
connection_string = "postgresql+psycopg2://play:123@192.168.0.1:5432/team3"
embedding = OpenAIEmbeddings()

collection_names = ['DB', 'samsung', 'hanwha', "현대해상", "test", "meritzfire"]
retrievers = []
for name in collection_names:
    vectorstore = PGVector(
        embeddings=embedding,
        collection_name=name,
        connection=connection_string,
        use_jsonb=True
    )
    retrievers.append(vectorstore.as_retriever(search_kwargs={'k': K}))
combined_retriever = EnsembleRetriever(retrievers=retrievers)

# =========================
# LLM 체인
# =========================
llm = ChatOpenAI(model=MODEL, temperature=TEMP, max_tokens=MAX_TOKENS)
chain = (
    {
        "context": RunnablePassthrough(),   # 바깥에서 문자열 형태로 주입
        "question": RunnablePassthrough()
    }
    | prompt_dowon
    | llm
    | StrOutputParser()
)

# =========================
# 유틸 함수
# =========================
def load_jsonl(path: str) -> List[Dict]:
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            s = line.strip()
            if not s or not s.startswith("{"):
                continue
            try:
                rows.append(json.loads(s))
            except json.JSONDecodeError:
                # 깨진 라인이 있으면 스킵
                continue
    return rows

def format_docs_for_prompt(docs: List[Document]) -> str:
    """프롬프트 주입용: 본문 + (파일명/페이지) 라벨 포함"""
    chunks = []
    source_map = {}
    counter = 1
    for d in docs:
        src = d.metadata.get("source", "알 수 없는 출처")
        page = d.metadata.get("page", None)
        page_disp = page + 1 if isinstance(page, int) else (page or "?")
        if src not in source_map:
            source_map[src] = counter
            counter += 1
        doc_id = source_map[src]
        base = os.path.basename(src)
        chunks.append(f"[문서 {doc_id} - {base}, 페이지 {page_disp}]\n{(d.page_content or '').strip()}\n")
    return "\n".join(chunks)

def docs_to_context_list(docs: List[Document], max_chars: int = 1200) -> List[str]:
    """데이터셋 저장용 contexts(List[str]) 생성 (길이 제한)"""
    out = []
    for d in docs:
        src = d.metadata.get("source", "unknown")
        page = d.metadata.get("page", None)
        page_disp = page + 1 if isinstance(page, int) else (page or "?")
        head = f"[{os.path.basename(src)} p.{page_disp}] "
        text = (d.page_content or "").strip()
        clipped = (text[:max_chars] + "…") if len(text) > max_chars else text
        out.append(head + clipped)
    return out

# =========================
# 1) JSONL 로드
# =========================
rows = load_jsonl(INPUT_JSONL)
print(f"로드된 샘플 수: {len(rows)}")

# =========================
# 2) 새 answer/contexts 생성
# =========================
new_questions: List[str] = []
new_answers:   List[str] = []
new_contexts:  List[List[str]] = []
new_ground:    List[str] = []

for i, r in enumerate(rows, 1):
    q = (r.get("question") or "").strip()
    if not q:
        continue

    # 컨텍스트 준비
    if REWRITE_CONTEXTS:
        docs = combined_retriever.get_relevant_documents(q)
        ctx_for_prompt = format_docs_for_prompt(docs)
        ctx_for_json   = docs_to_context_list(docs)
    else:
        ctx_list = r.get("contexts") or []
        # 프롬프트 주입용 문자열로 변환
        ctx_for_prompt = "\n".join(f"[컨텍스트 {idx+1}]\n{c}\n" for idx, c in enumerate(ctx_list))
        ctx_for_json   = ctx_list

    # LLM으로 answer 생성
    try:
        answer = chain.invoke({"question": q, "context": ctx_for_prompt})
    except Exception as e:
        answer = f"(생성 실패: {e})"

    # 누적
    new_questions.append(q)
    new_answers.append(answer)
    new_contexts.append(ctx_for_json)
    new_ground.append((r.get("ground_truth") or "").strip())

    if i % 5 == 0:
        print(f"진행: {i}/{len(rows)}")
    time.sleep(RATE_LIMIT_SLEEP)

print("생성 완료.")

# =========================
# 3) RAGAS용 Dataset 생성
# =========================
eval_data = {
    "question": new_questions,
    "answer": new_answers,
    "contexts": new_contexts,       # List[List[str]]
    "ground_truth": new_ground
}
dataset = Dataset.from_dict(eval_data)

# 확인
print(dataset)
print("열:", dataset.column_names)
print("샘플 예시:", dataset[0]["question"][:40], " | ctx len:", len(dataset[0]["contexts"]))

# (선택) 디스크 저장: Arrow 포맷으로 저장하고 싶다면
# dataset.save_to_disk("/mnt/data/ragas_eval_dataset")
