### In this notebook we just optimize in a live trading efficient way the strategy/idea of pairs trading we constructed in notebook 2. We also add a non stationarity flag in order to prevent from trading when conditions on the market are not appropriate for this statistical arbitrage strategy.

In [None]:
import os
from datetime import datetime, timedelta, timezone

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from statsmodels.tsa.stattools import adfuller

import MetaTrader5 as mt5

# initialize 
authorized = mt5.initialize()
print("initialize:", authorized, mt5.last_error())


# 3) quick check
info = mt5.account_info()
#info 


initialize: True (1, 'Success')


In [2]:
SYM1 = "USTEC"#"HD.NYSE" #"NVDA.NAS"#"US500"#"GS.NYSE" # #"NVDA.NAS" #"MAR.NAS"#"AAPL.NAS"#"XBRUSD"#
SYM2 = "US500"#"AAPL.NAS" #"AMD.NAS"#"USTEC"#"MS.NYSE" #"AMD.NAS" #"HLT.NYSE"#"HD.NYSE"#"XTIUSD"##
TF = mt5.TIMEFRAME_H1 # try "H4" or "D1" later
START = datetime(2025, 1, 1) # UTC
END = datetime.now() # UTC


# get data
s1_rates = mt5.copy_rates_range(SYM1,TF,START, END)
s1_rates = pd.DataFrame(s1_rates)
s1_rates["time"] = pd.to_datetime(s1_rates["time"], unit = "s")
print(s1_rates)
s2_rates = mt5.copy_rates_range(SYM2,TF,START, END)
s2_rates = pd.DataFrame(s2_rates)
s2_rates["time"] = pd.to_datetime(s2_rates["time"], unit = "s")



                    time      open      high       low     close  tick_volume  \
0    2024-12-31 23:00:00  20959.67  20989.72  20938.22  20974.10         7488   
1    2025-01-02 01:00:00  21009.44  21067.19  20991.19  21013.57         5127   
2    2025-01-02 02:00:00  21013.57  21014.19  20869.19  20977.94         8891   
3    2025-01-02 03:00:00  20978.07  21058.82  20978.07  21041.82         9340   
4    2025-01-02 04:00:00  21040.82  21079.32  21034.82  21069.07         6326   
...                  ...       ...       ...       ...       ...          ...   
4630 2025-10-14 17:00:00  24395.80  24565.60  24380.20  24511.20        29889   
4631 2025-10-14 18:00:00  24513.70  24660.10  24505.80  24627.30        28938   
4632 2025-10-14 19:00:00  24627.30  24769.50  24584.50  24714.50        27549   
4633 2025-10-14 20:00:00  24714.10  24760.00  24680.80  24722.00        26950   
4634 2025-10-14 21:00:00  24722.10  24740.30  24704.50  24727.20         8783   

      spread  real_volume  

In [3]:
df = s1_rates[["time", "open"]].merge(s2_rates[["time", "open"]], on = "time")

df = df.rename(columns={"open_x": SYM1, "open_y": SYM2})
print(df)

                    time     USTEC   US500
0    2024-12-31 23:00:00  20959.67  5881.5
1    2025-01-02 01:00:00  21009.44  5893.6
2    2025-01-02 02:00:00  21013.57  5887.3
3    2025-01-02 03:00:00  20978.07  5882.9
4    2025-01-02 04:00:00  21040.82  5894.6
...                  ...       ...     ...
4629 2025-10-14 17:00:00  24395.80  6589.3
4630 2025-10-14 18:00:00  24513.70  6620.5
4631 2025-10-14 19:00:00  24627.30  6647.5
4632 2025-10-14 20:00:00  24714.10  6667.3
4633 2025-10-14 21:00:00  24722.10  6675.1

[4634 rows x 3 columns]


In [4]:
# plot the df
import plotly.express as px
fig = px.line(df, x ="time", y = [SYM1, SYM2], title = f"Historical Prices - {SYM1} vs {SYM2} (based on mt5 {TF} timeframe infos)")

fig.show()

In [5]:
# investigate correlation using pandas method (same)
df[[SYM1, SYM2]].corr()

