# üìä Agora Stock Research Demo with Alpha Vantage

This notebook demonstrates:
- ‚úÖ **Agora's `@agora_node` decorator** for simple workflow creation
- ‚úÖ **Built-in telemetry** with Traceloop/OpenTelemetry
- ‚úÖ **Alpha Vantage API** for real-time stock data
- ‚úÖ **Gradio interface** for interactive stock research
- ‚úÖ **Efficient async processing** with TracedAsyncFlow

## üéØ Workflow:
1. User enters stock symbol
2. Fetch real-time quote from Alpha Vantage
3. Fetch company overview & fundamentals
4. Fetch historical price data
5. AI analysis with OpenAI GPT
6. Display results with telemetry insights

## üì¶ Step 1: Install Dependencies

In [None]:
# Install Agora from GitHub
!pip install -q git+https://github.com/JerzyKultura/Agora.git

# Install required packages
!pip install -q openai gradio requests pandas matplotlib
!pip install -q traceloop-sdk opentelemetry-api opentelemetry-sdk

print("‚úÖ All dependencies installed!")

## üîë Step 2: Configure API Keys

In [None]:
import os
from google.colab import userdata

# Option 1: Use Colab secrets (recommended)
# Go to the key icon on the left sidebar and add:
# - OPENAI_API_KEY
# - ALPHA_VANTAGE_KEY

try:
    os.environ["OPENAI_API_KEY"] = userdata.get('OPENAI_API_KEY')
    os.environ["ALPHA_VANTAGE_KEY"] = userdata.get('ALPHA_VANTAGE_KEY')
    print("‚úÖ API keys loaded from Colab secrets")
except:
    # Option 2: Manual entry (less secure)
    print("‚ö†Ô∏è  Colab secrets not found. Please enter manually:")
    os.environ["OPENAI_API_KEY"] = input("Enter OpenAI API key: ").strip()
    os.environ["ALPHA_VANTAGE_KEY"] = input("Enter Alpha Vantage API key: ").strip()
    print("‚úÖ API keys configured")

## üöÄ Step 3: Initialize Agora with Telemetry

In [None]:
import asyncio
import requests
import json
from datetime import datetime
from openai import AsyncOpenAI

# Import Agora components
from agora.agora_tracer import init_traceloop, agora_node, TracedAsyncFlow

# Initialize telemetry
init_traceloop(
    app_name="stock_research_demo",
    export_to_console=True,  # Show telemetry in console
    disable_content_logging=True  # Keep API calls private
)

# Initialize OpenAI client
openai_client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])

print("‚úÖ Agora telemetry initialized!")
print("üìä All node executions will be traced automatically")

## üîß Step 4: Alpha Vantage API Helper Functions

In [None]:
ALPHA_VANTAGE_BASE = "https://www.alphavantage.co/query"
API_KEY = os.environ["ALPHA_VANTAGE_KEY"]

