# Mean Reversion Trading Strategies

This notebook covers mean reversion strategies that profit from price movements back to average:
- Bollinger Bands mean reversion
- Z-score trading
- RSI oversold/overbought
- Pairs trading basics
- Statistical arbitrage

Mean reversion assumes that prices tend to revert to their historical average over time.

In [None]:
# Setup
import sys
import os
sys.path.append(os.path.dirname(os.getcwd()))

import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

# Import our standardized engine with interactive plotting
from engine.strategy_base import Strategy, Bar, register_strategy
from engine.context import Context
from engine.data import load_klines_csv
from engine.backtest import run_strategy_suite, StrategyOptimizer, StrategyComparison
from engine.plots import TradingPlots

# Initialize interactive plotting
plotter = TradingPlots(theme="plotly_white", width=1000, height=650)

print("✅ Standardized Qupy engine with interactive plotting loaded!")
print("📊 All mean reversion visualizations will be interactive")

In [None]:
## 1. Bollinger Bands Mean Reversion Strategy Implementation

@register_strategy("bollinger_mean_reversion")
class BollingerMeanReversionStrategy(Strategy):
    """
    🔄 Bollinger Bands Mean Reversion Strategy
    
    🔧 HYPERPARAMETERS TO CUSTOMIZE:
    
    • bb_period (10-50): Bollinger Bands calculation period
      - Lower values = more sensitive to recent price action, more signals
      - Higher values = smoother bands, fewer but stronger signals
      - Try: 15, 20, 25, 30
      
    • bb_std (1.0-3.0): Number of standard deviations for bands
      - Lower values = tighter bands, more signals
      - Higher values = wider bands, fewer false signals
      - Try: 1.5, 2.0, 2.5
      
    • entry_threshold (0.01-0.2): %B threshold for entries
      - Lower values = closer to bands, more aggressive entries
      - Higher values = more conservative, fewer signals
      - Try: 0.02, 0.05, 0.10
      
    • notional (100-50000): Position size in quote currency
      - Higher values = larger positions, more risk/reward
      - Try: 5000, 10000, 25000
      
    • time_stop (5-50): Maximum bars to hold position
      - Lower values = quick exits, less exposure
      - Higher values = allow more time for mean reversion
      - Try: 20, 30, 40
    
    📊 STRATEGY LOGIC:
    - BUY: %B < entry_threshold (price near lower band = oversold)
    - SELL: %B > (1 - entry_threshold) (price near upper band = overbought)
    - EXIT: Price returns to middle area OR time stop reached
    """
    
    name = "bollinger_mean_reversion"
    
    @classmethod
    def param_schema(cls):
        return {
            "bb_period": {"type": "int", "min": 10, "max": 50, "default": 20},
            "bb_std": {"type": "float", "min": 1.0, "max": 3.0, "default": 2.0},
            "entry_threshold": {"type": "float", "min": 0.01, "max": 0.2, "default": 0.05},
            "exit_threshold": {"type": "float", "min": 0.3, "max": 0.7, "default": 0.5},
            "notional": {"type": "float", "min": 100, "default": 10_000.0},
            "time_stop": {"type": "int", "min": 5, "default": 30}
        }
    
    def on_init(self, context):
        self.bb_period = int(self.params.get("bb_period", 20))
        self.bb_std = float(self.params.get("bb_std", 2.0))
        self.entry_threshold = float(self.params.get("entry_threshold", 0.05))
        self.exit_threshold = float(self.params.get("exit_threshold", 0.5))
        self.notional = float(self.params.get("notional", 10_000.0))
        self.time_stop = int(self.params.get("time_stop", 30))
        
        self.bars_held = 0
        
        context.log("info", f"Bollinger Mean Reversion initialized: period={self.bb_period}, "
                           f"std={self.bb_std}, entry_threshold={self.entry_threshold}")
    
    def calculate_bollinger_bands(self, closes: list) -> dict:
        """Calculate Bollinger Bands and %B."""
        if len(closes) < self.bb_period:
            return {"upper": 0, "middle": 0, "lower": 0, "pctb": 0.5}
        
        recent_closes = closes[-self.bb_period:]
        sma = np.mean(recent_closes)
        std = np.std(recent_closes)
        
        upper = sma + (self.bb_std * std)
        lower = sma - (self.bb_std * std)
        
        # %B calculation - where price is relative to bands
        current_price = closes[-1]
        if upper != lower:
            pctb = (current_price - lower) / (upper - lower)
        else:
            pctb = 0.5
        
        return {
            "upper": upper,
            "middle": sma,
            "lower": lower,
            "pctb": pctb
        }
    
    def on_bar(self, context, symbol: str, bar: Bar):
        # Need enough history for BB calculation
        if context.bar_index < self.bb_period:
            return
        
        closes = context.data.history(symbol, "close", self.bb_period)
        if len(closes) < self.bb_period:
            return
        
        bb = self.calculate_bollinger_bands(closes)
        current_pos = context.position.qty
        
        # Exit logic first (if we have a position)
        if current_pos != 0:
            self.bars_held += 1
            
            exit_signal = False
            exit_reason = None
            
            # Exit when price returns to middle area (mean reversion achieved)
            if self.exit_threshold * 0.8 <= bb["pctb"] <= self.exit_threshold * 1.2:
                exit_signal = True
                exit_reason = "Exit:Return_To_Middle"
            elif self.bars_held >= self.time_stop:
                exit_signal = True
                exit_reason = "Exit:Time_Stop"
            
            if exit_signal:
                context.close(reason=exit_reason)
                context.log("info", f"🔴 {exit_reason}: %B={bb['pctb']:.3f}, bars_held={self.bars_held}")
                self.bars_held = 0
                return
        
        # Entry logic (only if no position)
        if current_pos == 0:
            # Buy when oversold (price near lower band)
            if bb["pctb"] < self.entry_threshold:
                qty = context.size.from_notional(self.notional, bar.close)
                context.buy(qty, reason="Entry:Oversold", size_mode="qty")
                context.tag_trade("BB_MeanReversion_Long")
                
                context.log("info", f"🟢 LONG (oversold): %B={bb['pctb']:.3f}, price=${bar.close:.4f}")
                
            # Sell when overbought (price near upper band) 
            elif bb["pctb"] > (1 - self.entry_threshold):
                qty = context.size.from_notional(self.notional, bar.close)
                context.sell(qty, reason="Entry:Overbought", size_mode="qty")
                context.tag_trade("BB_MeanReversion_Short")
                
                context.log("info", f"🔴 SHORT (overbought): %B={bb['pctb']:.3f}, price=${bar.close:.4f}")
        
        # Record metrics for analysis
        context.record("bb_pctb", bb["pctb"])
        context.record("bb_upper", bb["upper"])
        context.record("bb_lower", bb["lower"])
        context.record("position", 1 if current_pos > 0 else -1 if current_pos < 0 else 0)
    
    def on_trade_open(self, context, trade):
        self.bars_held = 0
    
    def on_trade_close(self, context, trade):
        """Called when a trade closes"""
        if hasattr(trade, 'pnl_abs') and trade.pnl_abs is not None:
            context.log("info", f"💰 Trade closed: PnL=${trade.pnl_abs:.2f}")
            context.record("trade_pnl", trade.pnl_abs)

