<h1>Mean-reverting with Jump-diffusion Stocks Stat-Arb Strategy</h1>

<h2>Importing Data</h2>

Data imported is 2 stock tickers from IDX Healthcare sector that highly correlated for June $10^{\text{th}}, 2025$ until June $30^{\text{th}}, 2025$, SAME and SIDO.

In [1]:
import pandas as pd

In [2]:
from datetime import datetime 
import yfinance as yf

In [3]:
import pytz

In [5]:
end_date = datetime(2025,6,30)
start_date = datetime(2025,6,10)
stock_a = 'SAME.JK'
stock_b = 'SIDO.JK'
stock_df = yf.download([stock_a, stock_b], start=start_date, end=end_date, interval="5m", auto_adjust=False)['Close']
stock_df.index = stock_df.index.tz_convert('Asia/Jakarta')
stock_df

[*********************100%***********************]  2 of 2 completed


Ticker,SAME.JK,SIDO.JK
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1
2025-06-10 09:00:00+07:00,,515.0
2025-06-10 09:05:00+07:00,256.0,515.0
2025-06-10 09:10:00+07:00,258.0,515.0
2025-06-10 09:15:00+07:00,258.0,515.0
2025-06-10 09:20:00+07:00,,515.0
...,...,...
2025-06-26 15:40:00+07:00,,486.0
2025-06-26 15:45:00+07:00,254.0,486.0
2025-06-26 16:00:00+07:00,254.0,486.0
2025-06-26 16:05:00+07:00,,486.0


<h2>Cleaning Data</h2>

In [6]:
stock_df = stock_df.ffill().bfill()
stock_df.head()

Ticker,SAME.JK,SIDO.JK
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1
2025-06-10 09:00:00+07:00,256.0,515.0
2025-06-10 09:05:00+07:00,256.0,515.0
2025-06-10 09:10:00+07:00,258.0,515.0
2025-06-10 09:15:00+07:00,258.0,515.0
2025-06-10 09:20:00+07:00,258.0,515.0


In [7]:
corr_value = stock_df[stock_a].corr(stock_df[stock_b])
print(f"Correlation (price level) between {stock_a} and {stock_b}: {corr_value:.4f}")

Correlation (price level) between SAME.JK and SIDO.JK: 0.8777


<h2>Spread Calculation</h2>

According to (Endres and Stübinger, 2017), the spread for two stocks $A$ and $B$ (for initial condition) with prices $S_A(t)$ and $S_B(t)$ defined as:<br>
$X_t = \ln \left ( \frac{S_A(t)}{S_A(0)}\right) - \ln \left ( \frac{S_B(t)}{S_B(0)}\right)$ , $t \geq 0$<br>

In [8]:
import numpy as np

In [9]:
log_a = np.log(stock_df[stock_a])
log_b = np.log(stock_df[stock_b])
rel_a = log_a - log_a.iloc[0]
rel_b = log_b - log_b.iloc[0]
spread = rel_a - rel_b

spread

Datetime
2025-06-10 09:00:00+07:00    0.000000
2025-06-10 09:05:00+07:00    0.000000
2025-06-10 09:10:00+07:00    0.007782
2025-06-10 09:15:00+07:00    0.007782
2025-06-10 09:20:00+07:00    0.007782
                               ...   
2025-06-26 15:40:00+07:00    0.050115
2025-06-26 15:45:00+07:00    0.050115
2025-06-26 16:00:00+07:00    0.050115
2025-06-26 16:05:00+07:00    0.050115
2025-06-26 16:10:00+07:00    0.050115
Length: 847, dtype: float64

In [10]:
import plotly.graph_objs as go

In [11]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=spread.index,
    y=spread.values,
    mode='lines',
    name=f'Spread: {stock_a} - {stock_b}',
    line=dict(color='royalblue', width=2)
))

# Tambahkan garis horizontal mean spread
mean_spread = spread.mean()
fig.add_hline(y=mean_spread, line_dash="dash", line_color="green",
              annotation_text="Mean", annotation_position="top right")

# Layout
fig.update_layout(
    title=f"{stock_a} and {stock_b} Spread",
    xaxis_title="Date",
    yaxis_title="Spread",
    template='plotly_white',
    hovermode='x unified'
)

fig.show()

<h2>Parameter Estimation</h2>

