In [None]:
from dataclasses import dataclass
from itertools import cycle

import databento as db
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.linear_model import LinearRegression
from statsmodels.tsa.stattools import coint

from finm37000 import (
    as_ct,
    get_databento_api_key,
    get_official_stats,
    temp_env,
)

px.defaults.color_discrete_sequence = px.colors.qualitative.Set3
color_palette = cycle(px.defaults.color_discrete_sequence)

with temp_env(DATABENTO_API_KEY=get_databento_api_key()):
    client = db.Historical()

In [None]:
date = "2025-10-23"
cme = "GLBX.MDP3"

## Pairs trading

A trade of one contract against another related contract is called a pairs trade.
These may or may not correspond to listed ICS spreads.
In fact, they may not even be on the same exchange (e.g., ICE and CME both
have Crude and Natural Gas futures with different deliverables sharing
a common reality).

The remainder of this notebook walks through a pairs trading analysis emphasizing
how the steps of the quantitative workflow can inform the analysis.

Parts of this were adapted from an example on Databento. In addition to citing
my sources, I point this out because they have other suggestions for how to
improve the analysis of a pairs trade, and I recommend you read it for its
insights. In addition, this type of backtesting approach to trading system
analysis is very common, but it is really only a prototype proof-of-concept
at this stage. Carrying it through to a production system often
takes additional work, which the analysis below tries to highlight.

https://databento.com/docs/examples/algo-trading/pairs-trading

That demo uses ICE's Brent crude future: https://www.ice.com/products/219/brent-crude-futures

That's the most actively traded Brent future, but the CME has corresponding financially settled
end-of-month contracts that we will use. Then we will try to extend the idea that
two different physical crude contracts should track each other to the idea that refined
products should similarly track, and then to the possibility that unrelated energy
products will respond to similar market forces. Thus we will consider:
* `CL` WTI Crude: https://www.cmegroup.com/markets/energy/crude-oil/light-sweet-crude.html
* `BZ` Brent financially settled EOM: https://www.cmegroup.com/markets/energy/crude-oil/brent-crude-oil-last-day.html
* `RB` RBOB Gasoline: https://www.cmegroup.com/markets/energy/refined-products/rbob-gasoline.html
* `NG` Henry Hub Natural Gas: https://www.cmegroup.com/markets/energy/natural-gas/natural-gas.html

In [None]:
@dataclass
class PairsTradeLeg:
    symbol: str
    tick_size: float
    tick_value: float
    fees_per_trade: float

Include fees per trade. Actual fees depend on your access to the market:
https://www.cmegroup.com/company/clearing-fees.html

For example, current member fees for `CL` and `RB` are \\$0.70 and \\$1.50 for non-members, for `BZ` are \\$0.85 and \\$1.35,
and \\$0.80 and \\$1.60 for `NG`.

In [None]:
# Backtest configuration
start = "2024-04-01"
end = "2024-05-15"

cl_leg = PairsTradeLeg(
    symbol="CLM4",
    tick_size=0.01,
    tick_value=10.0,
    fees_per_trade=0.70,
)
bz_leg = PairsTradeLeg(
    symbol="BZM4",
    tick_size=0.01,
    tick_value=10.0,
    fees_per_trade=0.85,
)
bz_next_leg = PairsTradeLeg(
    symbol="BZN4",
    tick_size=0.01,
    tick_value=10.0,
    fees_per_trade=0.85,
)
rb_leg = PairsTradeLeg(
    symbol="RBM4",
    tick_size=0.0001,
    tick_value=4.20,
    fees_per_trade=0.70,
)
ng_leg = PairsTradeLeg(
    symbol="NGM4",
    tick_size=0.001,
    tick_value=10.0,
    fees_per_trade=0.80,
)
pairs_legs = {
    "CL": cl_leg,
    "BZ": bz_leg,
    "BZN": bz_next_leg,
    "RB": rb_leg,
    "NG": ng_leg,
}
one_tick_slippage = {
    "CL": 0.01,
    "BZ": 0.01,
    "BZN": 0.01,
    "RB": 0.0001,
    "NG": 0.001,
}
two_tick_slippage = {key: 2 * value for key, value in one_tick_slippage.items()}

