In [24]:
# ! pip install langgraph elasticsearch openai python-dotenv


In [25]:
# load 
from typing import TypedDict, Literal, List, Dict, Any
from collections import defaultdict

from dotenv import load_dotenv
import os

from elasticsearch import Elasticsearch
from openai import OpenAI

from langgraph.graph import StateGraph, END

from typing import Annotated
from langgraph.graph.message import add_messages

# .env 로드
load_dotenv()

# --- OpenAI ---
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY is not set")

client = OpenAI(api_key=OPENAI_API_KEY)

# --- Elasticsearch ---
ES_HOST = os.getenv("ES_HOST", "localhost")
ES_PORT = int(os.getenv("ES_PORT", "9200"))
ES_USER = os.getenv("ES_USER", "elastic")
ES_PASSWORD = os.getenv("ES_PASSWORD", "changeme")
ES_INDEX = os.getenv("ES_INDEX", "papers_index")

es = Elasticsearch(
    hosts=[{"host": ES_HOST, "port": ES_PORT, "scheme": "http"}],
    basic_auth=(ES_USER, ES_PASSWORD),
)

# 필드 이름
TITLE_FIELD = "title"
CONTENT_FIELD = "content"
YEAR_FIELD = "year"
CITATION_FIELD = "citations"
EMBED_FIELD = "embedding"

# 검색 설정
TOP_K_SPARSE = 30
TOP_K_DENSE = 30
TOP_K_FINAL = 10


In [36]:
# 상태 정의 + 공통 llm/임베딩 함수 
# 상태 정의
UtterType = Literal["KEYWORD_TOPIC", "NL_TOPIC", "SPECIFIC_PAPER"]
SearchStrategy = Literal["sparse", "dense", "hybrid"]

class AgentState(TypedDict, total=False):
    query_text: str
    utterance_type: UtterType
    search_strategy: SearchStrategy
    keyword_hits: List[Dict[str, Any]]
    semantic_hits: List[Dict[str, Any]]
    top_papers: List[Dict[str, Any]]
    answer: str

# LLM / Embedding 헬퍼
def call_llm_json(system_prompt: str, user_prompt: str) -> Dict[str, Any]:
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        response_format={"type": "json_object"},
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    import json
    return json.loads(resp.choices[0].message.content)

def call_llm_text(system_prompt: str, user_prompt: str) -> str:
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    )
    return resp.choices[0].message.content

def get_embedding(text: str) -> List[float]:
    emb = client.embeddings.create(
        model="text-embedding-3-large",
        input=text,
    )
    return emb.data[0].embedding


In [37]:
# node 1 : classify_utterance 
def classify_utterance_agent(state: AgentState) -> AgentState:
    query = state["query_text"].strip()

    system_prompt = """
너는 논문 검색 에이전트의 인텐트 분류기다.
유저의 질의를 아래 세 유형 중 하나로 분류해라.

- KEYWORD_TOPIC: 키워드 위주의 짧은 검색어
- NL_TOPIC: 자연어 서술형 질문
- SPECIFIC_PAPER: 특정 논문을 가리키는 질문

반드시 JSON으로:
{"utterance_type": "..."}
"""
    user_prompt = f"질문: {query}"
    result = call_llm_json(system_prompt, user_prompt)
    utter: UtterType = result.get("utterance_type", "NL_TOPIC")

    # state["utterance_type"] = utter
    # return state
    return {"utterance_type": utter}


In [38]:
# node2 : strategy_agent
def strategy_agent(state: AgentState) -> AgentState:
    utter = state["utterance_type"]

    if utter == "KEYWORD_TOPIC":
        strategy: SearchStrategy = "sparse"
    elif utter == "SPECIFIC_PAPER":
        strategy = "hybrid"
    else:
        strategy = "hybrid"

    return {"search_strategy": strategy}



In [39]:
# node 3 : keyword_search_agent (BM25)
def keyword_search_agent(state: AgentState) -> AgentState:
    query = state["query_text"]
    strategy = state.get("search_strategy", "hybrid")

    if strategy not in ("sparse", "hybrid"):
        return {"keyword_hits": []}

    body = {
        "size": TOP_K_SPARSE,
        "query": {
            "multi_match": {
                "query": query,
                "fields": [TITLE_FIELD, CONTENT_FIELD],
                "type": "best_fields",
            }
        },
    }
    resp = es.search(index=ES_INDEX, body=body)
    hits = resp["hits"]["hits"]

    results = []
    for h in hits:
        src = h["_source"]
        results.append({
            "id": h["_id"],
            "score": h["_score"],
            "title": src.get(TITLE_FIELD),
            "content": src.get(CONTENT_FIELD),
            "year": src.get(YEAR_FIELD),
            "citations": src.get(CITATION_FIELD, 0),
        })

    return {"keyword_hits": results}


