# AI Stock Agent - Exercise 1

This notebook demonstrates an AI-powered stock analysis agent that can:
- Fetch real-time stock prices from Alpha Vantage API
- Analyze stock history and trends
- Use OpenAI's GPT models for intelligent stock recommendations
- Provide an interactive command-line interface

## Features
- Real-time stock price fetching
- Historical data analysis
- AI-powered buy/sell recommendations
- Interactive command interface
- Support for multiple stock symbols


## Setup and Imports

First, let's import all the necessary libraries and set up environment variables.


In [18]:
import os
import json
import requests
import argparse
import shlex
from typing import Optional, List, Dict, Tuple
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

print("✅ All imports successful!")
print("📋 Required environment variables:")
print("   - ALPHA_VANTAGE_KEY: For stock data API")
print("   - OPENAI_API_KEY: For AI analysis")
print("   - STOCK_SYMBOLS: Comma-separated stock symbols (optional)")
print("   - INTERACTIVE: Set to '1' for interactive mode (optional)")


✅ All imports successful!
📋 Required environment variables:
   - ALPHA_VANTAGE_KEY: For stock data API
   - OPENAI_API_KEY: For AI analysis
   - STOCK_SYMBOLS: Comma-separated stock symbols (optional)
   - INTERACTIVE: Set to '1' for interactive mode (optional)


## Utility Functions

Let's define some helper functions for environment variable management and API interactions.
    

In [20]:
def require_env(var_name: str) -> str:
    """Require an environment variable to be set, raise error if missing."""
    value = os.getenv(var_name)
    if not value:
        raise RuntimeError(f"Missing required environment variable: {var_name}")
    return value

# Test environment variables
try:
    alpha_key = require_env("ALPHA_VANTAGE_KEY")
    openai_key = require_env("OPENAI_API_KEY")
    print("✅ All required environment variables are set!")
    print(f"🔑 Alpha Vantage API Key: ...{alpha_key[-5:] if alpha_key else 'None'}")
    print(f"🤖 OpenAI API Key: ...{openai_key[-5:] if openai_key else 'None'}")
except RuntimeError as e:
    print(f"❌ {e}")
    print("Please set the required environment variables in your .env file")


✅ All required environment variables are set!
🔑 Alpha Vantage API Key: ...06QQT
🤖 OpenAI API Key: ...tolMA


## Stock Data Fetching Functions

These functions handle fetching real-time stock prices and historical data from the Alpha Vantage API.


In [29]:
def fetch_stock_price(symbol: str) -> Optional[Dict]:
    """Fetch latest stock price and change percent from Alpha Vantage."""
    api_key = require_env("ALPHA_VANTAGE_KEY")
    url = "https://www.alphavantage.co/query"
    params = {
        "function": "GLOBAL_QUOTE",
        "symbol": symbol,
        "apikey": api_key,
    }
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    data = response.json()
    quote = data.get("Global Quote") or data.get("GlobalQuote")
    if not quote:
        return None
    try:
        price = float(quote["05. price"]) if "05. price" in quote else float(quote["price"])  # type: ignore[index]
        change_percent_str = quote.get("10. change percent") or quote.get("change_percent") or "0%"
        change_percent = float(str(change_percent_str).replace("%", ""))
        return {"symbol": symbol, "price": price, "change_percent": change_percent}
    except Exception:
        return None

# Test the function with a sample stock
print("🧪 Testing stock price fetch...")
test_symbol = "AAPL"
price_data = fetch_stock_price(test_symbol)
if price_data:
    print(f"✅ Successfully fetched {test_symbol}: ${price_data['price']:.2f} ({price_data['change_percent']:+.2f}%)")
else:
    print(f"❌ Failed to fetch data for {test_symbol}")


🧪 Testing stock price fetch...
✅ Successfully fetched AAPL: $236.70 (+1.12%)


