In [None]:
import os
os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["GLOG_minloglevel"] = "2"
import json
from collections import defaultdict
from kiwipiepy import Kiwi
from typing import TypedDict, List, Literal, Any
from langchain_core.documents import Document
from langchain_core.runnables import RunnableParallel, RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from langchain_naver import ChatClovaX, ClovaXEmbeddings
from langchain_community.retrievers import BM25Retriever
from langchain_milvus import Milvus
from sentence_transformers import CrossEncoder
from langgraph.graph import StateGraph, START, END

In [None]:
llm = ChatClovaX(
    model="HCX-DASH-002", 
    max_tokens=1024,
    temperature=0,
    api_key=""
)

embeddings = ClovaXEmbeddings(
    model="bge-m3",
    api_key=""
)

In [3]:
kiwi = Kiwi()

def kiwi_tokenize(text):
    return [token.form for token in kiwi.tokenize(text) if token.tag.startswith('N')]

def get_documents_from_milvus(vector_db):
    collection = vector_db.col
    res = collection.query(
        expr="", 
        output_fields=["text", "idx"], 
        limit=16384
    )
    
    docs = []
    for item in res:
        text = item.get('text')
        idx = item.get('idx') 
        
        if text:
            docs.append(Document(
                page_content=text,
                metadata={"idx": idx} 
            ))
            
    return docs

Quantization is not supported for ArchType::neon. Fall back to non-quantized model.


In [4]:
class ManualEnsembleRetriever:
    def __init__(self, retrievers, weights=None, c=60):
        self.retrievers = retrievers
        self.weights = weights if weights else [0.5] * len(retrievers)
        self.c = c 

    def invoke(self, query):
        results = [r.invoke(query) for r in self.retrievers]
        rrf_score = defaultdict(float)
        doc_map = {}

        for r_idx, docs in enumerate(results):
            weight = self.weights[r_idx]
            for rank, doc in enumerate(docs):
                doc_key = doc.page_content
                if doc_key not in doc_map:
                    doc_map[doc_key] = doc
                
                score = weight * (1 / (rank + 1 + self.c))
                rrf_score[doc_key] += score

        sorted_keys = sorted(rrf_score.keys(), key=lambda k: rrf_score[k], reverse=True)
        return [doc_map[k] for k in sorted_keys]


def create_hybrid_retriever(vector_db, k=20):
    raw_docs = get_documents_from_milvus(vector_db)
    vector_retriever = vector_db.as_retriever(search_kwargs={"k": k})
    
    if not raw_docs:
        return vector_retriever
        
    bm25_retriever = BM25Retriever.from_documents(
        raw_docs, 
        preprocess_func=kiwi_tokenize
    )
    bm25_retriever.k = k
    
    return ManualEnsembleRetriever(
        retrievers=[bm25_retriever, vector_retriever],
        weights=[0.5, 0.5]
    )

In [None]:
title_db = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "https://in03-6919b557b41d797.serverless.gcp-us-west1.cloud.zilliz.com",   
        "token": ""          
    },
    collection_name="title",                   
    auto_id=True
)

standards_db = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "https://in03-6919b557b41d797.serverless.gcp-us-west1.cloud.zilliz.com",   
        "token": ""          
    },
    collection_name="standards",                   
    auto_id=True
)

outline_db = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "https://in03-6919b557b41d797.serverless.gcp-us-west1.cloud.zilliz.com",   
        "token": ""          
    },
    collection_name="outline",                   
    auto_id=True
)

problems_db = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "https://in03-6919b557b41d797.serverless.gcp-us-west1.cloud.zilliz.com",   
        "token": ""          
    },
    collection_name="problems",                   
    auto_id=True
)

opinion_db = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "https://in03-6919b557b41d797.serverless.gcp-us-west1.cloud.zilliz.com",   
        "token": ""          
    },
    collection_name="opinion",                   
    auto_id=True
)

