# 🚀 PAI Stock Chatbot - 전체 코드 흐름 분석

이 노트북은 **LangGraph**와 **FastAPI**를 기반으로 한 **멀티세션 주식 가격 조회 및 계산 챗봇**의 전체 코드를 **실제 파일 순서대로** 보여주고 실행 흐름을 설명합니다.

## 📋 코드 실행 흐름
```
1. 의존성 설치 (requirements.txt)
2. 환경 설정 (config.py)
3. 상태 정의 (state.py)
4. 도구 구현 (tools.py)
5. 노드 구현 (nodes.py)
6. 그래프 구성 (graph.py)
7. 세션 관리 (session_manager.py)
8. API 엔드포인트 (chat.py)
9. FastAPI 앱 (main.py)
10. Streamlit UI (ui_streamlit.py)
11. 테스트 코드 (test_*.py)
```

## 🎯 핵심 아키텍처
- **비동기 처리**: `asyncio.to_thread()`로 동기 라이브러리 통합
- **멀티세션 지원**: 세션별 독립적 메모리 및 동시성 제어
- **토큰 스트리밍**: 실시간 응답 스트리밍
- **LangChain Chain 패턴**: Runnable 인터페이스 활용한 파이프라인


## 1️⃣ 의존성 설치 - requirements.txt

프로젝트에 필요한 모든 패키지들을 정의합니다.


In [None]:
# requirements.txt
"""
fastapi
uvicorn[standard]
langchain
langgraph
langchain-openai
python-dotenv
httpx # 비동기 HTTP 요청
pytest
pytest-asyncio
yfinance
cachetools
numexpr
streamlit 
"""

print("📦 의존성 설치:")
print("pip install -r requirements.txt")
print("\n💡 주요 패키지 설명:")
print("• fastapi: 웹 API 프레임워크")
print("• langchain/langgraph: LLM 에이전트 구성")
print("• yfinance: 주가 데이터 조회")
print("• streamlit: 웹 UI")
print("• httpx: 비동기 HTTP 클라이언트")


## 2️⃣ 환경 설정 - app/core/config.py

애플리케이션의 기본 설정을 관리합니다.


In [None]:
# app/core/config.py
import os
from dotenv import load_dotenv

load_dotenv()

class Settings:
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY")

settings = Settings()

print("✅ 환경 설정 로드 완료")
print("💡 .env 파일에서 OPENAI_API_KEY를 읽어옵니다")
print("🔑 API Key 설정:", "✅" if settings.OPENAI_API_KEY else "❌")


## 3️⃣ 상태 정의 - app/agents/state.py

LangGraph 에이전트의 상태를 정의합니다. 멀티턴 대화를 위해 메시지 히스토리를 누적 저장합니다.


In [None]:
# app/agents/state.py
from typing import List, TypedDict, Annotated
from langchain_core.messages import BaseMessage
import operator

# Annotated와 operator.add를 사용하면,
# state 딕셔너리의 'messages' 키에 새로운 메시지가 추가될 때 기존 리스트에 덮어쓰지 않고 이어붙입니다.
class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], operator.add]

print("✅ AgentState 정의 완료")
print("💡 operator.add로 메시지 누적 저장 (멀티턴 대화 지원)")
print("🔄 새 메시지가 기존 리스트에 덮어쓰지 않고 추가됨")


## 4️⃣ 도구 정의 - app/agents/tools.py

주가 조회와 계산 기능을 제공하는 도구들을 정의합니다.


In [None]:
# app/agents/tools.py
import asyncio
import yfinance as yf
from langchain_core.tools import tool
import numexpr

def _get_current_stock_price(symbol: str) -> float | str:
    """yfinance 라이브러리를 호출하는 동기 함수"""
    try:
        ticker = yf.Ticker(symbol)
        # 하루 동안의 데이터를 가져와 마지막 종가를 확인
        hist = ticker.history(period="1d")
        if hist.empty:
            return f"'{symbol}'에 대한 데이터를 찾을 수 없습니다. 주식 심볼이 정확한지 확인해주세요."
        return round(hist['Close'].iloc[-1], 2)
    except Exception as e:
        return f"주가 조회 중 오류가 발생했습니다: {e}"