In [None]:
ohlcv_legs = {
    key: client.timeseries.get_range(
        dataset=cme,
        schema="ohlcv-1m",
        symbols=leg.symbol,
        start=start,
        end=end,
    ).to_df()
    for key, leg in pairs_legs.items()
}

### Cointegration

The trading logic is specified
* check for cointegration based on p-values, looking back `lookback` minutes,
* if it "is" cointegrated, then calculate the z-score of the forward-looking residual,
* a negative z-score indicates actual `y` value over-estimated by linear regression, so buy `y`, sell `x`.
* a positvie z-score indicates the opposite, so the opposite position is taken.

A pair of time series $(X_t, Y_t)$ is cointegrated if there is a constant $\beta$ such that
$$
Z_t = Y_t - \beta X_t
$$
is _stationary_. This course will not cover different technical definitions of stationarity,
but if you are unfamiliar with the concept, you can think of weak stationarity as
1. Constant mean $E(Z_t) = \mu$ for all $t$.
2. Constant variance $Var(Z_t) = \sigma^2$ for all $t$.
3. The autocovariance structure is also independent of time $Cov(Z_t, Z_{t+h}) = \gamma(h)$.

The cointegrating relationship is basically a regression with a fancy
test for stationarity on the error terms. That is, we try to remove the ways in which the sequences
move together by regression, then check whether the error terms still contain a detectable
pattern.

### When and how much to trade?

The backtest starts with a simple signal and size configuration:
when the standardized residual is sufficiently negative, that implies that the $y$ leg is under priced, so buy $y$ and sell $x$.
And conversely, when it is sufficiently positive, sell $y$ and buy $x$.

### When to exit the trade?

The simulation also takes a simple threshold exit as well, exiting positions when the standardized residuals are less extreme.

### Slippage assumptions

To add some realism to the simulation, costs per trade are added including
* slippage: how far your execution is from your desired price. In this case, we are signaling from minute-close data, and assuming we can execute two ticks away
* trading fees: the exchange-charged fee to trade,
* commissions: fees charged by your broker.


Let's put these into a function for reuse across the different pairs we want to simulate.

In [None]:
@dataclass
class SignalConfig:
    p_threshold: float = 0.06
    lookback: int = 100


default_signal = SignalConfig


def calculate_cointegration_signal(df_ohlcv_x, df_ohlcv_y, config):
    df_ohlcv_x = df_ohlcv_x["close"].to_frame(name="x")
    df_ohlcv_y = df_ohlcv_y["close"].to_frame(name="y")
    df = df_ohlcv_x.join(df_ohlcv_y, how="outer").ffill().dropna()
    df["cointegrated"] = 0
    df["residual"] = 0.0
    df["zscore"] = 0.0
    is_cointegrated = False
    lr = LinearRegression()

    lookback = config.lookback
    for i in range(lookback, len(df), lookback):
        x = df["x"].iloc[i - lookback : i].to_numpy().reshape(-1, 1)
        y = df["y"].iloc[i - lookback : i].to_numpy().reshape(-1, 1)

        if is_cointegrated:
            # Compute and normalize signal on a forward window
            x_new = df["x"].iloc[i : (i + lookback)].to_numpy().reshape(-1, 1)
            y_new = df["y"].iloc[i : (i + lookback)].to_numpy().reshape(-1, 1)

            spread_back = y - lr.coef_ * x
            spread_forward = y_new - lr.coef_ * x_new
            zscore = (spread_forward - spread_back.mean()) / spread_back.std()

            df.iloc[i : (i + lookback), df.columns.get_loc("cointegrated")] = 1
            df.iloc[i : (i + lookback), df.columns.get_loc("residual")] = spread_forward
            df.iloc[i : (i + lookback), df.columns.get_loc("zscore")] = zscore

        _, p, _ = coint(x, y)
        is_cointegrated = p < config.p_threshold
        lr.fit(x, y)
    return df

