In [4]:
import pandas as pd
import numpy as np
import math

def calculate_channel_breakout(df, length=20):
    """Calculate rolling high and low channels."""
    df['Upper_Channel'] = df['high'].rolling(window=length).max()
    df['Lower_Channel'] = df['low'].rolling(window=length).min()
    return df

def calculate_momentum_indicators(df):
    """Calculate RSI, ADX, ATR, and 20-day SMA."""
    # RSI calculation
    delta = np.diff(df['close'].values)
    delta = np.append(delta, 0)
    
    gain = np.where(delta > 0, delta, 0)
    loss = np.where(delta < 0, -delta, 0)
    
    avg_gain = np.zeros_like(df['close'].values)
    avg_loss = np.zeros_like(df['close'].values)
    
    # First value is the simple average for 14-period RSI
    period = 14
    avg_gain[period] = np.mean(gain[:period])
    avg_loss[period] = np.mean(loss[:period])
    
    # Calculate smoothed averages
    for i in range(period + 1, len(df)):
        avg_gain[i] = (avg_gain[i-1] * (period - 1) + gain[i]) / period
        avg_loss[i] = (avg_loss[i-1] * (period - 1) + loss[i]) / period
    
    rs = np.zeros_like(df['close'].values)
    rsi = np.zeros_like(df['close'].values)
    
    for i in range(period, len(df)):
        if avg_loss[i] == 0:
            rs[i] = 100
        else:
            rs[i] = avg_gain[i] / avg_loss[i]
        
        rsi[i] = 100 - (100 / (1 + rs[i]))
    
    df['RSI'] = rsi
    
    # Calculate Average Directional Index (ADX)
    # This is a simplified version - in a real implementation you'd use a more robust ADX calculation
    high = df['high'].values
    low = df['low'].values
    close = df['close'].values
    
    # Calculate True Range
    tr = np.zeros_like(close)
    for i in range(1, len(close)):
        hl = high[i] - low[i]
        hpc = abs(high[i] - close[i-1])
        lpc = abs(low[i] - close[i-1])
        tr[i] = max(hl, hpc, lpc)
    
    # Calculate ATR
    atr = np.zeros_like(close)
    atr[period] = np.mean(tr[1:period+1])
    
    for i in range(period+1, len(close)):
        atr[i] = (atr[i-1] * (period-1) + tr[i]) / period
    
    df['ATR'] = atr
    
    # Simplified ADX calculation
    # In a real implementation, you'd calculate +DI and -DI first
    # This is just a placeholder to match the original strategy structure
    adx = np.zeros_like(close)
    # Use a simple moving average of the true range as a placeholder for ADX
    for i in range(period*2, len(close)):
        adx[i] = np.mean(tr[i-period+1:i+1]) / atr[i] * 100
    
    df['ADX'] = adx
    
    # Calculate 20-day SMA
    df['SMA20'] = df['close'].rolling(window=20).mean()
    
    return df

def calculate_technical_ratings(df):
    """
    Assign technical ratings:
      "Strong Buy" if RSI > 50 and ADX > 30;
      "Strong Sell" if RSI < 50 and ADX > 30;
      "Neutral" otherwise.
    """
    df['Technical_Rating'] = np.where(
        (df['RSI'] > 50) & (df['ADX'] > 30), 'Strong Buy',
        np.where((df['RSI'] < 50) & (df['ADX'] > 30), 'Strong Sell', 'Neutral')
    )
    return df

def calculate_price_change(df, period=14):
    """Calculate price change and its derivative."""
    df['Price_Change'] = df['close'].diff(period)
    df['Price_Change_Direction'] = df['Price_Change'].diff()
    return df

def calculate_levy_signal(df, window=14, kurt_threshold=1.5):
    """Calculate Levy signal based on rolling kurtosis and skew."""
    df['returns'] = df['close'].pct_change()
    df['rolling_kurtosis'] = df['returns'].rolling(window=window).kurt()
    df['rolling_skew'] = df['returns'].rolling(window=window).skew()
    df['levy_signal'] = 0
    df.loc[(df['rolling_kurtosis'] > kurt_threshold) & (df['rolling_skew'] > 0), 'levy_signal'] = 1
    df.loc[(df['rolling_kurtosis'] > kurt_threshold) & (df['rolling_skew'] < 0), 'levy_signal'] = -1
    return df

