# Small-Cap Kalman Hybrid Aggressive Optimizer

**Target:** >20% return on small-cap stocks (<$25/share)

Uses:
- GPU-accelerated Kalman filter (CuPy)
- Massive 1-minute flat files
- NVIDIA Nemotron for parameter analysis

In [None]:
# Configuration
N_TRIALS = 50  # Trials per symbol
AGGREGATE_MINS = 5  # Aggregate to 5-min bars
TARGET_RETURN = 0.20  # 20% target

In [None]:
import os
import sys
import gzip
import json
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, field

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# GPU support
try:
    import cupy as cp
    GPU_AVAILABLE = True
    print(f"GPU: {cp.cuda.runtime.getDeviceProperties(0)['name'].decode()}")
except ImportError:
    GPU_AVAILABLE = False
    cp = np
    print("GPU: Not available, using CPU")

# Paths
MASSIVE_DATA_DIR = Path.home() / "projects/ordinis/data/massive"
OUTPUT_DIR = Path.home() / "projects/ordinis/data/backtest_results/small_cap_kalman"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print(f"Data dir: {MASSIVE_DATA_DIR}")
print(f"Output dir: {OUTPUT_DIR}")

In [None]:
# Small-cap sectors with aggressive volatility focus
SMALL_CAP_SECTORS = {
    "crypto_mining": ["RIOT", "MARA", "CLSK", "BITF", "HUT", "CIFR", "WULF"],
    "biotech_speculative": ["BNGO", "SNDL", "TLRY", "CGC", "ACB", "XXII", "AGRX"],
    "ev_clean_energy": ["PLUG", "FCEL", "BE", "CHPT", "BLNK", "EVGO", "GOEV"],
    "fintech_disruptors": ["SOFI", "HOOD", "AFRM", "UPST", "OPEN", "LMND"],
    "retail_consumer": ["GME", "AMC", "BBIG", "WKHS", "WISH", "CLOV", "EXPR"],
    "industrial_materials": ["CLF", "X", "AA", "BTU", "ARCH", "HCC"],
    "tech_software": ["AI", "SOUN", "IONQ", "RGTI", "QUBT", "KULR"],
    "real_estate_reits": ["MPW", "AGNC", "NLY", "ABR", "TWO", "IVR"],
}
ALL_SMALL_CAPS = [s for sector in SMALL_CAP_SECTORS.values() for s in sector]
print(f"Total small-cap symbols: {len(ALL_SMALL_CAPS)}")

In [None]:
@dataclass
class AggressiveConfig:
    """Aggressive optimization config targeting >20% returns."""
    initial_capital: float = 100_000.0
    position_size_pct: float = 0.15
    max_positions: int = 3
    process_noise_q: float = 1e-4
    observation_noise_r: float = 5e-3
    residual_z_entry: float = 0.75
    residual_z_exit: float = 0.1
    trend_slope_min: float = 0.0
    stop_loss_pct: float = 0.05
    take_profit_pct: float = 0.08
    trailing_stop_pct: float = 0.025
    aggregate_mins: int = 5

In [None]:
def load_massive_symbol(symbol: str, aggregate_mins: int = 5) -> pd.DataFrame | None:
    """Load all Massive data for a symbol and aggregate."""
    dfs = []
    for gz_file in sorted(MASSIVE_DATA_DIR.glob("*.csv.gz")):
        try:
            with gzip.open(gz_file, "rt") as f:
                df = pd.read_csv(f)
            sym_df = df[df["ticker"] == symbol].copy()
            if sym_df.empty:
                continue
            sym_df["datetime"] = pd.to_datetime(sym_df["window_start"], unit="ns", utc=True)
            sym_df = sym_df.set_index("datetime")
            sym_df = sym_df[["open", "high", "low", "close", "volume"]]
            dfs.append(sym_df)
        except Exception:
            continue
    if not dfs:
        return None
    combined = pd.concat(dfs).sort_index()
    combined = combined[~combined.index.duplicated(keep="first")]
    if aggregate_mins > 1:
        combined = combined.resample(f"{aggregate_mins}min").agg({
            "open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"
        }).dropna()
    return combined if len(combined) >= 100 else None

