This notebook applies the directional-change (DC) framework to multi-asset price data using two optimization functions:  
- `event_density_score`, which measures how closely the observed DC event density matches a target density $d_{\text{target}}$, and  
- `up_down_asymmetry`, which quantifies the strength of upward and downward trend continuation via normalized overshoot scores.

The analysis proceeds in three stages:  
1. **Event-density evaluation**: For a selected asset, volatility $\sigma = \operatorname{std}(\ln(P_t / P_{t-1}))$ is computed, and the event-density score is evaluated over a grid of threshold multipliers $k \in [8, 52]$, with $\theta = k \cdot \sigma$.  
2. **Global squash calibration**: Using all assets, empirical overshoot magnitudes  
   $$
   r_i = \frac{(P_{\text{ext},i} - P_{\text{ext},i-1}) / P_{\text{ext},i-1}}{\theta}
   $$  
   are collected, and a universal squash parameter $s^*$ is estimated via hierarchical bootstrap for the mapping $\mu(r) = 1 - e^{-r/s}$.  
3. **Asymmetry visualization**: For the selected asset, the asymmetry scores $\mu_{\text{up}}$ and $\mu_{\text{down}}$ are computed across the same $k$-grid using the calibrated $s^*$, and visualized to assess trend strength as a function of threshold scale.

The output is a **calibrated squash parameter** $s^*$ and **diagnostic plots** showing how event density and asymmetry vary with $k$.

In [None]:
# Core data loading
from core.data import load_all_klines

# Scientific computing
import numpy as np

# Visualization
import plotly.graph_objects as go
import matplotlib.pyplot as plt

# Your DC framework
from core.dc import (
    compute_directional_change_events,
    attach_OSV_EXT_to_runs,
)
from core.opt import event_density_score, up_down_asymmetry

# === CONFIGURATION ===
SELECTED_COIN = "BTCUSDT"  # Change this to analyze different coins

# Load 4+ years of hourly price data for all available symbols
df = load_all_klines(
    root="data/data/spot/monthly/klines/",
    interval="1h",
    range_folder="2017-01-01_2025-10-08",
    min_years=4,
)

print(f"Loaded data for {df.index.get_level_values('Symbol').nunique()} assets")
print(
    f"Date range: {df.index.get_level_values('Open Time').min()} to {df.index.get_level_values('Open Time').max()}"
)

# Get prices for selected coin
prices = df.loc[SELECTED_COIN].Close.dropna().to_numpy()
print(f"Analyzing {SELECTED_COIN} with {len(prices)} price points")
print("===========================================")

## Event Density
This code visualizes how well the **observed directional-change (DC) event density** matches a **target density** $d_{\text{target}} = 0.002$ (≈1 event per 500 samples) as a function of a **universal scaling factor** $k$.

For a given asset price series $P_t$, it first computes the **volatility** as the standard deviation of log returns:
$$
\sigma = \operatorname{std}\big(\ln(P_t / P_{t-1})\big),
$$
which is dimensionless and reflects the asset’s typical % move.

The DC threshold is then set **adaptively** as:
$$
\theta = k \cdot \sigma,
$$
so that $k$ controls how many “typical moves” are required to trigger a directional change. The loop tests $k \in [8, 52]$, corresponding to thresholds from $8\sigma$ to $52\sigma$.

For each $k$, the DC pipeline detects events, and the **event density score** is computed as:
$$
s(d) = \exp\!\Big(-\beta \, \big|\ln(d / d_{\text{target}})\big|^\alpha\Big), \quad \text{where} \quad d = \frac{N}{T},
$$
with $N$ = number of runs, $T$ = total samples, $\alpha = 2$, $\beta = 1.5$.  
This score is **dimensionless**, peaks at 1 when $d = d_{\text{target}}$, and symmetrically penalizes under- or over-detection.

In [None]:
# --- Compute volatility (log returns std) ---
log_returns = np.diff(np.log(prices))
sigma = np.std(log_returns)
print(f"{SELECTED_COIN} volatility (σ): {sigma:.4f} (~{sigma * 100:.2f}%)")

# --- Define k range (universal scaling factor) ---
k_values_density = np.linspace(8, 52, 88)  # Test thresholds from 8σ to 52σ

# Target density (1 event per ~500 samples)
d_target = 0.002

