### Data Loading and Preprocessing

This section loads the aggregated open interest data from a pickle file. It then processes the data by:
1.  Concatenating all snapshots into a single DataFrame.
2.  Converting relevant columns to datetime objects.
3.  Filtering for delivery blocks of at least one hour.
4.  Exploding the DataFrame to have one row per delivery hour for each contract.
5.  Setting a multi-level index for efficient lookups.

In [3]:
import pandas as pd
from datetime import timedelta

snapshots: dict = pd.read_pickle('../pickles/1min/all_agg_open_interest.pkl')

min_lag = pd.Timedelta(minutes=30)
max_lag = pd.Timedelta(hours=2)

joint = (
    pd.concat(snapshots.values(), axis=0)
        .assign(
            **{
                col: lambda df, c=col: pd.to_datetime(df[c], errors='coerce') 
                for col in ['DeliveryStart', 'DeliveryEnd', 'Trading_time']
            }
        )
        .pipe(lambda df: df[(df['DeliveryEnd'] - df['DeliveryStart']) >= timedelta(hours=1)])
        .assign(
            Delivery_Hour = lambda df: [
                pd.date_range(start=s, end=e, freq='h', inclusive='left')
                for s, e in zip(df['DeliveryStart'].dt.floor('h'), df['DeliveryEnd'].dt.floor('h'))
            ]
        )
        .explode('Delivery_Hour')
        .drop(columns=['DeliveryStart', 'DeliveryEnd', 'BlockVolume'])
        .set_index(['Trading_time', 'Delivery_Hour', 'Price'])
)

print("Data processing complete. The final DataFrame 'joint' is ready.")

  pd.concat(snapshots.values(), axis=0)


Data processing complete. The final DataFrame 'joint' is ready.


### Inspect Processed Data

Display the first 100 rows of the processed `joint` DataFrame to verify the data transformation steps.

In [77]:
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000):
    print(joint.head(100))

                                                     Side   Price  Volume
Trading_time              Delivery_Hour                                  
2021-06-25 13:00:00+00:00 2021-06-25 22:00:00+00:00   BUY -222.00     1.0
                          2021-06-25 22:00:00+00:00  SELL  500.00     4.0
                          2021-06-25 22:00:00+00:00  SELL  499.99     4.0
                          2021-06-25 23:00:00+00:00   BUY -222.00     1.0
                          2021-06-25 23:00:00+00:00  SELL  499.99     4.0
                          2021-06-25 23:00:00+00:00  SELL  500.00     4.0
                          2021-06-26 00:00:00+00:00  SELL  499.99     4.0
                          2021-06-26 00:00:00+00:00  SELL  500.00     4.0
                          2021-06-26 00:00:00+00:00   BUY -222.00     1.0
                          2021-06-26 01:00:00+00:00  SELL  499.99     2.0
                          2021-06-26 01:00:00+00:00   BUY -222.00     1.0
                          2021-06-26 0

### Strategy and Backtesting Functions

This cell defines the core logic for the trading strategy and backtesting:
-   `dual_thrust`: Implements the Dual Thrust trading strategy. It calculates upper and lower price bands and generates buy/sell signals based on price movements relative to these bands within a specified trading window.
-   `backtest_delivery_hour`: Simulates the execution of trades based on the generated signals for a single delivery hour and calculates the resulting profit or loss (PnL).

