In [1]:

# imports
import os
import sys
import types
import json

# figure size/format
fig_width = 7
fig_height = 5
fig_format = 'retina'
fig_dpi = 96
interactivity = ''
is_shiny = False
is_dashboard = False
plotly_connected = True

# matplotlib defaults / format
try:
  import matplotlib.pyplot as plt
  plt.rcParams['figure.figsize'] = (fig_width, fig_height)
  plt.rcParams['figure.dpi'] = fig_dpi
  plt.rcParams['savefig.dpi'] = fig_dpi
  from IPython.display import set_matplotlib_formats
  set_matplotlib_formats(fig_format)
except Exception:
  pass

# plotly use connected mode
try:
  import plotly.io as pio
  if plotly_connected:
    pio.renderers.default = "notebook_connected"
  else:
    pio.renderers.default = "notebook"
  for template in pio.templates.keys():
    pio.templates[template].layout.margin = dict(t=30,r=0,b=0,l=0)
except Exception:
  pass

# disable itables paging for dashboards
if is_dashboard:
  try:
    from itables import options
    options.dom = 'fiBrtlp'
    options.maxBytes = 1024 * 1024
    options.language = dict(info = "Showing _TOTAL_ entries")
    options.classes = "display nowrap compact"
    options.paging = False
    options.searching = True
    options.ordering = True
    options.info = True
    options.lengthChange = False
    options.autoWidth = False
    options.responsive = True
    options.keys = True
    options.buttons = []
  except Exception:
    pass
  
  try:
    import altair as alt
    # By default, dashboards will have container sized
    # vega visualizations which allows them to flow reasonably
    theme_sentinel = '_quarto-dashboard-internal'
    def make_theme(name):
        nonTheme = alt.themes._plugins[name]    
        def patch_theme(*args, **kwargs):
            existingTheme = nonTheme()
            if 'height' not in existingTheme:
              existingTheme['height'] = 'container'
            if 'width' not in existingTheme:
              existingTheme['width'] = 'container'

            if 'config' not in existingTheme:
              existingTheme['config'] = dict()
            
            # Configure the default font sizes
            title_font_size = 15
            header_font_size = 13
            axis_font_size = 12
            legend_font_size = 12
            mark_font_size = 12
            tooltip = False

            config = existingTheme['config']

            # The Axis
            if 'axis' not in config:
              config['axis'] = dict()
            axis = config['axis']
            if 'labelFontSize' not in axis:
              axis['labelFontSize'] = axis_font_size
            if 'titleFontSize' not in axis:
              axis['titleFontSize'] = axis_font_size  

            # The legend
            if 'legend' not in config:
              config['legend'] = dict()
            legend = config['legend']
            if 'labelFontSize' not in legend:
              legend['labelFontSize'] = legend_font_size
            if 'titleFontSize' not in legend:
              legend['titleFontSize'] = legend_font_size  

            # The header
            if 'header' not in config:
              config['header'] = dict()
            header = config['header']
            if 'labelFontSize' not in header:
              header['labelFontSize'] = header_font_size
            if 'titleFontSize' not in header:
              header['titleFontSize'] = header_font_size    

            # Title
            if 'title' not in config:
              config['title'] = dict()
            title = config['title']
            if 'fontSize' not in title:
              title['fontSize'] = title_font_size

            # Marks
            if 'mark' not in config:
              config['mark'] = dict()
            mark = config['mark']
            if 'fontSize' not in mark:
              mark['fontSize'] = mark_font_size

            # Mark tooltips
            if tooltip and 'tooltip' not in mark:
              mark['tooltip'] = dict(content="encoding")

            return existingTheme
            
        return patch_theme

    # We can only do this once per session
    if theme_sentinel not in alt.themes.names():
      for name in alt.themes.names():
        alt.themes.register(name, make_theme(name))
      
      # register a sentinel theme so we only do this once
      alt.themes.register(theme_sentinel, make_theme('default'))
      alt.themes.enable('default')

  except Exception:
    pass

# enable pandas latex repr when targeting pdfs
try:
  import pandas as pd
  if fig_format == 'pdf':
    pd.set_option('display.latex.repr', True)
except Exception:
  pass

# interactivity
if interactivity:
  from IPython.core.interactiveshell import InteractiveShell
  InteractiveShell.ast_node_interactivity = interactivity

# NOTE: the kernel_deps code is repeated in the cleanup.py file
# (we can't easily share this code b/c of the way it is run).
# If you edit this code also edit the same code in cleanup.py!