In [35]:
def fetch_stock_history(symbol: str, days: int = 7) -> Optional[List[Dict]]:
    """Fetch recent daily closes for a symbol (last N trading days).

    Returns a list of dicts sorted descending by date (most recent first):
    [{"date": "YYYY-MM-DD", "close": 123.45, "change_percent": -0.4}, ...]
    """
    api_key = require_env("ALPHA_VANTAGE_KEY")
    url = "https://www.alphavantage.co/query"
    params = {
        "function": "TIME_SERIES_DAILY",
        "symbol": symbol,
        "apikey": api_key,
        "outputsize": "compact",
    }
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    data = response.json()
    series = data.get("Time Series (Daily)")
    if not series or not isinstance(series, dict):
        return None

    # Sort dates descending (most recent first)
    dates = sorted(series.keys(), reverse=True)
    records: List[Dict] = []
    prev_close: Optional[float] = None
    for date in dates[: max(days + 1, 2)]:  # fetch a bit extra to compute change
        entry = series.get(date, {})
        if not entry:
            continue
        close_str = entry.get("4. close") or entry.get("5. adjusted close")
        if not close_str:
            continue
        try:
            close_val = float(close_str)
        except Exception:
            continue
        change_pct = 0.0
        if prev_close is not None and prev_close != 0:
            change_pct = ((close_val - prev_close) / prev_close) * 100.0
        records.append({"date": date, "close": close_val, "change_percent": round(change_pct, 4)})
        prev_close = close_val

    return records[:days]

# Test the function with a sample stock
print("🧪 Testing stock history fetch...")
test_symbol = "AAPL"
history_data = fetch_stock_history(test_symbol, days=5)
if history_data:
    print(f"✅ Successfully fetched {len(history_data)} days of history for {test_symbol}:")
    for record in history_data:
        print(f"   {record['date']}: ${record['close']:.2f} ({record['change_percent']:+.2f}%)")
else:
    print(f"❌ Failed to fetch history for {test_symbol}")


🧪 Testing stock history fetch...
✅ Successfully fetched 5 days of history for AAPL:
   2025-09-15: $236.70 (+0.00%)
   2025-09-12: $234.07 (-1.11%)
   2025-09-11: $230.03 (-1.73%)
   2025-09-10: $226.79 (-1.41%)
   2025-09-09: $234.35 (+3.33%)


## Data Analysis Functions

These functions compute features from historical stock data to help with analysis.


In [40]:
def compute_history_features(history: List[Dict]) -> Dict:
    """Compute simple features from history: n-day return, avg change, last close."""
    if not history:
        return {"n_day_return_pct": 0.0, "avg_daily_change_pct": 0.0, "last_close": None}
    closes = [h["close"] for h in history]
    last_close = closes[0]
    end_close = closes[-1]
    n_day_return = 0.0
    if end_close:
        n_day_return = ((last_close - end_close) / end_close) * 100.0
    daily_changes = [h.get("change_percent", 0.0) for h in history]
    avg_daily_change = sum(daily_changes) / len(daily_changes) if daily_changes else 0.0
    return {
        "n_day_return_pct": round(n_day_return, 4),
        "avg_daily_change_pct": round(avg_daily_change, 4),
        "last_close": last_close,
    }

# Test the function with sample history data
if history_data:
    features = compute_history_features(history_data)
    print("🧪 Testing feature computation...")
    print(f"✅ Computed features: {features}")
else:
    print("⚠️ No history data available for feature testing")


🧪 Testing feature computation...
✅ Computed features: {'n_day_return_pct': 1.0028, 'avg_daily_change_pct': -0.1824, 'last_close': 236.7}


## AI Analysis Functions

These functions use OpenAI's GPT models to analyze stock data and provide intelligent recommendations.


In [44]:
def get_openai_client():
    """Create and return an OpenAI client instance."""
    from openai import OpenAI
    api_key = require_env("OPENAI_API_KEY")
    return OpenAI(api_key=api_key)