In [4]:
def dual_thrust(data: pd.DataFrame, n: int, k1: float, k2: float, delivery_hour: pd.Timestamp, trading_window_open: timedelta, trading_window_close: timedelta):
    """
    Calculates dual thrust trading signals.

    :param data: A pandas DataFrame with 'high', 'mid', 'low' columns.
    :param n: The lookback period for range calculation in minutes.
    :param k1: The coefficient for the upper band.
    :param k2: The coefficient for the lower band.
    :param trading_window_open: The opening time of the trading window relative to the delivery hour.
    :param trading_window_close: The closing time of the trading window relative to the delivery hour.
    :return: A pandas Series with trading signals (-1 for sell, 1 for buy, 0 for hold), upper band, and lower band.
    """
    
    # Calculate rolling high, low, close, Shift to use previous n period's data for current signal
    window = f'{n}min'
    rolling_high = data['best_ask'].rolling(window=window).max().shift(1)
    rolling_low = data['best_bid'].rolling(window=window).min().shift(1)
    close = data['mid'].shift(1)

    # Calculate range
    highest_high = rolling_high
    lowest_low = rolling_low
    range_val = pd.concat([(highest_high - close).abs(), (close - lowest_low).abs()], axis=1).max(axis=1)
    
    # Calculate bands
    open_price = data['mid'].shift(1) # Using previous close as open
    upper_band = open_price + k1 * range_val
    lower_band = open_price - k2 * range_val
    
    # Generate signals
    signals = pd.Series(None, index=data.index)
    signals[data['best_bid'] > upper_band] = 1
    signals[data['best_ask'] < lower_band] = -1

    # Filter signals based on trading window
    trading_start = delivery_hour - trading_window_open
    trading_end = delivery_hour - trading_window_close
    signals = signals[(signals.index >= trading_start) & (signals.index <= trading_end)]
    upper_band = upper_band[(upper_band.index >= trading_start) & (upper_band.index <= trading_end)]
    lower_band = lower_band[(lower_band.index >= trading_start) & (lower_band.index <= trading_end)]

    return signals, upper_band, lower_band

def backtest_delivery_hour(prices, signals, logging=False, return_trade_data=False):
    """
    Backtests a trading strategy for a single delivery hour.

    :param prices: A pandas DataFrame with 'best_bid' and 'best_ask' columns, indexed by Trading_time.
    :param signals: A pandas Series of trading signals.
    :param logging: Whether to log detailed trade information.
    :param return_trade_data: Whether to return detailed trade execution data.
    :return: Total profit and loss, and optionally trade execution data.
    """
    if signals.dropna().empty:
        return (0.0, pd.Series(dtype='object')) if return_trade_data else 0.0

    position = signals.ffill().fillna(0)  # Forward-fill signals to maintain the position
    if not position.empty:
        position.iloc[-1] = 0  # Close the position at the end of the period

    total_pnl = 0
    prev_position = 0
    entry_price = 0
    trade_data = []  # To store trade execution details

    for time, current_position in position.items():
        if current_position != prev_position:
            trade_pnl = 0
            # Position is closed or flipped
            if prev_position == 1:  # Close long
                trade_price = prices.loc[time, 'best_bid']
                trade_pnl = trade_price - entry_price
                total_pnl += trade_pnl
                trade_data.append({'time': time, 'price': trade_price, 'pnl': total_pnl, 'position': 0}) # Close position
            elif prev_position == -1:  # Close short
                trade_price = prices.loc[time, 'best_ask']
                trade_pnl = entry_price - trade_price
                total_pnl += trade_pnl
                trade_data.append({'time': time, 'price': trade_price, 'pnl': total_pnl, 'position': 0}) # Close position

            # New position is opened
            if current_position == 1:  # Open long
                entry_price = prices.loc[time, 'best_ask']
                trade_data.append({'time': time, 'price': entry_price, 'pnl': total_pnl, 'position': 1})
            elif current_position == -1:  # Open short
                entry_price = prices.loc[time, 'best_bid']
                trade_data.append({'time': time, 'price': entry_price, 'pnl': total_pnl, 'position': -1})

            if logging:
                print(time, "| prev", prev_position, "| curr", current_position, "| pnl", total_pnl)

            prev_position = current_position

    if not trade_data:
        return (total_pnl, pd.Series(dtype='object')) if return_trade_data else total_pnl

    trade_df = pd.DataFrame(trade_data).set_index('time')
    trade_series = trade_df.apply(lambda row: {'price': row['price'], 'pnl': row['pnl'], 'position': row['position']}, axis=1)

    return (total_pnl, trade_series) if return_trade_data else total_pnl

### Prepare Data for Backtesting

This cell computes the best bid, best ask, and mid-price for each trading time and delivery hour. The resulting `test_data` DataFrame is structured for use in the backtesting functions.