@tool
async def get_stock_price(symbol: str) -> float | str:
    """
    주어진 주식 심볼(예: AAPL, GOOG)의 현재 가격을 조회합니다.
    yfinance 라이브러리를 사용하여 실시간 데이터를 가져옵니다.
    """
    print(f"--- TOOL: yfinance 주가 조회 ({symbol}) ---")
    
    # 동기 함수인 _get_current_stock_price를 별도 스레드에서 실행하여
    # 메인 이벤트 루프를 막지 않도록 합니다.
    price_or_error = await asyncio.to_thread(_get_current_stock_price, symbol)
    
    return price_or_error

@tool(parse_docstring=True)
def calculator(expression: str) -> str:
    """Calculate expression using Python's numexpr library.

    Args:
        expression (str): A single-line mathematical expression to evaluate. For example: "37593 * 67" or "37593**(1/5)".

    Returns:
        str: The result of the evaluated expression.
    """
    return str(numexpr.evaluate(expression))


stock_tools = [
    get_stock_price, 
    calculator
]

print("✅ 도구 정의 완료")
print("📊 get_stock_price: yfinance로 주가 조회")
print("🧮 calculator: numexpr로 수학 계산")
print("🚀 asyncio.to_thread()로 동기 함수를 비동기로 실행")


## 5️⃣ 노드 구현 - app/agents/nodes.py

LangGraph의 워크플로우를 구성하는 노드들을 정의합니다.


In [None]:
# app/agents/nodes.py
from langchain_core.messages import ToolMessage
from .state import AgentState
from .tools import stock_tools

# 도구를 실행하고 그 결과를 state에 추가하는 노드
async def tool_node(state: AgentState) -> dict:
    # 마지막 메시지에 담긴 tool_calls를 확인
    last_message = state["messages"][-1]
    
    # 각 tool_call을 실행
    tool_calls = last_message.tool_calls
    tool_outputs = []
    for tool_call in tool_calls:
        tool_name = tool_call["name"]
        # 일치하는 도구를 찾아 실행
        for tool in stock_tools:
            if tool.name == tool_name:
                # 비동기 함수이므로 await 사용
                result = await tool.ainvoke(tool_call["args"])
                tool_outputs.append(
                    ToolMessage(content=str(result), tool_call_id=tool_call["id"])
                )
                break
    
    return {"messages": tool_outputs}

# LLM을 호출하여 다음에 할 일을 결정하는 노드
def agent_node(state: AgentState, agent):
    result = agent(state)
    return {"messages": [result]}

print("✅ 노드 정의 완료")
print("🔧 tool_node: 도구 실행 및 결과 반환")
print("🤖 agent_node: LLM 호출 및 의사결정")


## 6️⃣ 에이전트 그래프 - app/agents/graph.py

LangChain Chain 패턴을 활용한 에이전트와 LangGraph 워크플로우를 구성합니다.


In [None]:
# app/agents/graph.py
from functools import partial
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage
from langchain_core.runnables import RunnableLambda
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
from .state import AgentState
from .nodes import agent_node, tool_node
from .tools import stock_tools
from ..core.config import settings
import threading
from typing import Dict, List
from datetime import datetime

# 스레드 로컬 저장소로 세션별 격리
_thread_local = threading.local()

def create_system_message() -> SystemMessage:
    """간단한 시스템 메시지 생성"""
    return SystemMessage(content="당신은 주가 계산을 도와주는 AI Agent입니다.")

def prepare_messages(state: AgentState) -> List:
    """메시지 준비"""
    messages = state["messages"]
    system_msg = create_system_message()
    return [system_msg] + messages

def create_agent():
    """Chain 기반으로 개선된 에이전트 생성"""
    
    # LLM 컴포넌트 생성
    llm = ChatOpenAI(
        model="gpt-4o-mini", 
        temperature=0.1,
        api_key=settings.OPENAI_API_KEY
    )
    
    # 도구 바인딩된 LLM
    llm_with_tools = llm.bind_tools(stock_tools)
    
    # 메시지 준비 Runnable
    message_preparer = RunnableLambda(prepare_messages)
    
    # Chain 구성 - 파이프라인으로 연결
    chain = (
        message_preparer      # AgentState -> 준비된 메시지들
        | llm_with_tools      # 메시지들 -> AI 응답
    )
    
    # AgentState 호환을 위한 래퍼
    def agent_wrapper(state: AgentState):
        """Chain을 AgentState와 호환시키는 래퍼"""
        result = chain.invoke(state)
        return result
    
    return agent_wrapper

