# Cost Derivation Analysis

Deriving expected cost profiles from existing coherence model metrics.

**Reference:** `notes/cost-types.md`

## Cost Proxy Mapping

| Cost Type | Proxy Metric | Interpretation |
|-----------|--------------|----------------|
| **Shock Cost** | `max_deviation` | How violently does the system absorb stress? |
| **Repair Cost** | `recovery_time` | How long does the system remain impaired? |
| **Maintenance Cost** | `baseline` variance | How much does the system pay to stay adaptable? |
| **Cost Efficiency** | Amplification ratios | How efficiently does the system convert perturbation into damage? |

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (14, 6)
plt.rcParams['font.size'] = 11

EXPORTS_DIR = Path('../exports')
NOTES_DIR = Path('../notes')

In [None]:
# Import parsing functions from main notebook
def parse_behaviorspace_spreadsheet(filepath):
    with open(filepath, 'r') as f:
        lines = f.readlines()
    
    metadata = {
        'version': lines[0].strip().split(',')[0].strip('"'),
        'model': lines[1].strip().strip('"'),
        'experiment': lines[2].strip().strip('"'),
        'timestamp': lines[3].strip().strip('"')
    }
    
    data_start = None
    for i, line in enumerate(lines):
        if '[all run data]' in line:
            data_start = i
            break
    
    if data_start is None:
        raise ValueError("Could not find [all run data] marker")
    
    params = {}
    for i in range(7, data_start):
        parts = lines[i].strip().split(',')
        if parts and parts[0] and not parts[0].startswith('['):
            param_name = parts[0].strip('"')
            values = [p.strip('"') for p in parts[1:] if p.strip('"')]
            if not param_name.startswith('['):
                params[param_name] = values
    
    header_line = lines[data_start].strip().split(',')
    col_names = [h.strip('"') for h in header_line]
    
    df = pd.read_csv(filepath, skiprows=data_start+1, header=None)
    df.columns = col_names[:len(df.columns)]
    
    if df.columns[0] == '' or df.columns[0] == '[all run data]':
        df = df.iloc[:, 1:]
    
    return metadata, params, df


def reshape_runs(df):
    cols = list(df.columns)
    cols_per_run = 5  # v2 metrics
    
    runs = []
    i = 0
    run_id = 1
    
    while i < len(cols) - (cols_per_run - 1):
        if 'step' in str(cols[i]).lower():
            run_data = df.iloc[:, i:i+cols_per_run].copy()
            run_data.columns = ['step', 'recovery_time', 'baseline', 'max_deviation', 'heading_variance']
            run_data['run_id'] = run_id
            runs.append(run_data)
            run_id += 1
            i += cols_per_run
        else:
            i += 1
    
    if runs:
        return pd.concat(runs, ignore_index=True)
    return df


def build_conditions(params):
    first_param = list(params.values())[0]
    n_runs = len(first_param)
    conditions = {}
    
    for i in range(n_runs):
        cond = {'run_id': i + 1}
        
        if 'entrainment-mode?' in params:
            cond['entrainment'] = params['entrainment-mode?'][i]
            cond['mode'] = 'Entrainment' if cond['entrainment'] == 'true' else 'Coherence'
        
        if 'perturbation-strength' in params:
            cond['strength'] = int(params['perturbation-strength'][i])
        
        if 'coupling-strength' in params:
            cond['coupling'] = float(params['coupling-strength'][i])
        
        if 'identity-pull-weight' in params:
            cond['identity_pull'] = float(params['identity-pull-weight'][i])
        
        conditions[i + 1] = cond
    
    return conditions

## 1. Load E003 Data (Mode Contrast)

In [None]:
# Load E003
e003_file = EXPORTS_DIR / 'coherence_model_simple_E003-spreadsheet.csv'
meta, params, df_raw = parse_behaviorspace_spreadsheet(e003_file)
df_long = reshape_runs(df_raw)
conditions = build_conditions(params)