In [5]:
bids = joint[joint['Side'] == 'BUY'].reset_index().groupby(['Trading_time', 'Delivery_Hour'])['Price'].max()
asks = joint[joint['Side'] == 'SELL'].reset_index().groupby(['Trading_time', 'Delivery_Hour'])['Price'].min()
mids = (bids + asks) / 2
buy_volumes = joint[joint['Side'] == 'BUY'].reset_index().groupby(['Trading_time', 'Delivery_Hour'])['Volume'].sum()
sell_volumes = joint[joint['Side'] == 'SELL'].reset_index().groupby(['Trading_time', 'Delivery_Hour'])['Volume'].sum()

test_data = pd.DataFrame({'best_bid': bids, 'best_ask': asks, 'mid': mids, 'buy_volumes': buy_volumes, 'sell_volumes': sell_volumes}).dropna()

### Test Backtesting Function

This cell runs a test of the `backtest_delivery_hour` function using the `dual_thrust` strategy for a single, specific delivery hour. The `logging=True` parameter provides detailed output of the simulated trades.

In [7]:
# test backtesting function with dual thrust
test_delivery_hour = pd.Timestamp('2021-06-26 08:00:00+00:00')
price_data = test_data.xs(test_delivery_hour, level='Delivery_Hour')
backtest_delivery_hour(price_data, dual_thrust(price_data, 10, 0.5, 0.5, test_delivery_hour, timedelta(hours=1), timedelta(minutes=15))[0], logging=True)

2021-06-26 07:13:00+00:00 | prev 0 | curr -1.0 | pnl 0
2021-06-26 07:39:00+00:00 | prev -1.0 | curr 1.0 | pnl 30.509999999999998
2021-06-26 07:40:00+00:00 | prev 1.0 | curr -1.0 | pnl 61.019999999999996
2021-06-26 07:45:00+00:00 | prev -1.0 | curr 0.0 | pnl 115.52


np.float64(115.52)

### Grid Search: Per-Hour Optimization

This section performs a grid search to find the optimal `dual_thrust` parameters (`n`, `k1`, `k2`, etc.) for each delivery hour individually. The goal is to maximize the PnL for each hour, and the results are saved to a pickle file.

In [None]:
from tqdm.notebook import tqdm
import numpy as np

results = {}
for delivery_hour in tqdm(test_data.index.get_level_values('Delivery_Hour').unique()):
    prices = test_data.xs(delivery_hour, level='Delivery_Hour')
    best_pnl = -float('inf')
    best_params = None
    
    # Grid search for parameters
    for n in range(5, 25, 5):
        for k1 in np.arange(0.1, 1.5, 0.1):
            for k2 in np.arange(0.1, 1.5, 0.1):
                for trading_window_open in [timedelta(hours=1), timedelta(hours=2), timedelta(hours=3)]:
                    for trading_window_close in [timedelta(minutes=15), timedelta(minutes=30)]:
                        signals, _, _ = dual_thrust(
                            prices, 
                            n=n, 
                            k1=k1, 
                            k2=k2, 
                            delivery_hour=delivery_hour,
                            trading_window_open=trading_window_open, 
                            trading_window_close=trading_window_close
                        )
                        pnl = backtest_delivery_hour(prices[['best_bid', 'best_ask']], signals)
                        
                        if pnl > best_pnl:
                            best_pnl = pnl
                            best_params = (n, round(k1, 3), round(k2, 3), trading_window_open, trading_window_close)
    
    results[delivery_hour] = {'best_pnl': best_pnl, 'best_params': best_params}

# Display results
results_df = pd.DataFrame.from_dict(results, orient='index')
results_df.to_pickle('../pickles/grid_search_results.pkl')
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000):
    print(results_df)

  0%|          | 0/24 [00:00<?, ?it/s]

                           best_pnl                       best_params