def fetch_alpha_vantage(function, symbol=None, **kwargs):
    """
    Efficient Alpha Vantage API caller with error handling.
    """
    params = {
        "function": function,
        "apikey": API_KEY,
        **kwargs
    }
    
    if symbol:
        params["symbol"] = symbol
    
    try:
        response = requests.get(ALPHA_VANTAGE_BASE, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        # Check for API errors
        if "Error Message" in data:
            return {"error": data["Error Message"]}
        if "Note" in data:  # Rate limit
            return {"error": "API rate limit reached. Please wait a minute."}
        
        return data
    except Exception as e:
        return {"error": str(e)}

print("‚úÖ Alpha Vantage helper functions ready")

## üéØ Step 5: Define Stock Research Nodes with @agora_node Decorator

Each node is a simple async function wrapped with the `@agora_node` decorator.
Telemetry is automatic!

In [None]:
# ============================================================================
# NODE 1: Validate and prepare stock symbol
# ============================================================================

@agora_node(name="ValidateSymbol")
async def validate_symbol(shared):
    """
    Validate the stock symbol from user input.
    """
    symbol = shared.get("symbol", "").strip().upper()
    
    if not symbol:
        shared["error"] = "Please enter a stock symbol"
        return "error"
    
    if len(symbol) > 10 or not symbol.isalpha():
        shared["error"] = f"Invalid symbol: {symbol}"
        return "error"
    
    shared["symbol"] = symbol
    shared["timestamp"] = datetime.now().isoformat()
    return "fetch_quote"


# ============================================================================
# NODE 2: Fetch real-time quote
# ============================================================================

@agora_node(name="FetchQuote", max_retries=2, wait=1)
async def fetch_quote(shared):
    """
    Fetch real-time stock quote from Alpha Vantage.
    """
    symbol = shared["symbol"]
    
    # Use asyncio.to_thread for sync API call
    data = await asyncio.to_thread(
        fetch_alpha_vantage, 
        "GLOBAL_QUOTE", 
        symbol=symbol
    )
    
    if "error" in data:
        shared["error"] = data["error"]
        return "error"
    
    quote = data.get("Global Quote", {})
    if not quote:
        shared["error"] = f"No quote data found for {symbol}"
        return "error"
    
    shared["quote"] = {
        "price": quote.get("05. price", "N/A"),
        "change": quote.get("09. change", "N/A"),
        "change_percent": quote.get("10. change percent", "N/A"),
        "volume": quote.get("06. volume", "N/A"),
        "high": quote.get("03. high", "N/A"),
        "low": quote.get("04. low", "N/A")
    }
    
    return "fetch_overview"


# ============================================================================
# NODE 3: Fetch company overview
# ============================================================================

@agora_node(name="FetchOverview", max_retries=2, wait=1)
async def fetch_overview(shared):
    """
    Fetch company fundamentals and overview.
    """
    symbol = shared["symbol"]
    
    data = await asyncio.to_thread(
        fetch_alpha_vantage,
        "OVERVIEW",
        symbol=symbol
    )
    
    if "error" in data:
        # Non-critical error - continue without overview
        shared["overview"] = {"error": data["error"]}
    else:
        shared["overview"] = {
            "name": data.get("Name", "N/A"),
            "sector": data.get("Sector", "N/A"),
            "industry": data.get("Industry", "N/A"),
            "market_cap": data.get("MarketCapitalization", "N/A"),
            "pe_ratio": data.get("PERatio", "N/A"),
            "dividend_yield": data.get("DividendYield", "N/A"),
            "52week_high": data.get("52WeekHigh", "N/A"),
            "52week_low": data.get("52WeekLow", "N/A"),
            "description": data.get("Description", "N/A")[:500]  # Truncate
        }
    
    return "fetch_historical"


# ============================================================================
# NODE 4: Fetch historical data
# ============================================================================

@agora_node(name="FetchHistorical", max_retries=2, wait=1)
async def fetch_historical(shared):
    """
    Fetch recent daily price history.
    """
    symbol = shared["symbol"]
    
    data = await asyncio.to_thread(
        fetch_alpha_vantage,
        "TIME_SERIES_DAILY",
        symbol=symbol,
        outputsize="compact"  # Last 100 days
    )
    
    if "error" in data:
        shared["historical"] = {"error": data["error"]}
    else:
        time_series = data.get("Time Series (Daily)", {})
        # Get last 10 days
        recent = dict(list(time_series.items())[:10])
        shared["historical"] = recent
    
    return "analyze"


# ============================================================================
# NODE 5: AI Analysis with OpenAI
# ============================================================================

@agora_node(name="AnalyzeStock", max_retries=2, wait=1)
async def analyze_stock(shared):
    """
    Generate AI-powered stock analysis using OpenAI.
    """
    symbol = shared["symbol"]
    quote = shared.get("quote", {})
    overview = shared.get("overview", {})
    
    # Build analysis prompt
    prompt = f"""Analyze the following stock data for {symbol}:

**Current Quote:**
- Price: ${quote.get('price', 'N/A')}
- Change: {quote.get('change', 'N/A')} ({quote.get('change_percent', 'N/A')})
- Volume: {quote.get('volume', 'N/A')}
- Day Range: ${quote.get('low', 'N/A')} - ${quote.get('high', 'N/A')}

**Company Overview:**
- Name: {overview.get('name', 'N/A')}
- Sector: {overview.get('sector', 'N/A')}
- Industry: {overview.get('industry', 'N/A')}
- Market Cap: {overview.get('market_cap', 'N/A')}
- P/E Ratio: {overview.get('pe_ratio', 'N/A')}
- Dividend Yield: {overview.get('dividend_yield', 'N/A')}
- 52-Week Range: ${overview.get('52week_low', 'N/A')} - ${overview.get('52week_high', 'N/A')}

Provide a brief analysis (3-4 paragraphs) covering:
1. Current price action and momentum
2. Valuation metrics (P/E, market cap)
3. Key strengths and potential risks
4. Overall outlook (neutral, avoid specific buy/sell recommendations)

Keep it concise and professional."""
    
    try:
        response = await openai_client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=600
        )
        
        analysis = response.choices[0].message.content
        shared["analysis"] = analysis
        shared["success"] = True
        
    except Exception as e:
        shared["analysis"] = f"Analysis failed: {str(e)}"
        shared["success"] = True  # Still show results
    
    return "complete"