# --- Initialize storage ---
score_vals, counts, thetas_used = [], [], []

# --- Loop over k values ---
for k in k_values_density:
    theta = k * sigma  # Adaptive threshold

    # Run DC pipeline
    events, runs = compute_directional_change_events(prices, theta)
    runs = attach_OSV_EXT_to_runs(runs, theta)

    # Compute scores
    penalty = event_density_score(prices, events, d_target=d_target, alpha=2, beta=1.5)
    score_vals.append(penalty)
    counts.append(len(runs))
    thetas_used.append(theta)

# Convert to arrays
score_vals = np.array(score_vals)
counts = np.array(counts)
thetas_used = np.array(thetas_used)

# --- Compute derived quantities ---
T = len(prices)
densities = counts / T
ratio = densities / d_target
log_ratio = np.log10(ratio)

# Sort for smooth curve
sorted_idx = np.argsort(log_ratio)
log_ratio_sorted = log_ratio[sorted_idx]
penalty_sorted = score_vals[sorted_idx]

# --- Create interactive plot ---
fig = go.Figure()

# Primary trace: Penalty vs k
fig.add_trace(
    go.Scatter(
        x=k_values_density,
        y=score_vals,
        name="Penalty (vs k)",
        mode="lines+markers",
        marker=dict(color="blue"),
        line=dict(color="blue"),
        hovertemplate="k=%{x:.2f}<br>Penalty=%{y:.4f}<br>θ=%{text:.3f}%",
        text=np.round(thetas_used * 100, 3),
    )
)

# Secondary trace: Penalty vs log10(d/d_target)
fig.add_trace(
    go.Scatter(
        x=log_ratio_sorted,
        y=penalty_sorted,
        name="Penalty (vs log₁₀(d/dₜ))",
        mode="lines+markers",
        marker=dict(color="green", symbol="circle"),
        line=dict(color="green", dash="dot"),
        hovertemplate="log₁₀(d/dₜ)=%{x:.3f}<br>Penalty=%{y:.4f}",
        xaxis="x2",
    )
)

# Right y-axis: Event count
fig.add_trace(
    go.Scatter(
        x=k_values_density,
        y=counts,
        name="Event count",
        mode="lines+markers",
        marker=dict(color="red", symbol="square"),
        line=dict(color="red", dash="dash"),
        yaxis="y2",
        hovertemplate="k=%{x:.2f}<br>Runs=%{y}",
    )
)

# Layout
fig.update_layout(
    title=f"Event-count penalty vs universal scaling factor k ({SELECTED_COIN})",
    xaxis=dict(title="Universal scaling factor k", showgrid=True),
    xaxis2=dict(
        title="log₁₀(observed density / target density)",
        showgrid=False,
        overlaying="x",
        side="top",
        anchor="y",
    ),
    yaxis=dict(
        title=dict(text="Event-count penalty", font=dict(color="blue")),
        tickfont=dict(color="blue"),
        range=[0, 1],
    ),
    yaxis2=dict(
        title=dict(text="Number of runs", font=dict(color="red")),
        tickfont=dict(color="red"),
        overlaying="y",
        side="right",
    ),
    legend=dict(x=0.02, y=0.98, bgcolor="rgba(0,0,0,0)"),
    template="plotly_white",
)

fig.show()

The plot shows:
- **Blue curve**: penalty score vs. $k$ (primary x-axis)
- **Green curve**: same score vs. $\log_{10}(d / d_{\text{target}})$ (top x-axis), illustrating the symmetric log-ratio penalty
- **Red curve**: number of detected runs (right y-axis)

## Squash Parameter Calibration (Asymmetry Score)

This code calibrates a universal **squash parameter** $ s $ for the normalized overshoot mapping  
$$
\mu(r) = 1 - e^{-r / s},
$$  
using a hierarchical bootstrap across multiple assets to ensure cross-asset robustness.

For each asset, it first computes the **volatility** as the standard deviation of log returns:  
$$
\sigma = \operatorname{std}\big(\ln(P_t / P_{t-1})\big),
$$  
which is dimensionless and characterizes the asset’s typical relative move.