2021-06-25 22:00:00+00:00      0.00   (5, 1.0, 0.1, 1:00:00, 0:15:00)
2021-06-25 23:00:00+00:00      0.00   (5, 0.8, 0.1, 1:00:00, 0:15:00)
2021-06-26 00:00:00+00:00      2.11   (5, 0.1, 0.3, 3:00:00, 0:15:00)
2021-06-26 01:00:00+00:00      2.64   (5, 0.6, 0.8, 3:00:00, 0:15:00)
2021-06-26 02:00:00+00:00      8.13   (5, 0.7, 0.1, 2:00:00, 0:15:00)
2021-06-26 03:00:00+00:00      0.00   (5, 1.0, 1.0, 1:00:00, 0:15:00)
2021-06-26 04:00:00+00:00      0.00   (5, 0.3, 0.1, 1:00:00, 0:15:00)
2021-06-26 05:00:00+00:00      0.00   (5, 0.8, 0.1, 1:00:00, 0:15:00)
2021-06-26 06:00:00+00:00      5.24   (5, 0.7, 0.1, 3:00:00, 0:15:00)
2021-06-26 07:00:00+00:00    259.30   (5, 0.5, 1.0, 2:00:00, 0:15:00)
2021-06-26 08:00:00+00:00    346.63  (20, 0.1, 0.8, 2:00:00, 0:15:00)
2021-06-26 09:00:00+00:00     19.56  (10, 1.0, 0.1, 3:00:00, 0:15:00)
2021-06-26 10:00:00+00:00      0.00   (5, 0.4, 0.1, 1:00:00, 0:15:00)
2021-06-26 11:00:00+

### Grid Search: Per-Hour Optimization with Symmetric Bands

This is a variation of the previous grid search, with the constraint that `k1` must equal `k2`. This tests a symmetric version of the Dual Thrust strategy.

In [None]:
from tqdm.notebook import tqdm
import numpy as np

results = {}
for delivery_hour in tqdm(test_data.index.get_level_values('Delivery_Hour').unique()):
    prices = test_data.xs(delivery_hour, level='Delivery_Hour')
    best_pnl = -float('inf')
    best_params = None
    
    # Grid search for parameters
    for n in range(5, 25, 5):
        for k1 in np.arange(0.1, 1.5, 0.1):
            k2 = k1

            for trading_window_open in [timedelta(hours=1), timedelta(hours=2), timedelta(hours=3)]:
                for trading_window_close in [timedelta(minutes=15), timedelta(minutes=30)]:
                    signals, _, _ = dual_thrust(
                        prices, 
                        n=n, 
                        k1=k1, 
                        k2=k2, 
                        delivery_hour=delivery_hour,
                        trading_window_open=trading_window_open, 
                        trading_window_close=trading_window_close
                    )
                    pnl = backtest_delivery_hour(prices[['best_bid', 'best_ask']], signals)
                    
                    if pnl > best_pnl:
                        best_pnl = pnl
                        best_params = (n, round(k1, 3), round(k2, 3), trading_window_open, trading_window_close)
    
    results[delivery_hour] = {'best_pnl': best_pnl, 'best_params': best_params}

# Display results
results_df = pd.DataFrame.from_dict(results, orient='index')
results_df.to_pickle('../pickles/grid_search_results_k1-eq-k2.pkl')
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000):
    print(results_df)

  0%|          | 0/24 [00:00<?, ?it/s]

                           best_pnl                       best_params
2021-06-25 22:00:00+00:00      0.00   (5, 1.0, 1.0, 1:00:00, 0:15:00)
2021-06-25 23:00:00+00:00      0.00   (5, 0.8, 0.8, 1:00:00, 0:15:00)
2021-06-26 00:00:00+00:00      0.81   (5, 0.5, 0.5, 3:00:00, 0:15:00)
2021-06-26 01:00:00+00:00      2.64   (5, 0.8, 0.8, 3:00:00, 0:15:00)
2021-06-26 02:00:00+00:00      2.50   (5, 0.1, 0.1, 1:00:00, 0:15:00)
2021-06-26 03:00:00+00:00      0.00   (5, 1.0, 1.0, 1:00:00, 0:15:00)
2021-06-26 04:00:00+00:00      0.00   (5, 0.3, 0.3, 1:00:00, 0:15:00)
2021-06-26 05:00:00+00:00      0.00   (5, 0.8, 0.8, 1:00:00, 0:15:00)
2021-06-26 06:00:00+00:00      0.00   (5, 0.4, 0.4, 1:00:00, 0:15:00)
2021-06-26 07:00:00+00:00     37.00   (5, 0.6, 0.6, 2:00:00, 0:15:00)
2021-06-26 08:00:00+00:00    129.58  (10, 0.9, 0.9, 3:00:00, 0:15:00)
2021-06-26 09:00:00+00:00      0.00  (10, 1.2, 1.2, 1:00:00, 0:15:00)
2021-06-26 10:00:00+00:00      0.00   (5, 0.4, 0.4, 1:00:00, 0:15:00)
2021-06-26 11:00:00+