Unnamed: 0,USTEC,US500
USTEC,1.0,0.994466
US500,0.994466,1.0


###  compute rolling OLS (α, β) efficiently (vectorized, no per-bar .fit()), suitable for real-time, and then plots the current regression line over the scatter to see the fit.

In [6]:
# --- params ---
beta_window = 180   # try 120–250 for H1/D1

# pull series
x = df[SYM2].astype(float)
y = df[SYM1].astype(float)

# rolling stats (vectorized; past-only window => real-time friendly)
mx = x.rolling(beta_window, min_periods=beta_window).mean()
my = y.rolling(beta_window, min_periods=beta_window).mean()
varx = (x**2).rolling(beta_window, min_periods=beta_window).mean() - mx**2
covxy = (x*y).rolling(beta_window, min_periods=beta_window).mean() - mx*my

beta_roll  = covxy / varx.replace(0, np.nan)
alpha_roll = my - beta_roll * mx

# attach
df["alpha"]    = alpha_roll
df["beta"]     = beta_roll
df["fitted"]   = alpha_roll + beta_roll * x
df["residual"] = y - df["fitted"]
df['raw_difference'] = df[SYM1] - df[SYM2]

df.tail()

Unnamed: 0,time,USTEC,US500,alpha,beta,fitted,residual,raw_difference
4629,2025-10-14 17:00:00,24395.8,6589.3,-559.869581,3.793704,24437.983877,-42.183877,17806.5
4630,2025-10-14 18:00:00,24513.7,6620.5,-622.540853,3.803037,24555.46486,-41.76486,17893.2
4631,2025-10-14 19:00:00,24627.3,6647.5,-655.401272,3.807935,24657.843881,-30.543881,17979.8
4632,2025-10-14 20:00:00,24714.1,6667.3,-670.929523,3.81026,24733.219742,-19.119742,18046.8
4633,2025-10-14 21:00:00,24722.1,6675.1,-695.440238,3.813906,24762.761156,-40.661156,18047.0


In [7]:
# pick the latest valid α,β to draw the regression line
last_idx = df["beta"].last_valid_index()
if last_idx is not None:
    alpha = float(df.loc[last_idx, "alpha"])
    beta  = float(df.loc[last_idx, "beta"])
else:
    alpha = np.nan
    beta  = np.nan

# scatter + OLS line (using latest α,β)
x_min, x_max = float(x.min()), float(x.max())
line_x = np.array([x_min, x_max], dtype=float)
line_y = alpha + beta * line_x



In [8]:
fig = go.Figure()
fig.add_trace(go.Scattergl(
    x=x, y=y,
    mode="markers", name="data",
    marker=dict(size=6, opacity=0.35)
))
if np.isfinite(alpha) and np.isfinite(beta):
    fig.add_trace(go.Scatter(
        x=line_x, y=line_y,
        mode="lines", name=f"Rolling OLS (last window): y = {alpha:.2f} + {beta:.3f}·x"
    ))

fig.update_layout(
    title=(f"Rolling OLS hedge: {SYM1} ≈ α + β·{SYM2}  "
           f"(window={beta_window}, past-only)"),
    xaxis_title=SYM2, yaxis_title=SYM1,
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
)
fig.show()

In [9]:
px.line(df, x ="time", y = "residual", title = f"Residual we trade (hedged spread) - {SYM1} vs {SYM2} (based on mt5 {TF} timeframe infos)")


In [10]:
# plo the residual(spread)
px.line(df, x ="time", y = ["residual", "raw_difference"], title = f"Residual we trade (hedged spread) vs raw difference - {SYM1} vs {SYM2} (based on mt5 {TF} timeframe infos)")


### Checkpoint! Up to this point we have computed the alpha and beta of the OLS over a rolling window. We constrcuted our col "residual" which represnts the spread(difference) between the two symbols fitted with OLS. 
Next we need to check abt stationarity of the residual(spread) using the adf test.
 

In [11]:
# 2) bands + z on the spread (robust std with guards)
band_window = 180
k = 2.0

roll = df["residual"].rolling(band_window, min_periods=band_window)
sma = roll.mean()
std = roll.std(ddof=1).replace(0, np.nan)