In [None]:
@dataclass
class ExecutionConfig:
    slippage_x: float
    slippage_y: float
    entry_threshold: float = 1.5
    exit_threshold: float = 0.5
    commissions_per_side: float = 0.08

In [None]:
def simulate_trading(signal_df, x_leg, y_leg, config):
    # Standardized residual is negative => y is underpriced => sell x, buy y
    # Standardized residual is positive => y is overpriced => buy x, sell y
    df = signal_df.copy()
    df["position_y"] = np.select(
        [
            signal_df["zscore"] < -config.entry_threshold,
            signal_df["zscore"] > config.entry_threshold,
        ],
        [1, -1],
    )
    df["position_x"] = -df["position_y"]
    # Exit positions when z-score crosses +/- exit_threshold
    hold_y_long = np.where(signal_df["zscore"] <= -config.exit_threshold, 1, 0)
    hold_y_short = np.where(signal_df["zscore"] >= config.exit_threshold, 1, 0)

    # Carry forward positions until exit condition is met
    df["position_y"] = df["position_y"].mask(
        (df["position_y"].shift() == -1) & (hold_y_short == 1), -1
    )
    df["position_y"] = df["position_y"].mask(
        (df["position_y"].shift() == 1) & (hold_y_long == 1), 1
    )
    df["position_x"] = -df["position_y"]
    df["pnl_x"] = (
        df["position_x"].shift() * df["x"].diff() / x_leg.tick_size * x_leg.tick_value
    )
    df["pnl_y"] = (
        df["position_y"].shift() * df["y"].diff() / y_leg.tick_size * y_leg.tick_value
    )

    df["volume_x"] = df["position_x"].diff().abs()
    df["volume_y"] = df["position_y"].diff().abs()

    df["transaction_cost"] = (
        df["volume_x"] * x_leg.fees_per_trade
        + df["volume_y"] * y_leg.fees_per_trade
        + (df["volume_x"] + df["volume_y"]) * config.commissions_per_side
        + (df["volume_x"] * config.slippage_x / x_leg.tick_size / 2 * x_leg.tick_value)
        + (df["volume_y"] * config.slippage_y / y_leg.tick_size / 2 * y_leg.tick_value)
    )
    df["gross_pnl"] = df["pnl_x"] + df["pnl_y"]
    df["net_pnl"] = df["gross_pnl"] - df["transaction_cost"]
    return df

In [None]:
pairs = [("CL", "BZ"), ("CL", "BZN"), ("CL", "RB"), ("CL", "NG")]
signal_dfs = {
    pair: calculate_cointegration_signal(
        ohlcv_legs[pair[0]], ohlcv_legs[pair[1]], default_signal
    )
    for pair in pairs
}
simulated_dfs = {
    pair: simulate_trading(
        signal_dfs[pair],
        pairs_legs[pair[0]],
        pairs_legs[pair[1]],
        ExecutionConfig(
            slippage_x=one_tick_slippage[pair[0]], slippage_y=one_tick_slippage[pair[1]]
        ),
    )
    for pair in pairs
}

In [None]:
x = "CL"
y = "BZ"
df = simulated_dfs[(x, y)]
plot_cols = ["gross_pnl", "net_pnl", "transaction_cost"]
fig = px.line(
    df[plot_cols].cumsum(),
    labels={
        "gross_pnl": "Gross PnL",
        "net_pnl": "Net PnL",
        "transaction_cost": "Transaction Costs",
    },
)
fig.show()

Why doesn't anything happen at the end of the period? The Brent contracts trade two months ahead of the physical market, so the
June contract stopped trading at the end of April.

The demo trades June WTI vs. July Brent to maintain activity through the trading window, but that represents a misalignment
of delivery times. Which is not to say that is not a valid way to analyze this relationship: that could provide a way
to continue trading the relationship while `CL` is active.