# output kernel dependencies
kernel_deps = dict()
for module in list(sys.modules.values()):
  # Some modules play games with sys.modules (e.g. email/__init__.py
  # in the standard library), and occasionally this can cause strange
  # failures in getattr.  Just ignore anything that's not an ordinary
  # module.
  if not isinstance(module, types.ModuleType):
    continue
  path = getattr(module, "__file__", None)
  if not path:
    continue
  if path.endswith(".pyc") or path.endswith(".pyo"):
    path = path[:-1]
  if not os.path.exists(path):
    continue
  kernel_deps[path] = os.stat(path).st_mtime
print(json.dumps(kernel_deps))

# set run_path if requested
if r'C:\Users\kmkim\Desktop\projects\blog\docs\blog\posts\Agent\16-Agent':
  os.chdir(r'C:\Users\kmkim\Desktop\projects\blog\docs\blog\posts\Agent\16-Agent')

# reset state
%reset

# shiny
# Checking for shiny by using False directly because we're after the %reset. We don't want
# to set a variable that stays in global scope.
if False:
  try:
    import htmltools as _htmltools
    import ast as _ast

    _htmltools.html_dependency_render_mode = "json"

    # This decorator will be added to all function definitions
    def _display_if_has_repr_html(x):
      try:
        # IPython 7.14 preferred import
        from IPython.display import display, HTML
      except:
        from IPython.core.display import display, HTML

      if hasattr(x, '_repr_html_'):
        display(HTML(x._repr_html_()))
      return x

    # ideally we would undo the call to ast_transformers.append
    # at the end of this block whenver an error occurs, we do 
    # this for now as it will only be a problem if the user 
    # switches from shiny to not-shiny mode (and even then likely
    # won't matter)
    import builtins
    builtins._display_if_has_repr_html = _display_if_has_repr_html

    class _FunctionDefReprHtml(_ast.NodeTransformer):
      def visit_FunctionDef(self, node):
        node.decorator_list.insert(
          0,
          _ast.Name(id="_display_if_has_repr_html", ctx=_ast.Load())
        )
        return node

      def visit_AsyncFunctionDef(self, node):
        node.decorator_list.insert(
          0,
          _ast.Name(id="_display_if_has_repr_html", ctx=_ast.Load())
        )
        return node

    ip = get_ipython()
    ip.ast_transformers.append(_FunctionDefReprHtml())

  except:
    pass

def ojs_define(**kwargs):
  import json
  try:
    # IPython 7.14 preferred import
    from IPython.display import display, HTML
  except:
    from IPython.core.display import display, HTML

  # do some minor magic for convenience when handling pandas
  # dataframes
  def convert(v):
    try:
      import pandas as pd
    except ModuleNotFoundError: # don't do the magic when pandas is not available
      return v
    if type(v) == pd.Series:
      v = pd.DataFrame(v)
    if type(v) == pd.DataFrame:
      j = json.loads(v.T.to_json(orient='split'))
      return dict((k,v) for (k,v) in zip(j["index"], j["data"]))
    else:
      return v

  v = dict(contents=list(dict(name=key, value=convert(value)) for (key, value) in kwargs.items()))
  display(HTML('<script type="ojs-define">' + json.dumps(v) + '</script>'), metadata=dict(ojs_define = True))
globals()["ojs_define"] = ojs_define


In [2]:
# API 키를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv

# API 키 정보 로드
load_dotenv()

In [3]:
# LangSmith 추적을 설정한다. https://smith.langchain.com
# !pip install -qU langchain-teddynote
from langchain_teddynote import logging

# 프로젝트 이름을 입력한다.
logging.langsmith("CH16-Plan-Execute-Agent")

In [4]:
from langchain_openai import ChatOpenAI
from langchain_teddynote.tools.tavily import TavilySearch
from langchain_core.messages import HumanMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

# 메모리 설정 (계획 및 실행 이력 저장)
memory = MemorySaver()

# 모델 설정
# Planner: 복잡한 추론이 필요하므로 GPT-4o 사용
planner_model = ChatOpenAI(model_name="gpt-4o", temperature=0)

# Executor: 비용 절감을 위해 GPT-4o-mini 사용
executor_model = ChatOpenAI(model_name="gpt-4o-mini")

In [5]:
from typing import List
from pydantic import BaseModel, Field

class Task(BaseModel):
    """수행할 작업의 단위"""
    id: int = Field(description="작업 ID (1부터 시작)")
    title: str = Field(description="작업 제목")
    description: str = Field(description="작업 상세 설명")
    required_tools: List[str] = Field(description="필요한 도구 목록")
    dependencies: List[int] = Field(
        default_factory=list, 
        description="선행 작업 ID 목록"
    )
    
    def __str__(self) -> str:
        return f"Task {self.id}: {self.title}"

class Plan(BaseModel):
    """전체 작업 계획"""
    objective: str = Field(description="최종 목표")
    tasks: List[Task] = Field(description="수행할 작업 리스트")
    
    def __str__(self) -> str:
        task_str = "\n".join(f"  {task}" for task in self.tasks)
        return f"Goal: {self.objective}\n\nTasks:\n{task_str}"