In [None]:
def get_available_small_caps() -> list[tuple[str, float]]:
    """Get small-cap symbols available in Massive data with current prices."""
    gz_files = list(MASSIVE_DATA_DIR.glob("*.csv.gz"))
    if not gz_files:
        return []
    latest = sorted(gz_files)[-1]
    with gzip.open(latest, "rt") as f:
        df = pd.read_csv(f)
    available = []
    for sym in ALL_SMALL_CAPS:
        sym_df = df[df["ticker"] == sym]
        if not sym_df.empty:
            last_price = sym_df["close"].iloc[-1]
            if last_price < 25:
                available.append((sym, last_price))
    return sorted(available, key=lambda x: x[1])

available = get_available_small_caps()
print(f"Found {len(available)} small-cap symbols under $25:")
for sym, price in available[:10]:
    print(f"  {sym}: ${price:.2f}")
if len(available) > 10:
    print(f"  ... and {len(available) - 10} more")

In [None]:
def kalman_filter_gpu(prices: np.ndarray, q: float, r: float) -> dict:
    """GPU-accelerated Kalman filter."""
    xp = cp if GPU_AVAILABLE else np
    n = len(prices)
    prices_gpu = xp.asarray(prices, dtype=xp.float64)
    levels = xp.zeros(n, dtype=xp.float64)
    residuals = xp.zeros(n, dtype=xp.float64)
    variances = xp.zeros(n, dtype=xp.float64)
    x = float(prices_gpu[0])
    p = 1.0
    for i in range(n):
        x_pred = x
        p_pred = p + q
        k = p_pred / (p_pred + r)
        x = x_pred + k * (float(prices_gpu[i]) - x_pred)
        p = (1 - k) * p_pred
        levels[i] = x
        residuals[i] = prices_gpu[i] - x
        variances[i] = p
    slopes = xp.diff(levels, prepend=levels[0])
    lookback = 50
    residual_z = xp.zeros(n, dtype=xp.float64)
    for i in range(lookback, n):
        window = residuals[i-lookback:i]
        mean = xp.mean(window)
        std = xp.std(window) + 1e-10
        residual_z[i] = (residuals[i] - mean) / std
    if GPU_AVAILABLE:
        return {"levels": cp.asnumpy(levels), "residuals": cp.asnumpy(residuals),
                "residual_z": cp.asnumpy(residual_z), "slopes": cp.asnumpy(slopes)}
    return {"levels": levels, "residuals": residuals, "residual_z": residual_z, "slopes": slopes}

In [None]:
@dataclass
class BacktestResult:
    symbol: str
    config: dict
    total_return: float = 0.0
    sharpe_ratio: float = 0.0
    max_drawdown: float = 0.0
    win_rate: float = 0.0
    profit_factor: float = 0.0
    num_trades: int = 0
    avg_trade_pnl: float = 0.0
    trades: list = field(default_factory=list)
    equity_curve: list = field(default_factory=list)
    
    def to_dict(self) -> dict:
        return {"symbol": self.symbol, "config": self.config, "total_return": self.total_return,
                "sharpe_ratio": self.sharpe_ratio, "max_drawdown": self.max_drawdown,
                "win_rate": self.win_rate, "profit_factor": self.profit_factor,
                "num_trades": self.num_trades, "avg_trade_pnl": self.avg_trade_pnl}

