In [9]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tabulate import tabulate
from colorama import Fore, Style
from abc import ABC, abstractmethod
from typing import Callable, List, Optional, Dict, Tuple
from dataclasses import dataclass, field
import random

class SignalGenerator(ABC):
    @abstractmethod
    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        """
        data: OHLCV (and extra) up to t‑1
        returns: Series indexed like data of +1 (long), -1 (short), 0 (flat)
        """


class SimpleMAMomentum(SignalGenerator):
    """Long when price > its 24h SMA, exit when below 24h SMA"""
    def __init__(self, window: int = 24):
        self.window = window

    def generate_signals(self, data: pd.DataFrame) -> pd.Series:
        close = data['close']
        sma = close.rolling(self.window).mean()
        signal = pd.Series(0, index=data.index)
        signal[close > sma] = 1
        signal[close < sma] = -1
        return signal


class EnhancedMACDRSIStrategy(SignalGenerator):
    """Refined MACD+RSI strategy: includes trend filtering and volatility suppression"""
    def __init__(self,
                 macd_fast:int=12, macd_slow:int=26, macd_signal:int=9,
                 rsi_period:int=14, rsi_oversold:float=30, rsi_overbought:float=70,
                 trend_ema:int=100, min_volatility:float=0.005):
        self.fast = macd_fast
        self.slow = macd_slow
        self.signal = macd_signal
        self.rsi_period = rsi_period
        self.rsi_over = rsi_overbought
        self.rsi_under = rsi_oversold
        self.trend_ema = trend_ema
        self.min_vol = min_volatility

    def generate_signals(self, data:pd.DataFrame) -> pd.Series:
        df = data.copy()
        close = df['close']
        # MACD: fast EMA, slow EMA
        ema_fast = close.ewm(span=self.fast, adjust=False).mean()
        ema_slow = close.ewm(span=self.slow, adjust=False).mean()
        macd_line = ema_fast - ema_slow
        macd_signal = macd_line.ewm(span=self.signal, adjust=False).mean()
        # RSI: standard Wilder's RSI
        delta = close.diff()
        up = delta.clip(lower=0)
        down = -delta.clip(upper=0)
        ema_up = up.ewm(alpha=1/self.rsi_period, adjust=False).mean()
        ema_down = down.ewm(alpha=1/self.rsi_period, adjust=False).mean()
        rs = ema_up / ema_down
        rsi = 100 - (100 / (1 + rs))

        sig = pd.Series(0, index=df.index)

        # Volatility filter (relative daily range)
        atr = (df['high'] - df['low']) / df['close'].shift(1)
        low_vol = atr < self.min_vol

        # Trend filter (price must be above long-term EMA)
        trend = df['close'] > df['close'].ewm(span=self.trend_ema, adjust=False).mean()

        # bullish MACD crossover + RSI oversold + volatility/trend filter
        buy = (
            (macd_line > macd_signal) &
            (macd_line.shift() <= macd_signal.shift()) &
            (rsi < self.rsi_under) &
            trend &
            (~low_vol)
        )
        sell = (
            (macd_line < macd_signal) &
            (macd_line.shift() >= macd_signal.shift()) &
            (rsi > self.rsi_over)
        )

        sig[buy] = 1
        sig[sell] = -1
        return sig

In [33]:
import numpy as np
import pandas as pd
from tabulate import tabulate
from dataclasses import dataclass
from typing import Callable, List, Optional, Dict
import matplotlib.pyplot as plt

@dataclass
class BacktestConfig:
    start_equity: float = 10_000.0
    risk_per_trade: float = 0.01
    leverage: float = 1.0
    stop_loss: float = 0.02
    take_profit: float = 0.05
    entry_margin_ratio: float = 0.5
    maintenance_margin_ratio: float = 0.25
    min_qty: float = 0.001
    min_hold_bars: int = 1
    freq_hours: int = 1
    fill_threshold: float = 0.05
    spread: float = 0.0002
    cost_tiers: Optional[Dict[float, float]] = None

    def __post_init__(self):
        assert 0 < self.freq_hours <= 24, "freq_hours must be between 1 and 24"
        assert 0 <= self.fill_threshold < 1,  "fill_threshold must be in [0,1)"
        assert 0 < self.maintenance_margin_ratio < self.entry_margin_ratio <= 1.0, \
               "Require 0 < maintenance_margin_ratio < entry_margin_ratio <= 1"

class CostModel:
    def __init__(self, spread: float, tiers: Optional[Dict[float, float]] = None):
        self.base_spread = spread
        self.tiers = tiers or {}

    def cost_pct(self, price: float, qty: float) -> float:
        notional = price * qty
        for threshold, pct in sorted(self.tiers.items(), key=lambda x: x[0], reverse=True):
            if notional >= threshold:
                return pct
        return self.base_spread

    def adjust_price(self, price: float, side: int, qty: float) -> float:
        """Apply slippage/spread to the given price."""
        pct = self.cost_pct(price, qty)
        return price * (1 + side * pct)