# ============================================================================
# ERROR NODE: Handle errors gracefully
# ============================================================================

@agora_node(name="HandleError")
async def handle_error(shared):
    """
    Handle any errors that occurred during the workflow.
    """
    error = shared.get("error", "Unknown error")
    shared["result"] = f"‚ùå Error: {error}"
    return "complete"


print("‚úÖ All stock research nodes defined with @agora_node decorator")
print("üìä Telemetry will automatically track all node executions")

## üîÑ Step 6: Build the Workflow with TracedAsyncFlow

In [None]:
def build_stock_research_flow():
    """
    Build the complete stock research workflow.
    """
    # Create flow with telemetry
    flow = TracedAsyncFlow("StockResearch")
    
    # Set starting node
    flow.start(validate_symbol)
    
    # Build the flow graph
    validate_symbol - "fetch_quote" >> fetch_quote
    validate_symbol - "error" >> handle_error
    
    fetch_quote - "fetch_overview" >> fetch_overview
    fetch_quote - "error" >> handle_error
    
    fetch_overview - "fetch_historical" >> fetch_historical
    
    fetch_historical - "analyze" >> analyze_stock
    
    # All paths lead to completion
    # (flow ends when no edges are defined for the returned action)
    
    return flow

print("‚úÖ Stock research workflow defined")
print("\nüìä Workflow Structure:")
print("  1. ValidateSymbol ‚Üí checks input")
print("  2. FetchQuote ‚Üí real-time price")
print("  3. FetchOverview ‚Üí company fundamentals")
print("  4. FetchHistorical ‚Üí price history")
print("  5. AnalyzeStock ‚Üí AI analysis")
print("  (All steps are traced automatically!)")

## üñ•Ô∏è Step 7: Create Gradio Interface

In [None]:
import gradio as gr
import pandas as pd

async def research_stock(symbol):
    """
    Main function called by Gradio.
    """
    # Create shared state
    shared = {"symbol": symbol}
    
    # Build and run flow
    flow = build_stock_research_flow()
    
    try:
        await flow.run_async(shared)
    except Exception as e:
        return f"‚ùå Workflow error: {str(e)}", None, None
    
    # Check for errors
    if "error" in shared and not shared.get("success"):
        return f"‚ùå {shared['error']}", None, None
    
    # Format results
    quote = shared.get("quote", {})
    overview = shared.get("overview", {})
    analysis = shared.get("analysis", "No analysis available")
    
    # Build quote summary
    quote_text = f"""## üìà {symbol} - Live Quote

**Price:** ${quote.get('price', 'N/A')}  
**Change:** {quote.get('change', 'N/A')} ({quote.get('change_percent', 'N/A')})  
**Volume:** {quote.get('volume', 'N/A')}  
**Day Range:** ${quote.get('low', 'N/A')} - ${quote.get('high', 'N/A')}
"""
    
    # Build company overview
    overview_text = f"""## üè¢ Company Overview

**Name:** {overview.get('name', 'N/A')}  
**Sector:** {overview.get('sector', 'N/A')}  
**Industry:** {overview.get('industry', 'N/A')}  
**Market Cap:** ${overview.get('market_cap', 'N/A')}  
**P/E Ratio:** {overview.get('pe_ratio', 'N/A')}  
**Dividend Yield:** {overview.get('dividend_yield', 'N/A')}  
**52-Week Range:** ${overview.get('52week_low', 'N/A')} - ${overview.get('52week_high', 'N/A')}

---

{overview.get('description', '')[:300]}...
"""
    
    # Build AI analysis
    analysis_text = f"""## ü§ñ AI Analysis

{analysis}

---

*Analysis generated by GPT-4o-mini using Alpha Vantage data*
"""
    
    return quote_text, overview_text, analysis_text


