# Load and Prepare data

In [1]:
import pandas as pd

# Load your data (replace with your file)
data = pd.read_csv('es_futures_data.csv', parse_dates=['Date'])
data.set_index('Date', inplace=True)

# Ensure data is sorted chronologically
data = data.sort_index()

Failed to get ticker 'ES=F' reason: Expecting value: line 1 column 1 (char 0)
[*********************100%***********************]  1 of 1 completed

1 Failed download:
['ES=F']: YFTzMissingError('$%ticker%: possibly delisted; no timezone found')


First few rows:
Empty DataFrame
Columns: [(Adj Close, ES=F), (Close, ES=F), (High, ES=F), (Low, ES=F), (Open, ES=F), (Volume, ES=F)]
Index: []

Last few rows:
Empty DataFrame
Columns: [(Adj Close, ES=F), (Close, ES=F), (High, ES=F), (Low, ES=F), (Open, ES=F), (Volume, ES=F)]
Index: []


# Step 2: Define In-Sample and Out-of-Sample Periods

In [None]:
# Example: Split at January 1, 2017
split_date = '2017-01-01'

in_sample = data[data.index < split_date]
out_sample = data[data.index >= split_date]


# Step 3: Integrate Split into the TrendStrategy Class

In [None]:
class TrendStrategy:
    def __init__(self, data):
        self.data = data.copy()
        self.optimize_params = None

    def calculate_indicators(self, ema_fast=50, ema_slow=200, rsi_window=14, atr_window=14):
        """Calculate technical indicators with parameterization"""
        self.data['EMA_Fast'] = self.data['Close'].ewm(span=ema_fast, adjust=False).mean()
        self.data['EMA_Slow'] = self.data['Close'].ewm(span=ema_slow, adjust=False).mean()
        
        delta = self.data['Close'].diff()
        gain = delta.where(delta > 0, 0)
        loss = -delta.where(delta < 0, 0)
        avg_gain = gain.rolling(rsi_window).mean()
        avg_loss = loss.rolling(rsi_window).mean()
        rs = avg_gain / avg_loss
        self.data['RSI'] = 100 - (100 / (1 + rs))
        
        high_low = self.data['High'] - self.data['Low']
        high_close = np.abs(self.data['High'] - self.data['Close'].shift())
        low_close = np.abs(self.data['Low'] - self.data['Close'].shift())
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        self.data['ATR'] = tr.rolling(atr_window).mean()
        
        lookback = 20
        self.data['Swing_High'] = self.data['High'].rolling(lookback).max()
        self.data['Swing_Low'] = self.data['Low'].rolling(lookback).min()
        self.data['Fib_Extension'] = self.data['Close'] + (self.data['Swing_High'] - self.data['Swing_Low']) * 0.272
        return self.data

    def generate_signals(self, rsi_threshold=50):
        self.data['EMA_Signal'] = np.where(self.data['EMA_Fast'] > self.data['EMA_Slow'], 1, -1)
        self.data['RSI_Filter'] = np.where(self.data['RSI'] > rsi_threshold, 1, -1)
        long_cond = (self.data['EMA_Signal'] == 1) & (self.data['RSI_Filter'] == 1)
        short_cond = (self.data['EMA_Signal'] == -1) & (self.data['RSI_Filter'] == -1)
        self.data['Signal'] = np.select([long_cond, short_cond], [1, -1], default=0)
        return self.data

    def backtest_strategy(self, atr_multiplier=1.5, risk_per_trade=0.02):
        self.data['Position'] = 0
        self.data['Entry_Price'] = np.nan
        self.data['Exit_Price'] = np.nan
        self.data['Stop_Loss'] = np.nan
        self.data['Profit_Target'] = np.nan
        self.data['PnL'] = 0.0
        
        position = 0
        entry_price = 0
        capital = 1_000_000  # Starting capital
        contract_size = 50  # S&P E-mini multiplier
        
        for i in range(1, len(self.data)):
            atr = self.data['ATR'].iloc[i]
            position_size = (capital * risk_per_trade) / (atr_multiplier * atr * contract_size)
            position_size = int(position_size)
            
            signal = self.data['Signal'].iloc[i]
            
            if signal != 0 and position == 0:
                position = signal * position_size
                entry_price = self.data['Close'].iloc[i]
                sl = entry_price - (signal * atr_multiplier * atr)
                pt = self.data['Fib_Extension'].iloc[i]
                
                self.data.at[self.data.index[i], 'Position'] = position
                self.data.at[self.data.index[i], 'Entry_Price'] = entry_price
                self.data.at[self.data.index[i], 'Stop_Loss'] = sl
                self.data.at[self.data.index[i], 'Profit_Target'] = pt
            
            if position != 0:
                current_low = self.data['Low'].iloc[i]
                current_high = self.data['High'].iloc[i]
                
                if position > 0:  # Long
                    if current_low <= self.data['Stop_Loss'].iloc[i]:
                        exit_price = self.data['Stop_Loss'].iloc[i]
                        pnl = (exit_price - entry_price) * position * contract_size
                        capital += pnl
                        position = 0
                    elif current_high >= self.data['Profit_Target'].iloc[i]:
                        exit_price = self.data['Profit_Target'].iloc[i]
                        pnl = (exit_price - entry_price) * position * contract_size
                        capital += pnl
                        position = 0
                else:  # Short
                    if current_high >= self.data['Stop_Loss'].iloc[i]:
                        exit_price = self.data['Stop_Loss'].iloc[i]
                        pnl = (entry_price - exit_price) * abs(position) * contract_size
                        capital += pnl
                        position = 0
                    elif current_low <= self.data['Profit_Target'].iloc[i]:
                        exit_price = self.data['Profit_Target'].iloc[i]
                        pnl = (entry_price - exit_price) * abs(position) * contract_size
                        capital += pnl
                        position = 0
                
                if position == 0:
                    self.data.at[self.data.index[i], 'Exit_Price'] = exit_price
                    self.data.at[self.data.index[i], 'PnL'] = pnl
        
        trades = self.data[self.data['PnL'] != 0].copy()
        trades['Returns'] = trades['PnL'] / capital
        cumulative_returns = (1 + trades['Returns']).cumprod()
        
        performance = {
            'total_return': trades['PnL'].sum(),
            'sharpe_ratio': self._calculate_sharpe(trades['Returns']),
            'max_drawdown': self._calculate_max_drawdown(cumulative_returns),
            'win_rate': len(trades[trades['PnL'] > 0]) / len(trades) if len(trades) > 0 else 0,
            'profit_factor': trades[trades['PnL'] > 0]['PnL'].sum() / abs(trades[trades['PnL'] < 0]['PnL'].sum()) if len(trades) > 0 else 0
        }
        return performance, self.data

    def _calculate_sharpe(self, returns, risk_free=0.0):
        excess_returns = returns - risk_free
        return excess_returns.mean() / excess_returns.std() if excess_returns.std() != 0 else 0

    def _calculate_max_drawdown(self, cumulative_returns):
        peak = cumulative_returns.expanding(min_periods=1).max()
        dd = (cumulative_returns - peak) / peak
        return dd.min()

    def optimize_parameters(self, ema_range=(30, 100), rsi_range=(10, 30), atr_mult_range=(1, 3)):
        """Parameter optimization using brute force method (on in-sample data)"""
        def objective(params):
            ema_fast, ema_slow, rsi_window, atr_mult = params
            self.calculate_indicators(int(ema_fast), int(ema_slow), int(rsi_window))
            self.generate_signals()
            perf, _ = self.backtest_strategy(atr_mult)
            return -perf['sharpe_ratio']  # Minimize negative Sharpe
        
        optimization = brute(
            objective,
            (ema_range, (150, 250), rsi_range, atr_mult_range),
            finish=None
        )
        
        self.optimize_params = {
            'ema_fast': int(optimization[0]),
            'ema_slow': int(optimization[1]),
            'rsi_window': int(optimization[2]),
            'atr_multiplier': optimization[3]
        }
        return self.optimize_params