For context, the volume and open interest looks like this:

In [None]:
symbols = [leg.symbol for leg in pairs_legs.values()]

instrument_defs = client.timeseries.get_range(
    dataset=cme,
    schema="definition",
    symbols=symbols,
    start=start,
)

In [None]:
raw_stats = client.timeseries.get_range(
    dataset=cme,
    schema="statistics",
    symbols=symbols,
    start=start,
    end=end,
)

In [None]:
stats = get_official_stats(raw_stats.to_df(), instrument_defs.to_df())

In [None]:
plot_df = stats.reset_index()
plot_groups = plot_df.groupby("Symbol")
colors = {group: next(color_palette) for group in plot_groups.groups}

x_col = "Trade date"
plot_cols = ["Cleared volume", "Open interest"]
subplot_titles = [f"{family} {col}" for family in symbols for col in plot_cols]

fig = make_subplots(
    rows=len(symbols),
    cols=len(plot_cols),
    shared_xaxes="all",
    subplot_titles=subplot_titles,
)

for i, symbol in enumerate(symbols):
    df = plot_groups.get_group(symbol)
    fig.add_trace(
        go.Scatter(
            x=df[x_col],
            y=df[plot_cols[0]],
            name=symbol,
            line=dict(color=colors[symbol]),
        ),
        row=i + 1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=df[x_col],
            y=df[plot_cols[1]],
            name=symbol,
            line=dict(color=colors[symbol]),
            showlegend=False,
        ),
        row=i + 1,
        col=2,
    )


fig.update_layout(
    height=800,
    width=800,
    title_text="Cleared volume",
)
fig.show()

Let's switch the analysis to July Brent to see the difference:

In [None]:
x = "CL"
y = "BZN"
df = simulated_dfs[(x, y)]
plot_cols = ["gross_pnl", "net_pnl", "transaction_cost"]
fig = px.line(
    df[plot_cols].cumsum(),
    labels={
        "gross_pnl": "Gross PnL",
        "net_pnl": "Net PnL",
        "transaction_cost": "Transaction Costs",
    },
)
fig.show()

Before moving on to the other pairs, you should note how sensitive this is to slippage. Assuming we do one tick worse per execution,
the P&L looks like this:

In [None]:
x = "CL"
y = "BZN"
signal_df = calculate_cointegration_signal(ohlcv_legs[x], ohlcv_legs[y], default_signal)
df = simulate_trading(
    signal_df,
    pairs_legs[x],
    pairs_legs[y],
    ExecutionConfig(slippage_x=two_tick_slippage[x], slippage_y=two_tick_slippage[y]),
)
plot_cols = ["gross_pnl", "net_pnl", "transaction_cost"]
fig = px.line(
    df[plot_cols].cumsum(),
    labels={
        "gross_pnl": "Gross PnL",
        "net_pnl": "Net PnL",
        "transaction_cost": "Transaction Costs",
    },
)
fig.show()

Now consider the RBOB-CL pair. Besides the P&L looking less promising, what else do you notice about the graph?

In [None]:
x = "CL"
y = "RB"
df = simulated_dfs[(x, y)]
plot_cols = ["gross_pnl", "net_pnl", "transaction_cost"]
fig = px.line(
    df[plot_cols].cumsum(),
    labels={
        "gross_pnl": "Gross PnL",
        "net_pnl": "Net PnL",
        "transaction_cost": "Transaction Costs",
    },
)
fig.show()

And finally the Nat Gas - Crude pair:

In [None]:
x = "CL"
y = "NG"
df = simulated_dfs[(x, y)]
plot_cols = ["gross_pnl", "net_pnl", "transaction_cost"]
fig = px.line(
    df[plot_cols].cumsum(),
    labels={
        "gross_pnl": "Gross PnL",
        "net_pnl": "Net PnL",
        "transaction_cost": "Transaction Costs",
    },
)
fig.show()

