# PRISM Lorenz System Validation

**Purpose**: Validate PRISM's chaos detection against analytically-known ground truth.

## Lorenz System Equations

$$\frac{dx}{dt} = \sigma(y - x)$$
$$\frac{dy}{dt} = x(\rho - z) - y$$
$$\frac{dz}{dt} = xy - \beta z$$

**Parameters**: $\sigma = 10$, $\rho = 28$, $\beta = 8/3$

## Expected Characteristics

| Property | Expected | Detection Method |
|----------|----------|------------------|
| Deterministic | Yes | RQA Determinism > 0.9 |
| Chaotic | Yes | Positive Lyapunov, bounded attractor |
| Two lobes | Yes | x > 0 (right) vs x < 0 (left) |
| Regime transitions | ~50-60 per 100 time units | Lobe crossings |

In [None]:
import numpy as np
import polars as pl
from scipy.integrate import solve_ivp
import matplotlib.pyplot as plt
from datetime import date, timedelta

# PRISM imports
import sys
sys.path.insert(0, '..')
from prism.db.parquet_store import get_parquet_path
from prism.db.polars_io import write_parquet_atomic

## 1. Generate Lorenz Trajectory

In [None]:
def lorenz_system(t, state, sigma=10, rho=28, beta=8/3):
    """Lorenz ODEs."""
    x, y, z = state
    return [
        sigma * (y - x),
        x * (rho - z) - y,
        x * y - beta * z
    ]

# Integration parameters
t_span = (0, 100)
dt = 0.01
t_eval = np.arange(t_span[0], t_span[1], dt)
initial_state = [1.0, 1.0, 1.0]

# Solve
sol = solve_ivp(
    lorenz_system,
    t_span,
    initial_state,
    t_eval=t_eval,
    method='RK45'
)

print(f"Generated {len(sol.t):,} points")
print(f"x range: [{sol.y[0].min():.2f}, {sol.y[0].max():.2f}]")
print(f"y range: [{sol.y[1].min():.2f}, {sol.y[1].max():.2f}]")
print(f"z range: [{sol.y[2].min():.2f}, {sol.y[2].max():.2f}]")

## 2. Ground Truth: Lobe Transitions

In [None]:
# Detect lobe (regime): left wing (x < 0) vs right wing (x > 0)
lobe = np.where(sol.y[0] > 0, 1, 0)  # 1=right, 0=left
transitions = np.abs(np.diff(lobe))
n_transitions = transitions.sum()

# Compute dwell times
transition_indices = np.where(transitions == 1)[0]
dwell_times = np.diff(transition_indices) * dt

print(f"Lobe transitions: {n_transitions}")
print(f"Mean dwell time: {dwell_times.mean():.3f} time units")
print(f"Std dwell time: {dwell_times.std():.3f} time units")
print(f"Min/Max dwell: {dwell_times.min():.3f} / {dwell_times.max():.3f}")

In [None]:
# Visualize attractor
fig = plt.figure(figsize=(14, 5))

# 3D attractor
ax1 = fig.add_subplot(131, projection='3d')
ax1.plot(sol.y[0], sol.y[1], sol.y[2], lw=0.3, alpha=0.7)
ax1.set_xlabel('x'); ax1.set_ylabel('y'); ax1.set_zlabel('z')
ax1.set_title('Lorenz Attractor')

# x-z projection with lobes
ax2 = fig.add_subplot(132)
colors = ['blue' if x < 0 else 'red' for x in sol.y[0]]
ax2.scatter(sol.y[0], sol.y[2], c=colors, s=0.1, alpha=0.5)
ax2.axvline(x=0, color='black', linestyle='--', alpha=0.5)
ax2.set_xlabel('x'); ax2.set_ylabel('z')
ax2.set_title('x-z Projection (blue=left, red=right)')

# Signal with transitions
ax3 = fig.add_subplot(133)
t_plot = sol.t[:2000]
x_plot = sol.y[0][:2000]
ax3.plot(t_plot, x_plot, lw=0.5)
ax3.axhline(y=0, color='black', linestyle='--', alpha=0.5)
ax3.set_xlabel('Time'); ax3.set_ylabel('x')
ax3.set_title('x(t) with lobe boundary')

plt.tight_layout()
plt.show()

## 3. Convert to PRISM Observations

In [None]:
# Each timestep = 1 day
base_date = date(2020, 1, 1)
n_points = len(sol.t)