@dataclass
class Position:
    entry_bar: int
    entry_price: float
    qty: float
    side: int
    entry_fee_pct: float         # renamed from entry_cost
    locked_margin: float
    exit_bar: Optional[int] = None
    exit_price: float = 0.0
    exit_cost: float = 0.0
    pnl: float = 0.0
    exit_reason: Optional[str] = None

class PositionManager:
    def __init__(self, cfg: BacktestConfig, cost_model: CostModel, data: pd.DataFrame):
        self.cfg = cfg
        self.cm = cost_model
        self.data = data
        self.position: Optional[Position] = None
        self.trades: List[Position] = []
        self.reserved_cash: float = 0.0
        self._last_entry_cost: float = 0.0

    def _enter(self, i: int, sig: int, equity: float) -> None:
        o = self.data['open'].iat[i]
        available = equity - self.reserved_cash
        raw_qty = available * self.cfg.risk_per_trade / (self.cfg.stop_loss * o)
        qty = min(raw_qty * self.cfg.leverage, available * self.cfg.leverage / o)
        qty = min(qty, self.data['volume'].iat[i] * self.cfg.fill_threshold)
        qty = np.floor(qty / self.cfg.min_qty) * self.cfg.min_qty
        if qty <= 0:
            return

        # 2) compute fee pct on mid price BEFORE slippage
        fee_pct = self.cm.cost_pct(o, qty)

        # then adjust price by that pct
        price = o * (1 + sig * fee_pct)

        entry_fee = fee_pct * qty * price
        locked = qty * price * self.cfg.entry_margin_ratio

        self.reserved_cash += locked
        self.position = Position(i, price, qty, sig, fee_pct, locked)
        self._last_entry_cost = locked + entry_fee

    def _exit(self, pos: Position, i: int, fill_price: float, cost_pct: float, reason: str) -> None:
        entry_fee = pos.entry_fee_pct * pos.qty * pos.entry_price
        exit_fee  = cost_pct       * pos.qty * fill_price
        pnl = pos.side * pos.qty * (fill_price - pos.entry_price) - entry_fee - exit_fee

        self.reserved_cash -= pos.locked_margin
        pos.exit_bar    = i
        pos.exit_price  = fill_price
        pos.exit_cost   = cost_pct
        pos.pnl         = pnl
        pos.exit_reason = reason
        self.trades.append(pos)

    def on_bar(self, i: int, sig: int, equity: float) -> float:
        self._last_entry_cost = 0.0
        cash_change = 0.0

        # 1) Margin-call exit
        if self.position:
            available = equity - self.reserved_cash
            maint_req = (self.position.qty *
                         self.position.entry_price *
                         self.cfg.maintenance_margin_ratio)
            if available < maint_req:
                fill = self.cm.adjust_price(self.data['close'].iat[i],
                                            -self.position.side,
                                            self.position.qty)
                pct  = self.cm.cost_pct(fill, self.position.qty)
                self._exit(self.position, i, fill, pct, 'MarginCall')
                cash_change += self.trades[-1].locked_margin + self.trades[-1].pnl
                self.position = None
                return cash_change

        # 2) No position → entry
        if self.position is None:
            if sig != 0:
                self._enter(i, sig, equity)
                cash_change -= self._last_entry_cost
            return cash_change

        # 3) Enforce minimum hold
        if i - self.position.entry_bar < self.cfg.min_hold_bars:
            return cash_change

        # 4) Intra-bar exit checks (stop-loss first, then take-profit)
        pos = self.position
        o, h, l = (self.data[col].iat[i] for col in ['open', 'high', 'low'])
        slp = pos.entry_price * (1 - pos.side * self.cfg.stop_loss)
        tpp = pos.entry_price * (1 + pos.side * self.cfg.take_profit)

        if (pos.side > 0 and l <= slp) or (pos.side < 0 and h >= slp):
            reason, target = 'SL', slp
        elif (pos.side > 0 and h >= tpp) or (pos.side < 0 and l <= tpp):
            reason, target = 'TP', tpp
        elif sig == 0 or np.sign(sig) != pos.side:
            reason, target = 'Signal', o
        else:
            return cash_change

        # exit here
        fill = self.cm.adjust_price(target, -pos.side, pos.qty)
        pct  = self.cm.cost_pct(fill, pos.qty)
        self._exit(pos, i, fill, pct, reason)
        cash_change += pos.locked_margin + pos.pnl
        self.position = None

        # 5) allow immediate re‑entry on same bar
        if sig != 0:
            self._enter(i, sig, equity + cash_change)  # equity now includes pnl
            cash_change -= self._last_entry_cost

        return cash_change