In [None]:
def run_aggressive_backtest(df: pd.DataFrame, symbol: str, config: AggressiveConfig) -> BacktestResult:
    """Run aggressive backtest targeting high returns."""
    n = len(df)
    if n < 100:
        return BacktestResult(symbol=symbol, config={}, total_return=-1.0)
    prices = df["close"].values
    kalman = kalman_filter_gpu(prices, config.process_noise_q, config.observation_noise_r)
    capital = config.initial_capital
    position = 0.0
    entry_price = 0.0
    entry_idx = 0
    max_price = 0.0
    trades = []
    equity_curve = [capital]
    warmup = 60
    
    for i in range(warmup, n):
        current_price = prices[i]
        residual_z = kalman["residual_z"][i]
        if position != 0:
            unrealized = position * (current_price - entry_price)
            equity_curve.append(capital + unrealized)
        else:
            equity_curve.append(capital)
        
        if position != 0:
            if position > 0:
                pnl_pct = (current_price - entry_price) / entry_price
                max_price = max(max_price, current_price)
            else:
                pnl_pct = (entry_price - current_price) / entry_price
                max_price = min(max_price, current_price) if max_price > 0 else current_price
            
            exit_signal = False
            exit_reason = ""
            if pnl_pct < -config.stop_loss_pct:
                exit_signal = True
                exit_reason = "stop_loss"
            elif pnl_pct >= config.take_profit_pct:
                exit_signal = True
                exit_reason = "take_profit"
            elif pnl_pct > 0.02:
                if position > 0:
                    trailing = max_price * (1 - config.trailing_stop_pct)
                    if current_price <= trailing:
                        exit_signal = True
                        exit_reason = "trailing_stop"
                else:
                    trailing = max_price * (1 + config.trailing_stop_pct)
                    if current_price >= trailing:
                        exit_signal = True
                        exit_reason = "trailing_stop"
            if not exit_signal:
                if (position > 0 and residual_z > -config.residual_z_exit) or \
                   (position < 0 and residual_z < config.residual_z_exit):
                    if pnl_pct > 0:
                        exit_signal = True
                        exit_reason = "mean_reversion"
            if exit_signal:
                pnl = position * (current_price - entry_price)
                capital += pnl
                trades.append({"pnl": pnl, "pnl_pct": pnl_pct, "reason": exit_reason})
                position = 0.0
                max_price = 0.0
        
        if position == 0:
            signal_long = residual_z < -config.residual_z_entry
            signal_short = residual_z > config.residual_z_entry
            if signal_long or signal_short:
                risk_amount = capital * config.position_size_pct
                shares = risk_amount / current_price
                position = shares if signal_long else -shares
                entry_price = current_price
                entry_idx = i
                max_price = current_price
    
    if position != 0:
        final_price = prices[-1]
        pnl = position * (final_price - entry_price)
        pnl_pct = (final_price - entry_price) / entry_price * np.sign(position)
        capital += pnl
        trades.append({"pnl": pnl, "pnl_pct": pnl_pct, "reason": "end_of_data"})
        equity_curve.append(capital)
    
    equity_arr = np.array(equity_curve)
    total_return = (capital - config.initial_capital) / config.initial_capital
    if len(equity_arr) > 1:
        returns = np.diff(equity_arr) / equity_arr[:-1]
        sharpe = np.mean(returns) / (np.std(returns) + 1e-10) * np.sqrt(252 * 78)
    else:
        sharpe = 0.0
    peak = np.maximum.accumulate(equity_arr)
    drawdown = (peak - equity_arr) / (peak + 1e-10)
    max_dd = float(np.max(drawdown))
    if trades:
        winners = [t for t in trades if t["pnl"] > 0]
        losers = [t for t in trades if t["pnl"] <= 0]
        win_rate = len(winners) / len(trades)
        gross_profit = sum(t["pnl"] for t in winners) if winners else 0
        gross_loss = abs(sum(t["pnl"] for t in losers)) if losers else 1
        profit_factor = gross_profit / gross_loss if gross_loss > 0 else 0
        avg_pnl = np.mean([t["pnl_pct"] for t in trades])
    else:
        win_rate = profit_factor = avg_pnl = 0
    
    return BacktestResult(
        symbol=symbol,
        config={"q": config.process_noise_q, "r": config.observation_noise_r,
                "z_entry": config.residual_z_entry, "z_exit": config.residual_z_exit,
                "stop_loss": config.stop_loss_pct, "take_profit": config.take_profit_pct},
        total_return=total_return, sharpe_ratio=sharpe, max_drawdown=max_dd,
        win_rate=win_rate, profit_factor=profit_factor, num_trades=len(trades),
        avg_trade_pnl=avg_pnl, trades=trades[:50], equity_curve=equity_curve[::max(1, len(equity_curve)//100)])

In [None]:
def run_optimization(df: pd.DataFrame, symbol: str, n_trials: int = 50) -> tuple:
    """Run n_trials of random parameter search."""
    param_grid = {
        "process_noise_q": [1e-5, 5e-5, 1e-4, 5e-4, 1e-3],
        "observation_noise_r": [1e-4, 5e-4, 1e-3, 5e-3, 1e-2],
        "residual_z_entry": [0.5, 0.75, 1.0, 1.25, 1.5],
        "residual_z_exit": [0.05, 0.1, 0.15, 0.2, 0.3],
        "stop_loss_pct": [0.03, 0.05, 0.07, 0.10],
        "take_profit_pct": [0.05, 0.08, 0.10, 0.15, 0.20],
        "trailing_stop_pct": [0.015, 0.02, 0.025, 0.03],
        "position_size_pct": [0.10, 0.15, 0.20],
    }
    rng = np.random.default_rng(42)
    best_result = None
    best_params = None
    best_score = -np.inf
    all_results = []
    
    for trial in range(n_trials):
        config = AggressiveConfig(
            process_noise_q=rng.choice(param_grid["process_noise_q"]),
            observation_noise_r=rng.choice(param_grid["observation_noise_r"]),
            residual_z_entry=rng.choice(param_grid["residual_z_entry"]),
            residual_z_exit=rng.choice(param_grid["residual_z_exit"]),
            stop_loss_pct=rng.choice(param_grid["stop_loss_pct"]),
            take_profit_pct=rng.choice(param_grid["take_profit_pct"]),
            trailing_stop_pct=rng.choice(param_grid["trailing_stop_pct"]),
            position_size_pct=rng.choice(param_grid["position_size_pct"]),
        )
        result = run_aggressive_backtest(df, symbol, config)
        trial_data = {"trial": trial, "config": result.config, "return": result.total_return,
                      "sharpe": result.sharpe_ratio, "trades": result.num_trades}
        all_results.append(trial_data)
        if result.num_trades < 3:
            score = -np.inf
        elif result.max_drawdown > 0.35:
            score = -np.inf
        else:
            score = result.total_return * 0.6 + result.sharpe_ratio * 0.3 + result.win_rate * 0.1
        if score > best_score:
            best_score = score
            best_result = result
            best_params = result.config
            print(f"  [Trial {trial:2d}] NEW BEST: {result.total_return*100:+7.2f}%, "
                  f"Sharpe={result.sharpe_ratio:5.2f}, WR={result.win_rate*100:4.1f}%, "
                  f"Trades={result.num_trades}")
    
    return best_params, best_result, all_results

## Run Optimization on All Small-Caps

In [None]:
%%time

print("=" * 80)
print("SMALL-CAP KALMAN HYBRID AGGRESSIVE OPTIMIZER")
print(f"Target: >20% return on small-cap stocks (<$25)")
print(f"Trials per symbol: {N_TRIALS}")
print(f"Aggregation: {AGGREGATE_MINS}-minute bars")
print("=" * 80)

available = get_available_small_caps()
symbols = [s[0] for s in available]
print(f"\nFound {len(available)} small-cap symbols in Massive data")

symbol_results = {}
all_optimization_results = []

for sym in symbols:
    print(f"\n{'='*60}")
    print(f"[{sym}]")
    print("=" * 60)
    
    df = load_massive_symbol(sym, aggregate_mins=AGGREGATE_MINS)
    if df is None:
        print("  No data available")
        continue
    
    print(f"  Data: {len(df)} bars ({AGGREGATE_MINS}-min)")
    best_params, best_result, trials = run_optimization(df, sym, n_trials=N_TRIALS)
    
    if best_result and best_result.total_return > -1:
        symbol_results[sym] = best_result.to_dict()
        all_optimization_results.extend(trials)
        print(f"\n  BEST RESULT:")
        print(f"    Return: {best_result.total_return*100:+.2f}%")
        print(f"    Sharpe: {best_result.sharpe_ratio:.2f}")
        print(f"    MaxDD: {best_result.max_drawdown*100:.1f}%")
        print(f"    Win Rate: {best_result.win_rate*100:.1f}%")
        print(f"    Trades: {best_result.num_trades}")

## Summary & Analysis

In [None]:
if symbol_results:
    returns = [r["total_return"] for r in symbol_results.values()]
    avg_return = np.mean(returns)
    winners = len([r for r in returns if r > 0])
    high_return = len([r for r in returns if r > 0.20])
    
    print("=" * 80)
    print("OPTIMIZATION SUMMARY")
    print("=" * 80)
    print(f"Symbols tested: {len(symbol_results)}")
    print(f"Profitable: {winners}/{len(symbol_results)} ({100*winners/len(symbol_results):.1f}%)")
    print(f">20% return: {high_return}/{len(symbol_results)}")
    print(f"Average return: {avg_return*100:+.2f}%")
    
    sorted_results = sorted(symbol_results.items(), key=lambda x: x[1]["total_return"], reverse=True)
    print("\nTop 10 performers:")
    for sym, r in sorted_results[:10]:
        print(f"  {sym}: {r['total_return']*100:+.2f}%, Sharpe={r['sharpe_ratio']:.2f}, WR={r['win_rate']*100:.1f}%")

In [None]:
# Visualize results
if symbol_results:
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Bar chart of returns
    sorted_items = sorted(symbol_results.items(), key=lambda x: x[1]["total_return"], reverse=True)
    symbols_sorted = [x[0] for x in sorted_items]
    returns_sorted = [x[1]["total_return"] * 100 for x in sorted_items]
    colors = ['green' if r > 20 else 'blue' if r > 0 else 'red' for r in returns_sorted]
    
    axes[0].barh(symbols_sorted, returns_sorted, color=colors)
    axes[0].axvline(x=20, color='gold', linestyle='--', label='20% target')
    axes[0].set_xlabel('Return (%)')
    axes[0].set_title('Returns by Symbol')
    axes[0].legend()
    
    # Sharpe vs Return scatter
    sharpes = [r["sharpe_ratio"] for r in symbol_results.values()]
    returns_pct = [r["total_return"] * 100 for r in symbol_results.values()]
    axes[1].scatter(returns_pct, sharpes, c=returns_pct, cmap='RdYlGn', s=100)
    axes[1].axvline(x=20, color='gold', linestyle='--', label='20% target')
    axes[1].set_xlabel('Return (%)')
    axes[1].set_ylabel('Sharpe Ratio')
    axes[1].set_title('Risk-Adjusted Performance')
    
    plt.tight_layout()
    plt.show()

In [None]:
# Save results
if symbol_results:
    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    output_file = OUTPUT_DIR / f"small_cap_optimization_{timestamp}.json"
    
    with open(output_file, "w") as f:
        json.dump({
            "timestamp": timestamp,
            "n_trials": N_TRIALS,
            "aggregate_mins": AGGREGATE_MINS,
            "symbols": list(symbol_results.keys()),
            "results": symbol_results,
            "all_trials": all_optimization_results[:500],
            "summary": {
                "avg_return": avg_return,
                "profitable_pct": winners / len(symbol_results),
                "high_return_count": high_return,
            },
        }, f, indent=2, default=str)
    
    print(f"Results saved: {output_file}")