# Create observations DataFrame
rows = []
for i in range(n_points):
    obs_date = base_date + timedelta(days=i)
    rows.append({'signal_id': 'lorenz_x', 'obs_date': obs_date, 'value': sol.y[0][i]})
    rows.append({'signal_id': 'lorenz_y', 'obs_date': obs_date, 'value': sol.y[1][i]})
    rows.append({'signal_id': 'lorenz_z', 'obs_date': obs_date, 'value': sol.y[2][i]})
    rows.append({'signal_id': 'lorenz_lobe', 'obs_date': obs_date, 'value': float(lobe[i])})

obs_df = pl.DataFrame(rows)
print(f"Created {len(obs_df):,} observations")
print(f"Date range: {obs_df['obs_date'].min()} to {obs_df['obs_date'].max()}")
obs_df.head(8)

In [None]:
# Save to PRISM default location
default_obs_path = get_parquet_path('raw', 'observations')

if default_obs_path.exists():
    existing = pl.read_parquet(default_obs_path)
    existing = existing.filter(~pl.col('signal_id').str.starts_with('lorenz_'))
    combined = pl.concat([existing, obs_df])
    write_parquet_atomic(combined, default_obs_path)
    print(f"Appended to {default_obs_path}")
else:
    write_parquet_atomic(obs_df, default_obs_path)
    print(f"Created {default_obs_path}")

## 4. Run PRISM Signal Vector

In [None]:
from prism.entry_points.signal_vector import run_sliding_vectors

result = run_sliding_vectors(
    signals=['lorenz_x', 'lorenz_y', 'lorenz_z', 'lorenz_lobe'],
    verbose=True
)
print(f"\nProcessed: {result}")

## 5. Analyze PRISM Results

In [None]:
# Load results
vec = pl.read_parquet(get_parquet_path('vector', 'signal'))
vec = vec.filter(pl.col('signal_id').str.starts_with('lorenz_'))

print(f"Total rows: {len(vec):,}")
print(f"Signals: {vec['signal_id'].unique().to_list()}")
print(f"Engines: {vec['engine'].unique().to_list()}")

In [None]:
# Key metrics for lorenz_x
x_metrics = vec.filter(pl.col('signal_id') == 'lorenz_x')

key_metrics = [
    ('hurst', 'hurst_exponent'),
    ('entropy', 'sample_entropy'),
    ('entropy', 'permutation_entropy'),
    ('rqa', 'determinism'),
    ('rqa', 'entropy'),
    ('rqa', 'laminarity'),
    ('lyapunov', 'lyapunov_exponent'),
]

print("lorenz_x Key Metrics (mean across windows):")
print("=" * 50)
for engine, metric in key_metrics:
    vals = x_metrics.filter(
        (pl.col('engine') == engine) & 
        (pl.col('metric_name') == metric)
    )['metric_value']
    if len(vals) > 0:
        print(f"{metric:<25} {vals.mean():>10.4f} +/- {vals.std():>8.4f}")

## 6. Validation Summary

In [None]:
def get_metric_mean(df, engine, metric):
    vals = df.filter(
        (pl.col('engine') == engine) & 
        (pl.col('metric_name') == metric)
    )['metric_value']
    return vals.mean() if len(vals) > 0 else None

# Validation checks
checks = [
    ('RQA Determinism > 0.9', get_metric_mean(x_metrics, 'rqa', 'determinism'), lambda x: x > 0.9 if x else False),
    ('RQA Laminarity > 0.8', get_metric_mean(x_metrics, 'rqa', 'laminarity'), lambda x: x > 0.8 if x else False),
    ('RQA Entropy > 1.0', get_metric_mean(x_metrics, 'rqa', 'entropy'), lambda x: x > 1.0 if x else False),
    ('Sample Entropy < 0.5 (deterministic)', get_metric_mean(x_metrics, 'entropy', 'sample_entropy'), lambda x: x < 0.5 if x else False),
    ('Hurst > 0.5 (persistent)', get_metric_mean(x_metrics, 'hurst', 'hurst_exponent'), lambda x: x > 0.5 if x else False),
]

print("PRISM Lorenz Validation Results")
print("=" * 60)
print(f"{'Check':<40} {'Value':>10} {'Pass':>6}")
print("-" * 60)
for name, value, check_fn in checks:
    passed = check_fn(value)
    status = 'PASS' if passed else 'FAIL'
    val_str = f"{value:.4f}" if value else "N/A"
    print(f"{name:<40} {val_str:>10} {status:>6}")

print("\nGround Truth Comparison:")
print(f"  Lobe transitions: {n_transitions}")
print(f"  Mean dwell time: {dwell_times.mean():.3f} time units")