# Import Libraries

In [1]:
import numpy as np
import pandas as pd

from uo2009.dgp.params import EfficientPriceParams, NoiseParamsMA2
from uo2009.dgp.simulate import simulate_section4_1_1

from uo2009.estimators.test_statistics import tau_from_Z

## Define the experiment grid (Table 1 design)

In [32]:
# Monte Carlo
R = 50   # paper uses 1000 (start with 50 for debugging)
MASTER_SEED = 2026

# Sampling intervals (seconds)
DELTAS = [10.0, 15.0, 30.0]

# c values used by the paper
C_VALUES = [4.0, 2.0, 1.0, 0.5]

# True threshold for MA(2)
TRUE_M = 2

# Critical value (two-sided 5%)
CRIT = 1.96

# Max lag (ticks) to search over
L = 20  # safe; MA(2) should die out by 2

# Paper sample sizes
N_BY_DELTA = {10: 2300, 15: 1550, 30: 770}

# Paper-locked (c, M, K) by sample size N (from page 121)
CMK_BY_N = {
    2300: {4.0: (52, 44), 2.0: (26, 88), 1.0: (13, 176), 0.5: (7, 330)},
    1550: {4.0: (46, 33), 2.0: (23, 67), 1.0: (12, 129), 0.5: (6, 258)},
    770:  {4.0: (36, 21), 2.0: (18, 42), 1.0: (9, 85),  0.5: (5, 154)},
    # Cannot find the 5 sec.
}


In [3]:
N_BY_DELTA = {5: 4680, 10: 2300, 15: 1550, 30: 770}

## Create parameter objects

In [4]:
eff_params = EfficientPriceParams()     # uses defaults in params.py
noise_params = NoiseParamsMA2()         # nsr1, nsr2

## One simulation run + returns

In [5]:
rng = np.random.default_rng(123)

out = simulate_section4_1_1(
    eff_params=eff_params,
    noise_params=noise_params,
    delta_seconds=10.0,
    rng=rng,
)

y = out.y_obs                 # (n_obs, 2)
r = np.diff(y, axis=0)        # (n_obs-1, 2)
r1, r2 = r[:, 0], r[:, 1]

r1[:5], r2[:5], out.meta["n_obs"]

(array([23.42665774, -5.13794054,  0.39145386, -8.98668678, 23.79760794]),
 array([ 19.22884384, -27.79311779,  10.99404689,   4.71745427,
         13.82186576]),
 2341)

## Build Z-sequences for Eq.(9) and Eq. (16)

In [33]:
def returns_from_prices(y_obs: np.ndarray):
    r = np.diff(y_obs, axis=0)
    return r[:, 0], r[:, 1]

def Z_cross_plus(r1: np.ndarray, r2: np.ndarray, ell_steps: int) -> np.ndarray:
    shift = ell_steps + 1
    if len(r1) <= shift:
        return np.array([], dtype=float)
    return r1[shift:] * r2[:-shift]

def Z_cross_minus(r1: np.ndarray, r2: np.ndarray, ell_steps: int) -> np.ndarray:
    shift = ell_steps + 1
    if len(r1) <= shift:
        return np.array([], dtype=float)
    return r1[:-shift] * r2[shift:]

def Z_univariate(r: np.ndarray, ell_steps: int) -> np.ndarray:
    shift = ell_steps + 1
    if len(r) <= shift:
        return np.array([], dtype=float)
    return r[:-shift] * r[shift:]


## Locked $(M,K)$, truncation to $K*M$, and tau wrapper

In [36]:
def get_MK(N_base: int, c: float):
    c = float(c)
    if N_base in CMK_BY_N and c in CMK_BY_N[N_base]:
        M, K = CMK_BY_N[N_base][c]
        return int(M), int(K), True
    # fallback if not available (e.g., N=4680 for 5 sec)
    M = int(np.floor(c * (N_base ** (1/3))))
    M = max(M, 5)
    K = N_base // M
    return int(M), int(K), False

def truncate_to_KM(Z: np.ndarray, K: int, M: int) -> np.ndarray:
    used = K * M
    if len(Z) >= used:
        return Z[:used]
    # if lag is large and Z shorter, use what we have (tau_from_Z will compute its own K)
    return Z

def tau_stat(Z: np.ndarray, M: int, K: int):
    Z = np.asarray(Z, dtype=float)
    if Z.size == 0:
        return 0.0
    Z_use = truncate_to_KM(Z, K=K, M=M)
    if Z_use.size < M * 2:  # need at least K>=2 blocks inside tau_from_Z
        return 0.0
    return tau_from_Z(Z_use, M=M).tau


## Threshold selection for one replication

In [34]:
def pick_threshold_from_tau(tau_by_ell, crit=1.96):
    rejected = [ell for ell, t in tau_by_ell.items() if abs(t) > crit]
    return max(rejected) if rejected else 0