criteria_db = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "https://in03-6919b557b41d797.serverless.gcp-us-west1.cloud.zilliz.com",   
        "token": ""          
    },
    collection_name="criteria",                   
    auto_id=True
)

action_db = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "https://in03-6919b557b41d797.serverless.gcp-us-west1.cloud.zilliz.com",   
        "token": ""          
    },
    collection_name="action",                   
    auto_id=True
)

retrievers = {
    "title": create_hybrid_retriever(title_db),
    "standards": create_hybrid_retriever(standards_db),
    "outline": create_hybrid_retriever(outline_db),
    "problems": create_hybrid_retriever(problems_db),
    "opinion": create_hybrid_retriever(opinion_db),
    "criteria": create_hybrid_retriever(criteria_db),
    "action": create_hybrid_retriever(action_db),
}

In [39]:
class GraphState(TypedDict):
    question: str
    search_query: str
    selected_fields: List[str]
    selected_fields_cot: List[str]
    documents: List[Any]
    is_valid: str
    validator_cot: List[str]
    analysis_decision: str
    strategy_decider_cot: List[str]
    retry_count: int

In [None]:
field_selector_system = """
[역할]
당신은 감사 보고서 기반 RAG 시스템의 필드 선택기(Field Selector)입니다.

[목표]
사용자 질문을 분석하여,
아래에 정의된 7개의 필드 중 질문과 직접적으로 관련된 필드를 정확히 선택하십시오.

[핵심 원칙]
- 모든 판단은 질문의 의미와 의도를 기준으로 수행합니다.
- 추측이나 일반적 관행에 근거한 선택은 허용되지 않습니다.
- 질문에서 명시적으로 요구되거나 논리적으로 필수적인 필드만 선택합니다.

[필수 포함 규칙]
- "outline"과 "problems"는 질문의 유형과 무관하게 항상 포함해야 합니다.

[조건부 선택 규칙]
- title: 사건명이나 특정 사안의 명칭 자체를 묻는 경우에만 선택합니다.
- standards: 법령, 규정, 기준, 위반 여부, 적법성 판단이 질문의 핵심인 경우에만 선택합니다.
- opinion: 관계기관의 입장, 해명, 평가, 의견을 묻는 경우에만 선택합니다.
- criteria: 개선 방안, 재발 방지 대책, 내부통제 강화, 절차 보완을 묻는 경우에만 선택합니다.
- action: 처분, 제재, 징계, 후속 조치의 내용이나 수준을 묻는 경우에만 선택합니다.

[선택 가능한 필드 정의]
- title: 사건 제목, 사안명
- standards: 법령, 규정, 기준, 위반 여부
- outline: 사건의 개요, 배경, 전체 상황
- problems: 위반 사항, 문제점, 부적정 사례
- opinion: 관계기관의 의견, 평가, 입장
- criteria: 개선 방안, 내부통제, 절차 보완
- action: 처분, 제재, 징계, 후속 조치

[출력 형식]
- 출력은 반드시 하나의 JSON 객체여야 합니다.
- JSON에는 아래 두 개의 키만 포함해야 합니다.

{{
  "selected_fields": [소문자 문자열 리스트],
  "cot": [
    "Step 1: 질문의 핵심 의도와 요구 정보를 분석한다.",
    "Step 2: 필수 규칙에 따라 outline과 problems를 포함한다.",
    "Step 3: 질문의 내용에 따라 추가로 필요한 필드를 판단하여 선택한다."
  ]
}}

[출력 규칙]
- selected_fields에는 항상 "outline"과 "problems"가 포함되어야 합니다.
- selected_fields_cot는 단계적 판단 과정을 나타내는 문자열 리스트여야 합니다.
- selected_fields_cot는 최소 3단계 이상 작성해야 합니다.
- JSON 외의 텍스트는 절대 출력하지 마십시오.

[출력 예시]
{{
  "selected_fields": ["outline", "problems", "standards"],
  "selected_fields_cot": [
    "Step 1: 질문은 사건에서 법령이나 규정 위반 여부를 확인하려는 목적이다.",
    "Step 2: 모든 질문에 필수로 포함해야 하는 outline과 problems를 선택한다.",
    "Step 3: 위반된 법령과 기준 판단이 필요하므로 standards를 추가한다."
  ]
}}
"""