Across a grid of threshold multipliers $ k \in [8, 52] $, it sets the directional-change threshold as $ \theta = k \cdot \sigma $, runs the DC detection pipeline, and collects the absolute values of the **dimensionless overshoots**:  
$$
r_i = \frac{(P_{\text{ext},i} - P_{\text{ext},i-1}) / P_{\text{ext},i-1}}{\theta} = \texttt{OSV\_EXT}_i.
$$

Only assets with at least 50 valid overshoot samples are retained. For each such asset $ c $, the median overshoot $ \tilde{r}_c = \operatorname{median}(|r_i|) $ is computed. The baseline squash estimate is the median of these medians across all assets.

To quantify uncertainty, a bootstrap procedure is performed: in each of $ m = 50 $ iterations, a random sample of 70% of assets is drawn with replacement, and the median of their medians is recorded, yielding a distribution $ \{s^{(j)}\}_{j=1}^{m} $.

The final **global squash parameter** is set to the median of this bootstrap distribution:  
$$
s^* = \operatorname{median}\big(\{s^{(j)}\}_{j=1}^{m}\big),
$$  
with a 90% confidence interval derived from the 5th and 95th percentiles. This $ s^* $ ensures the asymmetry score $ \mu(r) $ saturates at a rate consistent with empirical overshoot behavior across the asset universe.

In [None]:
# --- CONFIGURATION ---
sample_frac = 0.7
m_bootstrap = 50
k_values_bootstrap = np.linspace(8, 52, 88)  # Test thresholds from 8σ to 52σ
all_coins = df.index.get_level_values("Symbol").unique().to_list()
rng = np.random.default_rng(42)

# --- Precompute volatility per coin ---
volatilities = {}
valid_coins = []

for coin in all_coins:
    coin_prices = df.loc[coin].Close.dropna().to_numpy()
    if len(coin_prices) < 10:
        continue
    log_ret = np.diff(np.log(coin_prices))
    sigma_coin = np.std(log_ret)
    if sigma_coin > 0:
        volatilities[coin] = sigma_coin
        valid_coins.append(coin)

if not valid_coins:
    raise ValueError("No valid coins with positive volatility")

print(f"Valid coins with positive volatility: {len(valid_coins)}")


# --- Helper function to collect |OSV_EXT| values ---
def collect_osv_abs(
    prices: np.ndarray, sigma: float, k_values: np.ndarray
) -> np.ndarray:
    ratios = []
    for k in k_values:
        theta = k * sigma
        events, runs = compute_directional_change_events(prices, theta)
        runs = attach_OSV_EXT_to_runs(runs, theta)
        osv_vals = [r["OSV_EXT"] for r in runs if r.get("OSV_EXT") is not None]
        osv_vals = np.array(osv_vals, dtype=float)
        osv_vals = osv_vals[~np.isnan(osv_vals)]
        if osv_vals.size == 0:
            continue
        ratios.extend(np.abs(osv_vals))
    return np.array(ratios)


# --- Collect |OSV_EXT| per coin ---
ratios_per_coin = {}
for coin in valid_coins:
    coin_prices = df.loc[coin].Close.dropna().to_numpy()
    ratios = collect_osv_abs(coin_prices, volatilities[coin], k_values_bootstrap)
    if ratios.size >= 50:  # Require minimum samples
        ratios_per_coin[coin] = ratios

available_coins = list(ratios_per_coin.keys())
n_total = len(available_coins)
if n_total == 0:
    raise ValueError("No coins with sufficient overshoot data")

print(f"Coins with sufficient overshoot data: {n_total}")

# --- Baseline: median of coin medians ---
coin_medians = {c: np.median(r) for c, r in ratios_per_coin.items()}
baseline = np.median(list(coin_medians.values()))
print(f"\nBaseline median of |OSV_EXT|: {baseline:.2f} from {n_total} coins")

# --- Bootstrap hierarchical medians ---
boot_squash = []
for i in range(m_bootstrap):
    n_sample = max(1, int(sample_frac * n_total))
    sample = rng.choice(available_coins, size=n_sample, replace=True)
    sample_medians = [np.median(ratios_per_coin[c]) for c in sample]
    boot_squash.append(np.median(sample_medians))
boot_squash = np.array(boot_squash)

# --- Bootstrap diagnostics ---
print("\n--- Bootstrap stability ---")
mean_s, std_s = boot_squash.mean(), boot_squash.std()
cv = std_s / mean_s
print(f"Mean squash  : {mean_s:.2f}")
print(f"Std deviation: {std_s:.2f}")
print(f"Coeff. of variation: {cv:.3f}")