df["sma"] = sma
df["standard_deviation"] = std
df["lower_band"] = sma - k * std
df["upper_band"] = sma + k * std
df["zscore"] = (df["residual"] - sma) / std

df.tail(100)

Unnamed: 0,time,USTEC,US500,alpha,beta,fitted,residual,raw_difference,sma,standard_deviation,lower_band,upper_band,zscore
4534,2025-10-08 14:00:00,24899.0,6727.9,-6553.910490,4.676299,24907.763197,-8.763197,18171.1,2.707496,32.978475,-63.249454,68.664446,-0.347824
4535,2025-10-08 15:00:00,24889.2,6727.1,-6518.143516,4.670956,24903.846201,-14.646201,18162.1,2.561159,32.996810,-63.432461,68.554779,-0.521486
4536,2025-10-08 16:00:00,24864.1,6724.4,-6447.744879,4.660450,24890.981836,-26.881836,18139.7,2.505233,33.038339,-63.571446,68.581912,-0.889484
4537,2025-10-08 17:00:00,24974.7,6727.0,-6458.813647,4.662180,24903.668739,71.031261,18247.7,2.976318,33.407538,-63.838757,69.791394,2.037113
4538,2025-10-08 18:00:00,24994.8,6742.0,-6450.582862,4.660985,24973.776278,21.023722,18252.8,3.158475,33.416102,-63.673730,69.990680,0.534630
...,...,...,...,...,...,...,...,...,...,...,...,...,...
4629,2025-10-14 17:00:00,24395.8,6589.3,-559.869581,3.793704,24437.983877,-42.183877,17806.5,47.876440,57.872874,-67.869307,163.622187,-1.556175
4630,2025-10-14 18:00:00,24513.7,6620.5,-622.540853,3.803037,24555.464860,-41.764860,17893.2,47.416663,58.255333,-69.094003,163.927328,-1.530873
4631,2025-10-14 19:00:00,24627.3,6647.5,-655.401272,3.807935,24657.843881,-30.543881,17979.8,47.011256,58.543437,-70.075618,164.098130,-1.324745
4632,2025-10-14 20:00:00,24714.1,6667.3,-670.929523,3.810260,24733.219742,-19.119742,18046.8,46.680522,58.748725,-70.816927,164.177971,-1.120029


Will will add a **warmup mask**. Why do we need it? Rolling stats (mean/std/bands/z) are unreliable at the start because they have too few observations. Even if you use min_periods=band_window, the very first bar that meets the minimum is still noisy and often causes jumpy z-scores and false “entries”.

The mask:

avoids half-baked values and edge effects at the beginning of the series,

keeps your plots clean (no misleading bands at the left edge),

prevents the backtest from taking trades too early based on unstable stats.

In [12]:
# warmup mask (avoid half-baked values)
warmup = band_window
df.loc[:warmup, ["sma","standard_deviation","lower_band","upper_band","zscore"]] = np.nan

### First adf sanity test on residual

In [13]:
from statsmodels.tsa.stattools import adfuller

#  one-off ADF sanity on the residual (no constant)
sp_full = df["residual"].dropna().values
if len(sp_full) > 50:
    adf_stat, pval, lags, nobs, crit, _ = adfuller(sp_full, regression="n", autolag="AIC")
    print(f"[ADF full residual] stat={adf_stat:.3f}, p={pval:.4f}, lags={lags}, nobs={nobs}")
    print(f"Critical values: {crit}")

[ADF full residual] stat=-7.130, p=0.0000, lags=7, nobs=4447
Critical values: {'1%': np.float64(-2.5662429493154995), '5%': np.float64(-1.941060570072196), '10%': np.float64(-1.6167604112922196)}


### Rolling ADF test

In [14]:

#  rolling ADF to gate trading (regime check)
adf_win = 300  # try 400–600 on H1/D1
pvals = np.full(len(df), np.nan, dtype=float)
sp = df["residual"].values

for i in range(len(df)):
    if i >= adf_win - 1:
        window = sp[i-adf_win+1:i+1]
        # drop NaNs from early warmup
        window = window[np.isfinite(window)]
        if len(window) >= max(30, int(0.6*adf_win)):
            try:
                pvals[i] = adfuller(window, regression="n", autolag="AIC")[1]
            except Exception:
                pvals[i] = np.nan