# 조건부 엣지
def should_continue(state: AgentState):
    """도구 호출 여부를 결정하는 조건부 엣지"""
    if state["messages"][-1].tool_calls:
        return "tools"
    return END

def create_memory_agent_executor():
    """세션별 독립적인 메모리를 가진 에이전트 실행기 생성"""
    # Chain 기반 에이전트 사용
    agent = create_agent() 
    
    # 각 세션마다 독립적인 메모리 생성
    memory = InMemorySaver()
    
    workflow = StateGraph(AgentState)
    
    # 노드 추가
    bound_agent_node = partial(agent_node, agent=agent)
    workflow.add_node("agent", bound_agent_node)
    workflow.add_node("tools", tool_node)
    
    # 엣지 설정
    workflow.set_entry_point("agent")
    workflow.add_conditional_edges("agent", should_continue)
    workflow.add_edge("tools", "agent")
    
    # 세션별 독립적인 컴파일된 실행기 반환
    return workflow.compile(checkpointer=memory)

print("✅ 에이전트 그래프 구성 완료")
print("⛓️ LangChain Chain 패턴 적용")
print("🧠 세션별 독립적 메모리 지원")
print("🔀 조건부 워크플로우: agent ⇄ tools")


## 7️⃣ 세션 매니저 - app/core/session_manager.py

멀티세션 지원과 동시성 제어를 위한 세션 매니저입니다.


In [None]:
# app/core/session_manager.py
import asyncio
from typing import Dict, Optional
from datetime import datetime, timedelta
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import InMemorySaver
from ..agents.graph import create_memory_agent_executor
import weakref
import threading

class SessionManager:
    """멀티세션을 위한 비동기 세션 매니저"""
    
    def __init__(self, session_timeout: int = 3600):  # 1시간 타임아웃
        self._sessions: Dict[str, dict] = {}
        self._session_locks: Dict[str, asyncio.Lock] = {}
        self._cleanup_lock = asyncio.Lock()
        self._session_timeout = session_timeout
        self._cleanup_task: Optional[asyncio.Task] = None
        
    async def get_session(self, thread_id: str) -> dict:
        """세션별 독립적인 agent_executor 반환"""
        
        # 세션별 락 생성 시 클린업 락으로 보호
        if thread_id not in self._session_locks:
            async with self._cleanup_lock: # 락 생성 시 동시성 제어
                if thread_id not in self._session_locks:
                    self._session_locks[thread_id] = asyncio.Lock()
        
        # 세션별 락으로 세션 데이터 보호
        async with self._session_locks[thread_id]: # 세션별 동시성 제어
            if thread_id not in self._sessions:
                # 새 세션 생성 (락으로 보호됨)
                self._sessions[thread_id] = {
                    'executor': create_memory_agent_executor(),
                    'created_at': datetime.now(),
                    'last_accessed': datetime.now(),
                    'message_count': 0
                }
                
                # 첫 번째 세션 생성 시 정리 작업 시작
                if self._cleanup_task is None:
                    self._cleanup_task = asyncio.create_task(self._cleanup_sessions())
            
            # 마지막 접근 시간 업데이트 (락으로 보호됨)
            self._sessions[thread_id]['last_accessed'] = datetime.now()
            return self._sessions[thread_id]
    
    async def stream_chat(self, message: str, thread_id: str):
        """세션별 독립적인 스트리밍 처리"""
        session = await self.get_session(thread_id)
        executor = session['executor']
        
        # 메시지 카운트 증가
        session['message_count'] += 1
        
        config = {"configurable": {"thread_id": thread_id}}
        
        try:
            async for event in executor.astream_events(
                {"messages": [HumanMessage(content=message)]},
                config=config,
                version="v1"
            ):
                kind = event["event"]
                if kind == "on_chat_model_stream":
                    content = event["data"]["chunk"].content
                    if content:
                        yield content
        except Exception as e:
            yield f"오류가 발생했습니다: {str(e)}"
    
    async def _cleanup_sessions(self):
        """비활성 세션 정리 (백그라운드 작업)"""
        while True:
            try:
                await asyncio.sleep(300)  # 5분마다 정리
                
                # 클린업 작업 시 전역 락 적용 
                async with self._cleanup_lock: # 클린업 중 다른 작업 차단
                    current_time = datetime.now() 
                    expired_sessions = []
                    
                    for thread_id, session in self._sessions.items():
                        if (current_time - session['last_accessed']).seconds > self._session_timeout:
                            expired_sessions.append(thread_id)
                    
                    # 만료된 세션 제거
                    for thread_id in expired_sessions:
                        del self._sessions[thread_id]
                        if thread_id in self._session_locks:
                            del self._session_locks[thread_id]
                        print(f"세션 {thread_id} 정리 완료")
                        
            except Exception as e:
                print(f"세션 정리 중 오류: {e}")
    
    async def get_session_info(self, thread_id: str) -> Optional[dict]:
        """세션 정보 조회 - 없으면 자동 생성"""
        # 세션이 없으면 자동으로 생성
        if thread_id not in self._sessions:
            await self.get_session(thread_id)  # 세션 자동 생성
        
        if thread_id in self._sessions:
            session = self._sessions[thread_id]
            return {
                'thread_id': thread_id,
                'created_at': session['created_at'].isoformat(),
                'last_accessed': session['last_accessed'].isoformat(),
                'message_count': session['message_count'],
                'active': True
            }
        return None
    
    async def close_session(self, thread_id: str) -> bool:
        # 세션 종료 시 클린업 락으로 보호
        async with self._cleanup_lock:
            if thread_id in self._sessions:
                del self._sessions[thread_id]
                if thread_id in self._session_locks:
                    del self._session_locks[thread_id]
                return True
        return False