def estimate_thresholds_one_rep(
    r1: np.ndarray,
    r2: np.ndarray,
    *,
    c: float,
    N_base: int,
    L: int,
    crit: float,
    m2_asset: int = 2,  # set 1 if you want univariate based on asset 1
):
    M, K, locked = get_MK(N_base, c)

    tau_plus = {}
    tau_minus = {}
    tau_m2 = {}

    for ell in range(L + 1):
        Zp = Z_cross_plus(r1, r2, ell)
        Zm = Z_cross_minus(r1, r2, ell)

        if m2_asset == 1:
            Zu = Z_univariate(r1, ell)
        else:
            Zu = Z_univariate(r2, ell)

        tau_plus[ell]  = tau_stat(Zp, M=M, K=K)
        tau_minus[ell] = tau_stat(Zm, M=M, K=K)
        tau_m2[ell]    = tau_stat(Zu, M=M, K=K)

    m_plus  = pick_threshold_from_tau(tau_plus,  crit=crit)
    m_minus = pick_threshold_from_tau(tau_minus, crit=crit)
    m2      = pick_threshold_from_tau(tau_m2,    crit=crit)

    return m_plus, m_minus, m2, {"M": M, "K": K, "locked": locked}


## Monte Carlo

In [37]:
rows = []
master_rng = np.random.default_rng(MASTER_SEED)

for delta in DELTAS:
    N_base = N_BY_DELTA[int(delta)]
    for c in C_VALUES:
        count_plus = 0
        count_minus = 0
        count_m2 = 0

        # record M,K used (should match paper for 10/15/30 sec)
        M, K, locked = get_MK(N_base, c)

        for rep in range(R):
            rng = np.random.default_rng(int(master_rng.integers(0, 2**31 - 1)))

            out = simulate_section4_1_1(
                eff_params=eff_params,
                noise_params=noise_params,
                delta_seconds=float(delta),
                rng=rng,
            )

            r1, r2 = returns_from_prices(out.y_obs)

            # truncate returns to paper N
            r1 = r1[:N_base]
            r2 = r2[:N_base]

            m_plus, m_minus, m2, _info = estimate_thresholds_one_rep(
                r1, r2, c=c, N_base=N_base, L=L, crit=CRIT, m2_asset=2
            )

            count_plus  += (m_plus  < TRUE_M)
            count_minus += (m_minus < TRUE_M)
            count_m2    += (m2      < TRUE_M)

        rows.append({
            "delta_sec": delta,
            "N": N_base,
            "c": c,
            "M": M,
            "K": K,
            "locked_MK": locked,
            "P(m_plus < 2)":  count_plus / R,
            "P(m_minus < 2)": count_minus / R,
            "P(m2 < 2)":      count_m2 / R,
        })

df = pd.DataFrame(rows)
df


Unnamed: 0,delta_sec,N,c,M,K,locked_MK,P(m_plus < 2),P(m_minus < 2),P(m2 < 2)
0,10.0,2300,4.0,52,44,True,0.38,0.26,0.16
1,10.0,2300,2.0,26,88,True,0.64,0.24,0.16
2,10.0,2300,1.0,13,176,True,0.58,0.22,0.14
3,10.0,2300,0.5,7,330,True,0.58,0.28,0.16
4,15.0,1550,4.0,46,33,True,0.48,0.4,0.16
5,15.0,1550,2.0,23,67,True,0.42,0.24,0.18
6,15.0,1550,1.0,12,129,True,0.52,0.3,0.2
7,15.0,1550,0.5,6,258,True,0.6,0.36,0.36
8,30.0,770,4.0,36,21,True,0.56,0.28,0.32
9,30.0,770,2.0,18,42,True,0.5,0.36,0.32


## Format as Table 4.1

In [39]:
pd.options.display.float_format = "{:.3f}".format

panel_plus  = df.pivot(index="c", columns="delta_sec", values="P(m_plus < 2)")
panel_minus = df.pivot(index="c", columns="delta_sec", values="P(m_minus < 2)")
panel_m2    = df.pivot(index="c", columns="delta_sec", values="P(m2 < 2)")

panel_plus, panel_minus, panel_m2

(delta_sec  10.000  15.000  30.000
 c                                
 0.500       0.580   0.600   0.460
 1.000       0.580   0.520   0.460
 2.000       0.640   0.420   0.500
 4.000       0.380   0.480   0.560,
 delta_sec  10.000  15.000  30.000
 c                                
 0.500       0.280   0.360   0.340
 1.000       0.220   0.300   0.480
 2.000       0.240   0.240   0.360
 4.000       0.260   0.400   0.280,
 delta_sec  10.000  15.000  30.000
 c                                
 0.500       0.160   0.360   0.320
 1.000       0.140   0.200   0.380
 2.000       0.160   0.180   0.320
 4.000       0.160   0.160   0.320)

## Debug

In [38]:
check = df[df["delta_sec"].isin([10.0, 15.0, 30.0])][["delta_sec", "N", "c", "M", "K", "locked_MK"]]
check.sort_values(["delta_sec", "c"])

Unnamed: 0,delta_sec,N,c,M,K,locked_MK
3,10.0,2300,0.5,7,330,True
2,10.0,2300,1.0,13,176,True
1,10.0,2300,2.0,26,88,True
0,10.0,2300,4.0,52,44,True
7,15.0,1550,0.5,6,258,True
6,15.0,1550,1.0,12,129,True
5,15.0,1550,2.0,23,67,True
4,15.0,1550,4.0,46,33,True
11,30.0,770,0.5,5,154,True
10,30.0,770,1.0,9,85,True