print(f"Experiment: {meta['experiment']}")
print(f"Runs: {len(conditions)}")
print("\nConditions:")
for run_id, cond in conditions.items():
    print(f"  Run {run_id}: {cond['mode']}, strength={cond['strength']}")

In [None]:
# Extract final metrics for each run
def extract_final_metrics(df_long, conditions):
    results = []
    for run_id, cond in conditions.items():
        run_data = df_long[df_long['run_id'] == run_id]
        final = run_data.iloc[-1]
        
        results.append({
            'run_id': run_id,
            'mode': cond.get('mode', 'Unknown'),
            'strength': cond.get('strength', 0),
            'recovery_time': final['recovery_time'],
            'baseline': final['baseline'],
            'max_deviation': final['max_deviation'],
            'mean_hv': run_data['heading_variance'].mean(),
            'std_hv': run_data['heading_variance'].std(),
        })
    
    return pd.DataFrame(results)

summary = extract_final_metrics(df_long, conditions)
print(summary.to_string(index=False))

## 2. Cost Proxy Derivation

### 2.1 Shock Cost (Peak Deviation)
> How violently does the system absorb stress?

Higher max_deviation = higher instantaneous shock cost

In [None]:
def compute_shock_cost(summary):
    """Shock cost = max_deviation (direct proxy)"""
    summary = summary.copy()
    summary['shock_cost'] = summary['max_deviation']
    return summary

summary = compute_shock_cost(summary)

# Compare shock cost by mode
coh = summary[summary['mode'] == 'Coherence'].sort_values('strength')
ent = summary[summary['mode'] == 'Entrainment'].sort_values('strength')

print("Shock Cost (max_deviation):")
print(f"{'Strength':<10} {'Coherence':<12} {'Entrainment':<12} {'Ratio (E/C)':<12}")
print("-" * 46)
for (_, c), (_, e) in zip(coh.iterrows(), ent.iterrows()):
    ratio = e['shock_cost'] / c['shock_cost'] if c['shock_cost'] > 0 else float('inf')
    print(f"{c['strength']:<10} {c['shock_cost']:<12.2f} {e['shock_cost']:<12.2f} {ratio:<12.2f}")

### 2.2 Repair Cost (Recovery Time)
> How long does the system remain impaired?

In [None]:
def compute_repair_cost(summary):
    """Repair cost = recovery_time (direct proxy)
    
    Note: -1 means Did Not Recover (DNR) - effectively infinite cost
    For analysis, we cap at experiment duration or use a sentinel value.
    """
    summary = summary.copy()
    # Treat 0 as minimal repair cost (immediate recovery)
    # Treat -1 as DNR (set to max observed or experiment duration)
    max_observed = summary[summary['recovery_time'] >= 0]['recovery_time'].max()
    summary['repair_cost'] = summary['recovery_time'].apply(
        lambda x: max_observed * 2 if x < 0 else x  # DNR = 2x max observed
    )
    return summary

summary = compute_repair_cost(summary)

print("Repair Cost (recovery_time):")
print(f"{'Strength':<10} {'Coherence':<12} {'Entrainment':<12} {'Ratio (E/C)':<12}")
print("-" * 46)
for (_, c), (_, e) in zip(coh.iterrows(), ent.iterrows()):
    if c['repair_cost'] == 0:
        ratio_str = "both 0" if e['repair_cost'] == 0 else "inf"
    else:
        ratio_str = f"{e['repair_cost'] / c['repair_cost']:.1f}x"
    print(f"{c['strength']:<10} {c['repair_cost']:<12.0f} {e['repair_cost']:<12.0f} {ratio_str:<12}")

### 2.3 Maintenance Cost (Baseline Variance)
> How much does the system pay to stay adaptable?

Higher baseline variance = ongoing regulatory effort (the "cost of staying ready")

In [None]:
def compute_maintenance_cost(summary):
    """Maintenance cost = baseline variance
    
    Coherence pays continuous maintenance cost (higher baseline)
    Entrainment pays lower maintenance but higher shock/repair costs
    """
    summary = summary.copy()
    summary['maintenance_cost'] = summary['baseline']
    return summary