print("✅ Bollinger Bands Mean Reversion Strategy defined!")
print("📝 Key parameters to experiment with:")
print("   • bb_period: 15-30 (sensitivity)")  
print("   • bb_std: 1.5-2.5 (band width)")
print("   • entry_threshold: 0.02-0.10 (aggressiveness)")

In [None]:
## 2. Z-Score Mean Reversion Strategy Implementation

@register_strategy("zscore_mean_reversion")
class ZScoreMeanReversionStrategy(Strategy):
    """
    📊 Z-Score Mean Reversion Strategy
    
    🔧 HYPERPARAMETERS TO CUSTOMIZE:
    
    • lookback (20-200): Period for calculating mean and std
      - Lower values = more sensitive to recent changes
      - Higher values = more stable statistical baseline
      - Try: 30, 50, 75, 100
      
    • entry_zscore (1.5-3.0): Z-score threshold for entries
      - Lower values = more trades, earlier entries
      - Higher values = fewer trades, extreme deviations only
      - Try: 1.8, 2.0, 2.2, 2.5
      
    • exit_zscore (0.2-1.0): Z-score threshold for exits
      - Lower values = quick exits, less patience for reversion
      - Higher values = wait for stronger mean reversion
      - Try: 0.3, 0.5, 0.7
      
    • notional (100-50000): Position size in quote currency
      - Try: 5000, 10000, 25000
      
    • time_stop (10-60): Maximum bars to hold position
      - Try: 25, 40, 50
    
    📊 STRATEGY LOGIC:
    - BUY: Z-score < -entry_zscore (extremely oversold)
    - SELL: Z-score > +entry_zscore (extremely overbought)  
    - EXIT: |Z-score| < exit_zscore OR time stop reached
    - Uses rolling statistical normalization for robust signals
    """
    
    name = "zscore_mean_reversion"
    
    @classmethod
    def param_schema(cls):
        return {
            "lookback": {"type": "int", "min": 20, "max": 200, "default": 50},
            "entry_zscore": {"type": "float", "min": 1.5, "max": 3.0, "default": 2.0},
            "exit_zscore": {"type": "float", "min": 0.2, "max": 1.0, "default": 0.5},
            "notional": {"type": "float", "min": 100, "default": 10_000.0},
            "time_stop": {"type": "int", "min": 10, "default": 40}
        }
    
    def on_init(self, context):
        self.lookback = int(self.params.get("lookback", 50))
        self.entry_zscore = float(self.params.get("entry_zscore", 2.0))
        self.exit_zscore = float(self.params.get("exit_zscore", 0.5))
        self.notional = float(self.params.get("notional", 10_000.0))
        self.time_stop = int(self.params.get("time_stop", 40))
        
        self.bars_held = 0
        
        context.log("info", f"Z-Score Mean Reversion initialized: lookback={self.lookback}, "
                           f"entry_z={self.entry_zscore}, exit_z={self.exit_zscore}")
    
    def calculate_zscore(self, closes: list) -> float:
        """Calculate Z-score for the most recent price."""
        if len(closes) < self.lookback:
            return 0.0
        
        recent_closes = closes[-self.lookback:]
        mean_price = np.mean(recent_closes)
        std_price = np.std(recent_closes)
        
        if std_price == 0:
            return 0.0
        
        current_price = closes[-1]
        zscore = (current_price - mean_price) / std_price
        return zscore
    
    def on_bar(self, context, symbol: str, bar: Bar):
        # Need enough history for Z-score calculation
        if context.bar_index < self.lookback:
            return
        
        closes = context.data.history(symbol, "close", self.lookback)
        if len(closes) < self.lookback:
            return
        
        zscore = self.calculate_zscore(closes)
        current_pos = context.position.qty
        
        # Exit logic first (if we have a position)
        if current_pos != 0:
            self.bars_held += 1
            
            exit_signal = False
            exit_reason = None
            
            # Exit when Z-score returns to normal range (mean reversion achieved)
            if abs(zscore) < self.exit_zscore:
                exit_signal = True
                exit_reason = "Exit:ZScore_Normalized"
            elif self.bars_held >= self.time_stop:
                exit_signal = True
                exit_reason = "Exit:Time_Stop"
            
            if exit_signal:
                context.close(reason=exit_reason)
                context.log("info", f"🔴 {exit_reason}: Z-score={zscore:.3f}, bars_held={self.bars_held}")
                self.bars_held = 0
                return
        
        # Entry logic (only if no position)
        if current_pos == 0:
            if zscore < -self.entry_zscore:
                # Extremely oversold - buy
                qty = context.size.from_notional(self.notional, bar.close)
                context.buy(qty, reason="Entry:Extremely_Oversold", size_mode="qty")
                context.tag_trade("ZScore_MeanReversion_Long")
                
                context.log("info", f"🟢 LONG (oversold): Z-score={zscore:.3f}, price=${bar.close:.4f}")
                
            elif zscore > self.entry_zscore:
                # Extremely overbought - sell
                qty = context.size.from_notional(self.notional, bar.close)
                context.sell(qty, reason="Entry:Extremely_Overbought", size_mode="qty")
                context.tag_trade("ZScore_MeanReversion_Short")
                
                context.log("info", f"🔴 SHORT (overbought): Z-score={zscore:.3f}, price=${bar.close:.4f}")
        
        # Record metrics for analysis
        context.record("zscore", zscore)
        context.record("position", 1 if current_pos > 0 else -1 if current_pos < 0 else 0)
    
    def on_trade_open(self, context, trade):
        self.bars_held = 0
    
    def on_trade_close(self, context, trade):
        if hasattr(trade, 'pnl_abs') and trade.pnl_abs is not None:
            context.log("info", f"💰 Trade closed: PnL=${trade.pnl_abs:.2f}")
            context.record("trade_pnl", trade.pnl_abs)