field_selector_user = """
[Question]
{question}
"""

field_selector_template = ChatPromptTemplate.from_messages([
    ("system", field_selector_system),
    ("human", field_selector_user)
])

field_selector_chain = (
    field_selector_template
    | llm
    | JsonOutputParser()
)


validator_system = """
[역할]
당신은 감사 보고서 검색 결과의 유효성을 판별하는 검증관(Validator)입니다.

[목표]
사용자의 질문(question)과 검색된 문서(context)를 비교하여,
제공된 문서만을 근거로 질문에 대해 직접적인 답변이 가능한지를 판단하십시오.

[사고 방식]
- 판단은 단계적 사고 과정을 거쳐 수행해야 합니다.
- 각 단계는 질문 요구, 문서 확인, 답변 가능성 판단의 흐름의 순서를 따라야 합니다.
- 판단 과정은 validator_cot에 반드시 기록해야 합니다.

[판단 원칙]
- 판단은 오직 제공된 문서(context)에 근거해야 합니다.
- 외부 지식, 일반 상식, 추론 보완은 허용되지 않습니다.
- 문서에 질문의 핵심 정보가 명시적으로 없으면 반드시 "no"로 판단하십시오.
- 애매한 경우에는 반드시 "no"를 선택하십시오.

[출력 형식]
출력은 반드시 하나의 JSON 객체여야 하며,
아래 두 개의 키만 포함해야 합니다.

{{
  "is_valid": "yes" 또는 "no",
  "validator_cot": [
    "Step 1: 질문이 요구하는 핵심 정보와 판단 기준을 식별한다.",
    "Step 2: 문서에서 해당 정보가 명시적으로 존재하는지 확인한다.",
    "Step 3: 문서만으로 질문에 직접 답변 가능한지 판단한다."
  ]
}}

[출력 규칙]
- is_valid 값은 반드시 소문자 문자열 "yes" 또는 "no"만 허용됩니다.
- validator_cot는 반드시 단계별 사고 흐름을 나타내는 문자열 리스트여야 합니다.
- validator_cot는 최소 3단계 이상 작성해야 합니다.
- JSON 외의 텍스트는 절대 출력하지 마십시오.

[출력 예시]
{{
  "is_valid": "no",
  "validator_cot": [
    "Step 1: 질문은 출장비 부당 집행 이후 어떤 징계나 처분이 있었는지를 묻고 있다.",
    "Step 2: 검색된 문서에는 출장비 부당 집행 사례와 문제점은 있으나, 징계나 처분 내용은 명시되어 있지 않다.",
    "Step 3: 문서 내용만으로는 처분 여부를 답할 수 없으므로 유효하지 않다고 판단했다."
  ]
}}
"""

validator_user = """
[Question]
{question}

[Retrieved Documents]
{context}
"""

validator_prompt = ChatPromptTemplate.from_messages([
    ("system", validator_system),
    ("human", validator_user)
])

validator_chain = (
    validator_prompt
    | llm
    | JsonOutputParser()
)