def calculate_obv(df):
    """Calculate On-Balance Volume (OBV) and its change."""
    obv = [0]
    for i in range(1, len(df)):
        if df['close'].iloc[i] > df['close'].iloc[i - 1]:
            obv.append(obv[-1] + df['volume'].iloc[i])
        elif df['close'].iloc[i] < df['close'].iloc[i - 1]:
            obv.append(obv[-1] - df['volume'].iloc[i])
        else:
            obv.append(obv[-1])
    df['OBV'] = obv
    df['OBV_Change'] = df['OBV'].diff()
    return df

def calculate_keltner_channels(df, length=20, atr_multiplier=1.5):
    """Calculate Keltner Channels for dynamic trailing stops."""
    df['KC_MA'] = df['close'].rolling(window=length).mean()
    df['KC_ATR'] = df['ATR'] * atr_multiplier
    df['KC_Upper'] = df['KC_MA'] + df['KC_ATR']
    df['KC_Lower'] = df['KC_MA'] - df['KC_ATR']
    return df

def hawkes_process(data: pd.Series, kappa: float):
    """Compute Hawkes process intensity for a time series."""
    alpha = np.exp(-kappa)
    arr = data.to_numpy()
    output = np.empty(len(data))
    output[:] = np.nan
    for i in range(1, len(data)):
        output[i] = arr[i] if np.isnan(output[i - 1]) else output[i - 1] * alpha + arr[i]
    return pd.Series(output, index=data.index) * kappa

