In [1]:
# API KEY를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv

# API KEY 정보로드
load_dotenv()

True

In [2]:
from langchain_teddynote import logging

# 프로젝트 이름을 입력합니다.
logging.langsmith("주식분석")

LangSmith 추적을 시작합니다.
[프로젝트명]
주식분석


In [3]:
################################ 문서 검색 RAG 정의##############################################
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma

# --- 문서 로드 및 전처리 ---
loader = PyMuPDFLoader("stock_report/[삼성전자]분기보고서(2024.11.14).pdf")
docs = loader.load()

## : 문서 분할(Split Documents) <-----------추후 문서 제목 단위 분할로 변경 필요
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
split_documents = text_splitter.split_documents(docs)

## 단계 3: 임베딩(Embedding) 생성
embeddings = OpenAIEmbeddings()

# 벡터스토어 생성
vectorstore = Chroma.from_documents(documents=split_documents, embedding=embeddings, persist_directory="stock_report/chroma_db")

# 5. 검색기(Retriever) 생성
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

# 문서 검색 도구 생성
from langchain.tools.retriever import create_retriever_tool

def rag_tool(input: str) -> str:
    docs = retriever.invoke(input)
    return "\n".join([doc.page_content for doc in docs])

retriever_tool = create_retriever_tool(
    retriever,
    name="retriever_tool",   # 툴의 이름 (Agent가 호출할 때 사용)
    description="반기보고서 내용을 검색하여 제공합니다."  # 툴의 용도 설명
)

In [4]:
################################# 웹 검색 및 날짜 확인 도구 ######################################
from tavily import TavilyClient
from langchain_community.tools.tavily_search import TavilySearchResults
from datetime import datetime
from langchain.agents import Tool
import os

# 웹검색 도구 생성
web_search = TavilySearchResults(max_results=3)
# 해당 함수의 경우에는 재검색이 요청된 경우에 사용하도록 한다.
web_search_retry = TavilySearchResults(max_results=5)

# 오늘 날짜 확인
def get_today_tool():
    return datetime.today().strftime('%Y-%m-%d')

today_tool = Tool(
    name="Get Today",
    func=get_today_tool,
    description="Returns today's date in YYYY-MM-DD format",
    verbose=True
)

# 생성된 도구 목록
tools = [web_search, web_search_retry, today_tool, retriever_tool]

In [5]:
# 노드 1-1 생성된 툴들에 대하여 툴노드 정의
from langgraph.prebuilt import ToolNode

tool_nodes = ToolNode(tools)

In [6]:
####################################### STATE ##########################################
from typing import Annotated, List, Dict
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages

class StockvalueStateCal(TypedDict):
    user_input: str
    messages: Annotated[list, add_messages]
    analysis_methods: list[str] = ["DCF", "DDM", "PER", "PBR"]
    current_price: str
    retrieve_check: bool
    retrieval_msg: str
    rewrite_query: str
    tools_call_switch: bool = True
    report_data: Dict[str, str]
    web_data: Dict[str, str]
    valuation: Dict[str, str]

In [7]:
############# llm 모델 정의 ############################
# 3. 모델(LLM) 생성
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    temperature=0.1,
    max_tokens=4000,
    model_name="gpt-4o-mini"  # 사용할 GPT 모델 설정
)

In [8]:
################################# 노드 정의 ########################################
# 노드 1-2 주식 가치 판단을 위해 필요한 정보를 분석하는 stock value calculation 노드
from langchain_core.prompts import ChatPromptTemplate, FewShotChatMessagePromptTemplate

# 툴 2개가 bind된 stock value cal 노드
stockvalueCal_model_with_tools = llm.bind_tools(tools)

