## State definition

In [27]:
from typing import TypedDict, List, Dict, Any

class WorkflowPortfolioState(TypedDict):
    holdings: List[Dict[str, Any]]
    market_data: Dict[str, Any]
    news_data: List[Dict[str, Any]]
    alerts: List[Dict[str, Any]]
    analysis: Dict[str, Any]
    timestamp: str
    messages: List[Any]  # For LangGraph agent communication
    current_task: str    # Track current workflow step
    results: Dict[str, Any]  # Store results from each agent

## Get stock price tool

In [28]:
from langchain_core.tools import tool
import yfinance as yf

@tool
def get_stock_information(symbol: str) -> Dict[str, Any]:
    """ Get current stock price and basic information for a symbol """
    try:
        stock = yf.Ticker(symbol)
        info = stock.info
        hist = stock.history(period="1d")

        if hist.empty:
            return {"error": f"No data available for this symbol {symbol}"}

        current_price = hist["Close"].iloc[-1]
        return {
            "symbol": symbol,
            "current_price": current_price,
            "previous_close": float(info.get("previousClose", current_price)),
            "volume": info.get('volume', 0),
            "market_cap": info.get('marketCap'),
            "day_change": float(current_price - info.get('previousClose', current_price)),
            "day_change_percent": float(((current_price - info.get('previousClose', current_price)) / info.get('previousClose', current_price)) * 100) if info.get('previousClose') else 0
        }
    except Exception as e:
        return {"error": f"Error fetching stock info for {symbol}: {str(e)}"}


## Calculate position value tool

In [29]:
@tool 
def calculate_position_value(symbol: str, quantity: float, avg_cost: float, current_price: float) -> Dict[str, Any]:
    """Calculate the current value and P&L for a stock position"""
    market_value = quantity * current_price
    cost_basis = quantity * avg_cost
    gain_loss = market_value - cost_basis
    gain_loss_percent = (gain_loss / cost_basis) * 100 if cost_basis > 0 else 0
    
    return {
        "symbol": symbol,
        "market_value": market_value,
        "cost_basis": cost_basis,
        "gain_loss": gain_loss,
        "gain_loss_percent": gain_loss_percent,
        "current_price": current_price,
        "quantity": quantity,
        "avg_cost": avg_cost
    }

## Data collection agent

In [31]:
from langgraph.prebuilt import create_react_agent

def create_data_agent(llm):
    """Create an agent focused on collecting stock market data"""
    
    system_message = """You are a Stock Data Collection Agent. Your job is to:
    1. Take a list of stock holdings and collect current market data for each
    2. Use the get_stock_price tool to fetch current prices
    3. Use the calculate_position_value tool to determine current position values
    4. Return a summary of all collected data
    
    Always be precise with numbers and clearly indicate if any data collection failed.
    Format your final response as a clear summary of the portfolio's current state.
    """
    
    tools = [get_stock_information, calculate_position_value]
    return create_react_agent(llm, tools, state_modifier=system_message)

## Portfolio monitor - workflow orchestator

In [32]:
from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage
from typing import List, Dict, Any
from langchain_core.messages import HumanMessage
from datetime import datetime