# 전역 세션 매니저 인스턴스
session_manager = SessionManager()

print("✅ SessionManager 정의 완료")
print("🔐 세션별 독립적 락 시스템")
print("🌊 토큰 단위 스트리밍 지원")
print("🧹 자동 세션 정리 기능")


## 8️⃣ API 엔드포인트 - app/api/v1/chat.py

RESTful API 엔드포인트를 정의합니다.


In [None]:
# app/api/v1/chat.py
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
from ...core.session_manager import session_manager

router = APIRouter()

class ChatRequest(BaseModel):
    message: str
    thread_id: str

class SessionInfoResponse(BaseModel):
    thread_id: str
    created_at: str
    last_accessed: str
    message_count: int
    active: bool

@router.post("/stream")
async def chat_stream(request: ChatRequest):
    """개선된 멀티세션 스트리밍 엔드포인트"""
    try:
        return StreamingResponse(
            session_manager.stream_chat(request.message, request.thread_id),
            media_type="text/event-stream"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"스트리밍 오류: {str(e)}")

@router.get("/session/{thread_id}")
async def get_session_info(thread_id: str) -> Optional[SessionInfoResponse]:
    """세션 정보 조회"""
    info = await session_manager.get_session_info(thread_id)
    if info:
        return SessionInfoResponse(**info)
    raise HTTPException(status_code=404, detail="세션을 찾을 수 없습니다")

@router.delete("/session/{thread_id}")
async def close_session(thread_id: str):
    """세션 종료"""
    success = await session_manager.close_session(thread_id)
    if success:
        return {"message": f"세션 {thread_id}이 종료되었습니다"}
    raise HTTPException(status_code=404, detail="세션을 찾을 수 없습니다")

@router.get("/sessions/active")
async def get_active_sessions():
    """활성 세션 목록 조회"""
    active_sessions = []
    for thread_id in session_manager._sessions.keys():
        info = await session_manager.get_session_info(thread_id)
        if info:
            active_sessions.append(info)
    return {"active_sessions": active_sessions, "count": len(active_sessions)}

print("✅ API 라우터 정의 완료")
print("🌊 /stream: 스트리밍 채팅")
print("📊 /session/{id}: 세션 정보")
print("🗑️ DELETE /session/{id}: 세션 종료")
print("📋 /sessions/active: 활성 세션 목록")


## 9️⃣ FastAPI 메인 애플리케이션 - app/main.py

FastAPI 애플리케이션의 진입점입니다.


In [None]:
# app/main.py
from fastapi import FastAPI
from .api.v1 import chat

app = FastAPI(
    title="Stock Agent API",
    description="A streaming chatbot for stock prices using LangGraph and FastAPI.",
    version="1.0.0"
)