df["adf_pval"] = pvals
df["tradable"] = df["adf_pval"] < 0.10 
df.tail(100)

Unnamed: 0,time,USTEC,US500,alpha,beta,fitted,residual,raw_difference,sma,standard_deviation,lower_band,upper_band,zscore,adf_pval,tradable
4534,2025-10-08 14:00:00,24899.0,6727.9,-6553.910490,4.676299,24907.763197,-8.763197,18171.1,2.707496,32.978475,-63.249454,68.664446,-0.347824,0.000743,True
4535,2025-10-08 15:00:00,24889.2,6727.1,-6518.143516,4.670956,24903.846201,-14.646201,18162.1,2.561159,32.996810,-63.432461,68.554779,-0.521486,0.000850,True
4536,2025-10-08 16:00:00,24864.1,6724.4,-6447.744879,4.660450,24890.981836,-26.881836,18139.7,2.505233,33.038339,-63.571446,68.581912,-0.889484,0.001140,True
4537,2025-10-08 17:00:00,24974.7,6727.0,-6458.813647,4.662180,24903.668739,71.031261,18247.7,2.976318,33.407538,-63.838757,69.791394,2.037113,0.000610,True
4538,2025-10-08 18:00:00,24994.8,6742.0,-6450.582862,4.660985,24973.776278,21.023722,18252.8,3.158475,33.416102,-63.673730,69.990680,0.534630,0.000829,True
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4629,2025-10-14 17:00:00,24395.8,6589.3,-559.869581,3.793704,24437.983877,-42.183877,17806.5,47.876440,57.872874,-67.869307,163.622187,-1.556175,0.026570,True
4630,2025-10-14 18:00:00,24513.7,6620.5,-622.540853,3.803037,24555.464860,-41.764860,17893.2,47.416663,58.255333,-69.094003,163.927328,-1.530873,0.021090,True
4631,2025-10-14 19:00:00,24627.3,6647.5,-655.401272,3.807935,24657.843881,-30.543881,17979.8,47.011256,58.543437,-70.075618,164.098130,-1.324745,0.014401,True
4632,2025-10-14 20:00:00,24714.1,6667.3,-670.929523,3.810260,24733.219742,-19.119742,18046.8,46.680522,58.748725,-70.816927,164.177971,-1.120029,0.018774,True


In [15]:
# IMPORTANT: for live trading, shift filters by 1 bar to avoid look-ahead
df["z_sig"]        = df["zscore"].shift(1)
df["tradable_sig"] = df["tradable"].shift(1).fillna(False)

print("Rolling ADF added: use 'tradable_sig' to enable/disable entries on the next bar.")