class MetricsTracker:
    def __init__(self, cfg: BacktestConfig):
        self.cfg = cfg
        self.times: List[pd.Timestamp] = []
        self.eq:    List[float]      = []

    def record(self, time: pd.Timestamp, equity: float):
        self.times.append(time)
        self.eq.append(equity)

    def compute(self, trades: List[Position]) -> Dict[str, float]:
        # Build equity series and bar returns
        eqs = pd.Series(self.eq, index=self.times)
        rtn = eqs.pct_change().dropna()

        # Optionally focus bar‐level Sharpe on in‐trade bars:
        in_trade_idx = set()
        for t in trades:
            # bars from entry_bar+1 up to & including exit_bar
            in_trade_idx.update(range(t.entry_bar + 1, t.exit_bar + 1))
        rtn_in = rtn.iloc[sorted(in_trade_idx)] if in_trade_idx else rtn

        ann = np.sqrt(365 * 24 / self.cfg.freq_hours)
        total_return = eqs.iloc[-1] / eqs.iloc[0] - 1
        years = len(eqs) / (365 * 24 / self.cfg.freq_hours)
        cagr = (1 + total_return) ** (1/years) - 1
        vol   = rtn_in.std() * ann
        sharpe = (rtn_in.mean() / rtn_in.std() * ann) if vol > 0 else np.nan
        sortino = (rtn_in.mean() / rtn_in[rtn_in < 0].std() * ann) if any(rtn_in < 0) else np.nan

        # Drawdown
        cum = (1 + rtn).cumprod()
        dd = cum / cum.cummax() - 1
        max_dd = dd.min()
        calmar = cagr / abs(max_dd) if max_dd < 0 else np.nan

        # Trade‐level stats
        wins  = [t.pnl for t in trades if t.pnl > 0]
        losses= [t.pnl for t in trades if t.pnl < 0]

        # Trade‐level Sharpe
        trade_returns = []
        for t in trades:
            # locked_margin is the initial margin posted
            entry_cap = t.locked_margin
            exit_cap  = t.locked_margin + t.pnl
            trade_returns.append(exit_cap/entry_cap - 1)
        if len(trade_returns) > 1:
            tr_mean = np.mean(trade_returns)
            tr_std  = np.std(trade_returns, ddof=1)
            trade_sharpe = tr_mean / tr_std
        else:
            trade_sharpe = np.nan

        stats = {
            'TotalReturn': total_return,
            'CAGR':         cagr,
            'Sharpe':       sharpe,
            'Sortino':      sortino,
            'Volatility':   vol,
            'MaxDrawdown':  max_dd,
            'Calmar':       calmar,
            'WinRate':      len(wins)/len(trades) if trades else np.nan,
            'AvgWin':       np.mean(wins)  if wins  else np.nan,
            'AvgLoss':      np.mean(losses) if losses else np.nan,
            'ProfitFactor': sum(wins)/abs(sum(losses)) if losses else np.nan,
            'TradeSharpe':  trade_sharpe
        }

        # Count exits by reason
        reasons = pd.Series([t.exit_reason for t in trades])
        for r in ['SL','TP','Signal','Forced','MarginCall']:
            stats[f'{r}_Exits'] = (reasons == r).sum()

        return stats