def stock_value_calculation_node(state: StockvalueStateCal) :
    prompt_with_tools = ChatPromptTemplate.from_messages([
        ("system","""
        당신은 유능한 금융전문가 입니다.\n
        user input 을 받아서 관련 주식명을 확인하고, 해당 주식의 가치를 평가하여 답변하여야 합니다. \n
        최종 답변에는 PER 분석 결과와 해석이 들어가 있어야 합니다. \n
        분석을 완료한 후 당신의 개인적인 의견을 마지막에 요약해주세요. \n
        주식 가치 평가는 반드시 검색된 정확한 수치로만 평가되어야 합니다. \n
        검색된 정확한 수치가 없다면 당신은 retriever_tool 을 활용하여 반기보고서에서 필요한 정보를 검색할 수 있습니다. \n
        반기보고서에서 확인이 어렵거나 실시간 데이터 확인이 필요하다면 web_search 를 활용하여 웹 검색을 할 수 있습니다.
        """),
        ("human", "Input: {human_input}\n Retrieve: {context}"),
    ])
    messages_list = state['messages']
    last_human_message = next((msg for msg in reversed(messages_list) if isinstance(msg, HumanMessage)), None).content
    last_msg = state['messages'][-1].content
    
    if last_human_message == last_msg:
        last_msg = ""
        print(f"==================================== INPUT ====================================\nHuman Input: {last_human_message}")
    else:
        try:
            last_msg_data = json.loads(state['messages'][-1].content)
            last_msg = "\n\n".join([d["content"] for d in last_msg_data])
        except:
            ...
        print(f"==================================== INPUT ====================================\nHuman Input: {last_human_message}\nContext: {last_msg}")
    
    if state['tools_call_switch']:
        chain_with_tools = prompt_with_tools | stockvalueCal_model_with_tools
        response = chain_with_tools.invoke({"human_input": last_human_message, "context": last_msg})

        if hasattr(response, "tool_calls") and len(response.tool_calls) > 0 and (response.tool_calls[0]["name"]) == "tavily_search_results_json":
            print("================================ Search Online ================================")
            tool_switch = False
        elif hasattr(response, "tool_calls") and len(response.tool_calls) > 0 and (response.tool_calls[0]["name"]) == "retrieve_trends":
            print("=============================== Search Retrieval ===============================")
            tool_switch = False
        else:
            print("============================= Stock Cal Information =============================")
            tool_switch = False
            print(response.content)
    else:
        print("에러")
        tool_switch = False

    return {"messages": [response], "user_input": last_human_message, "tools_call_switch": tool_switch}

In [9]:
# 노드 1-3. RAG 검증노드
# 노드 1-2의 Tools Output을 받아서, User Input에 잘 맞는지 검증해서 Yes Or No로 대답함.
# 만약 Yes라면 그대로 다시 Character Make Node로 보내서 최종 답변을 생성하도록 하고
# 아니라면 검색을 진행하고 새로운 값을 받아서 보낼거임.

from pydantic import BaseModel, Field

class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""
    binary_score:str = Field(..., description="Documents are relevant to the question, 'yes' or 'no'", enum=['yes', 'no'])

rag_check_model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
rag_check_model = rag_check_model.with_structured_output(GradeDocuments)