# /api/v1 접두사와 함께 chat 라우터 포함
app.include_router(chat.router, prefix="/api/v1")

@app.get("/")
def read_root():
    return {"message": "Welcome to the Stock Agent API"}

print("✅ FastAPI 애플리케이션 설정 완료")
print("📡 API 접두사: /api/v1")
print("🏠 루트: /")
print("\n▶️ 실행 방법:")
print("uvicorn app.main:app --reload")


## 🔟 Streamlit UI - app/ui_streamlit.py

멀티세션을 지원하는 웹 UI입니다.


In [None]:
# app/ui_streamlit.py
import streamlit as st
import httpx
import uuid

st.set_page_config(
    page_title="Stock-Bot",
    layout="wide"
)

API_URL = "http://127.0.0.1:8000/api/v1"

# 세션 상태 관리
if "sessions" not in st.session_state:
    st.session_state.sessions = {}

def stream_api_response(prompt: str, thread_id: str):
    try:
        with httpx.stream(
            "POST",
            f"{API_URL}/stream",
            json={"message": prompt, "thread_id": thread_id},
            timeout=60.0,
        ) as response:
            if response.status_code == 200:
                for chunk in response.iter_text():
                    if chunk.strip():
                        yield chunk
            else:
                yield f"API 오류: {response.status_code}"
    except Exception as e:
        yield f"연결 오류: {str(e)}"

# 사이드바
with st.sidebar:
    st.header("세션 관리")
    
    # 새 세션 추가
    new_session_name = st.text_input("세션 이름")
    if st.button("세션 추가"):
        if new_session_name and new_session_name not in st.session_state.sessions:
            st.session_state.sessions[new_session_name] = {
                "thread_id": str(uuid.uuid4()),
                "messages": [{"role": "assistant", "content": "안녕하세요! 주식 가격이나 계산을 도와주는 에이전트입니다."}]
            }
            st.rerun()
    
    # 기존 세션 목록
    for session_name in list(st.session_state.sessions.keys()):
        col1, col2 = st.columns([3, 1])
        with col1:
            st.text(session_name)
        with col2:
            if st.button("삭제", key=f"delete_{session_name}"):
                del st.session_state.sessions[session_name]
                st.rerun()

st.title("주가 계산 챗봇")

# 세션이 있으면 탭으로 표시
if st.session_state.sessions:
    session_names = list(st.session_state.sessions.keys())
    tabs = st.tabs(session_names)
    
    for tab, session_name in zip(tabs, session_names):
        with tab:
            session_data = st.session_state.sessions[session_name]
            
            # 세션 정보
            st.text(f"세션 ID: {session_data['thread_id'][:8]}...")
            
            # 입력창을 가장 위에 고정
            prompt = st.chat_input("주식에 대해 물어보세요", key=f"input_{session_name}")
            
            # 메시지 처리 (입력 처리)
            if prompt:
                # 사용자 메시지 추가
                session_data["messages"].append({"role": "user", "content": prompt})
                
                # AI 응답 생성
                with st.spinner("생각 중..."):
                    full_response_chunks = []
                    for chunk in stream_api_response(prompt, session_data["thread_id"]):
                        full_response_chunks.append(chunk)
                    full_response = "".join(full_response_chunks)
                
                # 응답 저장
                if full_response:
                    session_data["messages"].append({"role": "assistant", "content": full_response})
                
                st.rerun()
            
            # 스크롤 가능한 대화내용 영역
            st.markdown("### 대화 내용")
            chat_container = st.container(height=500)
            
            with chat_container:
                for message in session_data["messages"]:
                    with st.chat_message(message["role"]):
                        st.markdown(message["content"])

else:
    st.info("좌측에서 새 세션을 추가해주세요")