In [6]:
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_messages([
    (
        "system",
        """당신은 전문적인 작업 계획가(Project Manager)입니다.
        
사용자의 요청을 분석하여 다음 규칙에 따라 상세한 작업 계획을 수립하세요:

1. **작업 분해**: 복잡한 요청을 논리적인 소 작업들로 분해
2. **순차성**: 선행 작업이 필요한 경우 dependencies에 명시
3. **도구 식별**: 각 작업에 필요한 도구 명시 (web_search, file_management, pdf_retriever 등)
4. **명확성**: 각 작업의 목표가 명확하고 측정 가능하도록 작성

사용 가능한 도구:
- web_search: 웹 검색 (최신 정보 필요시)
- file_write: 파일 작성
- file_read: 파일 읽기
- pdf_retriever: PDF 문서 검색
- data_analysis: 데이터 분석

응답 형식은 다음 JSON 스키마를 따르세요:
```json
{{
    "objective": "최종 목표",
    "tasks": [
        {{
            "id": 1,
            "title": "작업 제목",
            "description": "작업 상세 설명",
            "required_tools": ["tool1", "tool2"],
            "dependencies": []
        }}
    ]
}}
```""",
    ),
    ("human", "{input}"),
])

In [7]:
from langchain_teddynote.tools.tavily import TavilySearch

# 웹 검색 도구 생성
web_search = TavilySearch(
    topic="general",
    max_results=5,
    include_answer=False,
    include_raw_content=False,
)

web_search.name = "web_search"
web_search.description = (
    "웹에서 정보를 검색합니다. 최신 정보, 뉴스, 통계 등이 필요할 때 사용합니다."
)

In [8]:
from langchain_community.agent_toolkits import FileManagementToolkit

working_directory = "tmp"

file_management_tools = FileManagementToolkit(
    root_dir=str(working_directory),
).get_tools()

# 도구 이름과 설명 업데이트
for tool in file_management_tools:
    if tool.name == "file_write":
        tool.description = "파일을 작성하거나 수정합니다."
    elif tool.name == "file_read":
        tool.description = "파일의 내용을 읽습니다."
    elif tool.name == "list_directory":
        tool.description = "디렉토리의 파일 목록을 조회합니다."

In [9]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
from langchain.document_loaders import PDFPlumberLoader
from langchain_core.tools.retriever import create_retriever_tool
from langchain_core.prompts import PromptTemplate

# PDF 로드 및 벡터화
loader = PDFPlumberLoader("data/sample_document.pdf")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
split_docs = loader.load_and_split(text_splitter)
vector = FAISS.from_documents(split_docs, OpenAIEmbeddings())
pdf_retriever = vector.as_retriever()

# Retriever 도구 생성
retriever_tool = create_retriever_tool(
    pdf_retriever,
    "pdf_retriever",
    "내부 문서에서 정보를 검색합니다. 회사 정책, 보고서, 기술 문서 등을 조회할 수 있습니다.",
    document_prompt=PromptTemplate.from_template(
        "<document><context>{page_content}</context><metadata><source>{source}</source><page>{page}</page></metadata></document>"
    ),
)

In [10]:
# 도구 목록 구성
tools = [web_search, *file_management_tools, retriever_tool]

In [11]:
from langgraph.prebuilt import create_react_agent

# Executor 에이전트: 개별 작업 수행
executor = create_react_agent(
    executor_model,
    tools,
    checkpointer=memory,
)

In [12]:
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import BaseMessage
from typing import Annotated
from langgraph.graph.message import add_messages

class PlanExecuteState(BaseModel):
    """계획 및 실행 상태"""
    input: str = Field(description="사용자 입력")
    plan: Plan = Field(default=None, description="수립된 계획")
    past_steps: Annotated[List[tuple], add_messages] = Field(
        default_factory=list,
        description="완료된 작업과 결과"
    )
    response: str = Field(default="", description="최종 응답")

def plan_step(state: PlanExecuteState) -> PlanExecuteState:
    """계획 수립 단계"""
    # Planner가 계획 수립
    planner_chain = planner_prompt | planner_model.with_structured_output(Plan)
    plan = planner_chain.invoke({"input": state.input})
    
    return {**state, "plan": plan}

