### 0. Install and import packages

`pip install -U crewai crewai[tools] pandas numpy matplotlib python-dotenv requests --no-cache-dir`

In [1]:
# Install dependencies (run once per environment)
import crewai  # noqa: F401
import pandas  # noqa: F401
import numpy  # noqa: F401
import matplotlib  # noqa: F401
import requests  # noqa: F401

### 1. Basic setup and helpers

In [2]:
# Setup & environment
import os, json, math, warnings, time
from datetime import datetime, timedelta
from typing import Dict, Any
warnings.filterwarnings('ignore')

from dotenv import load_dotenv
load_dotenv()
OPENAI_MODEL_NAME = os.getenv('OPENAI_MODEL_NAME', 'gpt-4o-mini')
ALPHAVANTAGE_API_KEY = os.getenv('ALPHAVANTAGE_API_KEY')
if not ALPHAVANTAGE_API_KEY:
    print('⚠️  Set ALPHAVANTAGE_API_KEY in your environment for data fetching.')
print('Using model:', OPENAI_MODEL_NAME)

Using model: gpt-4o-mini


### 2. Alpha Vantage fetchers, pure-pandas indicators, risk, rule-based signal, plotting

In [3]:
# Tools
import pandas as pd
import numpy as np
import requests
import matplotlib.pyplot as plt
from crewai.tools import tool

# ---------- Helpers ----------

def _ensure_df(df: pd.DataFrame) -> pd.DataFrame:
    if not isinstance(df, pd.DataFrame) or df.empty:
        raise ValueError('Price DataFrame is empty. Check ticker/period/interval.')
    for needed in ['Open','High','Low','Close','Adj Close','Volume']:
        if needed not in df.columns:
            raise ValueError(f'Missing column: {needed}')
    return df

def _sanitize_ticker(ticker: str) -> str:
    # Consistent sanitization for filenames only
    return (str(ticker)
            .replace('/', '-').replace('\\', '-')
            .replace(' ', '').replace(':', '-').replace('.', '-'))

def _normalize_interval(interval: str) -> str:
    il = (interval or "").strip().lower()
    if il in {"1d", "d"}:
        return "1d"
    elif il in {"1wk", "wk", "w"}:
        return "1wk"
    elif il in {"1mo", "mo", "m"}:
        return "1mo"
    elif il.endswith("min"):
        x = il.replace("min", "")
        return f"{x if x in {'1','5','15','30','60'} else '60'}min"
    return "1d"