# 독립성 테스트
if len(st.session_state.sessions) >= 2:
    st.markdown("---")
    if st.button("독립성 테스트"):
        st.subheader("테스트 결과")
        
        sessions = list(st.session_state.sessions.items())
        session1_name, session1 = sessions[0]
        session2_name, session2 = sessions[1]
        
        col1, col2 = st.columns(2)
        
        with col1:
            st.write(f"**{session1_name}**: {session1_name} 주가 알려줘")
            with st.spinner("생각 중..."):
                response1_chunks = []
                for chunk in stream_api_response(f"{session1_name} 주가 알려줘", session1["thread_id"]):
                    response1_chunks.append(chunk)
                response1 = "".join(response1_chunks)
            st.write(response1)
        
        with col2:
            st.write(f"**{session2_name}**: 방금 조회한 {session1_name} 주가가 얼마였지?")
            with st.spinner("생각 중..."):
                response2_chunks = []
                for chunk in stream_api_response(f"방금 조회한 {session1_name} 주가가 얼마였지?", session2["thread_id"]):
                    response2_chunks.append(chunk)
                response2 = "".join(response2_chunks)
            st.write(response2)
            
            if "모르" in response2 or "없" in response2:
                st.success("세션이 독립적으로 동작합니다")
            else:
                st.warning("세션 독립성에 문제가 있을 수 있습니다")

print("✅ Streamlit UI 구성 완료")
print("🎭 멀티세션 지원")
print("🌊 실시간 스트리밍")
print("🧪 독립성 테스트 기능")
print("\n▶️ 실행 방법:")
print("streamlit run app/ui_streamlit.py")


## 1️⃣1️⃣ 테스트 코드 - tests/test_agent.py

기본 에이전트 기능을 테스트합니다.


In [None]:
# tests/test_agent.py
import pytest
from langchain_core.messages import HumanMessage
from app.agents.graph import create_memory_agent_executor

@pytest.mark.asyncio
async def test_stock_price_query():
    """주식 가격 쿼리 테스트 - 멀티세션 방식"""
    # 각 테스트마다 독립적인 executor 생성
    agent_executor = create_memory_agent_executor()
    
    query = "엔비디아 5주랑 아마존 8주를 두명이 돈을 모아 사려고 하는데, 각자 얼마나 돈 챙겨야 해?"
    config = {"configurable": {"thread_id": "test-thread-1"}}
    
    final_state = await agent_executor.ainvoke(
        {"messages": [HumanMessage(content=query)]},
        config=config 
    )
    
    last_message = final_state["messages"][-1]
    content = last_message.content
    
    print(f"\n테스트 질문: {query}")
    print(f"최종 답변: {content}")
    
    assert any(char.isdigit() for char in content)

@pytest.mark.asyncio
async def test_simple_stock_query():
    """간단한 주식 조회 테스트"""
    agent_executor = create_memory_agent_executor()  # 독립적인 executor
    
    query = "애플 주가 알려줘"
    config = {"configurable": {"thread_id": "test-thread-2"}}
    
    final_state = await agent_executor.ainvoke(
        {"messages": [HumanMessage(content=query)]},
        config=config
    )
    
    last_message = final_state["messages"][-1]
    content = last_message.content
    
    assert len(content) > 0
    assert any(char.isdigit() for char in content)

@pytest.mark.asyncio
async def test_calculation_query():
    """계산 관련 쿼리 테스트"""
    agent_executor = create_memory_agent_executor()  # 독립적인 executor
    
    query = "테슬라 10주 사려면 얼마나 필요해?"
    config = {"configurable": {"thread_id": "test-thread-3"}}
    
    final_state = await agent_executor.ainvoke(
        {"messages": [HumanMessage(content=query)]},
        config=config
    )
    
    last_message = final_state["messages"][-1]
    content = last_message.content
    
    assert any(char.isdigit() for char in content)

@pytest.mark.asyncio
async def test_agent_memory_sequence():
    """에이전트 기억능력 순차 테스트"""
    agent_executor = create_memory_agent_executor()  # 독립적인 executor
    
    thread_id = "memory-sequence-test"
    config = {"configurable": {"thread_id": thread_id}}
    
    conversations = [
        "테슬라 주가 알려줘",
        "그 주가로 5주 사려면 얼마나 필요해?",
        "아까 조회한 테슬라 주가가 얼마였지?",
        "테슬라 3주와 아까 조회한 주가로 계산해줘"
    ]
    
    responses = []
    
    for i, query in enumerate(conversations, 1):
        final_state = await agent_executor.ainvoke(
            {"messages": [HumanMessage(content=query)]},
            config=config
        )
        response = final_state["messages"][-1].content
        responses.append(response)
        
        print(f"\n{i}단계 질문: {query}")
        print(f"{i}단계 응답: {response}")
    
    # 검증
    last_response = responses[-1]
    assert "테슬라" in last_response or "Tesla" in last_response or "TSLA" in last_response
    assert any(char.isdigit() for char in last_response)
    
    second_response = responses[1]
    assert any(char.isdigit() for char in second_response)