def analyze_with_llm(stocks: List[Dict]) -> str:
    """Call OpenAI Chat Completions to classify stocks into buy/sell, returning JSON string."""
    client = get_openai_client()

    stock_lines = [
        f"{s['symbol']}: Price={s['price']}, Change%={s['change_percent']}" for s in stocks
    ]
    stock_summary = "\n".join(stock_lines)

    prompt = (
        "You are a stock screening assistant.\n"
        "Here is the stock data:\n\n"
        f"{stock_summary}\n\n"
        "Task:\n"
        "- Decide if each stock goes into BUY or SELL list.\n"
        "- SELL list should be ordered by priority (biggest negative change first).\n"
        "- Give short reasons for each.\n"
        "Return result in JSON with keys: buy_list, sell_list.\n"
        "Ensure JSON is valid and parsable."
    )

    completion = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )
    content = completion.choices[0].message.content or "{}"
    print(content)
    return content

# Test the function with sample data
print("🧪 Testing AI analysis...")
if price_data:
    test_stocks = [price_data]
    try:
        result = analyze_with_llm(test_stocks)
        print("✅ AI analysis completed successfully!")
        print("📊 Analysis result:")
        print(result)
    except Exception as e:
        print(f"❌ AI analysis failed: {e}")
else:
    print("⚠️ No price data available for AI analysis testing")


🧪 Testing AI analysis...
```json
{
  "buy_list": [
    {
      "ticker": "AAPL",
      "price": 236.7,
      "change_percentage": 1.1236,
      "reason": "Positive change indicates potential for growth."
    }
  ],
  "sell_list": []
}
```
✅ AI analysis completed successfully!
📊 Analysis result:
```json
{
  "buy_list": [
    {
      "ticker": "AAPL",
      "price": 236.7,
      "change_percentage": 1.1236,
      "reason": "Positive change indicates potential for growth."
    }
  ],
  "sell_list": []
}
```


In [45]:
def analyze_with_llm_with_history(stocks: List[Dict], histories: Dict[str, List[Dict]], features: Dict[str, Dict], days: int) -> str:
    """LLM call that includes N-day history and simple features per symbol."""
    client = get_openai_client()

    lines: List[str] = []
    for s in stocks:
        sym = s["symbol"]
        feat = features.get(sym, {})
        hist = histories.get(sym, [])
        hist_str = "; ".join([f"{h['date']}: {h['close']} ({h['change_percent']}%)" for h in hist])
        lines.append(
            (
                f"{sym}: Price={s['price']}, 1d%={s['change_percent']}; "
                f"{days}d_return%={feat.get('n_day_return_pct')}, avg_daily%={feat.get('avg_daily_change_pct')}\n"
                f"  history: {hist_str}"
            )
        )
    summary = "\n".join(lines)

    prompt = (
        "You are a stock screening assistant.\n"
        f"Use each symbol's last {days} trading days to decide BUY or SELL.\n\n"
        f"Data:\n{summary}\n\n"
        "Task:\n"
        "- Create BUY and SELL lists with short reasons.\n"
        "- Consider both short-term momentum and recent trend.\n"
        "- SELL should be ordered by biggest downside risk first.\n"
        "Return valid JSON with keys: buy_list, sell_list."
    )

    completion = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )
    return completion.choices[0].message.content or "{}"

def answer_question_with_llm(question: str, stocks: List[Dict]) -> str:
    """Answer an arbitrary question with optional stock context."""
    client = get_openai_client()

    stock_lines = [
        f"{s['symbol']}: Price={s['price']}, Change%={s['change_percent']}" for s in stocks
    ]
    stock_summary = "\n".join(stock_lines) if stocks else "(no context)"

    prompt = (
        "You are a helpful stock assistant.\n"
        "If stock context is provided, use it. Otherwise, answer generally.\n\n"
        f"Context:\n{stock_summary}\n\n"
        f"Question: {question}\n"
    )

    completion = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )
    return completion.choices[0].message.content or ""

print("✅ Advanced AI analysis functions defined!")


✅ Advanced AI analysis functions defined!


## Interactive Mode Functions

The interactive loop provides a command-line interface for real-time stock analysis.