The spread dynamics modeling followed Ornstein-Uhlenbeck driven by Lévy noise:<br>
$dX_t = \theta (\mu_t-X_t)dt + d L_t$ <br>
with the solution is:<br>
$X_t = X_0 e^{-\theta t} + \displaystyle \int_{0}^{t} e^{-\theta (t-u)} \mu_u du + \displaystyle \int_{0}^{t} e^{-\theta (t-u)} d L_u$<br>
and Lévy process defined as:<br>
$L_t = \sigma W_t + \displaystyle \sum_{i=1}^{N_t} \xi_i$<br>
where $\xi$ is jump size, $\sigma$ is volatility (constant), $W_t$ is standard Brownian motion, and $N_t$ is Poisson process<br>

In [12]:
from sklearn.covariance import ledoit_wolf

In [58]:
beta = 0.4999 
dt = 1 / (250*12*5)  
def estimate_levy_ou_params(spread_series):
    mu = np.mean(spread_series)
    
    X = spread_series.values
    dX = np.diff(X)
    n = len(dX)
    
    Delta_n = dt  
    nu_n = Delta_n ** beta
    
    is_jump = np.abs(dX) > nu_n
    non_jump_mask = ~is_jump

    X_t = X[:-1] 
    mu_diff = mu - X_t

    numerator = np.sum(mu_diff[non_jump_mask] * dX[non_jump_mask])
    
    denominator = np.sum(mu_diff[non_jump_mask]**2) * dt
    
    theta = numerator / denominator if denominator != 0 else 0
    
    if any(non_jump_mask):
        sigma = np.std(dX[non_jump_mask]) / np.sqrt(dt)
    else:
        sigma = 0
        
    n_jumps = np.sum(is_jump)
    total_time = (len(spread_series) - 1) * dt 
    
    lambda_ = n_jumps / total_time if total_time > 0 else 0
    
    if n_jumps > 0:
        jump_sizes = np.abs(dX[is_jump])
        eta = n_jumps / np.sum(jump_sizes)
    else:
        eta = 0
    
    #Assumptions
    p_u = 0.5  
    p_d = 0.5  
    delta = 0   
    
    return {
        'theta': theta,
        'mu': mu,
        'sigma': sigma,
        'lambda': lambda_,
        'eta': eta,
        'p_u': p_u,
        'p_d': p_d,
        'delta': delta,
    }

In [59]:
pair_name = f"{stock_a}_{stock_b}"
try:
    params = estimate_levy_ou_params(spread)
    print(f"Parameter Estimation for {pair_name} pair:")
    for key, value in params.items():
        if isinstance(value, (int, float, np.float64)):
            print(f"{key}: {value:.5f}")
        else:
            print(f"{key}: {value}")
except Exception as e:
    print(f"Failed producing parameter estimation for {pair_name}: {e}")

Parameter Estimation for SAME.JK_SIDO.JK pair:
theta: 1009.37500
mu: 0.03197
sigma: 0.42879
lambda: 3723.40426
eta: 89.34699
p_u: 0.50000
p_d: 0.50000
delta: 0.00000


In [23]:
from scipy.integrate import quad


In [24]:
def expected_first_passage_time(b, theta, sigma, lambda_, eta, p_u, delta=0):
    try:
        K = eta 

        def integrand_exponent(u):
            try:
                term1 = lambda_ * (1 - p_u) * (eta / (eta + u) * np.exp(-delta * u) - 1)
                term2 = 0.5 * sigma**2 * u**2
                return (term1 + term2) / u
            except ZeroDivisionError:
                return 1e10

        def integrand_main(u):
            try:
                exp_term = np.exp(-integrand_exponent(u))
                base = (1 - u / K)
                if base <= 0: 
                    return 0
                power = (lambda_ / (2 * theta)) - 1
                return (np.exp(u * b) - np.exp(-u * b) * (1 - u / K)) * (base ** power) * exp_term
            except:
                return 0

        integral, _ = quad(integrand_main, 0, K - 1e-5, limit=100)

        return (1 / theta) * integral

    except Exception:
        return (b ** 2) / (sigma ** 2)