### Grid Search: Global Optimization Across All Hours

This grid search aims to find a single set of optimal parameters that maximizes the total PnL across all available delivery hours. This approach seeks a more generalized strategy instead of one tuned to each specific hour. To improve performance, data for each hour is pre-sliced and stored in a dictionary.

In [None]:
from tqdm.notebook import tqdm
import numpy as np

best_pnl = -float('inf')
best_params = None
best_hourly_pnl = None
all_delivery_hours = test_data.index.get_level_values('Delivery_Hour').unique()

# Pre-slice data for each delivery hour to improve efficiency
prices_by_hour = {
    hour: test_data.xs(hour, level='Delivery_Hour
    for hour in all_delivery_hours
}

# Grid search for parameters
for n in tqdm(range(5, 25, 5), desc="n"):
    for k1 in np.arange(0.1, 1.5, 0.1):
        for k2 in np.arange(0.1, 1.5, 0.1):
            for trading_window_open in [timedelta(hours=1), timedelta(hours=2), timedelta(hours=3)]:
                for trading_window_close in [timedelta(minutes=15), timedelta(minutes=30)]:
                    total_pnl = 0
                    hourly_pnl = {}
                    current_params = (n, round(k1, 3), round(k2, 3), trading_window_open, trading_window_close)

                    for delivery_hour in all_delivery_hours:
                        prices = prices_by_hour[delivery_hour]
                        signals, _, _ = dual_thrust(
                            prices, 
                            n=n, 
                            k1=k1, 
                            k2=k2, 
                            delivery_hour=delivery_hour,
                            trading_window_open=trading_window_open, 
                            trading_window_close=trading_window_close
                        )
                        pnl = backtest_delivery_hour(prices[['best_bid', 'best_ask']], signals)
                        total_pnl += pnl
                        hourly_pnl[delivery_hour] = pnl
                    
                    if total_pnl > best_pnl:
                        best_pnl = total_pnl
                        best_params = current_params
                        best_hourly_pnl = hourly_pnl

results = {
    'best_pnl_total': best_pnl,
    'best_params': best_params,
    'hourly_pnl': best_hourly_pnl
}

# Display results
pd.to_pickle(results, '../pickles/grid_search_results_across_all.pkl')

summary_df = pd.DataFrame({
    'value': {
        'best_pnl_total': results['best_pnl_total'],
        'best_params': results['best_params']
    }
})

hourly_pnl_df = pd.DataFrame.from_dict(results['hourly_pnl'], orient='index', columns=['pnl'])

with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', 1000):
    print("--- Summary ---")
    print(summary_df)
    print("\n--- Hourly PnL for Best Parameters ---")
    print(hourly_pnl_df)

n:   0%|          | 0/4 [00:00<?, ?it/s]

--- Summary ---
                                           value
best_pnl_total                            363.04
best_params     (15, 0.8, 1.0, 1:00:00, 0:15:00)

--- Hourly PnL for Best Parameters ---
                              pnl
2021-06-25 22:00:00+00:00  -14.98
2021-06-25 23:00:00+00:00    0.00
2021-06-26 00:00:00+00:00   -5.00
2021-06-26 01:00:00+00:00    0.00
2021-06-26 02:00:00+00:00    1.25
2021-06-26 03:00:00+00:00  -33.38
2021-06-26 04:00:00+00:00    0.00
2021-06-26 05:00:00+00:00    0.00
2021-06-26 06:00:00+00:00    0.00
2021-06-26 07:00:00+00:00  200.15
2021-06-26 08:00:00+00:00  208.00
2021-06-26 09:00:00+00:00    0.00
2021-06-26 10:00:00+00:00    0.00
2021-06-26 11:00:00+00:00    0.00
2021-06-26 12:00:00+00:00    0.00
2021-06-26 13:00:00+00:00    7.00
2021-06-26 14:00:00+00:00    0.00
2021-06-26 15:00:00+00:00    0.00
2021-06-26 16:00:00+00:00    0.00
2021-06-26 17:00:00+00:00    0.00
2021-06-26 18:00:00+00:00    0.00
2021-06-26 19:00:00+00:00    0.00
2021-06-26 20:0

### Visualize Trading Strategy

This cell generates a plot to visualize the performance of the Dual Thrust strategy for a specific delivery hour. It shows:
-   Best bid, best ask, and mid-prices.
-   The calculated upper and lower bands.
-   Vertical lines indicating when buy (green) and sell (red) signals were generated.

In [16]:
import plotly.graph_objects as go
import plotly.io as pio
from plotly.subplots import make_subplots

test_delivery_hour = pd.Timestamp('2021-06-26 08:00:00+00:00')

price_data = test_data.xs(test_delivery_hour, level='Delivery_Hour')
signals, upper_band, lower_band = dual_thrust(
    price_data,
    n=15,
    k1=0.8,
    k2=1.0,
    delivery_hour=test_delivery_hour,
    trading_window_open=timedelta(hours=1),
    trading_window_close=timedelta(minutes=15)
)

graph_start = lower_band.index.min() - timedelta(minutes=15)
graph_end = lower_band.index.max() + timedelta(minutes=15)

# Use backtest_delivery_hour to get trade execution data
pnl, trade_series = backtest_delivery_hour(price_data[['best_bid', 'best_ask']], signals, return_trade_data=True)

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.03, row_heights=[0.7, 0.3])