summary = compute_maintenance_cost(summary)

print("Maintenance Cost (baseline variance):")
print(f"{'Strength':<10} {'Coherence':<12} {'Entrainment':<12} {'Ratio (C/E)':<12}")
print("-" * 46)
for (_, c), (_, e) in zip(coh.iterrows(), ent.iterrows()):
    ratio = c['maintenance_cost'] / e['maintenance_cost'] if e['maintenance_cost'] > 0 else float('inf')
    print(f"{c['strength']:<10} {c['maintenance_cost']:<12.1f} {e['maintenance_cost']:<12.1f} {ratio:<12.1f}x")

### 2.4 Total Cost Profile

Combining costs requires normalization and weighting. Key insight:
- **Coherence**: Steady maintenance cost, low shock/repair costs
- **Entrainment**: Low maintenance, but shock/repair costs scale superlinearly with stress

In [None]:
def normalize_costs(summary, perturbation_frequency=1.0):
    """Normalize costs to [0,1] range for comparison.
    
    perturbation_frequency: expected perturbations per time unit
        - Higher frequency increases weight of shock/repair costs
        - Lower frequency favors low-maintenance strategies
    """
    summary = summary.copy()
    
    # Normalize each cost type to [0, 1]
    for cost_type in ['shock_cost', 'repair_cost', 'maintenance_cost']:
        col = summary[cost_type]
        summary[f'{cost_type}_norm'] = (col - col.min()) / (col.max() - col.min() + 1e-10)
    
    # Composite cost:
    # - Maintenance is continuous (weight = 1)
    # - Shock/repair are episodic (weight = perturbation_frequency)
    summary['episodic_cost_norm'] = (
        summary['shock_cost_norm'] + summary['repair_cost_norm']
    ) / 2
    
    summary['total_cost'] = (
        summary['maintenance_cost_norm'] + 
        perturbation_frequency * summary['episodic_cost_norm']
    )
    
    return summary

# Compare at different perturbation frequencies
for freq in [0.5, 1.0, 2.0]:
    summary_norm = normalize_costs(summary, perturbation_frequency=freq)
    
    print(f"\nTotal Cost (perturbation frequency = {freq}):")
    print(f"{'Strength':<10} {'Coherence':<12} {'Entrainment':<12} {'Lower Cost':<12}")
    print("-" * 46)
    
    coh_n = summary_norm[summary_norm['mode'] == 'Coherence'].sort_values('strength')
    ent_n = summary_norm[summary_norm['mode'] == 'Entrainment'].sort_values('strength')
    
    for (_, c), (_, e) in zip(coh_n.iterrows(), ent_n.iterrows()):
        winner = 'Coherence' if c['total_cost'] < e['total_cost'] else 'Entrainment'
        print(f"{c['strength']:<10} {c['total_cost']:<12.3f} {e['total_cost']:<12.3f} {winner:<12}")

## 3. Cost Profile Visualization