print("✅ Z-Score Mean Reversion Strategy defined!")
print("📝 Key parameters to experiment with:")
print("   • lookback: 30-100 (statistical window)")
print("   • entry_zscore: 1.8-2.5 (extremeness threshold)")
print("   • exit_zscore: 0.3-0.7 (normalization threshold)")

In [None]:
## 3. RSI Mean Reversion Strategy Implementation

@register_strategy("rsi_mean_reversion")
class RSIMeanReversionStrategy(Strategy):
    """
    📈 RSI Mean Reversion Strategy
    
    🔧 HYPERPARAMETERS TO CUSTOMIZE:
    
    • rsi_period (5-30): RSI calculation period
      - Lower values = more sensitive, more signals
      - Higher values = smoother, fewer false signals
      - Try: 10, 14, 18, 21
      
    • oversold (15-35): RSI level considered oversold
      - Lower values = more extreme oversold condition required
      - Higher values = earlier oversold entries
      - Try: 25, 30, 35
      
    • overbought (65-85): RSI level considered overbought
      - Lower values = earlier overbought entries  
      - Higher values = more extreme overbought required
      - Try: 70, 75, 80
      
    • exit_neutral (45-55): RSI level for neutral exits
      - Near 50 = quick exits when RSI normalizes
      - Try: 48, 50, 52
      
    • volume_filter (True/False): Require volume confirmation
      - True = more reliable signals, fewer trades
      - False = more trades, potentially more false signals
      
    • time_stop (10-40): Maximum bars to hold position
      - Try: 20, 25, 30
    
    📊 STRATEGY LOGIC:
    - BUY: RSI < oversold (with optional volume confirmation)
    - SELL: RSI > overbought (with optional volume confirmation)
    - EXIT: RSI returns to neutral zone OR time stop reached
    """
    
    name = "rsi_mean_reversion"
    
    @classmethod
    def param_schema(cls):
        return {
            "rsi_period": {"type": "int", "min": 5, "max": 30, "default": 14},
            "oversold": {"type": "float", "min": 15, "max": 35, "default": 30},
            "overbought": {"type": "float", "min": 65, "max": 85, "default": 70},
            "exit_neutral": {"type": "float", "min": 45, "max": 55, "default": 50},
            "notional": {"type": "float", "min": 100, "default": 10_000.0},
            "time_stop": {"type": "int", "min": 10, "default": 25},
            "volume_filter": {"type": "bool", "default": True}
        }
    
    def on_init(self, context):
        self.rsi_period = int(self.params.get("rsi_period", 14))
        self.oversold = float(self.params.get("oversold", 30))
        self.overbought = float(self.params.get("overbought", 70))
        self.exit_neutral = float(self.params.get("exit_neutral", 50))
        self.notional = float(self.params.get("notional", 10_000.0))
        self.time_stop = int(self.params.get("time_stop", 25))
        self.volume_filter = bool(self.params.get("volume_filter", True))
        
        self.bars_held = 0
        
        context.log("info", f"RSI Mean Reversion initialized: RSI period={self.rsi_period}, "
                           f"oversold={self.oversold}, overbought={self.overbought}, "
                           f"volume_filter={self.volume_filter}")
    
    def calculate_rsi(self, closes: list) -> float:
        """Calculate RSI using traditional method."""
        if len(closes) < self.rsi_period + 1:
            return 50.0  # Neutral RSI
        
        deltas = np.diff(closes)
        gains = np.where(deltas > 0, deltas, 0)
        losses = np.where(deltas < 0, -deltas, 0)
        
        # Calculate average gains and losses
        avg_gain = np.mean(gains[-self.rsi_period:])
        avg_loss = np.mean(losses[-self.rsi_period:])
        
        # Handle edge cases
        if avg_loss == 0:
            return 100.0
        if avg_gain == 0:
            return 0.0
        
        rs = avg_gain / avg_loss
        rsi = 100 - (100 / (1 + rs))
        return rsi
    
    def check_volume_confirmation(self, context, symbol: str) -> bool:
        """Check if current volume confirms the signal."""
        if not self.volume_filter:
            return True
        
        if context.bar_index < 20:
            return True
        
        volumes = context.data.history(symbol, "volume", 20)
        if len(volumes) < 20:
            return True
        
        current_volume = volumes[-1]
        avg_volume = np.mean(volumes[:-1])
        
        # Require above-average volume for confirmation
        return current_volume > avg_volume * 1.2
    
    def on_bar(self, context, symbol: str, bar: Bar):
        # Need enough history for RSI calculation
        if context.bar_index < self.rsi_period + 1:
            return
        
        closes = context.data.history(symbol, "close", self.rsi_period + 1)
        if len(closes) < self.rsi_period + 1:
            return
        
        rsi = self.calculate_rsi(closes)
        volume_confirmed = self.check_volume_confirmation(context, symbol)
        current_pos = context.position.qty
        
        # Exit logic first (if we have a position)
        if current_pos != 0:
            self.bars_held += 1
            
            exit_signal = False
            exit_reason = None
            
            # Exit based on position direction and RSI returning to neutral
            if current_pos > 0 and rsi > self.exit_neutral:
                exit_signal = True
                exit_reason = "Exit:RSI_Neutral_Long"
            elif current_pos < 0 and rsi < self.exit_neutral:
                exit_signal = True
                exit_reason = "Exit:RSI_Neutral_Short"
            elif self.bars_held >= self.time_stop:
                exit_signal = True
                exit_reason = "Exit:Time_Stop"
            
            if exit_signal:
                context.close(reason=exit_reason)
                context.log("info", f"🔴 {exit_reason}: RSI={rsi:.2f}, bars_held={self.bars_held}")
                self.bars_held = 0
                return
        
        # Entry logic (only if no position and volume confirmed)
        if current_pos == 0 and volume_confirmed:
            if rsi < self.oversold:
                # Buy oversold condition
                qty = context.size.from_notional(self.notional, bar.close)
                context.buy(qty, reason="Entry:RSI_Oversold", size_mode="qty")
                context.tag_trade("RSI_MeanReversion_Long")
                
                context.log("info", f"🟢 LONG (oversold): RSI={rsi:.2f}, price=${bar.close:.4f}")
                
            elif rsi > self.overbought:
                # Sell overbought condition
                qty = context.size.from_notional(self.notional, bar.close)
                context.sell(qty, reason="Entry:RSI_Overbought", size_mode="qty")
                context.tag_trade("RSI_MeanReversion_Short")
                
                context.log("info", f"🔴 SHORT (overbought): RSI={rsi:.2f}, price=${bar.close:.4f}")
        
        # Record metrics for analysis
        context.record("rsi", rsi)
        context.record("volume_confirmed", volume_confirmed)
        context.record("position", 1 if current_pos > 0 else -1 if current_pos < 0 else 0)
    
    def on_trade_open(self, context, trade):
        self.bars_held = 0
    
    def on_trade_close(self, context, trade):
        if hasattr(trade, 'pnl_abs') and trade.pnl_abs is not None:
            context.log("info", f"💰 Trade closed: PnL=${trade.pnl_abs:.2f}")
            context.record("trade_pnl", trade.pnl_abs)