# Plot 1: Histogram of bootstrap squash estimates
plt.figure(figsize=(8, 5))
plt.hist(boot_squash, bins=12, color="cornflowerblue", edgecolor="k", alpha=0.8)
plt.axvline(mean_s, color="red", linestyle="--", label=f"Mean = {mean_s:.2f}")
plt.title("Bootstrap Distribution of Squash Estimates")
plt.xlabel("Estimated Squash Parameter")
plt.ylabel("Frequency")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.show()

# --- μ(r) stability curves ---
combined = np.concatenate(list(ratios_per_coin.values()))
test_r = np.linspace(np.percentile(combined, 5), np.percentile(combined, 95), 50)

# Plot 2: μ(r) curves for all bootstrap samples
plt.figure(figsize=(10, 6))
for s in boot_squash:
    plt.plot(test_r, 1 - np.exp(-test_r / s), color="gray", alpha=0.3, lw=0.8)

mu_curves = np.array([1 - np.exp(-test_r / s) for s in boot_squash])
mu_mean = mu_curves.mean(axis=0)
mu_std = mu_curves.std(axis=0)

plt.plot(test_r, mu_mean, "k", lw=2.5, label="Mean μ Curve")
plt.fill_between(
    test_r,
    mu_mean - mu_std,
    mu_mean + mu_std,
    color="lightgray",
    alpha=0.6,
    label="±1σ Band",
)
plt.title("Stability of μ(r) = 1 - exp(-r/s) Across Bootstrap Samples")
plt.xlabel("|OSV_EXT| (Overshoot in Units of Theta)")
plt.ylabel("Normalized Overshoot Score μ")
plt.grid(True, linestyle="--", alpha=0.5)
plt.legend()
plt.tight_layout()
plt.show()

# --- Final global squash estimate ---
global_squash = np.median(boot_squash)
ci_low, ci_high = np.percentile(boot_squash, [5, 95])
print(f"\n✅ Recommended global squash: {global_squash:.2f}")
print(f"   90% bootstrap CI: [{ci_low:.2f}, {ci_high:.2f}]")

## Asymmetry Score
This code evaluates how **strongly upward and downward price trends overshoot** the directional-change (DC) threshold across a range of **volatility-scaled thresholds** $ \theta = k \cdot \sigma $, and identifies the optimal $ k $ that maximizes **balanced trend strength**.

For each candidate $ k \in [8, 52] $, it:
1. Sets the DC threshold as $ \theta = k \cdot \sigma $, where $ \sigma $ is the asset’s log-return volatility.
2. Detects DC events and computes the **dimensionless overshoot** for each run:
   $$
   r_i = \frac{(P_{\text{ext},i} - P_{\text{ext},i-1}) / P_{\text{ext},i-1}}{\theta},
   $$
   which measures how many **threshold-sized moves** occurred between consecutive extrema.
3. Computes **normalized asymmetry scores**:
   $$
   \mu_{\text{up}} = 1 - e^{-\bar{r}_{\text{up}} / s}, \quad
   \mu_{\text{down}} = 1 - e^{-\bar{r}_{\text{down}} / s},
   $$
   where $ \bar{r}_{\text{up}}, \bar{r}_{\text{down}} $ are the mean overshoots for upward and downward runs, and $ s = \texttt{SQUASH\_VALUE} $ controls saturation (higher $ s $ → slower approach to 1).

The **asymmetry score** is the average $ (\mu_{\text{up}} + \mu_{\text{down}})/2 \in [0,1] $, peaking when **both directions exhibit strong, sustained moves** beyond the DC trigger.

In [None]:
# Use the calibrated squash value from bootstrap
SQUASH_VALUE = global_squash

# Reuse volatility computed earlier
sigma_coin = sigma  # From Cell 3

# --- Define k range for asymmetry analysis ---
k_values_asym = np.linspace(8, 52, 88)

# --- Run optimization loop ---
asym_scores = []
run_counts = []
mu_up_vals = []
mu_down_vals = []

