In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os
from dotenv import load_dotenv, find_dotenv

_ = load_dotenv(find_dotenv())

# Creating the investment analysis crew

## Planner and Replanner

In [23]:
from langchain_openai import ChatOpenAI
planner_llm = ChatOpenAI(model="gpt-5-nano", temperature=0)

In [25]:
from typing import TypedDict, List, Literal
from pydantic import BaseModel, Field

class Step(BaseModel):
    step_number: int = Field(..., description="The step number in the plan")
    agent_to_use: Literal[
        "technical_analysis_agent", 
        "fundamental_analysis_agent", 
        "just_reply",
        "consolidate_and_reply"
    ] = Field(
        ..., 
        description= (
            "The name of the agent to use for this step. Use 'just_reply' to have "
            "the agent respond directly without invoking another agent. Use "
            "'consolidate_and_reply' to have the agent consolidate information "
            "from previous steps and respond."
        )
    )
    query_to_ask: str = Field(..., description="The query to ask this agent")

class Plan(BaseModel):
    steps: List[Step] = Field(..., description="The list of steps in the plan")

In [28]:
from langchain_core.prompts import ChatPromptTemplate

planner_prompt = ChatPromptTemplate.from_template(
    """For a given objective, come up with a simple step by step plan to achieve the objective.
    
    The plan should involve individual tasks that if executed correctly will yield the final outcome.
    
    The result of the final step should be the final answer to the user's objective.
    
    <important>
        Invoke the fundamental_analysis_agent and technical_analysis_agent only once per company of interest.
        Only use `just_reply` if you don't have to invoke any other agents.
        Whenever you invoke more than one agent, always have a `consolidate_reply` step at the end.
    </important>
    
    This means that if the user asks for analysis over 2 companies, you can invoke each agent twice, once per company.
    
    The user's question is: {objective}
    """
)

llm_planner = planner_prompt | planner_llm.with_structured_output(Plan)
plan = llm_planner.invoke({
    "Based on the past 5 years of stock market data, recommend whether to buy, hold, or sell the stock of Apple."
})

In [29]:
plan.steps

[Step(step_number=1, agent_to_use='fundamental_analysis_agent', query_to_ask='Please perform a fundamental analysis of Apple Inc. (AAPL) using the past 5 years of fundamental data. Provide an assessment of intrinsic value versus price based on revenue growth, earnings quality, operating margins, return metrics, balance sheet strength, cash flow, dividends and buybacks, and management guidance. Highlight key drivers of long-term value and indicate whether the stock appears undervalued, fairly valued, or overvalued.'),
 Step(step_number=2, agent_to_use='technical_analysis_agent', query_to_ask='Please perform a technical analysis of Apple Inc. (AAPL) using the past 5 years of stock market data. Analyze price trends, momentum, volatility, moving averages (e.g., 50-day and 200-day), RSI, MACD, volume, and identify support/resistance levels and common chart patterns. Provide a buy/hold/sell recommendation with rationale for the near-to-medium term and note any caveats.'),
 Step(step_number=3

### Replanner

In [14]:
replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)

llm_replanner = replanner_prompt | llm.with_structured_output(Plan)

In [None]:
class State(TypedDict):
    query: str
    tickers: List[str]
    plan: Plan
    fundamental_analysis_agent_reply: str
    technical_analysis_agent_reply: str
    search_agent_reply: str
    steps_done: List[Step]