class SimplePortfolioMonitor:
    def __init__(self, llm):
        self.llm = llm
        self.data_agent = create_data_agent(llm)
        
        # Create the workflow graph
        self.workflow = self._build_workflow()
    
    def _build_workflow(self) -> StateGraph:
        """Build a simple workflow with one agent"""
        workflow = StateGraph(WorkflowPortfolioState)
        
        # Add the data collection node
        workflow.add_node("collect_data", self._data_collection_node)
        
        # Simple linear flow
        workflow.set_entry_point("collect_data")
        workflow.add_edge("collect_data", END)
        
        return workflow.compile()
    
    async def _data_collection_node(self, state: WorkflowPortfolioState) -> WorkflowPortfolioState:
        """Node that runs the data collection agent"""
        holdings = state["holdings"]
        
        # Create a task message for the agent
        task_message = f"""
        Please collect current market data for the following stock holdings:
        {holdings}
        
        For each holding, get the current stock price and calculate the position value.
        Provide a summary of the entire portfolio's current state.
        """
        
        print(f"🤖 Data Agent: Starting data collection for {len(holdings)} holdings...")
        
        # Run the agent
        result = await self.data_agent.ainvoke({
            "messages": [HumanMessage(content=task_message)]
        })
        
        # Update state with results
        state["results"]["data_collection"] = result
        state["messages"].extend(result["messages"])
        state["timestamp"] = datetime.now().isoformat()
        
        print("✅ Data Agent: Collection complete!")
        
        return state

    async def monitor_portfolio(self, holdings: List[Dict[str, Any]]) -> WorkflowPortfolioState:
        """Run the portfolio monitoring workflow"""
        initial_state = WorkflowPortfolioState(
            holdings=holdings,
            market_data={},
            news_data=[],
            alerts=[],
            analysis={},
            timestamp="",
            messages=[],
            current_task="data_collection",
            results={}
        )
        
        print("🚀 Starting Portfolio Monitoring Workflow...")
        result = await self.workflow.ainvoke(initial_state)
        print("🎯 Workflow completed!")
        
        return result

## Usage example

In [33]:
from langchain_openai import ChatOpenAI
from langchain_core.rate_limiters import InMemoryRateLimiter


# Sample portfolio
sample_holdings = [
    {"symbol": "AAPL", "quantity": 100, "avg_cost": 150.0},
    {"symbol": "GOOGL", "quantity": 50, "avg_cost": 2500.0},
    {"symbol": "MSFT", "quantity": 75, "avg_cost": 300.0}
]

print("=== STEP-BY-STEP PORTFOLIO MONITORING ===")
print("Step 1: Setting up agents and tools...")

# Initialize with mock LLM (replace with real LLM)
rate_limiter = InMemoryRateLimiter(requests_per_second=2, check_every_n_seconds=0.2, max_bucket_size=5)

#llm = ChatOllama(model="qwen2.5:32b")
llm = ChatOpenAI(model="gpt-4o-mini", rate_limiter=rate_limiter)
monitor = SimplePortfolioMonitor(llm)

print("Step 2: Running data collection...")

# Test tools directly first (since we're using mock LLM)
print("\nTesting tools directly:")
for holding in sample_holdings:
    symbol = holding["symbol"]
    print(f"\nGetting data for {symbol}...")
    
    # Test the stock price tool
    price_data = get_stock_information.invoke({"symbol": symbol})
    print(f"Price data: {price_data}")
    
    if "error" not in price_data:
        # Test the position calculation tool
        position_data = calculate_position_value.invoke({
            "symbol": symbol,
            "quantity": holding["quantity"],
            "avg_cost": holding["avg_cost"],
            "current_price": price_data["current_price"]
        })
        print(f"Position value: {position_data}")


=== STEP-BY-STEP PORTFOLIO MONITORING ===
Step 1: Setting up agents and tools...
Step 2: Running data collection...

Testing tools directly:

Getting data for AAPL...
Price data: {'symbol': 'AAPL', 'current_price': np.float64(213.75999450683594), 'previous_close': 214.15, 'volume': 45773373, 'market_cap': 3192676417536, 'day_change': -0.3900054931640682, 'day_change_percent': -0.18211790481628212}
Position value: {'symbol': 'AAPL', 'market_value': 21375.999450683594, 'cost_basis': 15000.0, 'gain_loss': 6375.999450683594, 'gain_loss_percent': 42.506663004557296, 'current_price': 213.75999450683594, 'quantity': 100.0, 'avg_cost': 150.0}