# Step 4: Run In-Sample, Optimize, and Out-of-Sample Test

In [None]:
# Initialize in-sample strategy
in_sample_strategy = TrendStrategy(in_sample)

# Optimize parameters on in-sample data
opt_params = in_sample_strategy.optimize_parameters()
print(f"Optimized Parameters: {opt_params}")

# Run backtest on in-sample with optimized parameters
in_sample_strategy.calculate_indicators(**opt_params)
in_sample_strategy.generate_signals()
in_sample_perf, in_sample_results = in_sample_strategy.backtest_strategy(opt_params['atr_multiplier'])
print("\nIn-Sample Performance:")
for k, v in in_sample_perf.items():
    print(f"{k:15}: {v:.4f}")

# Initialize out-of-sample strategy with optimized parameters
out_sample_strategy = TrendStrategy(out_sample)
out_sample_strategy.calculate_indicators(**opt_params)
out_sample_strategy.generate_signals()
out_sample_perf, out_sample_results = out_sample_strategy.backtest_strategy(opt_params['atr_multiplier'])
print("\nOut-of-Sample Performance:")
for k, v in out_sample_perf.items():
    print(f"{k:15}: {v:.4f}")


# Step 5: Plot Results

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(14, 7))
plt.title('In-Sample vs Out-of-Sample Equity Curves')
plt.plot(in_sample_results.index, (1 + in_sample_results['PnL'].cumsum() / 1_000_000), label='In-Sample')
plt.plot(out_sample_results.index, (1 + out_sample_results['PnL'].cumsum() / 1_000_000), label='Out-of-Sample')
plt.legend()
plt.show()