In [46]:
def interactive_loop(initial_symbols: List[str]) -> None:
    """Interactive command-line interface for stock analysis."""
    tracked: List[str] = list(dict.fromkeys([s.upper() for s in initial_symbols]))

    def print_help() -> None:
        print(
            "Commands:\n"
            "  help                 Show this help\n"
            "  list                 Show tracked symbols\n"
            "  add TICKER [..]      Add one or more tickers\n"
            "  remove TICKER [..]   Remove one or more tickers\n"
            "  price TICKER         Fetch latest price for one\n"
            "  screen [days]        Fetch all + LLM buy/sell with N-day history (default 7)\n"
            "  history TICKER [d]   Show last d days history (default 7)\n"
            "  ask QUESTION         Ask any question\n"
            "  exit                 Quit\n"
        )

    print("Interactive mode. Type 'help' for commands. Ctrl+C to exit.")
    while True:
        try:
            line = input("agent> ").strip()
        except (EOFError, KeyboardInterrupt):
            print()
            break
        if not line:
            continue
        try:
            parts = shlex.split(line)
        except ValueError:
            print("Could not parse input")
            continue
        cmd = parts[0].lower()
        args = parts[1:]

        if cmd in {"exit", "quit", "q"}:
            break
        if cmd == "help":
            print_help()
            continue
        if cmd == "list":
            print(", ".join(tracked) if tracked else "(none)")
            continue
        if cmd == "add" and args:
            for sym in args:
                symu = sym.upper()
                if symu not in tracked:
                    tracked.append(symu)
            print("Tracked:", ", ".join(tracked))
            continue
        if cmd in {"remove", "rm"} and args:
            remove_set = {a.upper() for a in args}
            tracked[:] = [s for s in tracked if s not in remove_set]
            print("Tracked:", ", ".join(tracked) if tracked else "(none)")
            continue
        if cmd == "price" and args:
            sym = args[0].upper()
            data = fetch_stock_price(sym)
            if data:
                print(data)
            else:
                print(f"No data for {sym}")
            continue
        if cmd == "screen":
            # optional arg: days
            days = 7
            if args:
                try:
                    days = max(2, int(args[0]))
                except Exception:
                    pass
            stock_data: List[Dict] = []
            for sym in tracked:
                data = fetch_stock_price(sym)
                if data:
                    stock_data.append(data)
            if not stock_data:
                print("No stock data; check API key or rate limits.")
                continue
            # histories + features
            histories: Dict[str, List[Dict]] = {}
            feats: Dict[str, Dict] = {}
            for s in stock_data:
                sym = s["symbol"]
                hist = fetch_stock_history(sym, days=days) or []
                histories[sym] = hist
                feats[sym] = compute_history_features(hist)
            print("Fetched Stock Data:")
            for s in stock_data:
                print(s)
            print("\nLLM Recommendation (with history):")
            print(analyze_with_llm_with_history(stock_data, histories, feats, days))
            continue
        if cmd == "history" and args:
            sym = args[0].upper()
            days = 7
            if len(args) > 1:
                try:
                    days = max(2, int(args[1]))
                except Exception:
                    pass
            hist = fetch_stock_history(sym, days=days)
            if not hist:
                print(f"No history for {sym}")
                continue
            for h in hist:
                print(h)
            continue
        if cmd == "ask":
            question = line.partition(" ")[2].strip()
            if not question:
                print("Usage: ask QUESTION")
                continue
            # Provide current context
            context_data: List[Dict] = []
            for sym in tracked[:5]:  # limit to reduce rate-limit impact
                data = fetch_stock_price(sym)
                if data:
                    context_data.append(data)
            print(answer_question_with_llm(question, context_data))
            continue

        print("Unknown command. Type 'help'.")

print("✅ Interactive mode functions defined!")
print("💡 To start interactive mode, run: interactive_loop(['AAPL', 'TSLA'])")


✅ Interactive mode functions defined!
💡 To start interactive mode, run: interactive_loop(['AAPL', 'TSLA'])