for k in k_values_asym:
    theta = k * sigma_coin

    # Run DC pipeline
    events, runs = compute_directional_change_events(prices, theta)
    runs = attach_OSV_EXT_to_runs(runs, theta)

    # Compute asymmetry
    mu_up, mu_down = up_down_asymmetry(runs, theta, squash=SQUASH_VALUE)
    score = (mu_up + mu_down) / 2.0

    asym_scores.append(score)
    run_counts.append(len(runs))
    mu_up_vals.append(mu_up)
    mu_down_vals.append(mu_down)

# --- Create interactive plot ---
fig = go.Figure()

# Primary trace: asymmetry score
fig.add_trace(
    go.Scatter(
        x=k_values_asym,
        y=asym_scores,
        name="Up–Down Asymmetry Score (avg)",
        mode="lines+markers",
        marker=dict(color="dodgerblue"),
        line=dict(color="dodgerblue"),
        hovertemplate="k = %{x:.1f}σ<br>Asymmetry Score = %{y:.4f}<extra></extra>",
    )
)

# Upward component
fig.add_trace(
    go.Scatter(
        x=k_values_asym,
        y=mu_up_vals,
        name="μ↑ (Upward)",
        mode="lines",
        line=dict(color="green", dash="dot"),
        hovertemplate="k = %{x:.1f}σ<br>μ↑ = %{y:.4f}<extra></extra>",
    )
)

# Downward component
fig.add_trace(
    go.Scatter(
        x=k_values_asym,
        y=mu_down_vals,
        name="μ↓ (Downward)",
        mode="lines",
        line=dict(color="purple", dash="dot"),
        hovertemplate="k = %{x:.1f}σ<br>μ↓ = %{y:.4f}<extra></extra>",
    )
)

# Secondary trace: number of runs
fig.add_trace(
    go.Scatter(
        x=k_values_asym,
        y=run_counts,
        name="Number of Events",
        mode="lines+markers",
        marker=dict(color="red", symbol="square"),
        line=dict(color="red", dash="dash"),
        yaxis="y2",
        hovertemplate="k = %{x:.1f}σ<br>Events = %{y}<extra></extra>",
    )
)

# Layout
fig.update_layout(
    title=f"{SELECTED_COIN}: Normalized Up–Down Asymmetry Score vs. Volatility-Scaled Threshold (k)<br><sub>Squash = {SQUASH_VALUE:.2f}</sub>",
    xaxis=dict(
        title="Threshold Multiplier k (θ = k · σ)",
        tickmode="linear",
        dtick=5,
    ),
    yaxis=dict(
        title=dict(text="Asymmetry Score [0–1]", font=dict(color="dodgerblue")),
        tickfont=dict(color="dodgerblue"),
        range=[0, 1],
    ),
    yaxis2=dict(
        title=dict(text="Number of Runs", font=dict(color="red")),
        tickfont=dict(color="red"),
        overlaying="y",
        side="right",
    ),
    legend=dict(x=0.02, y=0.98, bgcolor="rgba(0,0,0,0)"),
    template="plotly_white",
    width=900,
    height=500,
)

fig.show()

# Find optimal k for selected coin based on asymmetry score
optimal_idx = np.argmax(asym_scores)
optimal_k = k_values_asym[optimal_idx]
optimal_score = asym_scores[optimal_idx]
optimal_runs = run_counts[optimal_idx]

The plot shows:
- **Blue curve**: overall asymmetry score vs. $ k $
- **Green/purple curves**: individual $ \mu_{\text{up}} $ and $ \mu_{\text{down}} $ components
- **Red curve**: number of detected runs (right axis)

The optimal $ k $ (marked by peak score) balances **sufficient event count** with **maximal trend continuation strength**, providing a robust, scale-invariant parameter for cross-asset DC analysis.

In [None]:
print("=== FINAL PARAMETER RECOMMENDATIONS ===")
print(f"Global squash parameter: {global_squash:.2f}")
print("")
print(f"For {SELECTED_COIN}:")
print(f"  Optimal k: {optimal_k:.1f}")
print(
    f"  Optimal theta: {optimal_k * sigma_coin:.4f} ({optimal_k * sigma_coin * 100:.2f}%)"
)
print(f"  Expected runs: {optimal_runs}")
print(f"  Asymmetry score: {optimal_score:.4f}")
print("")
print("For other assets, use the same global squash parameter")
print("and compute theta = optimal_k * asset_volatility")