This does not look nearly as good as the demo. We are not even covering transactions costs.
What can you do when your strategy does not work out?
* Check the alignment of your contracts.
* Examine the trading signals.
* Examine the assumptions of the trading signals.
* Reconsider the whether the tactical parameters are appropriate.

Let's continue by taking a closer look at WTI-Brent pair compared to the WTI-RBOB pair to see where we might improve.

### Check your data assumptions.

How much do these contracts even track each other? We already identified that the expiration, volume, and Brent do not align
with WTI, and we are accounting for that by comparing different months. We could also continue with the same
months but roll the strategy to the next month according to the Brent schedule.

In [None]:
for pair in pairs:
    df = simulated_dfs[pair]
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05)
    fig.update_layout(title=f"{pair[0]} vs. {pair[1]}: Price ")
    fig.add_trace(
        go.Scatter(
            x=as_ct(df.index),
            y=df["x"],
            name=pair[0],
        ),
        row=1,
        col=1,
    )
    fig.add_trace(
        go.Scatter(
            x=df.index,
            y=df["y"],
            name=pair[1],
        ),
        row=2,
        col=1,
    )
    fig.show()

The `CL-NG` pair does not pass a visual check that the prices move together, so eliminate it from further analysis.

On the other hand, `CL-RB` visually appears to move together, so let's try to figure out what's driving the
pairs strategy to make `CL-BZ` look better.

In [None]:
pairs = [("CL", "BZN"), ("CL", "RB")]

## Examine the trading signals.

Recall that the trading signal is two-tiered
1. Are the prices cointegrated?
2. Is the z-score of the residual above a threshold?

How often is cointegration detected?

In [None]:
cols = ["x", "y", "zscore", "cointegrated"]
for pair in pairs:
    df = simulated_dfs[pair]
    fig = make_subplots(
        rows=len(cols), cols=1, shared_xaxes=True, vertical_spacing=0.05
    )
    names = [pair[0], pair[1]] + cols[2:]
    fig.update_layout(
        title=f"{pair[0]} vs. {pair[1]}: Price, z-score, and cointegration."
    )
    for i, col in enumerate(cols):
        fig.add_trace(
            go.Scatter(
                x=as_ct(df.index),
                y=df[col],
                name=names[i],
            ),
            row=i + 1,
            col=1,
        )
    fig.show()

POLL:

What would be your next step to investigate?



## Examine the assumptions of the trading signals.
It appears that `CL-RB` does not clear the cointegration filter as often as `CL-BZ`. What does a cointegrated period look like compared with
one that does fails the cointegration test?

A second issue concerning me about those plots are the size of some of the z-scores. Such large z-scores are often a result of poorly fitting
models, but there is nothing in the strategy that would stop such instances from trading. In fact, those may correspond to profitable
trading opportunities. If that were the case, then maybe we should be looking for situations where the cointegration model is bad!

In [None]:
# How often are the prices cointegrated?

for pair in pairs:
    percent_cointegrated = simulated_dfs[pair]["cointegrated"].mean()
    print(
        f"{pair} is cointegrated: {round(percent_cointegrated * 100, 2)}% of the time"
    )

In [None]:
cointegrated_groups = {}
for pair in pairs:
    df = simulated_dfs[pair]
    df["cointegrated_id"] = (df["cointegrated"] != df["cointegrated"].shift()).cumsum()
    cointegrated_groups[pair] = [
        g for _, g in df.groupby("cointegrated_id") if g["cointegrated"].iloc[0]
    ]

In [None]:
def calc_group_id(df, col):
    mask = df[col].astype(bool)
    group_change = mask != mask.shift(fill_value=False)
    group_id = group_change.cumsum()
    # group_id.loc[~mask] = pd.NA
    return group_id

In [None]:
cointegrated_groups = {}
for pair in pairs:
    simulated_dfs[pair]["cointegrated_id"] = calc_group_id(
        simulated_dfs[pair], "cointegrated"
    )
    df = simulated_dfs[pair]
    cointegrated_groups[pair] = [
        g for _, g in df.groupby("cointegrated_id")
    ]  #  if g["cointegrated"].iloc[0]]