# Plot price lines
for col, name, color in [('best_bid', 'Best Bid', 'red'), ('best_ask', 'Best Ask', 'green'), ('mid', 'Mid Price', 'blue')]:
    fig.add_trace(go.Scatter(
        x=price_data.index,
        y=price_data[col],
        mode='lines',
        name=name,
        line=dict(color=color, width=1)
    ), row=1, col=1)

# Add Volume bars
fig.add_trace(go.Bar(
    x=price_data.index,
    y=price_data['buy_volumes'],
    name='Buy Volume',
    marker_color='green'
), row=2, col=1)

fig.add_trace(go.Bar(
    x=price_data.index,
    y=price_data['sell_volumes'],
    name='Sell Volume',
    marker_color='red'
), row=2, col=1)

# Plot bands
for col, name, color in [('upper_band', 'Upper Band', 'orange'), ('lower_band', 'Lower Band', 'purple')]:
    fig.add_trace(go.Scatter(
        x=upper_band.index if col == 'upper_band' else lower_band.index,
        y=upper_band if col == 'upper_band' else lower_band,
        mode='lines',
        name=name,
        line=dict(color=color, width=3)
    ), row=1, col=1)

# Plot trade execution points with hover text
fig.add_trace(go.Scatter(
    x=trade_series.index,
    y=trade_series.apply(lambda x: x['price']),
    mode='markers',
    marker=dict(size=10, color="black"),
    text=trade_series.apply(lambda x: f"{'Buy' if x['position'] == 1 else 'Sell'} at {x['price']}<br>Position: {x['position']}<br>PnL: {x['pnl']:.2f}"),
    hoverinfo='text',
    name='Trade Execution'
), row=1, col=1)

fig.update_layout(
    title=f'Price Lines Over Time with Dual Thrust Signals (Delivery Hour: {test_delivery_hour})',
    xaxis_title=None,
    yaxis_title='Price',
    barmode='stack',
    xaxis=dict(range=[graph_start, graph_end]),
    xaxis2=dict(range=[graph_start, graph_end], title='Trading Time'),
    yaxis=dict(range=[lower_band.min() - 5, upper_band.max() + 5]),
    yaxis2_title='Volume',
    height=800
)



pio.show(fig)