In [30]:
# semantic_search_agent 
def semantic_search_agent(state: AgentState) -> AgentState:
    query = state["query_text"]
    strategy = state.get("search_strategy", "hybrid")

    if strategy not in ("dense", "hybrid"):
        return {"semantic_hits": []}

    q_vec = get_embedding(query)
    body = {
        "size": TOP_K_DENSE,
        "knn": {
            "field": EMBED_FIELD,
            "query_vector": q_vec,
            "k": TOP_K_DENSE,
            "num_candidates": 100,
        },
    }
    resp = es.search(index=ES_INDEX, body=body)
    hits = resp["hits"]["hits"]

    results = []
    for h in hits:
        src = h["_source"]
        results.append({
            "id": h["_id"],
            "score": h["_score"],
            "title": src.get(TITLE_FIELD),
            "content": src.get(CONTENT_FIELD),
            "year": src.get(YEAR_FIELD),
            "citations": src.get(CITATION_FIELD, 0),
        })

    return {"semantic_hits": results}



In [31]:
def rrf_fusion(
    keyword_hits: List[Dict[str, Any]],
    semantic_hits: List[Dict[str, Any]],
    k: int = TOP_K_FINAL,
    k_rrf: int = 60,
) -> List[Dict[str, Any]]:
    scores = defaultdict(float)
    docs: Dict[str, Dict[str, Any]] = {}

    for rank, d in enumerate(keyword_hits):
        doc_id = d["id"]
        scores[doc_id] += 1.0 / (k_rrf + rank + 1)
        docs.setdefault(doc_id, d)

    for rank, d in enumerate(semantic_hits):
        doc_id = d["id"]
        scores[doc_id] += 1.0 / (k_rrf + rank + 1)
        docs.setdefault(doc_id, d)

    fused = sorted(docs.values(), key=lambda d: scores[d["id"]], reverse=True)
    return fused[:k]

def merge_and_select_agent(state: AgentState) -> AgentState:
    keyword_hits = state.get("keyword_hits", [])
    semantic_hits = state.get("semantic_hits", [])

    fused = rrf_fusion(keyword_hits, semantic_hits, k=TOP_K_FINAL)
    state["top_papers"] = fused

    query = state["query_text"]

    if not fused:
        state["answer"] = "관련 논문을 찾지 못했어. 검색어를 조금 더 구체적으로 바꿔볼래?"
        return state

    papers_text = []
    for i, p in enumerate(fused, start=1):
        papers_text.append(
            f"{i}. 제목: {p.get('title')}\n"
            f"   연도: {p.get('year')}, 인용수: {p.get('citations')}\n"
            f"   내용: {p.get('content')[:400]}..."
        )
    papers_block = "\n\n".join(papers_text)

    system_prompt = """
너는 논문 검색 어시스턴트다.
사용자의 질문과 후보 논문 목록을 보고,
- 중요한 3~5편을 골라 제목과 요약을 제시하고
- 왜 관련 있는지 코멘트를 달아라.
- 최신 논문이 있다면 우선 언급해라.

한국어로, 핵심 위주로 설명해라.
"""
    user_prompt = f"질문: {query}\n\n후보 논문 목록:\n{papers_block}"
    answer = call_llm_text(system_prompt, user_prompt)
    state["answer"] = answer
    return state


In [32]:
from config import es, ES_INDEX
print(es.indices.get_mapping(index=ES_INDEX))


{'papers_index': {'mappings': {'properties': {'abstract': {'type': 'text'}, 'citations': {'type': 'integer'}, 'content': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}, 'embedding': {'type': 'dense_vector', 'dims': 3072, 'index': True, 'similarity': 'cosine', 'index_options': {'type': 'int8_hnsw', 'm': 16, 'ef_construction': 100}}, 'title': {'type': 'text'}, 'url': {'type': 'keyword'}, 'venue': {'type': 'keyword'}, 'year': {'type': 'integer'}}}}}


In [33]:
def build_graph():
    workflow = StateGraph(AgentState)

    workflow.add_node("classify_utterance_agent", classify_utterance_agent)
    workflow.add_node("strategy_agent", strategy_agent)
    workflow.add_node("keyword_search_agent", keyword_search_agent)
    workflow.add_node("semantic_search_agent", semantic_search_agent)
    workflow.add_node("merge_and_select_agent", merge_and_select_agent)

    workflow.set_entry_point("classify_utterance_agent")

    workflow.add_edge("classify_utterance_agent", "strategy_agent")
    workflow.add_edge("strategy_agent", "keyword_search_agent")
    workflow.add_edge("strategy_agent", "semantic_search_agent")
    workflow.add_edge("keyword_search_agent", "merge_and_select_agent")
    workflow.add_edge("semantic_search_agent", "merge_and_select_agent")

    workflow.set_finish_point("merge_and_select_agent")

    return workflow.compile()

graph = build_graph()


In [35]:
def run_query(q: str):
    state: AgentState = {"query_text": q}
    result = graph.invoke(state)

    print("질문:", q)
    print("\n[질의 유형]:", result.get("utterance_type"))
    print("[검색 전략]:", result.get("search_strategy"))
    print("\n[답변]\n", result.get("answer"))

    print("\n[선택된 논문 리스트]")
    for i, p in enumerate(result.get("top_papers", []), start=1):
        print(f"{i}. {p.get('title')} ({p.get('year')}), citations={p.get('citations')}")

# 예시
run_query("최근 RAG retriever 설계 관련해서 중요한 논문 몇 개 알려줘")


AttributeError: 'list' object has no attribute 'strip'