In [None]:
pair = ("CL", "BZN")
true_groups = cointegrated_groups[pair]
full_df = simulated_dfs[pair]
start_group, stop_group = 1, 10
n_groups = stop_group - start_group + 1
cols_to_plot = ["x", "y", "zscore", "cointegrated"]


def slice_with_context(df, start_ts, end_ts, context_rows):
    start_pos = df.index.get_loc(start_ts)
    end_pos = df.index.get_loc(end_ts)
    if isinstance(start_pos, slice):
        start_pos = start_pos.start
    if isinstance(end_pos, slice):
        end_pos = end_pos.stop - 1
    return df.iloc[max(start_pos - context_rows, 0) : end_pos + 1]


fig = make_subplots(
    rows=len(cols_to_plot),
    cols=1,
    shared_xaxes=True,
    vertical_spacing=0.05,
    subplot_titles=cols_to_plot,
)

for gi, g in enumerate(true_groups[start_group - 1 : stop_group]):
    seg = slice_with_context(full_df, g.index[0], g.index[-1], default_signal.lookback)
    visible = gi == 0
    for row, col in enumerate(cols_to_plot, start=1):
        fig.add_trace(
            go.Scatter(
                x=as_ct(seg.index),
                y=seg[col],
                mode="lines+markers",
                name=col,
                visible=visible,
            ),
            row=row,
            col=1,
        )


def make_title(pair, group_index):
    return f"{pair[1]} vs. {pair[0]}: Cointegrated case {group_index}"


buttons = []
for gi in range(n_groups):
    vis = [False] * (n_groups * len(cols_to_plot))
    for j in range(len(cols_to_plot)):
        vis[gi * len(cols_to_plot) + j] = True
    buttons.append(
        dict(
            label=f"Cointegrated case {gi + start_group}",
            method="update",
            args=[
                {"visible": vis},
                {"title": make_title(pair, gi + start_group)},
            ],  # f"{pair} Cointegrated case {gi + start_group}"}]
        )
    )

fig.update_layout(
    title=make_title(pair, start_group),
    height=800,
    updatemenus=[
        {
            "buttons": buttons,
            "direction": "down",
            "x": 1.05,
            "y": 1.15,
            "showactive": True,
        }
    ],
    legend=dict(orientation="h", y=-0.2),
    margin=dict(l=40, r=40, t=60, b=40),
)

fig.show()

In [None]:
pair = ("CL", "RB")
true_groups = cointegrated_groups[pair]
full_df = simulated_dfs[pair]
start_group, stop_group = 1, 10
n_groups = stop_group - start_group + 1
cols_to_plot = ["x", "y", "zscore", "cointegrated"]


def slice_with_context(df, start_ts, end_ts, context_rows):
    start_pos = df.index.get_loc(start_ts)
    end_pos = df.index.get_loc(end_ts)
    if isinstance(start_pos, slice):
        start_pos = start_pos.start
    if isinstance(end_pos, slice):
        end_pos = end_pos.stop - 1
    return df.iloc[max(start_pos - context_rows, 0) : end_pos + 1]


fig = make_subplots(
    rows=len(cols_to_plot),
    cols=1,
    shared_xaxes=True,
    vertical_spacing=0.05,
    subplot_titles=cols_to_plot,
)

# Create one trace per group (hidden by default)
for gi, g in enumerate(true_groups[start_group - 1 : stop_group]):
    seg = slice_with_context(full_df, g.index[0], g.index[-1], default_signal.lookback)
    visible = gi == 0
    for row, col in enumerate(cols_to_plot, start=1):
        fig.add_trace(
            go.Scatter(
                x=as_ct(seg.index),
                y=seg[col],
                mode="lines+markers",
                name=col,
                visible=visible,
            ),
            row=row,
            col=1,
        )


def make_title(pair, group_index):
    return f"{pair[1]} vs. {pair[0]}: Cointegrated case {group_index}"