def retrieve_check_node(state: StockvalueStateCal):
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", """
            You are a consultation expert who provides appropriate information in response to user input.
            Return 'yes' or 'no' if you can provide an accurate answer to the user's question from the given documentation.
            If you can't provide a clear answer, be sure to return NO.
            """),
            ("human", "Retrieved document: \n\n {document} \n\n User's input: {question}"),
        ]
    )

    retrieval_msg = state['messages'][-1].content
    human_msg = state['user_input']
    retrieval_grader = prompt | rag_check_model
    response = retrieval_grader.invoke({"document": retrieval_msg, "question": human_msg})
    retrieve_handle = response.binary_score
    retrieve_check = False
    
    if retrieve_handle == "no":
        print("=============================== Need to Check ===============================")
        retrieve_check = True
    if retrieve_handle == "yes":
        print("============================== No Need to Check =============================")
        
    return {"retrieve_check": retrieve_check, "retrieval_msg": retrieval_msg}

In [10]:
# 노드 1-4. 쿼리 재-작성 노드
# 노드 1-2에서 산출된 retrieve가 입력값과 적절하게 매치되지 않는 경우, 입력값을 수정하게 됨.
# state User_input 이용
# 이는 노드 1-3에서 yes를 반환하는 경우에 실행됨.

class Rewrite_Output(TypedDict):
    """
    Sturctured_output을 생성하기위한 클래스
    """
    query: Annotated[str, ..., "Rewritten query to find appropriate material on the web"]

rewrite_model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
rewrite_model = rewrite_model.with_structured_output(Rewrite_Output)

def rewrite_node(state: StockvalueStateCal):
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", """
            You're an expert in improving search relevance.\n
            Look at previously entered search queries and rewrite them to better find that information on the internet.
            """),
            ("human", "Previously entered search queries: \n{user_input}"),
        ]
    )
    
    user_input = state['user_input']
    rewrite_chain = prompt | rewrite_model
    response = rewrite_chain.invoke({"user_input": user_input})
    rewrited_query = response['query']
    print(f"================================ Rewrited Query ================================\nRewritted Query: {rewrited_query}")

    return {"rewrite_query": rewrited_query}

In [11]:
# 노드 1-5. 재작성된 쿼리를 이용해서 인터넷 검색하는 노드

def rewrite_search_node(state: StockvalueStateCal):
    print("================================ Search Web ================================")
    docs = web_search_retry.invoke({"query": state['rewrite_query']})
    web_results = "\n\n".join([d["content"] for d in docs])
    web_results = web_results + "\n\n" + state['retrieval_msg']
    # print(web_results)

    new_messages = [ToolMessage(content=web_results, tool_call_id="tavily_search_results_json")]
            
    return {"messages": new_messages}

In [12]:
# 라우팅 함수를 수정해주자.
# 검색이 필요한 것인지, 아니면 RAG가 필요한 것인지 탐색!
def simple_route(state: StockvalueStateCal):
    """
    Simplery Route Tools or Next or retrieve
    """
    if isinstance(state, list):
        ai_message = state[-1]
    elif messages := state.get("messages", []):
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0 and ai_message.tool_calls[0]["name"] == "tavily_search_results_json":
        # print("Tavily Search Tool Call")
        return "tools"
    elif hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0 and ai_message.tool_calls[0]["name"] == "retrieve_trends":
        # print("Retrieve Call")
        return "retrieve"

    return "next"

# 여기서는 RAG가 괜찮은지 검증하여 반환.
def retrieve_route(state: StockvalueStateCal):
    """
    RAG Need Check?
    """
    if state['retrieve_check']:
        return "rewrite"

    return "return"

In [32]:
from langgraph.graph import StateGraph, START, END

# Pass나 ... 을 입력해 class를 정의하면 에러가 발생한다.
class InputState(TypedDict):
    start_input: str

def user_input_node(state: InputState):
    print("================================= Make Persona =================================")
    print("주식가치를 계산합니다. 확인하고 싶은 주식명을 말씀해주세요.")
    # time.sleep(1)
    user_input = input("User: ")
    
    return {"user_input": user_input}
    
# 그래프를 만드는 builder를 정의. input을 지정해주지 않으면
# OverallState를 START에서 Input으로 요구하게 됨.
graph_builder = StateGraph(StockvalueStateCal, input=InputState)

# 마지막으로 지금까지 만든 노드를 모두 넣어준다.
graph_builder.add_node("User Input", user_input_node)
graph_builder.add_node("stock_value_calculation", stock_value_calculation_node)
graph_builder.add_node("Rewrite Tool", rewrite_node)
graph_builder.add_node("Rewrite-Search", rewrite_search_node)

graph_builder.add_edge(START, "User Input")
graph_builder.add_edge("User Input", "stock_value_calculation")
graph_builder.add_edge("tool_nodes", "stock_value_calculation")
graph_builder.add_edge("RAG Tool", "retrieve_check_node")
graph_builder.add_edge("Rewrite Tool", "Rewrite-Search")
graph_builder.add_edge("Rewrite-Search", "stock_value_calculation")

graph_builder.add_conditional_edges(
    "stock_value_calculation_node",
    simple_route,
    {"tools": "tool_nodes", "next": "stock_value_calculation", "retrieve": "RAG Tool"}
)
graph_builder.add_conditional_edges(
    "retrieve_check_node", 
    retrieve_route, 
    {"rewrite": "Rewrite Tool", "return": "stock_value_calculation_node"}
)

compiled_graph = graph_builder.compile()
print(compiled_graph)

ValueError: Found edge starting at unknown node 'retrieve_check_node'