In [25]:
def optimize_threshold(params, c=0.004):
    def objective(b):
        b_val = b[0]
        if b_val <= c / 2:
            return 1e10

        try:
            E_tau = expected_first_passage_time(b_val, **params)
            return - (2 * b_val - c) / E_tau
        except:
            return 1e10

    initial_b = 0.01
    bounds = [(c / 2 + 1e-5, 0.5)]
    result = minimize(objective, [initial_b], bounds=bounds, method='L-BFGS-B')

    if result.success:
        b_opt = result.x[0]
        obj_value = -result.fun
        return {'b_opt': b_opt, 'obj_value': obj_value}
    else:
        b_opt = np.sqrt((c * params['sigma'] ** 2) / (2 * params['theta']))
        return {'b_opt': b_opt, 'obj_value': (2 * b_opt - c) / (b_opt ** 2 / params['sigma'] ** 2)}

In [56]:
def visualize_trading_signals(spread_series, mu, b_opt, pair_name):
    upper_bound = mu + b_opt
    lower_bound = mu - b_opt

    fig = go.Figure()

    fig.add_trace(go.Scatter(
        x=spread_series.index,
        y=spread_series.values,
        mode='lines',
        name='Spread',
        line=dict(color='#1f77b4', width=1.5)
    ))

    fig.add_hline(y=mu, line_dash="solid", line_color="red",
                  annotation_text="μ", annotation_position="bottom right")
    fig.add_hline(y=upper_bound, line_dash="dot", line_color="green",
                  annotation_text=" ", annotation_position="top right")
    fig.add_hline(y=lower_bound, line_dash="dot", line_color="green",
                  annotation_text=" ", annotation_position="bottom right")

    signal_points = []
    last_signal = None  

    for i in range(1, len(spread_series)):
        prev = spread_series.iloc[i - 1]
        curr = spread_series.iloc[i]
        t = spread_series.index[i]

        if prev < upper_bound and curr >= upper_bound:
            if last_signal != 'upper':  
                signal_points.append((t, curr))
                last_signal = 'upper'  

        elif prev > lower_bound and curr <= lower_bound:
            if last_signal != 'lower':  
                signal_points.append((t, curr))
                last_signal = 'lower'  

    if signal_points:
        times, values = zip(*signal_points)
        fig.add_trace(go.Scatter(
            x=times,
            y=values,
            mode='markers',
            name='Signal',
            marker=dict(color='black', size=8, symbol='circle')
        ))

    fig.update_layout(
        title=f'Trading Signals for Pair: {pair_name}',
        xaxis_title='Date',
        yaxis_title='Spread',
        legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1),
        template='plotly_white',
        hovermode='x unified'
    )

    fig.show()

In [34]:
from scipy.optimize import minimize

In [57]:
model_params = {
    'theta': params['theta'],
    'sigma': params['sigma'],
    'lambda_': params['lambda'],
    'eta': params['eta'],
    'p_u': params['p_u'] if params['p_u'] is not None else 0.5,
    'delta': params['delta'] if params['delta'] is not None else 0
}

mu = params['mu']

c = 0.004 
threshold_result = optimize_threshold(model_params, c=c)

print("\n" + "="*50)
print(f"Optimal Threshold for: {stock_a} vs {stock_b}")
print(f"mu                : {params['mu']:.6f}")
print(f"b* optimal        : {threshold_result['b_opt']:.6f}")
print(f"Lower Threshold   : {params['mu'] - threshold_result['b_opt']:.6f}")
print(f"Upper Threshold   : {params['mu'] + threshold_result['b_opt']:.6f}")
print(f"Expected return   : {threshold_result['obj_value']:.4f} per unit waktu")
print(f"Transaction costs c : {c:.4f}")
print("="*50)

visualize_trading_signals(
    spread_series=spread,
    mu=params['mu'],
    b_opt=threshold_result['b_opt'],
    pair_name=f"{stock_a} vs {stock_b}"
)


Optimal Threshold for: SAME.JK vs SIDO.JK
mu                : 0.031972
b* optimal        : 0.010000
Lower Threshold   : 0.021972
Upper Threshold   : 0.041972
Expected return   : 0.0000 per unit waktu
Transaction costs c : 0.0040