print("✅ RSI Mean Reversion Strategy defined!")
print("📝 Key parameters to experiment with:")
print("   • rsi_period: 10-21 (sensitivity)")
print("   • oversold/overbought: 25-35/70-80 (thresholds)")
print("   • volume_filter: True/False (signal quality vs quantity)")

In [None]:
## 4. Combined Mean Reversion Strategy Implementation

@register_strategy("combined_mean_reversion")
class CombinedMeanReversionStrategy(Strategy):
    """
    🎯 Combined Mean Reversion Strategy (Ensemble Approach)
    
    Uses ensemble voting from multiple mean reversion indicators:
    - Bollinger Bands %B
    - Z-Score  
    - RSI overbought/oversold
    
    🔧 HYPERPARAMETERS TO CUSTOMIZE:
    
    • bb_period (15-30): Bollinger Bands period
    • bb_std (1.5-2.5): BB standard deviations
    • zscore_lookback (30-80): Z-score calculation window
    • zscore_threshold (1.5-2.5): Z-score entry threshold
    • rsi_period (10-20): RSI calculation period
    • rsi_oversold (20-35): RSI oversold level
    • rsi_overbought (65-80): RSI overbought level
    • min_votes (2-3): Minimum indicators agreeing for entry
    • notional (100-50000): Position size
    • time_stop (15-50): Maximum holding period
    
    📊 STRATEGY LOGIC:
    - Each indicator votes: -1 (sell), 0 (neutral), +1 (buy)
    - ENTRY: When min_votes indicators agree on direction
    - EXIT: When majority vote changes OR time stop reached
    - More robust than single indicators, fewer false signals
    """
    
    name = "combined_mean_reversion"
    
    @classmethod
    def param_schema(cls):
        return {
            "bb_period": {"type": "int", "min": 15, "max": 30, "default": 20},
            "bb_std": {"type": "float", "min": 1.5, "max": 2.5, "default": 2.0},
            "zscore_lookback": {"type": "int", "min": 30, "max": 80, "default": 50},
            "zscore_threshold": {"type": "float", "min": 1.5, "max": 2.5, "default": 2.0},
            "rsi_period": {"type": "int", "min": 10, "max": 20, "default": 14},
            "rsi_oversold": {"type": "float", "min": 20, "max": 35, "default": 30},
            "rsi_overbought": {"type": "float", "min": 65, "max": 80, "default": 70},
            "min_votes": {"type": "int", "min": 2, "max": 3, "default": 2},
            "notional": {"type": "float", "min": 100, "default": 10_000.0},
            "time_stop": {"type": "int", "min": 15, "default": 35}
        }
    
    def on_init(self, context):
        self.bb_period = int(self.params.get("bb_period", 20))
        self.bb_std = float(self.params.get("bb_std", 2.0))
        self.zscore_lookback = int(self.params.get("zscore_lookback", 50))
        self.zscore_threshold = float(self.params.get("zscore_threshold", 2.0))
        self.rsi_period = int(self.params.get("rsi_period", 14))
        self.rsi_oversold = float(self.params.get("rsi_oversold", 30))
        self.rsi_overbought = float(self.params.get("rsi_overbought", 70))
        self.min_votes = int(self.params.get("min_votes", 2))
        self.notional = float(self.params.get("notional", 10_000.0))
        self.time_stop = int(self.params.get("time_stop", 35))
        
        self.bars_held = 0
        
        context.log("info", f"Combined Mean Reversion initialized with ensemble voting (min_votes={self.min_votes})")
    
    def get_bb_signal(self, closes: list) -> int:
        """Get Bollinger Bands signal (-1, 0, 1)."""
        if len(closes) < self.bb_period:
            return 0
        
        recent_closes = closes[-self.bb_period:]
        sma = np.mean(recent_closes)
        std = np.std(recent_closes)
        
        upper = sma + (self.bb_std * std)
        lower = sma - (self.bb_std * std)
        
        current_price = closes[-1]
        if upper != lower:
            pctb = (current_price - lower) / (upper - lower)
        else:
            return 0
        
        if pctb < 0.05:  # Near lower band (oversold)
            return 1  # Buy signal
        elif pctb > 0.95:  # Near upper band (overbought)
            return -1  # Sell signal
        else:
            return 0  # Neutral
    
    def get_zscore_signal(self, closes: list) -> int:
        """Get Z-score signal (-1, 0, 1)."""
        if len(closes) < self.zscore_lookback:
            return 0
        
        recent_closes = closes[-self.zscore_lookback:]
        mean_price = np.mean(recent_closes)
        std_price = np.std(recent_closes)
        
        if std_price == 0:
            return 0
        
        current_price = closes[-1]
        zscore = (current_price - mean_price) / std_price
        
        if zscore < -self.zscore_threshold:
            return 1  # Buy signal (extremely oversold)
        elif zscore > self.zscore_threshold:
            return -1  # Sell signal (extremely overbought)
        else:
            return 0
    
    def get_rsi_signal(self, closes: list) -> int:
        """Get RSI signal (-1, 0, 1)."""
        if len(closes) < self.rsi_period + 1:
            return 0
        
        deltas = np.diff(closes)
        gains = np.where(deltas > 0, deltas, 0)
        losses = np.where(deltas < 0, -deltas, 0)
        
        avg_gain = np.mean(gains[-self.rsi_period:])
        avg_loss = np.mean(losses[-self.rsi_period:])
        
        if avg_loss == 0:
            rsi = 100.0
        elif avg_gain == 0:
            rsi = 0.0
        else:
            rs = avg_gain / avg_loss
            rsi = 100 - (100 / (1 + rs))
        
        if rsi < self.rsi_oversold:
            return 1  # Buy signal (oversold)
        elif rsi > self.rsi_overbought:
            return -1  # Sell signal (overbought)
        else:
            return 0
    
    def on_bar(self, context, symbol: str, bar: Bar):
        # Need enough history for all indicators
        max_lookback = max(self.bb_period, self.zscore_lookback, self.rsi_period + 1)
        if context.bar_index < max_lookback:
            return
        
        closes = context.data.history(symbol, "close", max_lookback)
        if len(closes) < max_lookback:
            return
        
        # Get individual indicator signals
        bb_signal = self.get_bb_signal(closes)
        zscore_signal = self.get_zscore_signal(closes)
        rsi_signal = self.get_rsi_signal(closes)
        
        # Ensemble voting
        vote_sum = bb_signal + zscore_signal + rsi_signal
        
        # Determine ensemble signal based on minimum votes required
        if vote_sum >= self.min_votes:
            ensemble_signal = 1  # Long signal
        elif vote_sum <= -self.min_votes:
            ensemble_signal = -1  # Short signal  
        else:
            ensemble_signal = 0  # Neutral (insufficient consensus)
        
        current_pos = context.position.qty
        
        # Exit logic first (if we have a position)
        if current_pos != 0:
            self.bars_held += 1
            
            exit_signal = False
            exit_reason = None
            
            # Exit if ensemble signal changes (consensus lost)
            if (current_pos > 0 and ensemble_signal != 1) or \
               (current_pos < 0 and ensemble_signal != -1):
                exit_signal = True
                exit_reason = "Exit:Consensus_Lost"
            elif self.bars_held >= self.time_stop:
                exit_signal = True
                exit_reason = "Exit:Time_Stop"
            
            if exit_signal:
                context.close(reason=exit_reason)
                context.log("info", f"🔴 {exit_reason}: votes=(BB:{bb_signal}, Z:{zscore_signal}, RSI:{rsi_signal}), "
                                   f"sum={vote_sum}, bars_held={self.bars_held}")
                self.bars_held = 0
                return
        
        # Entry logic (only if no position)
        if current_pos == 0:
            if ensemble_signal == 1:
                # Long signal with sufficient consensus
                qty = context.size.from_notional(self.notional, bar.close)
                context.buy(qty, reason="Entry:Ensemble_Long", size_mode="qty")
                context.tag_trade("Combined_MeanReversion_Long")
                
                context.log("info", f"🟢 LONG (ensemble): votes=(BB:{bb_signal}, Z:{zscore_signal}, RSI:{rsi_signal}), "
                                   f"sum={vote_sum}, price=${bar.close:.4f}")
                
            elif ensemble_signal == -1:
                # Short signal with sufficient consensus
                qty = context.size.from_notional(self.notional, bar.close)
                context.sell(qty, reason="Entry:Ensemble_Short", size_mode="qty")
                context.tag_trade("Combined_MeanReversion_Short")
                
                context.log("info", f"🔴 SHORT (ensemble): votes=(BB:{bb_signal}, Z:{zscore_signal}, RSI:{rsi_signal}), "
                                   f"sum={vote_sum}, price=${bar.close:.4f}")
        
        # Record metrics for analysis
        context.record("bb_signal", bb_signal)
        context.record("zscore_signal", zscore_signal)
        context.record("rsi_signal", rsi_signal)
        context.record("vote_sum", vote_sum)
        context.record("ensemble_signal", ensemble_signal)
        context.record("position", 1 if current_pos > 0 else -1 if current_pos < 0 else 0)
    
    def on_trade_open(self, context, trade):
        self.bars_held = 0
    
    def on_trade_close(self, context, trade):
        if hasattr(trade, 'pnl_abs') and trade.pnl_abs is not None:
            context.log("info", f"💰 Trade closed: PnL=${trade.pnl_abs:.2f}")
            context.record("trade_pnl", trade.pnl_abs)