def execute_step(state: PlanExecuteState) -> PlanExecuteState:
    """작업 실행 단계"""
    plan = state.plan
    past_steps = state.past_steps
    
    # 완료되지 않은 작업 찾기
    completed_task_ids = {step[0] for step in past_steps}
    next_task = None
    
    for task in plan.tasks:
        # 의존성이 모두 완료되었고, 아직 수행되지 않은 작업
        if (task.id not in completed_task_ids and 
            all(dep_id in completed_task_ids for dep_id in task.dependencies)):
            next_task = task
            break
    
    if next_task is None:
        # 모든 작업 완료
        return state
    
    # 작업 실행
    task_instruction = f"""
    다음 작업을 수행하세요:
    
    제목: {next_task.title}
    설명: {next_task.description}
    필요 도구: {', '.join(next_task.required_tools)}
    
    이전 작업 결과:
    {chr(10).join(f"Task {task_id}: {result}" for task_id, result in past_steps)}
    """
    
    # Executor가 작업 수행
    result = executor.invoke(
        {"messages": [("human", task_instruction)]},
        {"configurable": {"thread_id": f"task_{next_task.id}"}}
    )
    
    # 결과 저장
    past_steps.append((next_task.id, result["messages"][-1].content))
    
    return {**state, "past_steps": past_steps}

def generate_response_step(state: PlanExecuteState) -> PlanExecuteState:
    """최종 응답 생성 단계"""
    plan = state.plan
    past_steps = state.past_steps
    
    # 모든 작업이 완료되었는지 확인
    if len(past_steps) < len(plan.tasks):
        return state
    
    # 결과 통합
    summary = f"""
    ## 작업 완료 보고서
    
    **목표**: {plan.objective}
    
    ### 수행 작업
    """
    
    for task_id, result in past_steps:
        task = next(t for t in plan.tasks if t.id == task_id)
        summary += f"\n\n**Task {task_id}: {task.title}**\n{result}\n"
    
    return {**state, "response": summary}

# 그래프 구성
workflow = StateGraph(PlanExecuteState)

workflow.add_node("plan", plan_step)
workflow.add_node("execute", execute_step)
workflow.add_node("respond", generate_response_step)

workflow.add_edge(START, "plan")
workflow.add_edge("plan", "execute")
workflow.add_edge("execute", "execute")  # 모든 작업 완료까지 반복
workflow.add_edge("execute", "respond")
workflow.add_edge("respond", END)

# 컴파일
plan_execute_agent = workflow.compile(checkpointer=memory)

In [13]:
from langchain_teddynote.messages import stream_graph

def run_plan_execute_agent(instruction: str, thread_id: str = "main"):
    """계획 및 실행 에이전트 실행"""
    config = {"configurable": {"thread_id": thread_id}}
    inputs = {"messages": [("human", instruction)]}
    
    stream_graph(plan_execute_agent, inputs, config)

In [14]:
instruction = """
AI 에이전트 시장에 대한 종합 조사 보고서를 작성해주세요.

다음 내용이 포함되어야 합니다:
1. 현재 AI 에이전트 시장 규모 및 성장률
2. 주요 플레이어 및 기술 트렌드
3. 시장 기회 및 위험 요소 분석
4. 향후 6개월 전망

보고서는 마크다운 형식으로 작성하고 파일로 저장해주세요.
"""

run_plan_execute_agent(instruction)

In [15]:
instruction = """
회사의 분기별 판매 데이터를 분석하고 시각화하는 프로젝트를 진행해주세요.

다음을 순서대로 수행하세요:
1. sales_data.csv 파일 읽기
2. 분기별 판매액, 증감율, 주요 제품 분석
3. 분석 결과를 정리한 요약 테이블 작성
4. 주요 통찰력을 markdown 형식으로 정리
5. 최종 분석 보고서를 analysis_report.md로 저장
"""

run_plan_execute_agent(instruction)

In [16]:
instruction = """
주요 경쟁사 3곳에 대한 경쟁 분석 리포트를 작성해주세요.

1. 경쟁사1, 경쟁사2, 경쟁사3의 최신 뉴스와 제품 정보 수집
2. 각 경쟁사의 강점, 약점, 기회, 위협(SWOT) 분석
3. 우리 회사와의 차별화 전략 도출
4. 비교 테이블 작성
5. 최종 보고서를 competitor_analysis.md로 저장

각 경쟁사별 분석은 병렬로 수행 가능합니다.
"""

run_plan_execute_agent(instruction)

In [17]:
def should_execute_task(task: Task, previous_results: dict) -> bool:
    """특정 조건에서만 Task 실행"""
    
    # 예: 검색 결과가 충분하면 분석 Task 스킵
    if task.id == 4 and len(previous_results.get("search_results", [])) < 3:
        return False
    
    # 예: 파일이 존재하면 생성 Task 스킵
    if task.id == 5 and "file_exists" in previous_results:
        return False
    
    return True

In [18]:
from functools import lru_cache

@lru_cache(maxsize=128)
def execute_task_cached(task_id: int, instruction: str) -> str:
    """동일한 Task는 캐시된 결과 사용"""
    # 실행 로직
    pass

In [19]:
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def execute_task_with_retry(task: Task) -> str:
    """실패 시 지수 백오프로 재시도"""
    # 실행 로직
    pass