<a href="https://colab.research.google.com/github/ray-islam/Quant-Research/blob/main/agentic_ai_tiny_gpt2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ===============================
# Agentic Trading Copilot — AI Edition (LLM + ML Predictor)
# ===============================
#!pip -q install yfinance gradio scikit-learn transformers accelerate --upgrade

In [2]:
import warnings; warnings.filterwarnings("ignore")
import math, numpy as np, pandas as pd, matplotlib.pyplot as plt, yfinance as yf
from math import sqrt, log
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler

# -------- settings --------
USE_LLM = False   # set True to enable tiny LLM planner (downloads a small model)
LLM_MODEL = "sshleifer/tiny-gpt2"  # tiny demo model; swap to TinyLlama for better quality

# -------- utils: force 1-D --------
def ravel1d(x): return np.asarray(x).ravel()
def assert_1d_all(df):
    bad = {c: v.shape for c,v in ((c, df[c].to_numpy()) for c in df.columns) if v.ndim != 1}
    if bad: raise ValueError(f"Non-1D columns found: {bad}")

# -------- data loader (intraday defaults for more actions) --------
def load_data(ticker="SPY", start=None, period="60d", interval="15m"):
    if period is not None and start is None:
        df = yf.download(ticker, period=period, interval=interval, auto_adjust=True, progress=False, prepost=True)
    else:
        start = pd.to_datetime(start)
        start_eff = (start - pd.Timedelta(days=60)).strftime("%Y-%m-%d")
        end_eff   = (pd.Timestamp.today().normalize() + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
        df = yf.download(ticker, start=start_eff, end=end_eff, interval=interval, auto_adjust=True, progress=False, prepost=True)

    if df is None or len(df) == 0:
        raise ValueError(f"No data for {ticker}.")
    df = df.rename(columns=str.lower).replace([np.inf, -np.inf], np.nan).dropna(how="any")
    base_cols = ["open","high","low","close","volume"]
    base = pd.DataFrame({c: pd.to_numeric(df[c].to_numpy().ravel(), errors="coerce")
                         for c in base_cols if c in df.columns}, index=df.index).dropna()
    if len(base) < 100:
        df2 = yf.download(ticker, period="90d", interval=interval, auto_adjust=True, progress=False, prepost=True).rename(columns=str.lower)
        base = pd.DataFrame({c: pd.to_numeric(df2[c].to_numpy().ravel(), errors="coerce")
                             for c in base_cols if c in df2.columns}, index=df2.index).dropna()
    if len(base) < 100:
        raise ValueError(f"Not enough data for {ticker} at {interval}. Got {len(base)} rows.")
    return base

# -------- indicators (manual, no 'ta') --------
def sma(s, w): return pd.Series(s.rolling(w).mean().to_numpy().ravel(), index=s.index, name=f"sma{w}")
def rsi(s, w=14):
    d = s.diff()
    up = pd.Series(np.where(d > 0, d, 0.0).ravel(), index=s.index)
    dn = pd.Series(np.where(d < 0, -d, 0.0).ravel(), index=s.index)
    gain = up.ewm(alpha=1/w, adjust=False).mean()
    loss = dn.ewm(alpha=1/w, adjust=False).mean()
    rs = gain / (loss + 1e-12)
    return pd.Series((100 - (100/(1+rs))).to_numpy().ravel(), index=s.index, name="rsi")
def donchian_high(h, w=20): return pd.Series(h.rolling(w).max().to_numpy().ravel(), index=h.index, name="don_high")
def donchian_low(l, w=20):  return pd.Series(l.rolling(w).min().to_numpy().ravel(), index=l.index, name="don_low")
def atr(h, l, c, w=14):
    pc = c.shift(1)
    tr = np.maximum.reduce([
        (h - l).to_numpy().ravel(),
        (h - pc).abs().to_numpy().ravel(),
        (l - pc).abs().to_numpy().ravel()
    ])
    tr = pd.Series(tr, index=h.index)
    return pd.Series(tr.ewm(alpha=1/w, adjust=False).mean().to_numpy().ravel(), index=h.index, name="atr")

def add_features(base):
    idx = base.index
    feats = pd.DataFrame({
        "sma10":    sma(base["close"], 10).to_numpy().ravel(),
        "sma30":    sma(base["close"], 30).to_numpy().ravel(),
        "rsi":      rsi(base["close"], 14).to_numpy().ravel(),
        "don_high": donchian_high(base["high"], 20).to_numpy().ravel(),
        "don_low":  donchian_low(base["low"], 20).to_numpy().ravel(),
        "atr":      atr(base["high"], base["low"], base["close"], 14).to_numpy().ravel(),
        "ret":      base["close"].pct_change().to_numpy().ravel(),
    }, index=idx)
    d = pd.concat([base, feats], axis=1).replace([np.inf, -np.inf], np.nan).dropna()
    assert_1d_all(d)
    return d

# -------- classic signals --------
def sig_momentum(d):
    s = pd.Series(0, index=d.index, dtype=int)
    s[d["sma10"] > d["sma30"]] = 1
    return s.astype(int)

def sig_meanrev(d):
    s = pd.Series(0, index=d.index, dtype=int)
    s[d["rsi"] < 35] = 1
    s[d["rsi"] > 60] = 0
    return s.ffill().fillna(0).astype(int)

def sig_breakout(d):
    up = d["don_high"].shift(1)
    dn = d["don_low"].shift(1)
    s = pd.Series(0, index=d.index, dtype=int)
    s[d["close"] > up] = 1
    s[d["close"] < dn] = 0
    return s.ffill().fillna(0).astype(int)

# -------- ML predictor signal (online SGD) --------
def sig_ml(d, warmup=200, threshold=0.52):
    X = pd.DataFrame({
        "sma_spread": (d["sma10"] - d["sma30"]) / (d["sma30"] + 1e-9),
        "atr_pct":    d["atr"] / (d["close"] + 1e-9),
        "rsi":        d["rsi"],
        "don_range": (d["don_high"] - d["don_low"]) / (d["close"] + 1e-9),
        "ret_lag1":   d["ret"].shift(1).fillna(0.0),
    }, index=d.index).replace([np.inf, -np.inf], np.nan).fillna(0.0)
    y = (d["ret"].shift(-1) > 0).astype(int).fillna(0).astype(int)

    scal = StandardScaler()
    clf  = SGDClassifier(loss="log_loss", alpha=1e-4, random_state=0)
    probs = np.zeros(len(X))

    warm = min(max(warmup, 50), len(X)-1)
    scal.partial_fit(X.iloc[:warm])
    clf.partial_fit(scal.transform(X.iloc[:warm]), y.iloc[:warm], classes=[0,1])

    for t in range(warm, len(X)):
        p = clf.predict_proba(scal.transform(X.iloc[t:t+1]))[0,1]
        probs[t] = p
        scal.partial_fit(X.iloc[t:t+1])
        clf.partial_fit(scal.transform(X.iloc[t:t+1]), y.iloc[t:t+1])

    return pd.Series((probs > threshold).astype(int), index=d.index, name="sig_ml")

# -------- backtester (budget-capped + min_shares) --------
def backtest(d, sig, risk_perc=0.02, sl_atr=1.5, min_shares=1, commission_bps=0):
    """
    - Caps shares by both risk and cash budget so entries don't fail on expensive tickers.
    - Ensures at least `min_shares` if affordable.
    """
    sig = pd.Series(sig.to_numpy().ravel(), index=sig.index).reindex(d.index).ffill().fillna(0).astype(int)

    cash, pos, equity = 100_000.0, 0, []
    entry_px, stop, shares = None, None, 0

    for i in range(1, len(d)):
        row = d.iloc[i]
        desired = int(sig.iloc[i])

        # trail stop
        if pos == 1 and stop is not None:
            stop = max(stop, row["close"] - sl_atr * row["atr"])

        # stop-out
        if pos == 1 and stop is not None and row["low"] < stop:
            px = stop
            cash += shares * px * (1 - commission_bps/10000.0)
            pos, shares, entry_px, stop = 0, 0, None, None

        # entries / exits
        if desired == 1 and pos == 0:
            px = row["close"]
            risk_dollars   = cash * risk_perc
            per_share_risk = max(0.01, sl_atr * row["atr"])

            # --- budget cap fix ---
            shares_risk   = int(risk_dollars // per_share_risk)
            shares_budget = int(cash // (px * (1 + commission_bps/10000.0)))
            shares = max(min_shares, min(shares_risk, shares_budget))

            if shares > 0:
                cost = shares * px * (1 + commission_bps/10000.0)
                if cost <= cash:
                    entry_px = px
                    cash -= cost
                    stop = entry_px - sl_atr * row["atr"]
                    pos = 1

        elif desired == 0 and pos == 1:
            px = row["close"]
            cash += shares * px * (1 - commission_bps/10000.0)
            pos, shares, entry_px, stop = 0, 0, None, None

        # mark-to-market
        mark = cash + (shares * row["close"] if pos == 1 else 0.0)
        equity.append(mark)

    eq = pd.Series(ravel1d(equity), index=d.index[1:], name="equity")
    retn = eq.pct_change().fillna(0.0)
    roll20 = retn.rolling(20)
    # rough intraday annualization for 15m bars
    sharpe20 = (roll20.mean() / (roll20.std() + 1e-9)) * np.sqrt(252*6.5*60/15)
    peak = eq.cummax(); dd = (eq/peak - 1.0)
    return {"equity": eq, "retn": retn, "sharpe20": sharpe20, "dd": dd, "final": float(eq.iloc[-1])}

# -------- regime + LLM planner (optional) --------
def summarize_regime(d):
    vol20 = float(d["ret"].rolling(20).std().iloc[-1] * np.sqrt(252))
    trend = float((d["sma10"].iloc[-1] - d["sma30"].iloc[-1]) / (d["sma30"].iloc[-1] + 1e-9))
    rng   = float((d["don_high"].iloc[-1] - d["don_low"].iloc[-1]) / (d["close"].iloc[-1] + 1e-9))
    return {"vol20": round(vol20,4), "trend": round(trend,4), "range": round(rng,4)}

def heuristic_choice(reg):
    if abs(reg["trend"]) < 0.004 and reg["range"] > 0.02: return "MEANREV", 0.02, 1.5, "Heuristic: flat trend, wide range"
    if reg["range"] > 0.03 and reg["trend"] > 0:          return "BREAKOUT", 0.02, 1.5, "Heuristic: wide range, uptrend"
    return "MOMENTUM", 0.02, 1.5, "Heuristic: trend dominant"

def llm_plan(regime, last_perf):
    if not USE_LLM:
        return heuristic_choice(regime)
    try:
        from transformers import AutoModelForCausalLM, AutoTokenizer
        tok = AutoTokenizer.from_pretrained(LLM_MODEL)
        lm  = AutoModelForCausalLM.from_pretrained(LLM_MODEL, device_map="auto")
        prompt = f"""Return EXACTLY one line:
STRATEGY=<MOMENTUM|MEANREV|BREAKOUT>; RISK=0.02; SL_ATR=1.5; WHY=<short reason>
Regime: {regime} Recent: {last_perf}"""
        x = tok(prompt, return_tensors="pt").to(lm.device)
        y = lm.generate(**x, max_new_tokens=64, do_sample=True, temperature=0.3)
        raw = tok.decode(y[0], skip_special_tokens=True).splitlines()[-1]
        up = raw.upper()
        strat = "MOMENTUM"
        if "MEAN" in up: strat = "MEANREV"
        if "BREAK" in up: strat = "BREAKOUT"
        import re
        m_r = re.search(r"RISK\s*=\s*([0-9.]+)", up); m_s = re.search(r"SL_ATR\s*=\s*([0-9.]+)", up)
        risk = float(m_r.group(1)) if m_r else 0.02
        sl   = float(m_s.group(1)) if m_s else 1.5
        # safety clamps
        risk = min(max(risk, 0.005), 0.03)
        sl   = min(max(sl,   1.0),   4.0)
        return strat, risk, sl, raw
    except Exception as e:
        print("LLM disabled:", e)
        return heuristic_choice(regime)

# -------- bandit (UCB1) --------
class UCB1:
    def __init__(self, arms, c=1.2):
        self.arms = list(arms); self.c = c
        self.n = {a: 0 for a in arms}; self.mu = {a: 0.0 for a in arms}; self.t = 0
    def select(self, bias_arm=None, bias_bonus=0.0):
        self.t += 1
        for a in self.arms:
            if self.n[a] == 0: return a
        scores = {}
        for a in self.arms:
            bonus = self.c * sqrt(log(self.t)/self.n[a])
            scores[a] = self.mu[a] + bonus + (bias_bonus if a == bias_arm else 0.0)
        return max(scores, key=scores.get)
    def update(self, arm, reward):
        self.n[arm] += 1
        self.mu[arm] += (reward - self.mu[arm]) / self.n[arm]

# -------- agentic selection (with ML arm) --------
def agentic_signal(d, rebalance=20, lookback=80, bandit_c=1.2, llm_bias=0.05):
    sigs = {
        "MOMENTUM": sig_momentum(d),
        "MEANREV":  sig_meanrev(d),
        "BREAKOUT": sig_breakout(d),
        "ML":       sig_ml(d, warmup=max(200, lookback), threshold=0.52),
    }
    strat_ret = {k: (sigs[k].shift(1).fillna(0).to_numpy() * d["ret"].to_numpy()) for k in sigs}
    comp = np.zeros(len(d), dtype=int)
    bandit = UCB1(sigs.keys(), c=bandit_c)
    decisions = []

    # initial plan (for bias toward LLM/heuristic suggestion)
    reg = summarize_regime(d.iloc[:max(lookback, 100)])
    init_strat, risk_perc, sl_atr, rationale = llm_plan(reg, last_perf={"sharpe":"n/a"})

    for start in range(max(lookback, 100), len(d), rebalance):
        end = min(start + rebalance, len(d))
        past_start = max(0, start - lookback)
        window_rewards = {k: float(np.nan_to_num(strat_ret[k][past_start:start]).mean()) for k in sigs}
        arm = bandit.select(bias_arm=init_strat, bias_bonus=llm_bias)
        comp[start:end] = sigs[arm].iloc[start:end].to_numpy()
        realized = float(np.nan_to_num(strat_ret[arm][start:end]).mean())
        bandit.update(arm, realized)
        decisions.append((d.index[start], arm, realized, window_rewards))

    return pd.Series(comp, index=d.index, name="sig_agentic"), {"risk_perc": risk_perc, "sl_atr": sl_atr, "rationale": rationale, "decisions": decisions}

# -------- main agent --------
def run_agent(ticker="SPY", period="60d", interval="15m",
              rebalance=20, lookback=80,
              risk_perc=None, sl_atr=None,
              commission_bps=0, min_shares=1):
    base = load_data(ticker, period=period, interval=interval)
    d    = add_features(base)

    comp_sig, meta = agentic_signal(d, rebalance=rebalance, lookback=lookback, bandit_c=1.2, llm_bias=0.05)
    # Use planner-proposed risk/stop unless overridden
    r = meta["risk_perc"] if risk_perc is None else float(risk_perc)
    s = meta["sl_atr"]    if sl_atr    is None else float(sl_atr)

    res = backtest(d, comp_sig, risk_perc=r, sl_atr=s, min_shares=min_shares, commission_bps=commission_bps)
    sharpe = float(0.0 if pd.isna(res["sharpe20"].iloc[-1]) else res["sharpe20"].iloc[-1])
    maxdd  = float(res["dd"].min()) if len(res["dd"]) else 0.0
    summary = {
        "ticker": ticker,
        "final_equity": f"${res['final']:,.0f}",
        "rolling_sharpe_20d": round(sharpe, 2),
        "max_drawdown": f"{round(maxdd*100, 2)}%",
        "risk_perc": r, "sl_atr": s,
        "planner_rationale": meta["rationale"],
        "decisions_sample": [(str(t), a, round(rv,4)) for t,a,rv,_ in meta["decisions"][:8]]
    }
    return d, comp_sig, res, summary

# -------- run once (sanity) --------
d, sig_comp, res, summary = run_agent()  # SPY, 60d, 15m defaults
print("Summary:", summary)

# -------- UI --------
import gradio as gr
def app(ticker, period, interval, rebalance, lookback, risk_perc, sl_atr, use_llm):
    global USE_LLM; USE_LLM = bool(use_llm)
    d, sig_comp, res, summary = run_agent(
        ticker=ticker, period=period, interval=interval,
        rebalance=int(rebalance), lookback=int(lookback),
        risk_perc=float(risk_perc) if risk_perc>0 else None,
        sl_atr=float(sl_atr) if sl_atr>0 else None
    )
    eq = res["equity"].rename("Equity").to_frame()
    ax = eq.plot(figsize=(8,4), title="Agentic Equity Curve (AI Edition)")
    fig = ax.get_figure(); plt.close(fig)
    stats = (
        f"**Ticker**: {summary['ticker']}\n"
        f"**Final Equity**: {summary['final_equity']}\n"
        f"**Rolling Sharpe (20d)**: {summary['rolling_sharpe_20d']}\n"
        f"**Max Drawdown**: {summary['max_drawdown']}\n"
        f"**Risk %/Trade**: {summary['risk_perc']}\n"
        f"**Stop (ATR)**: {summary['sl_atr']}\n"
        f"**Planner rationale**: {summary['planner_rationale']}\n"
        f"**First decisions**: {summary['decisions_sample']}\n"
    )
    return stats, fig

demo = gr.Interface(
    fn=app,
    inputs=[gr.Textbox(value="SPY", label="Ticker"),
            gr.Textbox(value="60d", label="Lookback Period (e.g., 60d, 90d)"),
            gr.Dropdown(choices=["15m","30m","1h","1d"], value="15m", label="Interval"),
            gr.Slider(5, 60, value=20, step=1, label="Rebalance (bars)"),
            gr.Slider(40, 200, value=80, step=5, label="Bandit Lookback (bars)"),
            gr.Slider(0.5, 3.0, value=2.0, step=0.1, label="Risk % per trade (0 = use planner)"),
            gr.Slider(0.5, 4.0, value=1.5, step=0.1, label="Stop (ATR multiples) (0 = use planner)"),
            gr.Checkbox(value=False, label="Use LLM planner (tiny)")],
    outputs=[gr.Markdown(label="Summary"), gr.Plot(label="Equity Curve")],
    title="Agentic Trading Copilot — AI Edition (Budget-Capped)",
    description="Bandit selects among Momentum / MeanRev / Breakout / ML predictor. Optional LLM planner."
)
demo.launch(share=False)

Summary: {'ticker': 'SPY', 'final_equity': '$82,673', 'rolling_sharpe_20d': -20.61, 'max_drawdown': '-17.47%', 'risk_perc': 0.02, 'sl_atr': 1.5, 'planner_rationale': 'Heuristic: flat trend, wide range', 'decisions_sample': [('2025-05-23 08:45:00+00:00', 'MOMENTUM', 0.0), ('2025-05-23 14:00:00+00:00', 'MEANREV', 0.0), ('2025-05-23 19:00:00+00:00', 'BREAKOUT', 0.0), ('2025-05-27 08:00:00+00:00', 'ML', 0.0), ('2025-05-27 13:00:00+00:00', 'MEANREV', 0.0), ('2025-05-27 18:15:00+00:00', 'MOMENTUM', 0.0), ('2025-05-27 23:15:00+00:00', 'BREAKOUT', -0.0), ('2025-05-28 12:15:00+00:00', 'ML', -0.0)]}
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Note: opening Chrome Inspector may crash demo inside Colab notebooks.
* To create a public link, set `share=True` in `launch()`.


<IPython.core.display.Javascript object>