In [None]:
def plot_cost_profiles(summary):
    """Visualize cost profiles by regime."""
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    strengths = sorted(summary['strength'].unique())
    x = np.arange(len(strengths))
    width = 0.35
    
    coh = summary[summary['mode'] == 'Coherence'].sort_values('strength')
    ent = summary[summary['mode'] == 'Entrainment'].sort_values('strength')
    
    # 1. Shock Cost
    ax = axes[0, 0]
    ax.bar(x - width/2, coh['shock_cost'], width, label='Coherence', color='#3498db', alpha=0.8)
    ax.bar(x + width/2, ent['shock_cost'], width, label='Entrainment', color='#e74c3c', alpha=0.8)
    ax.set_ylabel('Shock Cost (max deviation)')
    ax.set_title('Shock Cost\n(instantaneous perturbation load)', fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels(strengths)
    ax.set_xlabel('Perturbation Strength')
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')
    
    # 2. Repair Cost
    ax = axes[0, 1]
    ax.bar(x - width/2, coh['repair_cost'], width, label='Coherence', color='#3498db', alpha=0.8)
    ax.bar(x + width/2, ent['repair_cost'], width, label='Entrainment', color='#e74c3c', alpha=0.8)
    ax.set_ylabel('Repair Cost (recovery time)')
    ax.set_title('Repair Cost\n(time to restore viability)', fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels(strengths)
    ax.set_xlabel('Perturbation Strength')
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')
    
    # 3. Maintenance Cost
    ax = axes[1, 0]
    ax.bar(x - width/2, coh['maintenance_cost'], width, label='Coherence', color='#3498db', alpha=0.8)
    ax.bar(x + width/2, ent['maintenance_cost'], width, label='Entrainment', color='#e74c3c', alpha=0.8)
    ax.set_ylabel('Maintenance Cost (baseline variance)')
    ax.set_title('Maintenance Cost\n(ongoing regulatory effort)', fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels(strengths)
    ax.set_xlabel('Perturbation Strength')
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')
    
    # 4. Cost Tradeoff Summary
    ax = axes[1, 1]
    
    # Calculate ratios
    shock_ratio = ent['shock_cost'].values / (coh['shock_cost'].values + 1e-10)
    repair_ratio = ent['repair_cost'].values / (coh['repair_cost'].values + 1e-10)
    # For repair, handle 0/0 case
    repair_ratio = np.where(
        (coh['repair_cost'].values == 0) & (ent['repair_cost'].values == 0),
        1.0,
        np.where(coh['repair_cost'].values == 0, 10.0, repair_ratio)
    )
    maint_ratio = coh['maintenance_cost'].values / (ent['maintenance_cost'].values + 1e-10)
    
    ax.plot(strengths, shock_ratio, 'o-', label='Shock (Ent/Coh)', color='#e74c3c', linewidth=2)
    ax.plot(strengths, repair_ratio, 's-', label='Repair (Ent/Coh)', color='#9b59b6', linewidth=2)
    ax.plot(strengths, maint_ratio, '^-', label='Maintenance (Coh/Ent)', color='#3498db', linewidth=2)
    ax.axhline(y=1, color='gray', linestyle='--', alpha=0.5, label='Parity')
    
    ax.set_ylabel('Cost Ratio')
    ax.set_title('Cost Tradeoff Ratios\n(>1 means first regime pays more)', fontweight='bold')
    ax.set_xlabel('Perturbation Strength')
    ax.legend(loc='upper left')
    ax.grid(True, alpha=0.3)
    ax.set_yscale('log')
    ax.set_ylim(0.5, 20)
    
    plt.suptitle('E003: Cost Profiles by Regime', fontsize=14, fontweight='bold', y=1.02)
    plt.tight_layout()
    return fig, axes

fig, axes = plot_cost_profiles(summary)
plt.savefig(EXPORTS_DIR / 'cost_profiles_E003.png', dpi=150, bbox_inches='tight')
plt.show()

## 4. Cost Distribution Over Time

Key insight from `cost-types.md`:
> Coherence: steady, bounded cost over time  
> Entrainment: minimal cost during calm, large spikes during perturbation and recovery

In [None]:
def plot_temporal_cost_distribution(df_long, conditions, strength_filter=80):
    """Plot variance (proxy for instantaneous cost) over time."""
    fig, ax = plt.subplots(figsize=(14, 5))
    
    for run_id, cond in conditions.items():
        if cond['strength'] != strength_filter:
            continue
        
        run_data = df_long[df_long['run_id'] == run_id]
        
        color = '#3498db' if cond['mode'] == 'Coherence' else '#e74c3c'
        linestyle = '-' if cond['mode'] == 'Coherence' else '--'
        
        ax.plot(run_data['step'], run_data['heading_variance'], 
                color=color, linestyle=linestyle, linewidth=1.5,
                label=f"{cond['mode']}")
    
    # Mark perturbation window (tick 300-330 based on experiment design)
    ax.axvspan(300, 330, alpha=0.2, color='orange', label='Perturbation')
    
    ax.set_xlabel('Time (ticks)')
    ax.set_ylabel('Heading Variance (instantaneous cost proxy)')
    ax.set_title(f'Temporal Cost Distribution (Perturbation Strength = {strength_filter})', fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    return fig, ax

fig, ax = plot_temporal_cost_distribution(df_long, conditions, strength_filter=80)
plt.savefig(EXPORTS_DIR / 'temporal_cost_E003_str80.png', dpi=150, bbox_inches='tight')
plt.show()

In [None]:
def compute_temporal_cost_stats(df_long, conditions):
    """Compute temporal cost distribution statistics.
    
    For each run, calculate:
    - Pre-perturbation variance (baseline cost)
    - During-perturbation variance (acute cost)
    - Post-perturbation variance until recovery (repair cost)
    """
    # Perturbation window: 300-330 (from experiment design)
    PERTURB_START = 300
    PERTURB_END = 330
    
    results = []
    for run_id, cond in conditions.items():
        run_data = df_long[df_long['run_id'] == run_id]
        
        pre = run_data[run_data['step'] < PERTURB_START]['heading_variance']
        during = run_data[(run_data['step'] >= PERTURB_START) & (run_data['step'] <= PERTURB_END)]['heading_variance']
        post = run_data[run_data['step'] > PERTURB_END]['heading_variance']
        
        results.append({
            'run_id': run_id,
            'mode': cond['mode'],
            'strength': cond['strength'],
            'pre_mean': pre.mean(),
            'pre_std': pre.std(),
            'during_mean': during.mean(),
            'during_max': during.max(),
            'post_mean': post.mean(),
            'post_std': post.std(),
            'acute_spike': during.max() - pre.mean(),  # How much cost spiked
            'recovery_burden': (post.mean() - pre.mean()) * len(post),  # Integrated excess cost
        })
    
    return pd.DataFrame(results)

temporal_stats = compute_temporal_cost_stats(df_long, conditions)
print(temporal_stats[['mode', 'strength', 'pre_mean', 'acute_spike', 'recovery_burden']].to_string(index=False))

## 5. Crossover Analysis

At what perturbation frequency does coherence become cost-advantageous?

In [None]:
def find_crossover_frequency(summary, strength):
    """Find the perturbation frequency at which coherence becomes preferable.
    
    At low frequency: entrainment wins (lower maintenance cost dominates)
    At high frequency: coherence wins (lower shock/repair costs dominate)
    """
    coh = summary[(summary['mode'] == 'Coherence') & (summary['strength'] == strength)].iloc[0]
    ent = summary[(summary['mode'] == 'Entrainment') & (summary['strength'] == strength)].iloc[0]
    
    # Maintenance cost: coherence pays more (continuous)
    maint_diff = coh['maintenance_cost'] - ent['maintenance_cost']
    
    # Episodic cost: entrainment pays more (per perturbation)
    episodic_coh = coh['shock_cost'] + coh['repair_cost']
    episodic_ent = ent['shock_cost'] + ent['repair_cost']
    episodic_diff = episodic_ent - episodic_coh
    
    if episodic_diff <= 0:
        return float('inf')  # Entrainment always wins
    
    # Crossover: maint_diff = freq * episodic_diff
    # freq = maint_diff / episodic_diff
    crossover_freq = maint_diff / episodic_diff if episodic_diff > 0 else 0
    
    return crossover_freq

print("Crossover Frequency Analysis:")
print("(Perturbations per maintenance-cost-unit where coherence becomes preferable)")
print()
print(f"{'Strength':<12} {'Crossover Freq':<15} {'Interpretation':<30}")
print("-" * 57)
for strength in sorted(summary['strength'].unique()):
    freq = find_crossover_frequency(summary, strength)
    if freq == float('inf'):
        interp = "Entrainment always preferable"
    elif freq <= 0:
        interp = "Coherence always preferable"
    elif freq < 1:
        interp = "Coherence preferable (low freq threshold)"
    else:
        interp = f"Coherence preferable above {freq:.2f}/unit"
    print(f"{strength:<12} {freq:<15.3f} {interp:<30}")

## 6. Summary: Cost Profiles by Regime

### Key Findings

In [None]:
def print_cost_summary(summary):
    """Print formatted cost summary."""
    print("=" * 70)
    print("COST DERIVATION SUMMARY (E003)")
    print("=" * 70)
    
    coh = summary[summary['mode'] == 'Coherence'].sort_values('strength')
    ent = summary[summary['mode'] == 'Entrainment'].sort_values('strength')
    
    print("\n### COHERENCE REGIME")
    print("Cost profile: Continuous, bounded")
    print(f"  Maintenance cost (baseline): {coh['maintenance_cost'].mean():.1f} (range: {coh['maintenance_cost'].min():.1f}-{coh['maintenance_cost'].max():.1f})")
    print(f"  Shock cost scaling: Linear ({coh['shock_cost'].min():.1f} -> {coh['shock_cost'].max():.1f})")
    print(f"  Repair cost scaling: Minimal ({coh['repair_cost'].min():.0f} -> {coh['repair_cost'].max():.0f} ticks)")
    
    print("\n### ENTRAINMENT REGIME")
    print("Cost profile: Episodic, unbounded")
    print(f"  Maintenance cost (baseline): {ent['maintenance_cost'].mean():.1f} (range: {ent['maintenance_cost'].min():.1f}-{ent['maintenance_cost'].max():.1f})")
    print(f"  Shock cost scaling: Superlinear ({ent['shock_cost'].min():.1f} -> {ent['shock_cost'].max():.1f})")
    print(f"  Repair cost scaling: Superlinear ({ent['repair_cost'].min():.0f} -> {ent['repair_cost'].max():.0f} ticks)")
    
    print("\n### CRITICAL THRESHOLD")
    # Find where entrainment's episodic costs exceed coherence's maintenance advantage
    threshold_idx = None
    for i, (strength, c_shock, c_repair, e_shock, e_repair) in enumerate(zip(
        coh['strength'].values, coh['shock_cost'].values, coh['repair_cost'].values,
        ent['shock_cost'].values, ent['repair_cost'].values
    )):
        shock_ratio = e_shock / c_shock if c_shock > 0 else float('inf')
        if shock_ratio > 2:  # Entrainment shock cost > 2x coherence
            threshold_idx = i
            print(f"  Threshold between strength {coh['strength'].values[max(0,i-1)]} and {strength}")
            print(f"  At strength {strength}: Entrainment shock = {shock_ratio:.1f}x Coherence")
            break
    
    if threshold_idx is None:
        print("  No clear threshold detected in tested range")
    
    print("\n### INTERPRETATION")
    print("  Coherence: 'Pay as you go' - steady regulatory cost, bounded response")
    print("  Entrainment: 'Pay when stressed' - low baseline, catastrophic under high stress")
    print("\n  Optimal regime depends on:")
    print("    1. Environmental volatility (perturbation frequency)")
    print("    2. Perturbation magnitude distribution")
    print("    3. Available capacity buffer")
    
    print("\n" + "=" * 70)

print_cost_summary(summary)

## 7. Export Results

In [None]:
# Export summary with cost columns
summary.to_csv(EXPORTS_DIR / 'cost_derivation_E003.csv', index=False)
temporal_stats.to_csv(EXPORTS_DIR / 'temporal_cost_stats_E003.csv', index=False)

print("Exported:")
print(f"  - {EXPORTS_DIR / 'cost_derivation_E003.csv'}")
print(f"  - {EXPORTS_DIR / 'temporal_cost_stats_E003.csv'}")
print(f"  - {EXPORTS_DIR / 'cost_profiles_E003.png'}")
print(f"  - {EXPORTS_DIR / 'temporal_cost_E003_str80.png'}")