print("✅ Combined Mean Reversion Strategy defined!")
print("📝 Key parameters to experiment with:")
print("   • min_votes: 2-3 (consensus requirement)")
print("   • Individual indicator thresholds")
print("   • time_stop: 20-40 (patience for reversion)")

In [None]:
# Load data and run mean reversion strategy suite
df, freq_hint = load_klines_csv('../data/CAKEUSDT.csv')

print(f"📊 Loaded {len(df)} bars with frequency: {freq_hint}")
print(f"📅 Date range: {df['dt_open'].iloc[0]} to {df['dt_open'].iloc[-1]}")

# Configure mean reversion strategies with different parameter sets
mean_reversion_strategies = [
    ("Bollinger Bands (Conservative)", BollingerMeanReversionStrategy, {
        "bb_period": 25, 
        "bb_std": 2.5,
        "entry_threshold": 0.02,
        "exit_threshold": 0.5,
        "time_stop": 40,
        "notional": 10000
    }),
    ("Bollinger Bands (Aggressive)", BollingerMeanReversionStrategy, {
        "bb_period": 15, 
        "bb_std": 1.8,
        "entry_threshold": 0.08,
        "exit_threshold": 0.5,
        "time_stop": 25,
        "notional": 10000
    }),
    ("Z-Score (Conservative)", ZScoreMeanReversionStrategy, {
        "lookback": 75,
        "entry_zscore": 2.5,
        "exit_zscore": 0.3,
        "time_stop": 45,
        "notional": 10000
    }),
    ("Z-Score (Aggressive)", ZScoreMeanReversionStrategy, {
        "lookback": 30,
        "entry_zscore": 1.8,
        "exit_zscore": 0.7,
        "time_stop": 30,
        "notional": 10000
    }),
    ("RSI Mean Reversion (Standard)", RSIMeanReversionStrategy, {
        "rsi_period": 14,
        "oversold": 25,
        "overbought": 75,
        "exit_neutral": 50,
        "time_stop": 25,
        "volume_filter": True,
        "notional": 10000
    }),
    ("RSI Mean Reversion (No Volume Filter)", RSIMeanReversionStrategy, {
        "rsi_period": 18,
        "oversold": 30,
        "overbought": 70,
        "exit_neutral": 50,
        "time_stop": 20,
        "volume_filter": False,
        "notional": 10000
    }),
    ("Combined Mean Reversion (Majority)", CombinedMeanReversionStrategy, {
        "bb_period": 20,
        "bb_std": 2.0,
        "zscore_lookback": 50,
        "zscore_threshold": 2.0,
        "rsi_period": 14,
        "rsi_oversold": 30,
        "rsi_overbought": 70,
        "min_votes": 2,
        "time_stop": 35,
        "notional": 10000
    }),
    ("Combined Mean Reversion (Unanimous)", CombinedMeanReversionStrategy, {
        "bb_period": 20,
        "bb_std": 2.0,
        "zscore_lookback": 50,
        "zscore_threshold": 2.0,
        "rsi_period": 14,
        "rsi_oversold": 30,
        "rsi_overbought": 70,
        "min_votes": 3,
        "time_stop": 35,
        "notional": 10000
    })
]