# Create Gradio interface
def create_interface():
    with gr.Blocks(theme=gr.themes.Soft(), title="Stock Research with Agora") as demo:
        gr.Markdown("""
        # üìä Stock Research Demo
        ### Powered by Agora + Alpha Vantage + OpenAI
        
        Enter a stock symbol (e.g., AAPL, MSFT, TSLA) to get:
        - Real-time quote and price data
        - Company fundamentals and overview
        - AI-powered analysis
        
        *All operations are traced with OpenTelemetry!*
        """)
        
        with gr.Row():
            symbol_input = gr.Textbox(
                label="Stock Symbol",
                placeholder="Enter symbol (e.g., AAPL)",
                value="AAPL"
            )
            submit_btn = gr.Button("üîç Research Stock", variant="primary")
        
        with gr.Row():
            with gr.Column():
                quote_output = gr.Markdown(label="Quote")
            with gr.Column():
                overview_output = gr.Markdown(label="Overview")
        
        with gr.Row():
            analysis_output = gr.Markdown(label="Analysis")
        
        gr.Markdown("""
        ---
        **Note:** This demo uses Alpha Vantage's free tier (5 API calls/minute, 100/day).  
        Check the console for telemetry traces!
        """)
        
        # Wire up the button
        submit_btn.click(
            fn=research_stock,
            inputs=[symbol_input],
            outputs=[quote_output, overview_output, analysis_output]
        )
    
    return demo

print("‚úÖ Gradio interface ready")

## üöÄ Step 8: Launch the Application!

In [None]:
# Create and launch the interface
demo = create_interface()

# Launch with public sharing (optional)
demo.launch(
    share=True,  # Set to False for local-only
    debug=True   # Show detailed logs
)

print("\n" + "="*60)
print("üéâ Stock Research App is running!")
print("üìä Check the console for telemetry traces")
print("="*60)

## üìä Step 9: View Telemetry Insights

After running a stock research query, check the console output above to see:

- ‚è±Ô∏è **Execution times** for each node
- üîÑ **Retry attempts** and failures
- üìù **Span traces** with OpenTelemetry
- üîç **Node transitions** and routing

This is all automatic thanks to the `@agora_node` decorator and `TracedAsyncFlow`!

## üß™ Step 10: Test Individual Nodes (Optional)

You can test nodes independently to understand the flow:

In [None]:
# Test the workflow with a specific symbol
async def test_workflow(symbol="AAPL"):
    shared = {"symbol": symbol}
    flow = build_stock_research_flow()
    
    print(f"\nüîç Testing workflow for {symbol}...\n")
    
    await flow.run_async(shared)
    
    print("\n" + "="*60)
    print("‚úÖ WORKFLOW COMPLETE")
    print("="*60)
    
    if "error" in shared and not shared.get("success"):
        print(f"‚ùå Error: {shared['error']}")
    else:
        print(f"\nüìä Quote: ${shared['quote'].get('price', 'N/A')}")
        print(f"üè¢ Company: {shared['overview'].get('name', 'N/A')}")
        print(f"\nü§ñ Analysis:\n{shared.get('analysis', 'N/A')[:200]}...")
    
    return shared

# Run test
# result = await test_workflow("TSLA")

print("\nüí° Uncomment the line above to test the workflow")

## üìö Key Features Demonstrated

### 1. **@agora_node Decorator**
```python
@agora_node(name="FetchQuote", max_retries=2, wait=1)
async def fetch_quote(shared):
    # Your code here
    return "next_action"
```
- Wraps any function into a TracedAsyncNode
- Automatic retry logic with wait times
- Automatic telemetry tracking

### 2. **Built-in Telemetry**
- Powered by Traceloop + OpenTelemetry
- Automatic span tracking for all nodes
- Console and file export options
- Zero instrumentation code required

### 3. **Efficient Async Processing**
- All nodes run asynchronously
- Non-blocking API calls
- Graceful error handling
- Automatic retry mechanisms

### 4. **Alpha Vantage Integration**
- Real-time quotes (GLOBAL_QUOTE)
- Company fundamentals (OVERVIEW)
- Historical data (TIME_SERIES_DAILY)
- Error handling for rate limits

### 5. **AI-Powered Analysis**
- OpenAI GPT-4o-mini integration
- Contextual stock analysis
- Professional financial insights

---

## üéØ Next Steps

Extend this demo:
- Add technical indicators (SMA, RSI, MACD)
- Create price charts with matplotlib
- Add news sentiment analysis
- Compare multiple stocks
- Export analysis reports

## üìñ Resources

- [Agora Documentation](https://github.com/JerzyKultura/Agora)
- [Alpha Vantage API Docs](https://www.alphavantage.co/documentation/)
- [OpenTelemetry Python](https://opentelemetry.io/docs/languages/python/)
- [Gradio Documentation](https://www.gradio.app/docs/)

---

*Built with ‚ù§Ô∏è using Agora, Alpha Vantage, OpenAI, and Gradio*