def trading_strategy(df, initial_capital=1000.0, fee_percentage=0.1):
    """
    Implement trading strategy using Channel Breakout, Momentum Indicators,
    Technical Ratings, Levy Signal, OBV, Keltner Channels, and Hawkes Process
    with ATR-based trailing stops and dynamic position sizing.
    
    Args:
        df: DataFrame with OHLC data
        initial_capital: Starting capital amount
        fee_percentage: Fee percentage (e.g., 0.1 for 0.1%)
    """
    # Convert fee percentage to rate (0.1% -> 0.001)
    fee_rate = fee_percentage / 100.0
    
    # Create a copy of the DataFrame to avoid warnings
    df_strategy = df.copy()
    
    # Calculate all required indicators
    df_strategy = calculate_channel_breakout(df_strategy)
    df_strategy = calculate_momentum_indicators(df_strategy)
    df_strategy = calculate_technical_ratings(df_strategy)
    df_strategy = calculate_price_change(df_strategy)
    df_strategy = calculate_levy_signal(df_strategy)
    df_strategy = calculate_obv(df_strategy)
    df_strategy = calculate_keltner_channels(df_strategy)
    
    # Calculate Hawkes process (for returns)
    df_strategy['returns'] = df_strategy['close'].pct_change()
    df_strategy['hawkes_intensity'] = hawkes_process(df_strategy['returns'].fillna(0), kappa=3)
    
    # Initialize portfolio tracking columns
    df_strategy['signals'] = 0
    df_strategy['trade_type'] = ''
    df_strategy['position'] = 0
    df_strategy['position_size'] = 0.0
    df_strategy['stop_loss'] = 0.0
    df_strategy['capital'] = initial_capital
    df_strategy['equity'] = initial_capital
    df_strategy['fees_paid'] = 0.0
    
    # Current position status
    current_position = 0  # 0: no position, 1: long, -1: short
    position_size = 0.0
    entry_price = 0.0
    stop_loss = 0.0
    trailing_stop = 0.0
    capital = initial_capital
    total_fees_paid = 0.0
    
    # Define thresholds for signals
    hawkes_threshold = 0.007
    adx_entry_threshold = 35
    rsi_overbought = 70
    rsi_oversold = 30
    
    # Generate signals based on indicators
    # Start from the index where all indicators have valid values
    start_idx = 52  # This is a reasonable starting point for our indicators
    
    for i in range(start_idx, len(df_strategy)):
        # Avoid look-ahead bias by using i-1 values for decision making
        close_price = df_strategy['close'].iloc[i-1]
        current_atr = df_strategy['ATR'].iloc[i-1]
        
        # Dynamic ATR multiplier based on volatility
        vol_percentile = df_strategy['ADX'].iloc[i-1] / 100  # Using ADX as a proxy for volatility
        
        if vol_percentile < 0.3:
            atr_multiplier = 2.0  # Lower volatility, wider stops
        elif vol_percentile > 0.7:
            atr_multiplier = 1.2  # Higher volatility, tighter stops
        else:
            atr_multiplier = 1.5  # Medium volatility
        
        # Dynamic position sizing based on volatility
        if vol_percentile < 0.3:
            risk_percentage = 0.02  # 2% risk in low volatility
        elif vol_percentile > 0.7:
            risk_percentage = 0.01  # 1% risk in high volatility
        else:
            risk_percentage = 0.015  # 1.5% risk in medium volatility
        
        # Update equity value - using loc instead of iloc to avoid SettingWithCopyWarning
        if current_position == 1:
            df_strategy.loc[df_strategy.index[i], 'equity'] = capital + position_size * (df_strategy['close'].iloc[i] - entry_price)
        elif current_position == -1:
            df_strategy.loc[df_strategy.index[i], 'equity'] = capital + position_size * (entry_price - df_strategy['close'].iloc[i])
        else:
            df_strategy.loc[df_strategy.index[i], 'equity'] = capital
        
        # Check if stop loss was hit
        if current_position == 1 and df_strategy['low'].iloc[i] <= trailing_stop:
            # Long position stopped out
            exit_price = trailing_stop  # Use trailing stop as exit price
            
            # Calculate trade value and fees
            trade_value = position_size * exit_price
            fee = trade_value * fee_rate
            total_fees_paid += fee
            
            # Update capital
            capital = capital + position_size * (exit_price - entry_price) - fee
            
            # Update DataFrame
            df_strategy.loc[df_strategy.index[i], 'signals'] = -1
            df_strategy.loc[df_strategy.index[i], 'trade_type'] = 'stop_loss'
            df_strategy.loc[df_strategy.index[i], 'position'] = 0
            df_strategy.loc[df_strategy.index[i], 'position_size'] = 0
            df_strategy.loc[df_strategy.index[i], 'capital'] = capital
            df_strategy.loc[df_strategy.index[i], 'fees_paid'] = total_fees_paid
            
            current_position = 0
            position_size = 0
        
        elif current_position == -1 and df_strategy['high'].iloc[i] >= trailing_stop:
            # Short position stopped out
            exit_price = trailing_stop  # Use trailing stop as exit price
            
            # Calculate trade value and fees
            trade_value = position_size * exit_price
            fee = trade_value * fee_rate
            total_fees_paid += fee
            
            # Update capital
            capital = capital + position_size * (entry_price - exit_price) - fee
            
            # Update DataFrame
            df_strategy.loc[df_strategy.index[i], 'signals'] = 1
            df_strategy.loc[df_strategy.index[i], 'trade_type'] = 'stop_loss'
            df_strategy.loc[df_strategy.index[i], 'position'] = 0
            df_strategy.loc[df_strategy.index[i], 'position_size'] = 0
            df_strategy.loc[df_strategy.index[i], 'capital'] = capital
            df_strategy.loc[df_strategy.index[i], 'fees_paid'] = total_fees_paid
            
            current_position = 0
            position_size = 0
        
        # Update trailing stop if position is open
        if current_position == 1:
            # For long positions, update trailing stop if price moves favorably
            new_stop = df_strategy['close'].iloc[i] - atr_multiplier * current_atr
            if new_stop > trailing_stop:
                trailing_stop = new_stop
                df_strategy.loc[df_strategy.index[i], 'stop_loss'] = trailing_stop
        
        elif current_position == -1:
            # For short positions, update trailing stop if price moves favorably
            new_stop = df_strategy['close'].iloc[i] + atr_multiplier * current_atr
            if new_stop < trailing_stop or trailing_stop == 0:
                trailing_stop = new_stop
                df_strategy.loc[df_strategy.index[i], 'stop_loss'] = trailing_stop
        
        # Define trading conditions
        # Get specific indicator values for entry/exit decisions
        rsi = df_strategy['RSI'].iloc[i-1]
        adx = df_strategy['ADX'].iloc[i-1]
        rating = df_strategy['Technical_Rating'].iloc[i-1]
        levy_signal = df_strategy['levy_signal'].iloc[i-1]
        obv_change = df_strategy['OBV_Change'].iloc[i-1]
        upper_channel_prev = df_strategy['Upper_Channel'].iloc[i-2] if i > 2 else 0
        lower_channel_prev = df_strategy['Lower_Channel'].iloc[i-2] if i > 2 else 0
        price_change = df_strategy['Price_Change'].iloc[i-1]
        price_change_dir = df_strategy['Price_Change_Direction'].iloc[i-1]
        hawkes_intensity = df_strategy['hawkes_intensity'].iloc[i-1]
        current_price = df_strategy['close'].iloc[i-1]
        
        # Long entry condition
        long_condition = (
            hawkes_intensity > hawkes_threshold and 
            adx > adx_entry_threshold and
            (
                (df_strategy['high'].iloc[i-1] > upper_channel_prev) or
                (current_price > df_strategy['SMA20'].iloc[i-1] and price_change > 0 and price_change_dir > 0)
            ) and
            rating == 'Strong Buy' and 
            levy_signal >= 0 and 
            obv_change > 0
        )
        
        # Short entry condition (mirror of long condition)
        short_condition = (
            hawkes_intensity > hawkes_threshold and 
            adx > adx_entry_threshold and
            (
                (df_strategy['low'].iloc[i-1] < lower_channel_prev) or
                (current_price < df_strategy['SMA20'].iloc[i-1] and price_change < 0 and price_change_dir < 0)
            ) and
            rating == 'Strong Sell' and 
            levy_signal <= 0 and 
            obv_change < 0
        )
        
        # Generate signals only if no signals already set (e.g., from stop loss)
        if df_strategy['signals'].iloc[i] == 0:
            if long_condition:
                if current_position == 0:  # No position to long
                    # Calculate position size based on risk
                    entry_price = df_strategy['open'].iloc[i]  # Use open price for entry
                    stop_price = entry_price - atr_multiplier * current_atr
                    risk_per_unit = entry_price - stop_price
                    risk_amount = capital * risk_percentage
                    position_size = risk_amount / risk_per_unit
                    
                    # Calculate fees
                    trade_value = position_size * entry_price
                    fee = trade_value * fee_rate
                    total_fees_paid += fee
                    
                    # Set stop loss
                    trailing_stop = stop_price
                    
                    # Update DataFrame
                    df_strategy.loc[df_strategy.index[i], 'signals'] = 1
                    df_strategy.loc[df_strategy.index[i], 'trade_type'] = 'long'
                    df_strategy.loc[df_strategy.index[i], 'position'] = 1
                    df_strategy.loc[df_strategy.index[i], 'position_size'] = position_size
                    df_strategy.loc[df_strategy.index[i], 'stop_loss'] = trailing_stop
                    df_strategy.loc[df_strategy.index[i], 'capital'] = capital - fee
                    df_strategy.loc[df_strategy.index[i], 'fees_paid'] = total_fees_paid
                    
                    current_position = 1
                    capital -= fee
                    
                elif current_position == -1:  # Short to long (reversal)
                    # Close short position
                    exit_price = df_strategy['open'].iloc[i]  # Use open price for exit
                    
                    # Calculate trade value and fees for closing short
                    trade_value = position_size * exit_price
                    fee = trade_value * fee_rate
                    total_fees_paid += fee
                    
                    # Update capital after closing short
                    capital = capital + position_size * (entry_price - exit_price) - fee
                    
                    # Calculate new position size based on risk
                    entry_price = exit_price  # New entry is at the same price as exit
                    stop_price = entry_price - atr_multiplier * current_atr
                    risk_per_unit = entry_price - stop_price
                    risk_amount = capital * risk_percentage
                    position_size = risk_amount / risk_per_unit
                    
                    # Calculate fees for new long position
                    trade_value = position_size * entry_price
                    fee = trade_value * fee_rate
                    total_fees_paid += fee
                    
                    # Set stop loss
                    trailing_stop = stop_price
                    
                    # Update DataFrame
                    df_strategy.loc[df_strategy.index[i], 'signals'] = 2
                    df_strategy.loc[df_strategy.index[i], 'trade_type'] = 'long'
                    df_strategy.loc[df_strategy.index[i], 'position'] = 1
                    df_strategy.loc[df_strategy.index[i], 'position_size'] = position_size
                    df_strategy.loc[df_strategy.index[i], 'stop_loss'] = trailing_stop
                    df_strategy.loc[df_strategy.index[i], 'capital'] = capital - fee
                    df_strategy.loc[df_strategy.index[i], 'fees_paid'] = total_fees_paid
                    
                    current_position = 1
                    capital -= fee
            
            elif short_condition:
                if current_position == 0:  # No position to short
                    # Calculate position size based on risk
                    entry_price = df_strategy['open'].iloc[i]  # Use open price for entry
                    stop_price = entry_price + atr_multiplier * current_atr
                    risk_per_unit = stop_price - entry_price
                    risk_amount = capital * risk_percentage
                    position_size = risk_amount / risk_per_unit
                    
                    # Calculate fees
                    trade_value = position_size * entry_price
                    fee = trade_value * fee_rate
                    total_fees_paid += fee
                    
                    # Set stop loss
                    trailing_stop = stop_price
                    
                    # Update DataFrame
                    df_strategy.loc[df_strategy.index[i], 'signals'] = -1
                    df_strategy.loc[df_strategy.index[i], 'trade_type'] = 'short'
                    df_strategy.loc[df_strategy.index[i], 'position'] = -1
                    df_strategy.loc[df_strategy.index[i], 'position_size'] = position_size
                    df_strategy.loc[df_strategy.index[i], 'stop_loss'] = trailing_stop
                    df_strategy.loc[df_strategy.index[i], 'capital'] = capital - fee
                    df_strategy.loc[df_strategy.index[i], 'fees_paid'] = total_fees_paid
                    
                    current_position = -1
                    capital -= fee
                    
                elif current_position == 1:  # Long to short (reversal)
                    # Close long position
                    exit_price = df_strategy['open'].iloc[i]  # Use open price for exit
                    
                    # Calculate trade value and fees for closing long
                    trade_value = position_size * exit_price
                    fee = trade_value * fee_rate
                    total_fees_paid += fee
                    
                    # Update capital after closing long
                    capital = capital + position_size * (exit_price - entry_price) - fee
                    
                    # Calculate new position size based on risk
                    entry_price = exit_price  # New entry is at the same price as exit
                    stop_price = entry_price + atr_multiplier * current_atr
                    risk_per_unit = stop_price - entry_price
                    risk_amount = capital * risk_percentage
                    position_size = risk_amount / risk_per_unit
                    
                    # Calculate fees for new short position
                    trade_value = position_size * entry_price
                    fee = trade_value * fee_rate
                    total_fees_paid += fee
                    
                    # Set stop loss
                    trailing_stop = stop_price
                    
                    # Update DataFrame
                    df_strategy.loc[df_strategy.index[i], 'signals'] = -2
                    df_strategy.loc[df_strategy.index[i], 'trade_type'] = 'short'
                    df_strategy.loc[df_strategy.index[i], 'position'] = -1
                    df_strategy.loc[df_strategy.index[i], 'position_size'] = position_size
                    df_strategy.loc[df_strategy.index[i], 'stop_loss'] = trailing_stop
                    df_strategy.loc[df_strategy.index[i], 'capital'] = capital - fee
                    df_strategy.loc[df_strategy.index[i], 'fees_paid'] = total_fees_paid
                    
                    current_position = -1
                    capital -= fee
        
        # If there are no signals but we have a position, update the position info
        if df_strategy['signals'].iloc[i] == 0 and current_position != 0:
            df_strategy.loc[df_strategy.index[i], 'position'] = current_position
            df_strategy.loc[df_strategy.index[i], 'position_size'] = position_size
            df_strategy.loc[df_strategy.index[i], 'stop_loss'] = trailing_stop
            df_strategy.loc[df_strategy.index[i], 'capital'] = capital
    
    # Close any open positions at the end
    if current_position == 1:
        exit_price = df_strategy['close'].iloc[-1]
        trade_value = position_size * exit_price
        fee = trade_value * fee_rate
        total_fees_paid += fee
        capital = capital + position_size * (exit_price - entry_price) - fee
    elif current_position == -1:
        exit_price = df_strategy['close'].iloc[-1]
        trade_value = position_size * exit_price
        fee = trade_value * fee_rate
        total_fees_paid += fee
        capital = capital + position_size * (entry_price - exit_price) - fee
    
    # Calculate final equity
    final_equity = capital
    
    # Calculate performance metrics
    df_strategy['returns'] = df_strategy['equity'].pct_change()

    return df_strategy

def main():
    # Read the data
    df = pd.read_csv('BTC_2019_2023_2d.csv')
    
    # Convert datetime to standard format if needed
    if 'datetime' in df.columns:
        df['datetime'] = pd.to_datetime(df['datetime'])
    
    # Run the trading strategy with fees (0.1%) and initial capital
    result_df = trading_strategy(df, initial_capital=1000.0, fee_percentage=0.1)
    
    # Save the results
    output_columns = ['datetime', 'open', 'high', 'low', 'close', 'volume', 'signals', 'trade_type']
    result_df[output_columns].to_csv('trading_results_2d_str6.csv', index=False)
    
    return result_df

if __name__ == "__main__":
    main()