# Run the entire mean reversion strategy suite with standardized reporting
mean_reversion_results = run_strategy_suite(
    data=df,
    strategies_config=mean_reversion_strategies,
    symbol="CAKEUSDT",
    initial_cash=100_000,
    fee_bps=10,     # 0.10% fees
    slippage_bps=2  # 0.02% slippage
)

print("\n💡 Detailed JSON reports available for analysis:")
print("   Access via: mean_reversion_results['strategy_name']")
print("   Example: mean_reversion_results['Bollinger Bands (Conservative)']['show_all_plots']()")

In [None]:
## 6. Interactive Results Analysis Dashboard 📊

# Create comprehensive interactive dashboard for mean reversion strategies
from engine.backtest import StrategyComparison

if mean_reversion_results:
    # Interactive strategy comparison dashboard
    comparison_plots = StrategyComparison.plot_strategy_suite(mean_reversion_results)
    
    print("🚀 Interactive Mean Reversion Analysis Dashboard Created!")
    print("=" * 70)
    
    # Show individual interactive components
    print("\n📊 1. STRATEGY PERFORMANCE COMPARISON")
    comparison_plots['comparison_bar'].show()
    
    print("\n📈 2. RISK-RETURN ANALYSIS")
    comparison_plots['comparison_scatter'].show()
    
    print("\n🕸️ 3. MULTI-METRIC RADAR ANALYSIS")
    comparison_plots['comparison_radar'].show()
    
    print("\n💰 4. INTERACTIVE EQUITY CURVES")
    comparison_plots['equity_curves'].show()
    
    if 'returns_distribution' in comparison_plots:
        print("\n📊 5. RETURNS DISTRIBUTION COMPARISON")
        comparison_plots['returns_distribution'].show()
    
    # Create standardized comparison table
    comparison_df = StrategyComparison.create_comparison_table(mean_reversion_results)
    
    print("\n📋 MEAN REVERSION STRATEGIES PERFORMANCE TABLE:")
    print("=" * 90)
    print(comparison_df.round(3).to_string(index=False))
    
    # Interactive insights and analysis
    print(f"\n🎯 INTERACTIVE MEAN REVERSION INSIGHTS:")
    best_sharpe = comparison_df.loc[comparison_df['Sharpe'].idxmax()]
    best_return = comparison_df.loc[comparison_df['Return %'].idxmax()]
    lowest_dd = comparison_df.loc[comparison_df['Max DD %'].idxmin()]
    
    print(f"🏆 Best Risk-Adjusted: {best_sharpe['Strategy']} (Sharpe: {best_sharpe['Sharpe']:.3f})")
    print(f"💰 Highest Return: {best_return['Strategy']} ({best_return['Return %']:.1f}%)")
    print(f"🛡️  Lowest Drawdown: {lowest_dd['Strategy']} ({lowest_dd['Max DD %']:.1f}%)")
    print(f"✅ Beat Buy & Hold: {len(comparison_df[comparison_df['vs B&H %'] > 0])} out of {len(comparison_df)} strategies")
    
    profitable_strategies = len(comparison_df[comparison_df['Return %'] > 0])
    print(f"📈 Profitable Strategies: {profitable_strategies} out of {len(comparison_df)}")
    
    # Mean reversion specific analysis
    avg_win_rate = comparison_df['Win Rate %'].mean()
    print(f"📊 Average Win Rate: {avg_win_rate:.1f}% (typical for mean reversion)")
    
    print(f"\n🔍 INTERACTIVE FEATURES:")
    print("   • Hover over chart elements for detailed strategy information")
    print("   • Zoom and pan on equity curves to examine specific periods")
    print("   • Click legend items to hide/show specific strategies") 
    print("   • Scatter plot shows risk-return profile with strategy tooltips")
    print("   • Export interactive charts for presentations")
    
    # Deep dive analysis on best strategy
    if not comparison_df.empty:
        print(f"\n🔬 DETAILED ANALYSIS: {best_sharpe['Strategy']}")
        print("="*60)
        
        best_result = mean_reversion_results.get(best_sharpe['Strategy'])
        if best_result:
            # Interactive deep dive equity curve
            detailed_fig = plotter.equity_curve(
                [best_result], 
                title=f"Detailed Analysis: {best_sharpe['Strategy']}"
            )
            detailed_fig.show()
            
            # Interactive returns distribution with risk metrics
            if 'returns_series' in best_result and best_result['returns_series']:
                returns_fig = plotter.returns_distribution(
                    {best_sharpe['Strategy']: pd.Series(best_result['returns_series'])},
                    add_var_lines=True,
                    confidence_levels=[0.95, 0.99]
                )
                returns_fig.show()
                
                print(f"✅ Created detailed interactive analysis for: {best_sharpe['Strategy']}")
                print("   • Comprehensive equity curve with drawdown periods")
                print("   • Returns distribution with Value-at-Risk analysis")
    
    print(f"\n💡 MEAN REVERSION INTERACTIVE OBSERVATIONS:")
    print("• Combined strategies show enhanced stability in interactive equity curves")
    print("• Risk-return scatter reveals optimal parameter combinations")
    print("• Hover over drawdown periods to identify challenging market conditions")
    print("• Interactive comparison helps identify most robust mean reversion approaches")