def _parse_period_to_rows(period: str, interval: str) -> int:
    # Rough mapping to slice recent rows if API returns larger history
    approx_days = {
        '1mo': 22, '3mo': 66, '6mo': 126, '1y': 252, '2y': 504, '5y': 1260
    }.get(period, 126)
    if interval.endswith('wk'):
        return max(approx_days // 5, 50)
    if interval.endswith('mo'):
        return max(approx_days // 21, 24)
    if interval.endswith('min'):
        # cap to avoid huge payloads for intraday
        return min(2000, approx_days * (390 // max(1, int(interval.replace('min','')))))
    return approx_days  # daily

def _av_get(url: str, params: dict) -> dict:
    p = dict(params)
    p['apikey'] = ALPHAVANTAGE_API_KEY
    r = requests.get(url, params=p, timeout=30)
    r.raise_for_status()
    data = r.json()
    # Handle known non-2xx responses in JSON body
    if 'Error Message' in data or 'Note' in data or 'Information' in data:
        raise RuntimeError(f"Alpha Vantage error/rate limit/info: {data}")
    return data

def _alpha_to_df(data: dict, key: str, adjust_close_field: str | None = None) -> pd.DataFrame:
    ts = data.get(key, {})
    if not ts:
        raise ValueError(f"No time series data found under key: {key}")
    df = pd.DataFrame(ts).T
    df.index = pd.to_datetime(df.index)

    # Build a rename map depending on presence of adjusted field
    rename_map = {
        '1. open': 'Open',
        '2. high': 'High',
        '3. low':  'Low',
        '4. close':'Close',
        '6. volume':'Volume',  # in adjusted endpoints
        '5. volume':'Volume'   # in non-adjusted endpoints
    }
    df = df.rename(columns=rename_map)

    # Numerics
    for c in ['Open','High','Low','Close','Volume']:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors='coerce')

    # Adjusted close handling
    if adjust_close_field and adjust_close_field in df.columns:
        df['Adj Close'] = pd.to_numeric(df[adjust_close_field], errors='coerce')
    else:
        df['Adj Close'] = df.get('Close')

    df = df.sort_index()
    return _ensure_df(df[['Open','High','Low','Close','Adj Close','Volume']])

def _read_prices(csv_path: str) -> pd.DataFrame:
    if not os.path.isabs(csv_path):
        # For safety, convert relative -> absolute in current WD
        csv_path = os.path.abspath(csv_path)
    if not os.path.isfile(csv_path):
        raise FileNotFoundError(f"CSV not found: {csv_path}")
    df = pd.read_csv(csv_path, low_memory=False)
    date_col = 'Date' if 'Date' in df.columns else df.columns[0]
    df[date_col] = pd.to_datetime(df[date_col], errors='coerce', utc=False)
    df = df.dropna(subset=[date_col]).set_index(date_col).sort_index()
    return _ensure_df(df)

# --- Indicator helpers ---
def ema(series: pd.Series, span: int) -> pd.Series:
    return series.ewm(span=span, adjust=False).mean()

def rsi(series: pd.Series, period: int = 14) -> pd.Series:
    delta = series.diff()
    gain = delta.clip(lower=0)
    loss = -delta.clip(upper=0)
    avg_gain = gain.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/period, min_periods=period, adjust=False).mean()
    rs = avg_gain / (avg_loss + 1e-12)
    return 100 - (100 / (1 + rs))

def macd(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9):
    fast_ema = ema(series, fast)
    slow_ema = ema(series, slow)
    macd_line = fast_ema - slow_ema
    signal_line = ema(macd_line, signal)
    return macd_line, signal_line

def bollinger(series: pd.Series, period: int = 20, std_mult: float = 2.0):
    mid = series.rolling(window=period).mean()
    sd = series.rolling(window=period).std()
    upper = mid + std_mult * sd
    lower = mid - std_mult * sd
    return lower, mid, upper

def _compute_indicators_core_from_df(df: pd.DataFrame) -> dict:
    # Assumes df is validated and indexed by date
    df = df.copy()
    df['EMA20'] = ema(df['Close'], 20)
    df['EMA50'] = ema(df['Close'], 50)
    macd_line, sig_line = macd(df['Close'])
    df['MACD'] = macd_line
    df['MACD_SIG'] = sig_line
    df['BB_L'], df['BB_M'], df['BB_U'] = bollinger(df['Close'], 20, 2)
    df['RSI'] = rsi(df['Close'], 14)

    if len(df) < 2:
        raise ValueError("Not enough rows to compute crossovers (need ≥ 2 rows).")

    latest = df.iloc[-1]
    prev = df.iloc[-2]
    events = []
    if prev['EMA20'] < prev['EMA50'] and latest['EMA20'] > latest['EMA50']:
        events.append('Bullish EMA20/50 golden cross today')
    if prev['EMA20'] > prev['EMA50'] and latest['EMA20'] < latest['EMA50']:
        events.append('Bearish EMA20/50 death cross today')
    if prev['MACD'] < prev['MACD_SIG'] and latest['MACD'] > latest['MACD_SIG']:
        events.append('Bullish MACD cross above signal')
    if prev['MACD'] > prev['MACD_SIG'] and latest['MACD'] < prev['MACD_SIG']:
        events.append('Bearish MACD cross below signal')
    if latest['Close'] < latest['BB_L']:
        events.append('Price closed below lower Bollinger band (oversold)')
    if latest['Close'] > latest['BB_U']:
        events.append('Price closed above upper Bollinger band (overbought)')

    return {
        'close': float(latest['Close']),
        'ema20': float(latest['EMA20']),
        'ema50': float(latest['EMA50']),
        'macd': float(latest['MACD']),
        'macd_signal': float(latest['MACD_SIG']),
        'rsi': float(latest['RSI']),
        'bb_lower': float(latest['BB_L']),
        'bb_middle': float(latest['BB_M']),
        'bb_upper': float(latest['BB_U']),
        'events': events,
    }

# ---------- TOOLS ----------

@tool("Fetch OHLCV price history")
def fetch_ohlcv(ticker: str, period: str = '6mo', interval: str = '1d') -> str:
    """
    Fetch OHLCV using Alpha Vantage. Supports daily/weekly/monthly and basic intraday.
    Returns JSON:
      {
        csv_path (absolute), rows_count, start, end, last_close,
        period, interval (canonical), ticker, sanitized_ticker
      }
    """
    if not ALPHAVANTAGE_API_KEY:
        raise RuntimeError('ALPHAVANTAGE_API_KEY not set')

    base = 'https://www.alphavantage.co/query'
    canon_interval = _normalize_interval(interval)
    rows_target = _parse_period_to_rows(period, canon_interval)

    def _fetch_daily():
        # Try adjusted first
        try:
            data = _av_get(base, {
                'function': 'TIME_SERIES_DAILY_ADJUSTED',
                'symbol': ticker,
                'outputsize': 'full'
            })
            return _alpha_to_df(data, 'Time Series (Daily)', adjust_close_field='5. adjusted close')
        except Exception:
            # Fallback to non-adjusted daily
            data2 = _av_get(base, {
                'function': 'TIME_SERIES_DAILY',
                'symbol': ticker,
                'outputsize': 'full'
            })
            return _alpha_to_df(data2, 'Time Series (Daily)', adjust_close_field=None)

    if canon_interval == '1d':
        df = _fetch_daily()
    elif canon_interval == '1wk':
        data = _av_get(base, {'function': 'TIME_SERIES_WEEKLY_ADJUSTED', 'symbol': ticker})
        df = _alpha_to_df(data, 'Weekly Adjusted Time Series', adjust_close_field='5. adjusted close')
    elif canon_interval == '1mo':
        data = _av_get(base, {'function': 'TIME_SERIES_MONTHLY_ADJUSTED', 'symbol': ticker})
        df = _alpha_to_df(data, 'Monthly Adjusted Time Series', adjust_close_field='5. adjusted close')
    else:
        intr = canon_interval.replace('min', '')
        data = _av_get(base, {
            'function': 'TIME_SERIES_INTRADAY',
            'symbol': ticker,
            'interval': f'{intr}min',
            'outputsize': 'full'
            # 'adjusted' param is not consistently honored; omit to avoid confusion
        })
        ts_key = f'Time Series ({intr}min)'
        df = _alpha_to_df(data, ts_key, adjust_close_field=None)

    df = df.dropna(subset=['Close'])
    if df.empty:
        raise RuntimeError(f"Fetched empty dataframe for {ticker} ({canon_interval}, {period}).")

    if rows_target and rows_target > 0 and len(df) > rows_target:
        df = df.iloc[-rows_target:]

    # Write to absolute temp path
    safe_ticker = _sanitize_ticker(ticker)
    tmp_dir = tempfile.gettempdir()
    csv_path = os.path.join(tmp_dir, f"{safe_ticker}_{period}_{canon_interval}.csv")
    df.to_csv(csv_path, index=True)

    info = {
        'csv_path': os.path.abspath(csv_path),
        'rows_count': int(df.shape[0]),
        'start': str(pd.to_datetime(df.index[0]).date()),
        'end': str(pd.to_datetime(df.index[-1]).date()),
        'last_close': float(df['Close'].iloc[-1]),
        'period': period,
        'interval': canon_interval,
        'ticker': ticker,
        'sanitized_ticker': safe_ticker
    }
    return json.dumps(info)


@tool("Fetch recent news")
def fetch_news(ticker: str, limit: int = 10) -> str:
    """
    Fetch recent news using Alpha Vantage NEWS_SENTIMENT endpoint (subject to plan limits).
    Returns JSON list of {title, publisher, link, time}.
    """
    if not ALPHAVANTAGE_API_KEY:
        return json.dumps([])
    url = 'https://www.alphavantage.co/query'
    params = {
        'function': 'NEWS_SENTIMENT',
        'tickers': ticker,
        'sort': 'LATEST',
        'limit': min(50, max(1, limit)),
        'apikey': ALPHAVANTAGE_API_KEY
    }
    try:
        r = requests.get(url, params=params, timeout=30)
        r.raise_for_status()
        data = r.json()
    except Exception:
        return json.dumps([])

    feed = data.get('feed', []) or []
    items = []
    for n in feed[:limit]:
        src = n.get('source')
        authors = n.get('authors') or []
        if isinstance(authors, list):
            authors_str = ", ".join([str(a) for a in authors if a])
        else:
            authors_str = str(authors) if authors else ""
        publisher = src or (authors_str or "")
        items.append({
            'title': n.get('title'),
            'publisher': publisher,
            'link': n.get('url'),
            'time': n.get('time_published')  # e.g., "20240101T130000"
        })
    return json.dumps(items)


def _compute_indicators_core(csv_path: str) -> dict:
    df = _read_prices(csv_path)
    out = _compute_indicators_core_from_df(df)
    out['csv_path'] = os.path.abspath(csv_path)
    return out


@tool("Compute technical indicators")
def compute_indicators(csv_path: str) -> str:
    """
    Compute EMA20/EMA50, MACD, RSI, and Bollinger Bands from OHLCV CSV data.
    Detects crossovers and band breakouts, returning latest values and events as JSON.
    """
    out = _compute_indicators_core(csv_path)
    return json.dumps(out)


@tool("Compute risk metrics")
def compute_risk(csv_path: str) -> str:
    """
    Compute annualized volatility, max drawdown, and 1-day 95% VaR from OHLCV CSV data.
    Returns JSON with risk metrics and number of observations, plus conservative risk plan.
    """
    df = _read_prices(csv_path)
    rets = df['Close'].pct_change().dropna()

    if rets.empty:
        raise ValueError("Not enough return observations to compute risk metrics.")

    vol_ann = float(rets.std() * math.sqrt(252))
    cum = (1 + rets).cumprod()
    dd = (cum / cum.cummax()) - 1.0
    max_dd = float(dd.min())
    var95 = float(np.percentile(rets, 5))

    # Conservative risk suggestions
    # Stop: ~1.5 * daily volatility; TP: ~2.5 * daily volatility; Position: inverse with vol (bounded)
    daily_vol = float(rets.std())
    stop_loss_pct = round(1.5 * daily_vol, 4)
    take_profit_pct = round(2.5 * daily_vol, 4)
    # naive sizing: lower of 2% risk or based on vol cap
    pos_size_pct = round(max(0.5, min(5.0, 1.0 / (daily_vol * 100 + 1e-9))), 2)  # cap between 0.5% and 5%

    out = {
        'vol_annualized': vol_ann,
        'max_drawdown': max_dd,
        'hist_VaR_1d_95': var95,
        'n_days': int(len(rets)),
        'suggested': {
            'stop_loss_pct': stop_loss_pct,
            'take_profit_pct': take_profit_pct,
            'position_size_pct': pos_size_pct
        }
    }
    return json.dumps(out)


@tool("Rule-based technical signal")
def rule_based_signal(csv_path: str) -> str:
    """
    Generate a BUY/SELL/HOLD signal using simple rules on EMA, MACD, RSI, and Bollinger Bands.
    Returns JSON with signal, score, reasons, and indicator snapshot.
    """
    ind = _compute_indicators_core(csv_path)
    score = 0.0
    reasons = []

    if ind['ema20'] > ind['ema50']:
        score += 1; reasons.append('EMA20 > EMA50 (uptrend)')
    else:
        score -= 1; reasons.append('EMA20 < EMA50 (downtrend)')

    if ind['macd'] > ind['macd_signal']:
        score += 1; reasons.append('MACD > signal (bullish momentum)')
    else:
        score -= 1; reasons.append('MACD < signal (bearish momentum)')

    if ind['rsi'] < 30:
        score += 0.5; reasons.append('RSI < 30 (oversold)')
    elif ind['rsi'] > 70:
        score -= 0.5; reasons.append('RSI > 70 (overbought)')

    if ind['close'] < ind['bb_lower']:
        score += 0.5; reasons.append('Below lower band (mean-reversion up)')
    if ind['close'] > ind['bb_upper']:
        score -= 0.5; reasons.append('Above upper band (mean-reversion down)')

    signal = 'HOLD'
    if score >= 1.5:
        signal = 'BUY'
    elif score <= -1.5:
        signal = 'SELL'

    return json.dumps({
        'signal': signal,
        'score': score,
        'reasons': reasons,
        'indicators': ind
    })


@tool("Plot price & indicators")
def plot_price_and_indicators(csv_path: str) -> str:
    """
    Plot closing price with EMA20/50 and Bollinger Bands, saving to PNG.
    Returns the absolute file path of the generated chart.
    """
    df = _read_prices(csv_path).copy()
    df['EMA20'] = ema(df['Close'], 20)
    df['EMA50'] = ema(df['Close'], 50)
    df['BB_L'], df['BB_M'], df['BB_U'] = bollinger(df['Close'], 20, 2)

    plt.figure(figsize=(12, 6))
    df['Close'].plot(label='Close')
    df['EMA20'].plot(label='EMA20')
    df['EMA50'].plot(label='EMA50')
    df['BB_U'].plot(label='BB Upper')
    df['BB_L'].plot(label='BB Lower')
    plt.title('Price with EMAs & Bollinger Bands')
    plt.legend()
    out_path = os.path.splitext(os.path.abspath(csv_path))[0] + "_chart.png"
    plt.tight_layout(); plt.savefig(out_path); plt.close()
    return os.path.abspath(out_path)

### 3. Define Agents and Tasks

In [4]:
from crewai import Agent, Task, Crew, Process

# ---------- Agents ----------

market_data_analyst = Agent(
    role="Market Data Analyst",
    goal=("Gather and validate OHLCV & headlines for {ticker}. Summarize trend and anomalies."),
    backstory=("Meticulous about data quality, you verify timeframes and note gaps/splits."),
    tools=[fetch_ohlcv, fetch_news],
    allow_delegation=False,
    verbose=True,
)

technical_strategist = Agent(
    role="Technical Strategist",
    goal=("Transform price data into signals using EMA/RSI/MACD/Bollinger and explain rationale."),
    backstory=("Disciplined technician balancing momentum and mean-reversion; you state both sides."),
    tools=[compute_indicators, rule_based_signal, plot_price_and_indicators],
    allow_delegation=False,
    verbose=True,
)

risk_manager = Agent(
    role="Risk Manager",
    goal=("Quantify risk (vol, drawdown, VaR) and propose a conservative risk plan."),
    backstory=("Capital preservation first; you recommend sensible stops, targets, and sizing."),
    tools=[compute_risk],
    allow_delegation=False,
    verbose=True,
)

portfolio_manager = Agent(
    role="Portfolio Manager",
    goal=("Integrate data/signals/risk and decide: BUY/SELL/HOLD for {ticker} now, with confidence."),
    backstory=("Accountable decision-maker who weighs conflicting evidence and avoids bravado."),
    tools=[],
    allow_delegation=True,
    verbose=True,
)

# ---------- Tasks ----------
# Key change: downstream tasks MUST use the exact csv_path produced by the Market Data task.

task_collect = Task(
    description=(
        "For ticker {ticker} with period {period} and interval {interval}:\n"
        "1) Use *Fetch OHLCV price history* and capture its returned `csv_path` (absolute), rows_count, date range, and last_close.\n"
        "2) Use *Fetch recent news* (top 10).\n"
        "3) Summarize trend (up/down/sideways) and any data anomalies.\n\n"
        "**Return** JSON: {csv_path, rows_count, date_start, date_end, last_close, headlines[]}"
    ),
    expected_output="JSON object with csv_path, rows_count, date_start, date_end, last_close, headlines[]",
    agent=market_data_analyst,
)

task_tech = Task(
    description=(
        "Use the `csv_path` from the Market Data task output.\n"
        "- Call *Compute technical indicators* with that `csv_path`.\n"
        "- Call *Rule-based technical signal* with the same `csv_path`.\n"
        "- Optionally, call *Plot price & indicators* with the same `csv_path` to produce a chart.\n\n"
        "**Return** JSON: {close, ema20, ema50, macd, macd_signal, rsi, bb_lower, bb_middle, bb_upper, "
        "events[], rule_signal{signal, score, reasons[]}, chart_path?}"
    ),
    expected_output="JSON with indicators, notable events, a rule-based signal, and optional chart path",
    context=[task_collect],
    agent=technical_strategist,
)

task_risk = Task(
    description=(
        "Use the `csv_path` from the Market Data task output.\n"
        "- Call *Compute risk metrics* with that `csv_path`.\n"
        "- Propose stop_loss %, take_profit %, position_size % based on volatility and drawdown (conservative).\n\n"
        "**Return** JSON: {vol_annualized, max_drawdown, hist_VaR_1d_95, n_days, suggested:{stop_loss_pct, "
        "take_profit_pct, position_size_pct}}"
    ),
    expected_output="JSON with risk metrics and a conservative risk plan",
    context=[task_collect],
    agent=risk_manager,
)

task_decide = Task(
    description=(
        "Synthesize all prior outputs into a single recommendation for {ticker}. Choose one: BUY / SELL / HOLD now. "
        "Provide 3–5 reasons (include indicators & risk), confidence (0–1), and restate the risk plan.\n\n"
        "**Return** FINAL JSON: {action, confidence, price, reasons[], key_signals[], "
        "risk{stop_loss_pct, take_profit_pct, position_size_pct}}"
    ),
    expected_output="A clean JSON object with action, confidence, reasons, key_signals, and risk plan",
    context=[task_collect, task_tech, task_risk],
    agent=portfolio_manager,
)

crew = Crew(
    agents=[market_data_analyst, technical_strategist, risk_manager, portfolio_manager],
    tasks=[task_collect, task_tech, task_risk, task_decide],
    process=Process.sequential,  # run in order; no delegation tool needed
    verbose=True,
)

In [5]:
# Run the crew for a given ticker
TICKER = 'MSTR'      # change me
PERIOD = '1d'       # e.g., '1y', '2y'
INTERVAL = '5min'      # e.g., '1h', '1wk', '60min'

result = crew.kickoff(inputs={
    'ticker': TICKER,
    'period': PERIOD,
    'interval': INTERVAL,
})
print('\n===== FINAL RECOMMENDATION =====\n')
print(result)

Output()

Output()

Output()

[91m 

I encountered an error while trying to use the tool. This was the error: Alpha Vantage error/rate limit/info: {'Information': 'We have detected your API key as 2DBERVRG73B48JN6 and our standard API rate limit is 25 requests per day. Please subscribe to any of the premium plans at https://www.alphavantage.co/premium/ to instantly remove all daily rate limits.'}.
 Tool Fetch OHLCV price history accepts these inputs: Tool Name: Fetch OHLCV price history
Tool Arguments: {'ticker': {'description': None, 'type': 'str'}, 'period': {'description': None, 'type': 'str'}, 'interval': {'description': None, 'type': 'str'}}
Tool Description: 
    Fetch OHLCV using Alpha Vantage. Supports daily/weekly/monthly and basic intraday.
    Returns JSON:
      {
        csv_path (absolute), rows_count, start, end, last_close,
        period, interval (canonical), ticker, sanitized_ticker
      }
    
[00m


Output()

Output()

Output()

Output()

[91m 

I encountered an error while trying to use the tool. This was the error: Alpha Vantage error/rate limit/info: {'Information': 'We have detected your API key as 2DBERVRG73B48JN6 and our standard API rate limit is 25 requests per day. Please subscribe to any of the premium plans at https://www.alphavantage.co/premium/ to instantly remove all daily rate limits.'}.
 Tool Fetch OHLCV price history accepts these inputs: Tool Name: Fetch OHLCV price history
Tool Arguments: {'ticker': {'description': None, 'type': 'str'}, 'period': {'description': None, 'type': 'str'}, 'interval': {'description': None, 'type': 'str'}}
Tool Description: 
    Fetch OHLCV using Alpha Vantage. Supports daily/weekly/monthly and basic intraday.
    Returns JSON:
      {
        csv_path (absolute), rows_count, start, end, last_close,
        period, interval (canonical), ticker, sanitized_ticker
      }
    
[00m


Output()

Output()

Output()

KeyboardInterrupt: 