Rolling ADF added: use 'tradable_sig' to enable/disable entries on the next bar.



Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version. Call result.infer_objects(copy=False) instead. To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`



Checkpoint! Hedge (α, β) updates

With the rolling OLS we coded (rolling(..., min_periods=beta_window)), α and β update every new row/bar once the initial warmup is done.

Warmup: the first beta_window - 1 rows don’t have α, β.

After that (from index i = beta_window-1 onward), the window slides by 1 bar each time, so α and β change at every row (they are not held constant for beta_window rows).

You’ll only see the same α and β on adjacent rows if prices barely changed (rare).

If you want piecewise-constant α, β that refresh only every K bars, that’s a different design; our current approach updates per bar.

Stationarity (ADF) checks

We do it in two layers:

Full-sample ADF (once) — sanity check

Run a single ADF on the entire residual series you loaded.

Frequency: one time (when you run the notebook / load data).

Rolling ADF (gate, ongoing) — regime check

Use a rolling window of length adf_win over the residual.

You get NaNs for the first adf_win - 1 rows (not enough data).

After that, the p-value updates every new row/bar as the window slides.

Define tradable = (adf_pval < 0.10) each bar, and use tradable_sig = tradable.shift(1) for execution (no look-ahead).

### Some plots to see what things look like up to this point


### Residual with sma and BB bands

In [16]:
px.line(df, x ="time", y = ["residual", "sma", "upper_band", "lower_band"], title = f"Residual(hedged spread) with bb bands - {SYM1} vs {SYM2} (based on mt5 {TF} timeframe infos)")

### Z-score with entry/exit thresholds

Define the z_in and z_out values which determine the entry and exit points of our trades. For example when the residual(spread) is 2 std (z_in = 2) away from the sma then enter a trade and when the residual is 0.5 std (z_out = 0.5) away from the sma then exit the trade. Its important to try to find later optimal z_in and z_out

In [17]:
Z_IN = 2.2
Z_OUT = 0.2

In [18]:

fig2 = px.line(df, x="time", y="zscore", title=f"Residual z-score · Z_IN={Z_IN}, Z_OUT={Z_OUT}")
fig2.add_hline(y=0, line_dash="dash")
fig2.add_hline(y= Z_IN,  line_dash="dot")
fig2.add_hline(y=-Z_IN,  line_dash="dot")
fig2.add_hline(y= Z_OUT, line_dash="dashdot")
fig2.add_hline(y=-Z_OUT, line_dash="dashdot")
fig2.update_yaxes(title_text="z-score")
fig2.show()

### Rolling adf p-value (stationarity gate)

In [19]:
#  Rolling ADF p-value (stationarity gate) ---
if "adf_pval" in df.columns:
    fig3 = px.line(df, x="time", y="adf_pval", title="Rolling ADF p-value (stationarity gate)")
    fig3.add_hline(y=0.10, line_dash="dash", annotation_text="threshold ", annotation_position="top left")
    fig3.update_yaxes(range=[0, 0.5], title_text="p-value")
    fig3.show()

### Chnage of alpha and beta over period

In [20]:
px.line(df, x="time", y="alpha", title= f"Change of alpha overtime")

In [21]:
px.line(df, x="time", y="beta", title= f"Change of beta overtime ")

### Tiny status print

In [22]:
'''# --- tiny status print ---
last_z = df["zscore"].dropna().iloc[-1] if df["zscore"].notna().any() else np.nan
last_p = df["adf_pval"].dropna().iloc[-1] if "adf_pval" in df and df["adf_pval"].notna().any() else np.nan
tradable_now = (last_p < 0.10) if np.isfinite(last_p) else False
print(f"Latest z={last_z:.2f} | Latest ADF p={last_p:.4f} | Tradable now? {tradable_now}")'''

'# --- tiny status print ---\nlast_z = df["zscore"].dropna().iloc[-1] if df["zscore"].notna().any() else np.nan\nlast_p = df["adf_pval"].dropna().iloc[-1] if "adf_pval" in df and df["adf_pval"].notna().any() else np.nan\ntradable_now = (last_p < 0.10) if np.isfinite(last_p) else False\nprint(f"Latest z={last_z:.2f} | Latest ADF p={last_p:.4f} | Tradable now? {tradable_now}")'

Now we will:

use the shifted z-score (z_sig) and shifted stationarity gate (tradable_sig) so entries happen on the next bar (no look-ahead),

enter when |z_sig| > Z_IN, exit when |z_sig| < Z_OUT,

mark P&L from spread changes with a β-hedged unit position,

log trades and print a few basic metrics (Sharpe, max DD, hit rate).

In [23]:

time   = df["time"].to_numpy()
spread = df["residual"].to_numpy(dtype=float)
zsig   = df["z_sig"].to_numpy(dtype=float)
gate   = df["tradable_sig"].to_numpy()

n   = len(df)
pos = 0                              # +1 long spread, -1 short, 0 flat
equity = np.zeros(n, dtype=float)
pnl    = np.zeros(n, dtype=float)

trades = []
entry_i = entry_spread = entry_pos = np.nan

for i in range(1, n):
    new_pos = pos
    if np.isfinite(zsig[i]) and gate[i]:
        if pos == 0:
            if zsig[i] >= Z_IN:
                new_pos = -1
                trades.append({"time": time[i], "i": i, "action": "ENTER_SHORT",
                               "z": zsig[i], "spread": spread[i]})
                entry_i, entry_spread, entry_pos = i, spread[i], new_pos
            elif zsig[i] <= -Z_IN:
                new_pos = +1
                trades.append({"time": time[i], "i": i, "action": "ENTER_LONG",
                               "z": zsig[i], "spread": spread[i]})
                entry_i, entry_spread, entry_pos = i, spread[i], new_pos
        else:
            if abs(zsig[i]) < Z_OUT:
                trades.append({"time": time[i], "i": i, "action": "EXIT",
                               "z": zsig[i], "spread": spread[i],
                               "pnl_spread": (spread[i] - entry_spread) * entry_pos if np.isfinite(entry_spread) else np.nan,
                               "hold_bars": i - int(entry_i) if np.isfinite(entry_i) else np.nan})
                new_pos = 0
                entry_i = entry_spread = entry_pos = np.nan
    else:
        # if gate closes while in a trade, exit
        if pos != 0:
            trades.append({"time": time[i], "i": i, "action": "EXIT_GATE_OFF",
                           "z": zsig[i], "spread": spread[i],
                           "pnl_spread": (spread[i] - entry_spread) * entry_pos if np.isfinite(entry_spread) else np.nan,
                           "hold_bars": i - int(entry_i) if np.isfinite(entry_i) else np.nan})
            new_pos = 0
            entry_i = entry_spread = entry_pos = np.nan

    # mark-to-market on this bar with the *current* position (after decision)
    pnl[i] = new_pos * (spread[i] - spread[i-1])
    equity[i] = equity[i-1] + pnl[i]
    pos = new_pos

trades_df = pd.DataFrame(trades)

trades_df

Unnamed: 0,time,i,action,z,spread,pnl_spread,hold_bars
0,2025-02-19 02:00:00,769,ENTER_LONG,-2.243391,0.579555,,
1,2025-02-19 13:00:00,780,EXIT_GATE_OFF,-1.940975,21.225539,20.645984,11.0
2,2025-02-19 20:00:00,787,ENTER_LONG,-2.292658,-14.985582,,
3,2025-02-19 21:00:00,788,EXIT_GATE_OFF,-1.974823,-22.196329,-7.210746,1.0
4,2025-02-20 05:00:00,795,ENTER_LONG,-2.608487,-63.442495,,
5,2025-02-20 20:00:00,810,EXIT,-0.027309,24.180563,87.623058,15.0
6,2025-02-25 23:00:00,882,ENTER_LONG,-2.257298,-94.261409,,
7,2025-02-26 22:00:00,904,EXIT,0.059818,-18.083205,76.178204,22.0
8,2025-03-03 02:00:00,953,ENTER_LONG,-3.060279,-231.778082,,
9,2025-03-03 03:00:00,954,EXIT_GATE_OFF,-3.716993,-196.732896,35.045186,1.0


In [24]:

# --- basic metrics ---
bars_per_year = 24 * 252  # H1 approx
ret = pnl[np.isfinite(pnl)]
sharpe = (ret.mean() / ret.std(ddof=1) * np.sqrt(bars_per_year)) if ret.std(ddof=1) > 0 else 0.0

eq = pd.Series(equity)
max_dd = float((eq.cummax() - eq).max()) if not eq.empty else 0.0

exits = trades_df[trades_df["action"].str.startswith("EXIT")]
hit_rate = float((exits["pnl_spread"] > 0).mean()) if len(exits) else np.nan
avg_pnl  = float(exits["pnl_spread"].mean()) if len(exits) else np.nan
med_hold = float(exits["hold_bars"].median()) if len(exits) else np.nan
df["equity_spread_units"] = equity

print(f"Trades (exits): {len(exits)} | Hit rate: {hit_rate:.1%} | Avg PnL (spread units): {avg_pnl:.4f} | Median hold (bars): {med_hold}")
print(f"Sharpe (ann): {sharpe:.2f} | Max DD (spread units): {max_dd:.4f} | Final equity (spread units): {equity[-1]:.4f}")

Trades (exits): 26 | Hit rate: 61.5% | Avg PnL (spread units): 23.5862 | Median hold (bars): 14.0
Sharpe (ann): 0.87 | Max DD (spread units): 0.0000 | Final equity (spread units): nan


In [25]:

# split trades into entries/exits for plotting
entries = trades_df[trades_df["action"].isin(["ENTER_LONG","ENTER_SHORT"])]
exits   = trades_df[trades_df["action"].str.startswith("EXIT")]

fig = go.Figure()

# spread & bands
fig.add_trace(go.Scatter(x=df["time"], y=df["residual"], name="spread (residual)", mode="lines"))
fig.add_trace(go.Scatter(x=df["time"], y=df["sma"], name="sma", mode="lines"))
fig.add_trace(go.Scatter(x=df["time"], y=df["upper_band"], name="upper_band", mode="lines",
                         line=dict(dash="dash")))
fig.add_trace(go.Scatter(x=df["time"], y=df["lower_band"], name="lower_band", mode="lines",
                         line=dict(dash="dash")))

# entry markers
if not entries.empty:
    fig.add_trace(go.Scatter(
        x=entries["time"], y=entries["spread"],
        mode="markers", name="entries",
        marker=dict(symbol="circle", size=8, color="#1f77b4"),
        text=[f"{a} | z={z:.2f}" for a,z in zip(entries["action"], entries["z"])],
        hoverinfo="text+x+y"
    ))

# exit markers (EXIT / EXIT_GATE_OFF)
if not exits.empty:
    # prefer triangle-up for normal exits; diamond for gate-off exits
    is_gate = exits["action"].str.contains("GATE_OFF")
    fig.add_trace(go.Scatter(
        x=exits.loc[~is_gate, "time"], y=exits.loc[~is_gate, "spread"],
        mode="markers", name="exits",
        marker=dict(symbol="triangle-up", size=9, color="#d62728"),
        text=[f"EXIT | z={z:.2f} | pnl={p:.4f}" 
              for z,p in zip(exits.loc[~is_gate,"z"], exits.loc[~is_gate,"pnl_spread"].fillna(0.0))],
        hoverinfo="text+x+y"
    ))
    fig.add_trace(go.Scatter(
        x=exits.loc[is_gate, "time"], y=exits.loc[is_gate, "spread"],
        mode="markers", name="exits (gate off)",
        marker=dict(symbol="diamond", size=9, color="#ff7f0e"),
        text=[f"EXIT_GATE_OFF | z={z:.2f} | pnl={p:.4f}" 
              for z,p in zip(exits.loc[is_gate,"z"], exits.loc[is_gate,"pnl_spread"].fillna(0.0))],
        hoverinfo="text+x+y"
    ))

fig.update_layout(
    title=(f"Residual Spread with Bands & Trade Markers · "
           f"{SYM1} vs {SYM2} · Z_IN={Z_IN}, Z_OUT={Z_OUT}"),
    xaxis_title="time",
    yaxis_title="spread (units)",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
)

fig.show()

In [26]:
# build per-bar PnL only at exit indices; everything else stays 0
pnl_at_exit = np.zeros(len(df), dtype=float)

entry_spread = None
entry_pos = None  # +1 for ENTER_LONG, -1 for ENTER_SHORT

for _, r in trades_df.sort_values("i").iterrows():
    action = str(r["action"])
    i = int(r["i"])
    s = float(r["spread"])

    if action == "ENTER_LONG":
        entry_spread = s; entry_pos = +1
    elif action == "ENTER_SHORT":
        entry_spread = s; entry_pos = -1
    elif action.startswith("EXIT"):  # EXIT or EXIT_GATE_OFF
        if entry_spread is not None and entry_pos is not None:
            pnl = (s - entry_spread) * entry_pos
            pnl_at_exit[i] += pnl
        entry_spread = None; entry_pos = None

# cumulative equity (spread units), constant between exits
equity = np.cumsum(pnl_at_exit)
df["equity_spread_units"] = equity
print(f"Final equity (spread units): {equity[-1]:.4f}")

Final equity (spread units): 613.2408


In [27]:

fig = px.line(
    df, x="time", y="equity_spread_units",
    title="Cumulative P&L (spread units)"
)
fig.update_yaxes(title_text="equity (spread units)")
fig.update_xaxes(title_text="time")
fig.show()