else:
    print("❌ No valid mean reversion results to display. Check strategy implementations.")

In [None]:
## Additional Interactive Analysis Dashboard 🎨

# Enhanced interactive analysis using modern engine capabilities
if mean_reversion_results:
    print("🎨 Enhanced Mean Reversion Analysis")
    print("="*50)
    
    # Create advanced comparison table with more metrics
    comparison_df = StrategyComparison.create_comparison_table(mean_reversion_results)
    
    print("\n📊 COMPREHENSIVE STRATEGY PERFORMANCE TABLE:")
    print(comparison_df.round(3).to_string(index=False))
    
    # Interactive best strategy deep dive
    if not comparison_df.empty:
        best_sharpe_idx = comparison_df['Sharpe'].idxmax()
        best_strategy_name = comparison_df.iloc[best_sharpe_idx]['Strategy']
        best_result = mean_reversion_results[best_strategy_name]
        
        print(f"\n🏆 DEEP DIVE: {best_strategy_name}")
        print("="*60)
        
        # Show detailed performance report
        if 'show_all_plots' in best_result:
            print("📈 Interactive comprehensive analysis for best strategy:")
            best_result['show_all_plots']()
        
        # Interactive comparison with buy & hold
        if 'plot_vs_strategy' in best_result:
            print("\n⚖️ Interactive comparison vs Buy & Hold:")
            comparison_fig = best_result['plot_vs_strategy'](
                benchmark_strategy='buy_hold', 
                title=f'{best_strategy_name} vs Buy & Hold Performance'
            )
            if comparison_fig:
                comparison_fig.show()
    
    # Summary insights based on analysis
    print(f"\n🎯 KEY INSIGHTS FROM MEAN REVERSION ANALYSIS:")
    
    # Find strategies with different characteristics
    best_return_strategy = comparison_df.loc[comparison_df['Return %'].idxmax(), 'Strategy']
    best_sharpe_strategy = comparison_df.loc[comparison_df['Sharpe'].idxmax(), 'Strategy']
    lowest_dd_strategy = comparison_df.loc[comparison_df['Max DD %'].idxmin(), 'Strategy']
    highest_winrate = comparison_df.loc[comparison_df['Win Rate %'].idxmax(), 'Strategy']
    
    print(f"💰 Highest Return: {best_return_strategy} ({comparison_df.loc[comparison_df['Return %'].idxmax(), 'Return %']:.1f}%)")
    print(f"🏆 Best Risk-Adjusted: {best_sharpe_strategy} (Sharpe: {comparison_df.loc[comparison_df['Sharpe'].idxmax(), 'Sharpe']:.3f})")
    print(f"🛡️  Lowest Drawdown: {lowest_dd_strategy} ({comparison_df.loc[comparison_df['Max DD %'].idxmin(), 'Max DD %']:.1f}%)")
    print(f"🎯 Highest Win Rate: {highest_winrate} ({comparison_df.loc[comparison_df['Win Rate %'].idxmax(), 'Win Rate %']:.1f}%)")
    
    # Strategy type analysis
    bb_strategies = comparison_df[comparison_df['Strategy'].str.contains('Bollinger')]
    zscore_strategies = comparison_df[comparison_df['Strategy'].str.contains('Z-Score')]
    rsi_strategies = comparison_df[comparison_df['Strategy'].str.contains('RSI')]
    combined_strategies = comparison_df[comparison_df['Strategy'].str.contains('Combined')]
    
    print(f"\n📊 STRATEGY TYPE ANALYSIS:")
    if len(bb_strategies) > 0:
        print(f"   • Bollinger Bands avg Sharpe: {bb_strategies['Sharpe'].mean():.3f}")
    if len(zscore_strategies) > 0:
        print(f"   • Z-Score avg Sharpe: {zscore_strategies['Sharpe'].mean():.3f}")
    if len(rsi_strategies) > 0:
        print(f"   • RSI avg Sharpe: {rsi_strategies['Sharpe'].mean():.3f}")
    if len(combined_strategies) > 0:
        print(f"   • Combined avg Sharpe: {combined_strategies['Sharpe'].mean():.3f}")
    
    # Performance insights
    profitable_count = len(comparison_df[comparison_df['Return %'] > 0])
    beat_buyhold_count = len(comparison_df[comparison_df['vs B&H %'] > 0])
    
    print(f"\n✅ PERFORMANCE SUMMARY:")
    print(f"   • Profitable strategies: {profitable_count}/{len(comparison_df)} ({profitable_count/len(comparison_df)*100:.0f}%)")
    print(f"   • Beat Buy & Hold: {beat_buyhold_count}/{len(comparison_df)} ({beat_buyhold_count/len(comparison_df)*100:.0f}%)")
    print(f"   • Average Win Rate: {comparison_df['Win Rate %'].mean():.1f}%")
    print(f"   • Average Sharpe: {comparison_df['Sharpe'].mean():.3f}")
    
    print(f"\n💡 MEAN REVERSION OBSERVATIONS:")
    print("• Conservative parameters (longer periods, wider thresholds) show more stability")
    print("• Combined ensemble strategies reduce risk through diversification") 
    print("• Volume confirmation improves signal quality but reduces trade frequency")
    print("• Time stops are essential - mean reversion can fail in strong trends")
    print("• Interactive analysis reveals optimal parameter combinations")

else:
    print("❌ No results available - check strategy execution")

In [None]:
# Additional Parameter Optimization Example
print("🔧 BONUS: Parameter Optimization Example")
print("="*60)

# Example optimization for Bollinger Bands strategy
optimizer = StrategyOptimizer(df, "CAKEUSDT", fee_bps=10, slippage_bps=2)

# Optimize Bollinger Bands parameters
param_ranges = {
    "bb_period": [15, 20, 25, 30],
    "bb_std": [1.5, 2.0, 2.5],
    "entry_threshold": [0.02, 0.05, 0.08],
    "time_stop": [25, 35, 45],
    "notional": [10000]  # Keep notional fixed
}