strategy_decider_system = """
[역할]
당신은 검색 실패 원인을 분석하고 다음 행동 전략을 결정하는 전략 결정자(StrategyDecider)입니다.

[상황]
현재 검색 결과는 검증기(Validator)를 통과하지 못했습니다.
당신은 아래 세 가지 정보를 모두 참고하여,
다음 단계에서 취할 전략을 하나 선택해야 합니다.

- 필드 선택기의 사고 과정(selected_fields_cot)
- 현재 선택된 필드 목록(current_fields)
- 검증기의 사고 과정(validator_cot)

[선택 가능한 전략]
1. rewrite_query
   - 질문의 핵심 정보 유형은 맞으나, 현재 쿼리는 검색에 적합하지 않은 형태입니다.
   - 질문이 종합·판단형으로 작성되어 있어 사례(action)나 처분 결과가 직접 검색되지 않았습니다.
   - 쿼리를 사례 조회 중심, 처분·징계 중심의 검색형 문장으로 재작성하면
     다른 문서가 검색될 가능성이 높습니다.
   - new_query를 실제로 의미 있게 재작성할 수 있는 경우에만 선택해야 합니다.

2. update_fields
   - selected_fields_cot에서의 판단은 부분적으로 타당했습니다.
   - 문서의 주제는 질문과 대체로 일치합니다.
   - 그러나 질문이 요구하는 특정 정보 유형(규정, 조치, 기준 등)이 문서에 없습니다.
   - 현재 선택되지 않은 다른 필드를 추가하면 답변 가능성이 높아질 것으로 판단됩니다.

[선택 가능한 전체 필드 목록]
title, outline, problems, standards, criteria, action, opinion

[필수 제약 규칙 — 매우 중요]
- missing_fields는 반드시 current_fields에 포함되지 않은 필드만 선택해야 합니다.
- 이미 선택된 필드를 다시 선택하는 것은 절대 허용되지 않습니다.
- current_fields를 제외한 나머지 필드 중에서만 missing_fields를 구성하십시오.
- update_fields 전략을 선택했음에도 추가할 필드가 없다면,
  반드시 rewrite_query 전략을 선택해야 합니다.
- rewrite_query 전략을 선택한 경우,
  new_query는 원본 질문과 문장 구조 또는 검색 초점이 반드시 달라야 합니다.
- 원본 질문을 그대로 반복하거나 의미만 바꾼 수준의 쿼리는 허용되지 않습니다.

[사고 방식]
- Step 1에서는 selected_fields_cot를 분석하여,
  현재 필드 구성이 질문의 정보 요구에는 적절했는지 판단합니다.
- Step 2에서는 validator_cot를 분석하여,
  문서에 사례(action) 또는 처분 결과가 존재하지 않았다는 점을 명확히 식별합니다.
- Step 3에서는 다음 중 하나를 명확히 결정합니다.
  - 필드는 충분하지만, 사례·처분을 직접 검색할 수 있도록 쿼리를 재작성해야 한다.
  - 필드 자체가 부족하므로 필드를 확장해야 한다.
- 이 판단 과정은 strategy_decider_cot에 구체적인 판단 근거로 기록해야 합니다.

[출력 형식 - rewrite_query]
출력은 반드시 하나의 JSON 객체여야 하며,
아래 네 개의 키만 포함해야 합니다.

{{
  "strategy": "rewrite_query",
  "missing_fields": [],
  "new_query": "사례·처분·징계·조치(action)를 직접 검색할 수 있도록 재작성된 검색 쿼리",
  "strategy_decider_cot": [
    "이번 질문과 검색 실패 상황에 근거한 실제 판단 내용 1",
    "이번 질문과 검색 실패 상황에 근거한 실제 판단 내용 2",
    "이번 질문과 검색 실패 상황에 근거한 실제 판단 내용 3"
  ]
}}

[출력 형식 - update_fields]
출력은 반드시 하나의 JSON 객체여야 하며,
아래 네 개의 키만 포함해야 합니다.

{{
  "strategy": "update_fields",
  "missing_fields": ["current_fields에 포함되지 않은 필드명"],
  "new_query": "",
  "strategy_decider_cot": [
    "이번 질문과 검색 실패 상황에 근거한 실제 판단 내용 1",
    "이번 질문과 검색 실패 상황에 근거한 실제 판단 내용 2",
    "이번 질문과 검색 실패 상황에 근거한 실제 판단 내용 3"
  ]
}}

[출력 규칙]
- strategy는 반드시 두 값 중 하나여야 합니다.
- update_fields 전략인 경우 new_query는 반드시 빈 문자열 ""이어야 합니다.
- rewrite_query 전략인 경우 new_query는 반드시 비어 있지 않은 문자열이어야 하며,
  사례, 처분, 징계, 조치(action) 중심의 검색형 문장이어야 합니다.
- rewrite_query 전략이 아닌 경우 missing_fields는 반드시 빈 리스트 []여야 합니다.
- missing_fields에는 current_fields에 포함된 필드가 하나라도 있으면 안 됩니다.
- strategy_decider_cot는 최소 3단계 이상 작성해야 합니다.
- JSON 외의 텍스트는 절대 출력하지 마십시오.

[출력 예시 - rewrite_query]
{{
  "strategy": "rewrite_query",
  "missing_fields": [],
  "new_query": "출장비 부당 집행과 관련하여 감사 결과로 실제 내려진 징계 또는 처분 사례",
  "strategy_decider_cot": [
    "selected_fields_cot에 따르면 필드 구성은 질문 의도에 부합했으나, 쿼리가 종합 판단형으로 작성되어 사례 검색에 적합하지 않았다.",
    "validator_cot를 보면 문서에는 출장비 관련 일반적인 개선 사항만 존재하고, 처분 사례는 검색되지 않았다.",
    "처분 사례를 직접 검색하기 위해 질문을 사례·징계 중심의 검색형 쿼리로 재작성하기로 결정했다."
  ]
}}

[출력 예시 - update_fields]
{{
  "strategy": "update_fields",
  "missing_fields": ["action"],
  "new_query": "",
  "strategy_decider_cot": [
    "selected_fields_cot에 따르면 사건의 개요와 문제점 중심으로 필드가 선택되어 질문 주제와는 일치했다.",
    "validator_cot를 확인한 결과 문서에는 문제 상황은 있으나, 실제 처분이나 징계 정보가 포함되어 있지 않았다.",
    "처분 정보는 현재 필드에 없으므로 action 필드를 추가하는 전략이 필요하다고 판단했다."
  ]
}}
"""