Getting data for GOOGL...
Price data: {'symbol': 'GOOGL', 'current_price': np.float64(192.1699981689453), 'previous_close': 190.23, 'volume': 74506564, 'market_cap': 2338997141504, 'day_change': 1.9399981689453227, 'day_change_percent': 1.0198171523657273}
Position value: {'symbol': 'GOOGL', 'market_value': 9608.499908447266, 'cost_basis': 125000.0, 'gai

## Orchestrator example

In [26]:
async def run_orchestrator_example():
    """Demonstrate the full orchestrator workflow execution"""
    
    # Sample portfolio for testing
    test_portfolio = [
        {"symbol": "AAPL", "quantity": 100, "avg_cost": 150.0},
        {"symbol": "MSFT", "quantity": 75, "avg_cost": 300.0},
        {"symbol": "GOOGL", "quantity": 50, "avg_cost": 2500.0},
        {"symbol": "TSLA", "quantity": 25, "avg_cost": 200.0}
    ]
    
    print("=" * 60)
    print("🔥 PORTFOLIO MONITORING ORCHESTRATOR DEMO")
    print("=" * 60)
    
    # Initialize the fixed portfolio monitor
    print("\n📋 Step 1: Initializing Portfolio Monitor...")
    rate_limiter = InMemoryRateLimiter(requests_per_second=2, check_every_n_seconds=0.2, max_bucket_size=5)
    llm = ChatOpenAI(model="gpt-4o-mini", rate_limiter=rate_limiter, temperature=0)
    
    monitor = SimplePortfolioMonitor(llm)
    print("✅ Monitor initialized with LangGraph workflow")
    
    # Display portfolio before monitoring
    print("\n📊 Step 2: Initial Portfolio Holdings:")
    total_cost_basis = 0
    for holding in test_portfolio:
        cost_basis = holding["quantity"] * holding["avg_cost"]
        total_cost_basis += cost_basis
        print(f"   • {holding['symbol']}: {holding['quantity']} shares @ ${holding['avg_cost']:.2f} (Cost: ${cost_basis:,.2f})")
    print(f"   💰 Total Cost Basis: ${total_cost_basis:,.2f}")
    
    # Run the orchestrator workflow
    print("\n🤖 Step 3: Running Orchestrator Workflow...")
    print("-" * 40)
    
    try:
        result = await monitor.monitor_portfolio(test_portfolio)
        
        print("-" * 40)
        print("✅ Workflow execution completed!")
        
        # Display workflow results
        print("\n📈 Step 4: Workflow Results Summary:")
        print(f"   🕐 Timestamp: {result['timestamp']}")
        print(f"   💬 Total Messages: {len(result['messages'])}")
        print(f"   🔧 Results Keys: {list(result['results'].keys())}")
        
        # Show the final agent response
        if result['messages']:
            final_response = result['messages'][-1]
            print(f"\n🎯 Step 5: Agent's Final Response:")
            print("-" * 40)
            print(final_response.content)
            print("-" * 40)
        
        return result
        
    except Exception as e:
        print(f"❌ Error during workflow execution: {str(e)}")
        print("💡 Make sure you have set your OPENAI_API_KEY environment variable")
        return None

# Run the async function
print("🏃‍♂️ Starting orchestrator execution...")
result = await run_orchestrator_example()


🏃‍♂️ Starting orchestrator execution...
🔥 PORTFOLIO MONITORING ORCHESTRATOR DEMO

📋 Step 1: Initializing Portfolio Monitor...
✅ Monitor initialized with LangGraph workflow

📊 Step 2: Initial Portfolio Holdings:
   • AAPL: 100 shares @ $150.00 (Cost: $15,000.00)
   • MSFT: 75 shares @ $300.00 (Cost: $22,500.00)
   • GOOGL: 50 shares @ $2500.00 (Cost: $125,000.00)
   • TSLA: 25 shares @ $200.00 (Cost: $5,000.00)
   💰 Total Cost Basis: $167,500.00

🤖 Step 3: Running Orchestrator Workflow...
----------------------------------------
🚀 Starting Portfolio Monitoring Workflow...
🤖 Data Agent: Starting data collection for 4 holdings...
✅ Data Agent: Collection complete!
🎯 Workflow completed!
----------------------------------------
✅ Workflow execution completed!

📈 Step 4: Workflow Results Summary:
   🕐 Timestamp: 2025-07-24T22:32:19.609996
   💬 Total Messages: 12
   🔧 Results Keys: ['data_collection']

🎯 Step 5: Agent's Final Response:
----------------------------------------
Here is the summ

In [19]:
def analyze_orchestrator_results(workflow_result):
    """Analyze and display key insights from the orchestrator execution"""
    
    if workflow_result is None:
        print("❌ No results to analyze - workflow execution failed")
        return
    
    print("\n" + "=" * 60)
    print("📊 DETAILED RESULTS ANALYSIS")
    print("=" * 60)
    
    # 1. Workflow Metadata
    print(f"\n🔍 Workflow Metadata:")
    print(f"   • Execution Time: {workflow_result['timestamp']}")
    print(f"   • Holdings Processed: {len(workflow_result['holdings'])}")
    print(f"   • Workflow Steps: {list(workflow_result['results'].keys())}")
    
    # 2. Message Analysis
    print(f"\n💬 Agent Communication:")
    for i, message in enumerate(workflow_result['messages']):
        message_type = message.__class__.__name__
        content_preview = message.content[:100] + "..." if len(message.content) > 100 else message.content
        print(f"   {i+1}. {message_type}: {content_preview}")
    
    # 3. Data Collection Results
    if 'data_collection' in workflow_result['results']:
        data_result = workflow_result['results']['data_collection']
        print(f"\n🤖 Data Collection Agent Performance:")
        print(f"   • Messages Generated: {len(data_result.get('messages', []))}")
        
        # Try to extract any tool usage information
        for msg in data_result.get('messages', []):
            if hasattr(msg, 'tool_calls') and msg.tool_calls:
                print(f"   • Tools Used: {[call['name'] for call in msg.tool_calls]}")
                break
    
    # 4. Current Portfolio State
    print(f"\n📈 Current Portfolio Holdings:")
    for holding in workflow_result['holdings']:
        print(f"   • {holding['symbol']}: {holding['quantity']} shares @ ${holding['avg_cost']:.2f}")
    
    # 5. State Structure Analysis
    print(f"\n🏗️  State Structure:")
    for key, value in workflow_result.items():
        if key != 'messages':  # Skip messages as we analyzed them above
            value_type = type(value).__name__
            if isinstance(value, (list, dict)):
                size = len(value)
                print(f"   • {key}: {value_type} (size: {size})")
            else:
                print(f"   • {key}: {value_type}")
    
    print(f"\n✅ Analysis Complete!")
    
    return workflow_result

# Analyze the results if we have them
if 'result' in locals() and result is not None:
    analyzed_results = analyze_orchestrator_results(result)
else:
    print("🔄 Run the orchestrator example first to see analysis results")



📊 DETAILED RESULTS ANALYSIS

🔍 Workflow Metadata:
   • Execution Time: 2025-07-24T22:30:18.964210
   • Holdings Processed: 4
   • Workflow Steps: ['data_collection']

💬 Agent Communication:
   1. HumanMessage: 
        Please collect current market data for the following stock holdings:
        [{'symbol': 'A...
   2. AIMessage: 
   3. ToolMessage: {"symbol": "AAPL", "current_price": 213.75999450683594, "previous_close": 214.15, "volume": 45773373...
   4. ToolMessage: {"symbol": "MSFT", "current_price": 510.8800048828125, "previous_close": 505.87, "volume": 16041656,...
   5. ToolMessage: {"symbol": "GOOGL", "current_price": 192.1699981689453, "previous_close": 190.23, "volume": 74506564...
   6. ToolMessage: {"symbol": "TSLA", "current_price": 305.29998779296875, "previous_close": 332.56, "volume": 15435005...
   7. AIMessage: 
   8. ToolMessage: {"symbol": "AAPL", "market_value": 21375.999450683594, "cost_basis": 15000.0, "gain_loss": 6375.9994...
   9. ToolMessage: {"symbol": "MSF