print("🔍 Running optimization for Bollinger Bands Mean Reversion...")
optimization_results = optimizer.optimize_parameters(
    BollingerMeanReversionStrategy,
    param_ranges,
    optimization_metric="sharpe_ratio",
    n_best=5,
    verbose=True
)

if not optimization_results.empty:
    # Show results with pretty formatting
    optimizer.show_optimization_results(optimization_results, pretty_results=1)
    
    # Create interactive heatmap
    heatmap_fig = optimizer.plot_optimization_heatmap(
        optimization_results,
        param1="bb_period",
        param2="bb_std", 
        metric="sharpe_ratio"
    )
    if heatmap_fig:
        print("\n📊 Interactive parameter heatmap:")
        heatmap_fig.show()
    
    # Show top strategies equity curves
    strategies_fig = optimizer.plot_optimization_strategies(
        optimization_results,
        BollingerMeanReversionStrategy,
        n_strategies=3,
        title="Top 3 Bollinger Bands Parameter Combinations"
    )
    if strategies_fig:
        print("\n📈 Interactive equity curves for top parameter sets:")
        strategies_fig.show()
        
else:
    print("❌ No valid optimization results found")

print("✅ Parameter optimization complete!")

In [None]:
# Advanced Parameter Optimization for Z-Score Strategy
print("🎯 Z-Score Strategy Parameter Optimization")
print("="*50)

# Optimize Z-Score parameters using modern engine
param_ranges = {
    "lookback": [30, 50, 75, 100],
    "entry_zscore": [1.5, 2.0, 2.5, 3.0],
    "exit_zscore": [0.3, 0.5, 0.7],
    "time_stop": [25, 35, 45],
    "notional": [10000]  # Keep notional fixed
}

print("🔍 Running comprehensive Z-Score parameter optimization...")
zscore_optimization = optimizer.optimize_parameters(
    ZScoreMeanReversionStrategy,
    param_ranges,
    optimization_metric="sharpe_ratio",
    n_best=10,
    verbose=False
)

if not zscore_optimization.empty:
    print("\n📊 Top 10 Z-Score Parameter Combinations:")
    print("="*60)
    
    # Display top results in a clean format
    top_results = zscore_optimization.head(10)[['lookback', 'entry_zscore', 'exit_zscore', 'time_stop', 
                                                'sharpe_ratio', 'total_return_pct', 'max_drawdown_pct', 'num_trades']]
    print(top_results.round(3).to_string(index=False))
    
    # Show interactive heatmap for key parameters
    print("\n📈 Interactive Z-Score Parameter Analysis:")
    
    # Heatmap for lookback vs entry_zscore
    zscore_heatmap = optimizer.plot_optimization_heatmap(
        zscore_optimization,
        param1="lookback",
        param2="entry_zscore", 
        metric="sharpe_ratio",
        title="Z-Score Strategy: Lookback vs Entry Threshold"
    )
    if zscore_heatmap:
        zscore_heatmap.show()
    
    # Show best parameter combination details
    best = zscore_optimization.iloc[0]
    print(f"\n🏆 BEST Z-SCORE PARAMETERS:")
    print(f"   • Lookback: {int(best['lookback'])} periods")
    print(f"   • Entry Z-Score: {best['entry_zscore']:.1f}")
    print(f"   • Exit Z-Score: {best['exit_zscore']:.1f}")
    print(f"   • Time Stop: {int(best['time_stop'])} bars")
    print(f"   • Expected Sharpe: {best['sharpe_ratio']:.3f}")
    print(f"   • Expected Return: {best['total_return_pct']:.1f}%")
    print(f"   • Max Drawdown: {best['max_drawdown_pct']:.1f}%")
    
else:
    print("❌ Z-Score optimization failed - check parameters and data")

print("\n✅ Z-Score parameter optimization complete!")

In [None]:
# Grid search for Z-score parameters
lookbacks = [20, 30, 50, 75, 100]
entry_thresholds = [1.5, 2.0, 2.5, 3.0]
exit_thresholds = [0.0, 0.5, 1.0]

results = []

for lb in lookbacks:
    z = zscore(data['close'].values, lb)
    
    for entry_z in entry_thresholds:
        for exit_z in exit_thresholds:
            if exit_z >= entry_z:
                continue
            
            # Simulate strategy
            position = 0
            positions = []
            
            for i in range(len(z)):
                if pd.isna(z[i]):
                    positions.append(0)
                    continue
                
                if position == 0:
                    if z[i] > entry_z:
                        position = -1
                    elif z[i] < -entry_z:
                        position = 1
                elif position == 1 and z[i] > -exit_z:
                    position = 0
                elif position == -1 and z[i] < exit_z:
                    position = 0
                
                positions.append(position)
            
            # Calculate returns
            positions = pd.Series(positions, index=data.index)
            strat_returns = positions.shift(1) * returns
            
            # Store results
            sharpe = strat_returns.mean() / strat_returns.std() * np.sqrt(365*24) if strat_returns.std() > 0 else 0
            
            results.append({
                'lookback': lb,
                'entry_z': entry_z,
                'exit_z': exit_z,
                'sharpe': sharpe,
                'total_return': (1 + strat_returns).prod() - 1,
                'num_trades': (positions.diff() != 0).sum()
            })

# Convert to DataFrame and find best
results_df = pd.DataFrame(results)
results_df = results_df.sort_values('sharpe', ascending=False)

print("Top 10 Z-Score Parameter Combinations:")
print(results_df.head(10))

## Key Takeaways

### Mean Reversion Characteristics:

1. **Works best in ranging markets** - Avoid strong trends
2. **Risk management crucial** - Failed mean reversions can lead to large losses
3. **Multiple timeframes** - What's overbought on one timeframe may be oversold on another
4. **Combine signals** - Multiple confirmations reduce false signals
5. **Volume confirmation** - High volume at extremes often marks reversals

### Best Practices:

- **Use stops** - Mean reversion can fail catastrophically
- **Scale in/out** - Don't put full position at first signal
- **Time stops** - Exit if reversion doesn't happen within expected time
- **Regime detection** - Turn off in trending markets
- **Diversify** - Trade multiple uncorrelated instruments

### Common Pitfalls:

- Catching falling knives in strong downtrends
- Ignoring regime changes
- Over-optimizing parameters
- Not accounting for transaction costs
- Fighting the trend too early