In [48]:
def visualize_stock_signals(stockA_series, stockB_series, spread_series, mu, b_opt, pair_name):
    upper_bound = mu + b_opt
    lower_bound = mu - b_opt
    
    signals = []
    position = None
    last_signal = None 
    
    for i in range(1, len(spread_series)):
        t = spread_series.index[i]
        prev, curr = spread_series.iloc[i-1], spread_series.iloc[i]
        
        if position is None and prev < upper_bound and curr >= upper_bound:
            if last_signal != 'upper': 
                signals.append(('entry_short', t, 'A', 'sell'))
                signals.append(('entry_short', t, 'B', 'buy'))
                position = 'short'
                last_signal = 'upper' 
        
        elif position is None and prev > lower_bound and curr <= lower_bound:
            if last_signal != 'lower': 
                signals.append(('entry_long', t, 'A', 'buy'))
                signals.append(('entry_long', t, 'B', 'sell'))
                position = 'long'
                last_signal = 'lower' 
        
        elif position == 'long' and prev < mu and curr >= mu:
            position = None 
        
        elif position == 'short' and prev > mu and curr <= mu:
            position = None 
    
    signals_df = pd.DataFrame(signals, columns=['signal_type', 'time', 'stock', 'action'])
    
    fig = make_subplots(rows=2, cols=1, 
                        shared_xaxes=True,
                        vertical_spacing=0.1,
                        subplot_titles=(f'Stock Price {stock_a}', 
                                       f'Stock Price {stock_b}'))
    
    fig.add_trace(go.Scatter(
        x=stockA_series.index,
        y=stockA_series.values,
        mode='lines',
        name='Stock A',
        line=dict(color='#1f77b4', width=2)
    ), row=1, col=1)
    
    fig.add_trace(go.Scatter(
        x=stockB_series.index,
        y=stockB_series.values,
        mode='lines',
        name='Stock B',
        line=dict(color='#ff7f0e', width=2)
    ), row=2, col=1)
    
    stockA_signals = signals_df[signals_df['stock'] == 'A']
    for action_type in ['buy', 'sell']:
        action_signals = stockA_signals[stockA_signals['action'] == action_type]
        if not action_signals.empty:
            fig.add_trace(go.Scatter(
                x=action_signals['time'],
                y=stockA_series.loc[action_signals['time']],
                mode='markers',
                name=f'Stock A {action_type}',
                marker=dict(
                    color='green' if action_type == 'buy' else 'red',
                    size=10,
                    symbol='triangle-up' if action_type == 'buy' else 'triangle-down'
                ),
                hovertemplate='%{x}<br>Price: %{y:.2f}<br>Action: ' + action_type,
                showlegend=False
            ), row=1, col=1)
    
    stockB_signals = signals_df[signals_df['stock'] == 'B']
    for action_type in ['buy', 'sell']:
        action_signals = stockB_signals[stockB_signals['action'] == action_type]
        if not action_signals.empty:
            fig.add_trace(go.Scatter(
                x=action_signals['time'],
                y=stockB_series.loc[action_signals['time']],
                mode='markers',
                name=f'Stock B {action_type}',
                marker=dict(
                    color='green' if action_type == 'buy' else 'red',
                    size=10,
                    symbol='triangle-up' if action_type == 'buy' else 'triangle-down'
                ),
                hovertemplate='%{x}<br>Price: %{y:.2f}<br>Action: ' + action_type,
                showlegend=False
            ), row=2, col=1)
    
    fig.update_layout(
        title=f'Pairs Trading Signals: {pair_name}',
        height=800,
        template='plotly_white',
        hovermode='x unified',
        legend=dict(orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1)
    )
    
    fig.show()

In [44]:
from plotly.subplots import make_subplots

In [49]:
stockA_series = stock_df[stock_a]
stockB_series = stock_df[stock_b]

log_prices = np.log(stock_df)
spread = log_prices[stock_a] - log_prices[stock_a].iloc[0] - (log_prices[stock_b] - log_prices[stock_b].iloc[0])

params_opt = {
    'theta': params['theta'],
    'sigma': params['sigma'],
    'lambda_': params['lambda'],
    'eta': params['eta'],
    'p_u': params['p_u'],
    'delta': params.get('delta', 0)
}

c = 0.004 
result_opt = optimize_threshold(params_opt, c=c)
b_opt = result_opt['b_opt']

mu = params['mu']

visualize_stock_signals(
    stockA_series=stockA_series,
    stockB_series=stockB_series,
    spread_series=spread,
    mu=mu,
    b_opt=b_opt,
    pair_name=f"{stock_a} vs {stock_b}"
)