In [8]:
import os
import sys
from typing import List, Dict, Any, TypedDict, Optional

from langchain_core.documents import Document
from langchain_core.language_models import BaseLanguageModel
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
import json

In [None]:
db_path = "../data/db"
storage_path = "../data/processed"

In [3]:
def load_vectorstore(
    persist_directory: str,
    embedding_model_name: str = "BAAI/bge-m3"
) -> Chroma:
    """
    ChromaDB 벡터 저장소를 로드합니다.
    
    Args:
        persist_directory: 저장 디렉토리
        embedding_model_name: 임베딩 모델 이름
        
    Returns:
        ChromaDB 벡터 저장소
    """
    embeddings = HuggingFaceEmbeddings(
        model_name=embedding_model_name,
        model_kwargs={'device': 'cpu'},
        encode_kwargs={'normalize_embeddings': True}
    )
    
    vectorstore = Chroma(
        persist_directory=persist_directory,
        embedding_function=embeddings
    )
    
    return vectorstore

print(f"\n벡터 저장소 로드 중: {db_path}")
vectorstore = load_vectorstore(db_path)
print("벡터 저장소 로드 완료")


벡터 저장소 로드 중: /Users/user/Documents/study/learning_langchain/data/db
벡터 저장소 로드 완료


  vectorstore = Chroma(


In [None]:
class Storage:
    
    def __init__(self, storage_path: str = "data/processed"):
        """
        Args:
            storage_path: JSON 파일 경로
        """
        self.storage_path = storage_path
        self.storage_dir = os.path.dirname(storage_path)
        if self.storage_dir:
            os.makedirs(self.storage_dir, exist_ok=True)
    
    def save_tables(self, tables: List[Dict[str, Any]]):
        """
        테이블 리스트를 JSON 파일로 저장합니다.
        
        Args:
            tables: 테이블 정보 리스트
        """
        data = {
            "tables": tables
        }

        tables_path = os.path.join(self.storage_path, "tables.json")
        
        try:
            with open(tables_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"저장 실패: {e}")
    
    def load_tables(self) -> List[Dict[str, Any]]:
        """
        JSON 파일에서 테이블 리스트를 로드합니다.
        
        Returns:
            테이블 정보 리스트
        """
        if not os.path.exists(self.storage_path):
            return []

        tables_path = os.path.join(self.storage_path, "tables.json")
        
        try:
            with open(tables_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                return data.get('tables', [])
        except Exception as e:
            print(f"로드 실패: {e}")
            return []
    
    def get_table(self, table_id: str) -> Optional[Dict[str, Any]]:
        """
        특정 테이블을 조회합니다.
        
        Args:
            table_id: 테이블 ID
            
        Returns:
            테이블 정보 dict 또는 None
        """
        tables = self.load_tables()
        for table in tables:
            if table.get('table_id') == table_id:
                return table
        return None
    
    def get_all_tables(self) -> Dict[str, Dict[str, Any]]:
        """
        모든 테이블을 dict 형태로 반환합니다.
        
        Returns:
            {table_id: table_data} 형태의 dict
        """
        tables = self.load_tables()
        return {table.get('table_id'): table for table in tables}
    
    def save_texts(self, texts: List[Dict[str, Any]]):
        """
        텍스트 리스트를 JSON 파일로 저장합니다.
        
        Args:
            texts: 텍스트 정보 리스트
        """
        data = {
            "texts": texts
        }
        
        text_storage_path = os.path.join(self.storage_path, "texts.json")
        
        try:
            with open(text_storage_path, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            print(f"저장 실패: {e}")
    
    def load_texts(self) -> List[Dict[str, Any]]:
        """
        JSON 파일에서 텍스트 리스트를 로드합니다.
        
        Returns:
            텍스트 정보 리스트
        """
        text_storage_path = os.path.join(self.storage_path, "texts.json")
        
        if not os.path.exists(text_storage_path):
            return []
        
        try:
            with open(text_storage_path, 'r', encoding='utf-8') as f:
                data = json.load(f)
                return data.get('texts', [])
        except Exception as e:
            print(f"로드 실패: {e}")
            return []
    
    def get_text(self, text_id: str) -> Optional[Dict[str, Any]]:
        """
        특정 텍스트를 조회합니다.
        
        Args:
            text_id: 텍스트 ID
            
        Returns:
            텍스트 정보 dict 또는 None
        """
        texts = self.load_texts()
        for text in texts:
            if text.get('text_id') == text_id:
                return text
        return None

print(f"\nStorage 초기화 중: {storage_path}")
storage = Storage(storage_path)
print("Storage 초기화 완료")



Storage 초기화 중: /Users/user/Documents/study/learning_langchain/data/processed
Storage 초기화 완료


In [5]:
print("\nLLM 초기화 중...")
gpt_api_key = os.getenv("GPT_API_KEY")
llm = None

if gpt_api_key:
    llm = ChatOpenAI(
        model="gpt-4o-mini",
        api_key=gpt_api_key
    )
    print("GPT LLM 초기화 완료")
else:
    print("GPT_API_KEY가 없어서 LLM을 사용하지 않습니다.")


LLM 초기화 중...
GPT LLM 초기화 완료


In [6]:
class RAGState(TypedDict):
    query: str
    retrieved_docs: List[Document]
    original_contents: List[str]
    context: str
    prompt: str
    answer: str


class RAGSystem:
    """LangGraph 기반 RAG 시스템"""
    
    def __init__(
        self,
        vectorstore: Chroma,
        storage: Storage,
        llm: Optional[BaseLanguageModel] = None,
        embedding_model_name: str = "BAAI/bge-m3"
    ):
        """
        Args:
            vectorstore: ChromaDB 벡터 저장소
            storage: Storage 인스턴스
            llm: LLM 모델 (None이면 검색만 수행)
            embedding_model_name: 임베딩 모델 이름
        """
        self.vectorstore = vectorstore
        self.storage = storage
        self.llm = llm
        self.embeddings = HuggingFaceEmbeddings(
            model_name=embedding_model_name,
            model_kwargs={'device': 'cpu'},
            encode_kwargs={'normalize_embeddings': True}
        )
        
        self.graph = self._build_graph()
    
    def _build_graph(self) -> StateGraph:
        """LangGraph 파이프라인을 구성합니다."""
        workflow = StateGraph(RAGState)
        
        workflow.add_node("search", self._search_documents)
        workflow.add_node("load_original_contents", self._load_original_contents)
        workflow.add_node("build_context", self._build_context)
        workflow.add_node("generate_answer", self._generate_answer)
        
        workflow.set_entry_point("search")
        workflow.add_edge("search", "load_original_contents")
        workflow.add_edge("load_original_contents", "build_context")
        workflow.add_edge("build_context", "generate_answer")
        workflow.add_edge("generate_answer", END)
        
        return workflow.compile()
    
    def _search_documents(self, state: RAGState) -> RAGState:
        """벡터 DB에서 유사한 문서를 검색합니다."""
        query = state["query"]
        
        docs = self.vectorstore.similarity_search(query, k=5)
        
        state["retrieved_docs"] = docs
        return state
    
    def _load_original_contents(self, state: RAGState) -> RAGState:
        """검색된 문서의 원본 내용을 로드합니다."""
        original_contents = []
        
        for doc in state.get("retrieved_docs", []):
            doc_type = doc.metadata.get('type', 'text')
            
            if doc_type == 'table':
                table_id = doc.metadata.get('table_id')
                if table_id:
                    table_data = self.storage.get_table(table_id)
                    if table_data:
                        markdown = table_data.get('markdown', '')
                        description = table_data.get('description', '')
                        original_content = f"{description}\n{markdown}" if description else markdown
                    else:
                        original_content = doc.page_content
                else:
                    original_content = doc.page_content
            else:
                chunk_id = doc.metadata.get('chunk_id')
                text_id = doc.metadata.get('text_id') or f'text_{chunk_id}' if chunk_id is not None else None
                
                if text_id:
                    text_data = self.storage.get_text(text_id)
                    if text_data:
                        original_content = text_data.get('original_text', doc.page_content)
                    else:
                        original_content = doc.page_content
                else:
                    original_content = doc.page_content
            
            original_contents.append(original_content)
        
        state["original_contents"] = original_contents
        return state
    
    def _build_context(self, state: RAGState) -> RAGState:
        """검색된 문서의 원본 내용을 컨텍스트로 구성합니다."""
        context_parts = []
        
        for i, doc in enumerate(state.get("retrieved_docs", []), 1):
            doc_type = doc.metadata.get('type', 'text')
            original_content = state.get("original_contents", [])[i-1] if i-1 < len(state.get("original_contents", [])) else ""
            
            if doc_type == 'table':
                table_id = doc.metadata.get('table_id', '')
                context_parts.append(f"[테이블 {i}: {table_id}]\n{original_content}")
            else:
                context_parts.append(f"[텍스트 {i}]\n{original_content}")
        
        state["context"] = "\n\n".join(context_parts)
        return state
    
    def _generate_answer(self, state: RAGState) -> RAGState:
        """LLM으로 최종 답변을 생성합니다."""
        if not self.llm:
            state["answer"] = "LLM이 설정되지 않아 답변을 생성할 수 없습니다."
            state["prompt"] = ""
            return state
        
        query = state["query"]
        context = state.get("context", "")
        
        prompt = f"""다음 문서들을 참고하여 질문에 답변해주세요.

참고 문서:
{context}

질문: {query}

답변:"""
        
        state["prompt"] = prompt
        
        try:
            response = self.llm.invoke(prompt)
            if hasattr(response, 'content'):
                answer = response.content
            elif hasattr(response, 'text'):
                answer = response.text
            else:
                answer = str(response)
            
            state["answer"] = answer
        except Exception as e:
            state["answer"] = f"답변 생성 중 오류 발생: {e}"
        
        return state
    
    def query(self, query: str) -> Dict[str, Any]:
        """
        질문에 대한 답변을 생성합니다.
        
        Args:
            query: 질문
            
        Returns:
            {
                'query': str,
                'answer': str,
                'prompt': str,
                'sources': List[Document],
                'original_contents': List[str],
                'context': str
            }
        """
        initial_state: RAGState = {
            "query": query,
            "retrieved_docs": [],
            "original_contents": [],
            "context": "",
            "prompt": "",
            "answer": ""
        }
        
        final_state = self.graph.invoke(initial_state)
        
        return {
            "query": final_state.get("query", query),
            "answer": final_state["answer"],
            "prompt": final_state.get("prompt", ""),
            "sources": final_state.get("retrieved_docs", []),
            "original_contents": final_state.get("original_contents", []),
            "context": final_state.get("context", "")
        }

rag_system = RAGSystem(vectorstore, storage, llm=llm)
print("\nRAG 시스템 초기화 완료")


RAG 시스템 초기화 완료


In [7]:
while True:
    query = input("\n질문: ").strip()
    sys.stdout.flush()
    
    if not query:
        continue
    
    if query.lower() in ['quit', 'exit', 'q']:
        print("종료합니다.")
        sys.stdout.flush()
        break
    
    print("\n검색 및 답변 생성 중...")
    sys.stdout.flush()
    
    result = rag_system.query(query)
    
    print(f"\n{'='*60}")
    print(f"질문: {result.get('query', query)}")
    print(f"{'='*60}")
    
    if result.get('prompt'):
        print(f"\n수정된 프롬프트:\n{result['prompt']}")
        print(f"\n{'-'*60}")
    
    print(f"\n답변: {result['answer']}")
    print(f"\n참고한 문서 수: {len(result['sources'])}")
    
    if result['original_contents']:
        print("\n원본 내용 미리보기:")
        for i, content in enumerate(result['original_contents'][:3], 1):
            preview = content[:200] + "..." if len(content) > 200 else content
            print(f"  [{i}] {preview}")
    
    sys.stdout.flush()


검색 및 답변 생성 중...
/Users/user/Documents/study/learning_langchain/data/processed/tables.json
/Users/user/Documents/study/learning_langchain/data/processed/texts.json
/Users/user/Documents/study/learning_langchain/data/processed/texts.json
/Users/user/Documents/study/learning_langchain/data/processed/texts.json
/Users/user/Documents/study/learning_langchain/data/processed/texts.json

질문: 하이

수정된 프롬프트:
다음 문서들을 참고하여 질문에 답변해주세요.

참고 문서:
[테이블 1: table_27]
Tesla 10‑Q (Q3 2025) fair‑value table of cash and cash equivalents shows certificates of deposit and time deposits at **$15.1 bn (Level I)** as of Sept 30 2025 versus **$12.8 bn** on Dec 31 2024; commercial paper totals **$3.0 bn (Level III)** versus **$3.9 bn**, U.S. government securities **$5.2 bn** versus **$3.6 bn**, corporate debt securities **$16 m** versus **$118 m**, money‑market funds **$1.2 bn** versus **$1.8 bn**, and digital assets **$1.3 bn** versus **$1.1 bn**. The overall fair‑value of cash‑equivalents is **$25.8 bn (Level I)*