print("✅ 기본 테스트 정의 완료")
print("📊 주가 쿼리 테스트")
print("🧠 메모리 순차 테스트")
print("\n▶️ 실행 방법:")
print("pytest tests/test_agent.py -v")


## 1️⃣2️⃣ 멀티세션 테스트 - tests/test_multiSession.py

멀티세션 독립성과 동시성을 테스트합니다.


In [None]:
# tests/test_multiSession.py
import pytest
import asyncio
from langchain_core.messages import HumanMessage
from app.core.session_manager import SessionManager

@pytest.mark.asyncio
async def test_multi_session_isolation():
    """여러 세션이 독립적으로 동작하는지 테스트"""
    session_manager = SessionManager()
    
    # 두 개의 독립적인 세션 생성
    session1_id = "test-session-1"
    session2_id = "test-session-2"
    
    # 동시에 두 세션에서 다른 작업 수행
    async def session1_task():
        responses = []
        async for chunk in session_manager.stream_chat("애플 5주 계산해서 알려줘", session1_id):
            responses.append(chunk)
        return "".join(responses)
    
    async def session2_task():
        responses = []
        async for chunk in session_manager.stream_chat("테슬라 3주 계산해서 알려줘", session2_id):
            responses.append(chunk)
        return "".join(responses)
    
    # 동시 실행
    result1, result2 = await asyncio.gather(session1_task(), session2_task())
    
    # 각 세션이 독립적인 결과를 가져야 함
    assert "애플" in result1 or "Apple" in result1 or "AAPL" in result1
    assert "테슬라" in result2 or "Tesla" in result2 or "TSLA" in result2
    
    # 세션 정보 확인
    info1 = await session_manager.get_session_info(session1_id)
    info2 = await session_manager.get_session_info(session2_id)
    
    assert info1['thread_id'] == session1_id
    assert info2['thread_id'] == session2_id
    assert info1['message_count'] >= 1
    assert info2['message_count'] >= 1

@pytest.mark.asyncio
async def test_concurrent_sessions():
    """동시 다중 세션 처리 테스트"""
    session_manager = SessionManager()
    
    async def create_session_task(session_id: str, query: str):
        responses = []
        async for chunk in session_manager.stream_chat(query, session_id):
            responses.append(chunk)
        return session_id, "".join(responses)
    
    # 10개의 동시 세션 생성
    tasks = []
    for i in range(10):
        session_id = f"concurrent-test-{i}"
        query = f"애플 주가 {i}번째 조회"
        tasks.append(create_session_task(session_id, query))
    
    # 모든 작업 동시 실행
    results = await asyncio.gather(*tasks)
    
    # 모든 세션이 성공적으로 처리되었는지 확인
    assert len(results) == 10
    
    for session_id, response in results:
        assert len(response) > 0
        info = await session_manager.get_session_info(session_id)
        assert info is not None
        assert info['thread_id'] == session_id

print("✅ 멀티세션 테스트 정의 완료")
print("🔒 세션 독립성 테스트")
print("🔥 동시 세션 처리 테스트")
print("\n▶️ 실행 방법:")
print("pytest tests/test_multiSession.py -v")


## 1️⃣3️⃣ 실행 방법 및 명령어

### 🚀 환경 설정 및 실행


In [None]:
# 🔧 환경 설정
setup_commands = """
# 1. 가상환경 생성 및 활성화
python -m venv stock_chat
source stock_chat/bin/activate  # Linux/Mac
# 또는
stock_chat\\Scripts\\activate     # Windows

# 2. 의존성 설치
pip install -r requirements.txt

# 3. 환경변수 설정 (.env 파일 생성)
echo "OPENAI_API_KEY=your_openai_api_key_here" > .env
"""

# 🚀 서버 실행
server_commands = """
# 1. FastAPI 서버 실행
uvicorn app.main:app --reload
# → http://127.0.0.1:8000

# 2. Streamlit UI 실행 (새 터미널)
streamlit run app/ui_streamlit.py
# → http://localhost:8501
"""