class Backtester:
    def __init__(self, data: pd.DataFrame, strategy: Callable, cfg: BacktestConfig):
        self.df = data.resample(f'{cfg.freq_hours}h').agg(
            {'open': 'first', 'high': 'max', 'low': 'min',
             'close': 'last',  'volume': 'sum'}
        ).dropna()
        self.cfg = cfg
        cm = CostModel(cfg.spread, cfg.cost_tiers)
        self.pm = PositionManager(cfg, cm, self.df)
        self.mt = MetricsTracker(cfg)
        self.strategy = strategy
        self.equity_curve: List[float] = []

    def position_unreal(self, i: int) -> float:
        pos = self.pm.position
        return pos.side * pos.qty * (self.df['close'].iat[i] - pos.entry_price)

    def _simulate(self):
        cash = self.cfg.start_equity
        self.equity_curve = []
        # record initial equity including reserved_cash=0
        init_equity = cash + self.pm.reserved_cash
        self.equity_curve.append(init_equity)
        self.mt.times, self.mt.eq = [], []
        self.mt.record(self.df.index[0], init_equity)

        sig = np.sign(
            self.strategy(self.df)
                .shift(1)
                .fillna(0)
                .clip(-1, 1)
        )

        for i in range(1, len(self.df)):
            # unrealized PnL before bar
            unreal_pre = self.position_unreal(i) if self.pm.position else 0.0
            equity_pre = cash + self.pm.reserved_cash + unreal_pre

            # process entry/exit, get cash delta
            delta = self.pm.on_bar(i, int(sig.iat[i]), equity_pre)
            cash += delta

            # unrealized PnL after bar
            unreal_post = self.position_unreal(i) if self.pm.position else 0.0
            # true equity includes reserved margin
            equity_post = cash + self.pm.reserved_cash + unreal_post

            # record
            self.equity_curve.append(equity_post)
            self.mt.record(self.df.index[i], equity_post)

    def _force_close(self):
        if not self.pm.position:
            return

        i   = len(self.df) - 1
        pos = self.pm.position

        fill = self.pm.cm.adjust_price(
            self.df['close'].iat[i],
            -pos.side,
            pos.qty
        )
        pct = self.pm.cm.cost_pct(fill, pos.qty)

        entry_fee = pos.entry_fee_pct * pos.qty * pos.entry_price
        exit_fee  = pct             * pos.qty * fill
        pnl       = pos.side * pos.qty * (fill - pos.entry_price) - entry_fee - exit_fee

        # release margin
        self.pm.reserved_cash -= pos.locked_margin

        # compute final equity
        last_equity = self.equity_curve[-1] + pos.locked_margin + pnl

        # append and record (instead of mutating)
        self.equity_curve.append(last_equity)
        self.mt.record(self.df.index[i], last_equity)

        # finalize position
        pos.exit_bar    = i
        pos.exit_price  = fill
        pos.exit_cost   = pct
        pos.pnl         = pnl
        pos.exit_reason = 'Forced'
        self.pm.trades.append(pos)
        self.pm.position = None

    def run(self) -> Dict[str, float]:
        self._simulate()
        self._force_close()
        stats = self.mt.compute(self.pm.trades)
        print(tabulate(
            [[k, f"{v:.4f}"] for k, v in stats.items()],
            headers=['Metric','Value'],
            tablefmt='fancy_grid'
        ))
        return stats




In [35]:
import pandas as pd

df = pd.read_csv("ohlcv.csv", dtype={"start_time": int})
df["start_time"] = pd.to_datetime(df["start_time"], unit="ms")
df.set_index("start_time", inplace=True)
df.sort_index(inplace=True)

# ensure lowercase column names
df.columns = df.columns.str.lower()

# verify required OHLCV columns exist
required = {"open", "high", "low", "close", "volume"}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns: {missing}")

# 2) Configure backtest
cfg = BacktestConfig(
    start_equity=10_000,
    risk_per_trade=0.01,
    leverage=1.0,
    stop_loss=0.02,
    take_profit=0.05,
    entry_margin_ratio=0.5,
    maintenance_margin_ratio=0.25,
    freq_hours=1,
    fill_threshold=0.05,
    spread=0.0002,
    cost_tiers=None
)

# 3) Run SMA‑momentum strategy
sma_strat = SimpleMAMomentum(window=24)
bt_sma = Backtester(df, sma_strat.generate_signals, cfg)
print("=== SMA Momentum Results ===")
res_sma = bt_sma.run()

# 4) Run MACD+RSI strategy
macd_strat = EnhancedMACDRSIStrategy()
bt_macd = Backtester(df, macd_strat.generate_signals, cfg)
print("\n=== MACD+RSI Results ===")
res_macd = bt_macd.run()

=== SMA Momentum Results ===
╒══════════════════╤══════════╕
│ Metric           │    Value │
╞══════════════════╪══════════╡
│ TotalReturn      │  -0.9655 │
├──────────────────┼──────────┤
│ CAGR             │  -1      │
├──────────────────┼──────────┤
│ Sharpe           │   9.0415 │
├──────────────────┼──────────┤
│ Sortino          │  10.7026 │
├──────────────────┼──────────┤
│ Volatility       │  19.3922 │
├──────────────────┼──────────┤
│ MaxDrawdown      │  -0.9841 │
├──────────────────┼──────────┤
│ Calmar           │  -1.0161 │
├──────────────────┼──────────┤
│ WinRate          │   0.449  │
├──────────────────┼──────────┤
│ AvgWin           │  23.7176 │
├──────────────────┼──────────┤
│ AvgLoss          │ -34.4403 │
├──────────────────┼──────────┤
│ ProfitFactor     │   0.5612 │
├──────────────────┼──────────┤
│ TradeSharpe      │   0.0031 │
├──────────────────┼──────────┤
│ SL_Exits         │ 608      │
├──────────────────┼──────────┤
│ TP_Exits         │ 515      │
├──────────

  trade_sharpe = tr_mean / tr_std