strategy_decider_user = """
[Question]
{question}

[Selected Fields]
{current_fields}

[Field Selector COT]
{selected_fields_cot}

[Validator COT]
{validator_cot}
"""

strategy_decider_prompt = ChatPromptTemplate.from_messages([
    ("system", strategy_decider_system),
    ("human", strategy_decider_user)
])

strategy_decider_chain = (
    strategy_decider_prompt
    | llm
    | JsonOutputParser()
)

In [54]:
encoder_model = CrossEncoder("BAAI/bge-reranker-v2-m3")

In [55]:
def field_selector(state: GraphState) -> dict:
    current_input = state.get("search_query") if state.get("search_query") else state["question"]
    
    result = field_selector_chain.invoke({"question": current_input})

    new_selected = result.get("selected_fields", [])
    cot = result.get("selected_fields_cot", [])

    current_fields = state.get("selected_fields", [])
    merged_fields = list(set(current_fields + new_selected))

    if "outline" not in merged_fields:
        merged_fields.append("outline")
    if "problems" not in merged_fields:
        merged_fields.append("problems")

    return {
        "selected_fields": merged_fields,
        "selected_fields_cot": cot
    }


def retriever(state: GraphState) -> dict:
    search_query = state.get("search_query", state["question"])
    original_question = state["question"]
    
    target_fields = state["selected_fields"]
    
    selected_retrievers = {
        field: RunnableLambda(retrievers[field].invoke) 
        for field in target_fields
    }
    
    results = RunnableParallel(**selected_retrievers).invoke(search_query)
    
    grouped_content = defaultdict(list)
    for field, docs in results.items():
        for doc in docs:
            grouped_content[doc.metadata["idx"]].append((field, doc.page_content))

    new_docs = []
    for idx, contents in grouped_content.items():
        contents.sort(key=lambda x: x[0]) 
        full_text = "\n\n".join([f"[{field.upper()}]\n{text}" for field, text in contents])
        
        new_docs.append(Document(
            page_content=full_text,
            metadata={"idx": idx, "fields": [c[0] for c in contents]}
        ))

    previous_docs = state.get("documents", [])
    all_docs = previous_docs + new_docs
    
    unique_docs_map = {doc.metadata["idx"]: doc for doc in all_docs}
    unique_docs = list(unique_docs_map.values())
    
    if not unique_docs:
        return {"documents": []}

    pairs = [[original_question, doc.page_content] for doc in unique_docs]
    scores = encoder_model.predict(pairs)
    
    scored_docs = sorted(zip(unique_docs, scores), key=lambda x: x[1], reverse=True)
    top5_docs = [doc for doc, _ in scored_docs[:5]]
    
    return {"documents": top5_docs}