# 🧪 테스트 실행
test_commands = """
# 1. 모든 테스트 실행
pytest

# 2. 상세 출력으로 실행
pytest -v

# 3. 특정 테스트 파일만 실행
pytest tests/test_agent.py -v
pytest tests/test_multiSession.py -v

# 4. 비동기 테스트 디버깅
pytest --asyncio-mode=auto -s
"""

# 🔍 API 테스트
api_test_commands = """
# curl을 사용한 API 테스트

# 1. 기본 확인
curl http://127.0.0.1:8000/

# 2. 스트리밍 테스트
curl -X POST http://127.0.0.1:8000/api/v1/stream \\
  -H "Content-Type: application/json" \\
  -d '{"message": "애플 주가 알려줘", "thread_id": "test-123"}'

# 3. 세션 정보 조회
curl http://127.0.0.1:8000/api/v1/session/test-123

# 4. 활성 세션 목록
curl http://127.0.0.1:8000/api/v1/sessions/active
"""

print("🔧 환경 설정:")
print(setup_commands)
print("\n🚀 서버 실행:")
print(server_commands)
print("\n🧪 테스트 실행:")
print(test_commands)
print("\n🔍 API 테스트:")
print(api_test_commands)


## 🎉 완료! - 프로젝트 특징 요약

### 🎯 핵심 기능과 아키텍처


In [None]:
# 🎯 프로젝트 핵심 특징 요약
features = {
    "🔄 비동기 처리": [
        "asyncio.to_thread()로 동기 라이브러리(yfinance) 통합",
        "FastAPI StreamingResponse로 실시간 스트리밍",
        "LangGraph astream_events로 토큰 단위 스트리밍",
        "httpx.stream으로 클라이언트 스트리밍"
    ],
    
    "🎭 멀티세션 지원": [
        "세션별 독립적인 메모리 관리 (InMemorySaver)",
        "asyncio.Lock을 통한 동시성 제어",
        "자동 세션 정리 (타임아웃 기반)",
        "세션별 독립적인 agent_executor"
    ],
    
    "⛓️ LangChain Chain 패턴": [
        "Runnable 인터페이스 활용한 파이프라인",
        "| 연산자로 컴포넌트 연결",
        "message_preparer | llm_with_tools",
        "모듈화된 재사용 가능한 설계"
    ],
    
    "🛠️ 도구 통합": [
        "yfinance: 실시간 주가 조회",
        "numexpr: 수학적 계산",
        "LangGraph: 도구 호출 워크플로우",
        "비동기 도구 실행"
    ],
    
    "🌊 스트리밍 아키텍처": [
        "사용자 입력 → Streamlit → httpx → FastAPI",
        "FastAPI → SessionManager → LangGraph",
        "LangGraph → OpenAI → 토큰 스트림",
        "실시간 UI 업데이트"
    ],
    
    "🧪 테스트 커버리지": [
        "기본 에이전트 기능 테스트",
        "멀티턴 대화 메모리 테스트",
        "멀티세션 독립성 테스트",
        "동시 세션 처리 테스트"
    ]
}

print("🎯 PAI Stock Chatbot 핵심 특징:")
print("="*50)

for category, items in features.items():
    print(f"\n{category}:")
    for item in items:
        print(f"  • {item}")

print("\n" + "="*50)
print("🚀 프로덕션 레디 아키텍처")
print("💡 확장 가능한 멀티세션 설계")
print("⚡ 높은 성능의 비동기 처리")
print("🔒 안정적인 동시성 제어")

print("\n📁 전체 파일 구조:")
print("""
assignment_1/
├── 📦 requirements.txt          # 의존성
├── ⚙️ app/core/config.py        # 설정
├── 🧠 app/agents/state.py       # 상태 관리
├── 🛠️ app/agents/tools.py       # 도구 정의
├── 🔗 app/agents/nodes.py       # 노드
├── ⛓️ app/agents/graph.py       # 에이전트 그래프
├── 🎭 app/core/session_manager.py # 세션 관리
├── 🌐 app/api/v1/chat.py        # API 엔드포인트
├── 🚀 app/main.py               # FastAPI 앱
├── 🎨 app/ui_streamlit.py       # Streamlit UI
├── 🧪 tests/test_agent.py       # 기본 테스트
└── 🎭 tests/test_multiSession.py # 멀티세션 테스트
""")

print("\n🎉 이제 이 노트북을 통해 전체 프로젝트의 코드 흐름을 이해하고 실행할 수 있습니다!")