buttons = []
for gi in range(n_groups):
    vis = [False] * (n_groups * len(cols_to_plot))
    for j in range(len(cols_to_plot)):
        vis[gi * len(cols_to_plot) + j] = True
    buttons.append(
        dict(
            label=f"Cointegrated case {gi + start_group}",
            method="update",
            args=[
                {"visible": vis},
                {"title": make_title(pair, gi + start_group)},
            ],  # f"{pair} Cointegrated case {gi + start_group}"}]
        )
    )

fig.update_layout(
    title=make_title(pair, start_group),
    height=800,
    updatemenus=[
        {
            "buttons": buttons,
            "direction": "down",
            "x": 1.05,
            "y": 1.15,
            "showactive": True,
        }
    ],
    legend=dict(orientation="h", y=-0.2),
    margin=dict(l=40, r=40, t=60, b=40),
)

fig.show()

Some ideas to move forward:
* Examine cointegration p-values (seem to be a bit noisy and random about whether to turn on).
* Avoid reacting to inactive/active transition: threshold minimum variance for the legs.
* Calculate z-scores at all times and compare to next move, partition on cointegration. Try to understand what cointegration adds.
* Reconsider the whether the tactical parameters are appropriate.


In [None]:
def calculate_rolling_regression(df_ohlcv_x, df_ohlcv_y, config):
    df_ohlcv_x = df_ohlcv_x["close"].to_frame(name="x")
    df_ohlcv_y = df_ohlcv_y["close"].to_frame(name="y")
    df = df_ohlcv_x.join(df_ohlcv_y, how="outer").ffill().dropna()
    df["cointegrated_p"] = 0.0
    df["std_x"] = 0.0
    df["std_y"] = 0.0
    df["std_residual"] = 0.0
    df["residual"] = 0.0
    df["zscore"] = 0.0
    lr = LinearRegression()

    lookback = config.lookback
    p = pd.NA
    for i in range(lookback, len(df), lookback):
        x = df["x"].iloc[i - lookback : i].to_numpy().reshape(-1, 1)
        y = df["y"].iloc[i - lookback : i].to_numpy().reshape(-1, 1)

        if not pd.isna(p):
            # Compute and normalize signal on a forward window
            x_new = df["x"].iloc[i : (i + lookback)].to_numpy().reshape(-1, 1)
            y_new = df["y"].iloc[i : (i + lookback)].to_numpy().reshape(-1, 1)

            spread_back = y - lr.coef_ * x
            spread_forward = y_new - lr.coef_ * x_new
            zscore = (spread_forward - spread_back.mean()) / spread_back.std()

            df.iloc[i : (i + lookback), df.columns.get_loc("std_x")] = x.std()
            df.iloc[i : (i + lookback), df.columns.get_loc("std_y")] = y.std()
            df.iloc[i : (i + lookback), df.columns.get_loc("std_residual")] = (
                spread_back.std()
            )
            df.iloc[i : (i + lookback), df.columns.get_loc("cointegrated_p")] = p
            df.iloc[i : (i + lookback), df.columns.get_loc("residual")] = spread_forward
            df.iloc[i : (i + lookback), df.columns.get_loc("zscore")] = zscore

        _, p, _ = coint(x, y)
        lr.fit(x, y)
    return df

In [None]:
params = {
    pair: calculate_rolling_regression(
        ohlcv_legs[pair[0]], ohlcv_legs[pair[1]], default_signal
    )
    for pair in pairs
}

In [None]:
pair = ("CL", "BZN")
px.scatter(params[pair], x="zscore", y="cointegrated_p")

In [None]:
px.scatter(params[pair], x="std_x", y="std_y")

In [None]:
px.scatter(params[pair], x="std_residual", y="zscore")

In [None]:
pair = ("CL", "RB")
px.scatter(params[pair], x="zscore", y="cointegrated_p")

In [None]:
px.scatter(params[pair], x="std_x", y="std_y")

In [None]:
px.scatter(params[pair], x="std_residual", y="zscore")