def validator(state: GraphState) -> dict:
    question = state["question"]
    documents = state.get("documents", [])

    if not documents:
        return {"is_valid": "no", "validator_cot": ["문서 없음"]}

    context = "\n\n".join(doc.page_content for doc in documents)

    result = validator_chain.invoke({
        "question": question,
        "context": context
    })

    return {
        "is_valid": str(result.get("is_valid", "no")).lower().strip(),
        "validator_cot": result.get("validator_cot", [])
    }


def strategy_decider(state: GraphState) -> dict:
    question = state["question"]
    current_fields = state["selected_fields"]
    
    sel_cot_text = "\n".join(state.get("selected_fields_cot", []))
    val_cot_text = "\n".join(state.get("validator_cot", []))
    
    result = strategy_decider_chain.invoke({
        "question": question,
        "current_fields": str(current_fields),
        "selected_fields_cot": sel_cot_text,
        "validator_cot": val_cot_text
    })

    strategy = result.get("strategy", "rewrite_query")
    llm_suggested_missing = result.get("missing_fields", [])
    new_query = result.get("new_query", "").strip()
    decider_cot = result.get("strategy_decider_cot", [])
    
    final_missing_fields = list(set(llm_suggested_missing) - set(current_fields))
    
    if strategy == "update_fields" and not final_missing_fields:
        strategy = "rewrite_query"
        if not new_query:
            new_query = question 

    updates = {
        "analysis_decision": strategy,
        "strategy_decider_cot": decider_cot,
        "retry_count": state.get("retry_count", 0) + 1
    }
    
    if strategy == "update_fields" and final_missing_fields:
        new_fields = list(set(current_fields + final_missing_fields))
        updates["selected_fields"] = new_fields
        
    elif strategy == "rewrite_query" and new_query:
        updates["search_query"] = new_query
        
    return updates

In [56]:
workflow = StateGraph(GraphState)

workflow.add_node("field_selector", field_selector)
workflow.add_node("retriever", retriever)
workflow.add_node("validator", validator)
workflow.add_node("strategy_decider", strategy_decider)

workflow.add_edge(START, "field_selector")
workflow.add_edge("field_selector", "retriever")
workflow.add_edge("retriever", "validator")

def check_validation(state):
    if state.get("retry_count", 0) >= 3:
        return "pass"
    if state["is_valid"] == "yes":
        return "pass"
    return "fail"

workflow.add_conditional_edges(
    "validator",
    check_validation,
    {
        "pass": END,
        "fail": "strategy_decider"
    }
)

def check_strategy(state):
    return state["analysis_decision"]

workflow.add_conditional_edges(
    "strategy_decider",
    check_strategy,
    {
        "rewrite_query": "field_selector",
        "update_fields": "retriever"
    }
)

app = workflow.compile()

In [1]:
# generation은 빠져있습니다.