In [None]:
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.linear_model import LinearRegression
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
from scipy.interpolate import UnivariateSpline
from datetime import datetime, timedelta

print("Libraries imported successfully")

## 1. Load Synthetic Calibration Data

In [None]:
# Load synthetic OD timeseries dataset
with open('resources/datasets/synthetic-od-timeseries-50-experiments.json', 'r') as f:
    dataset = json.load(f)

print(f"Dataset: {dataset['metadata']['dataset_id']}")
print(f"Total experiments: {dataset['metadata']['data_composition']['calibration_experiments'] + dataset['metadata']['data_composition']['validation_experiments']}")
print(f"\nStrains: {list(dataset['dataset_statistics']['strain_distribution'].keys())}")
print(f"Media types: {list(dataset['dataset_statistics']['media_distribution'].keys())}")

## 2. Extract and Prepare Calibration Dataset

In [None]:
# Extract first experiment (WT + LB) for demonstration
exp = dataset['experiment_samples'][0]

print(f"Experiment ID: {exp['experiment_id']}")
print(f"Strain: {exp['strain_id']}")
print(f"Medium: {exp['medium_type']}")
print(f"Temperature: {exp['temperature_celsius']}°C")
print(f"Sampling interval: {exp['sampling_interval_minutes']} minutes")

# Convert time-series to DataFrame
data = []
for point in exp['time_series']:
    data.append({
        'elapsed_hours': point['elapsed_hours'],
        'od600': point['od600_measured'],
        'dcw_g_per_l': point['dcw_g_per_l'],
        'cell_count_million': point.get('cell_count_million', np.nan),
        'viability_percent': point.get('viability_percent', np.nan)
    })

df = pd.DataFrame(data)
print(f"\nDataFrame shape: {df.shape}")
print(df.head(10))

## 3. Exploratory Data Analysis

In [None]:
# Summary statistics
print("Summary Statistics:")
print(f"OD600 range: {df['od600'].min():.3f} - {df['od600'].max():.3f}")
print(f"DCW range: {df['dcw_g_per_l'].min():.3f} - {df['dcw_g_per_l'].max():.3f} g/L")
print(f"\nOD600 - DCW Correlation: {df['od600'].corr(df['dcw_g_per_l']):.4f}")

# Plot raw time-series
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

axes[0, 0].plot(df['elapsed_hours'], df['od600'], 'o-', label='OD600', color='blue')
axes[0, 0].set_xlabel('Elapsed Time (hours)')
axes[0, 0].set_ylabel('OD600')
axes[0, 0].set_title('Optical Density Time Course')
axes[0, 0].grid(True, alpha=0.3)
axes[0, 0].legend()

axes[0, 1].plot(df['elapsed_hours'], df['dcw_g_per_l'], 's-', label='DCW', color='green')
axes[0, 1].set_xlabel('Elapsed Time (hours)')
axes[0, 1].set_ylabel('Dry Cell Weight (g/L)')
axes[0, 1].set_title('Biomass Time Course')
axes[0, 1].grid(True, alpha=0.3)
axes[0, 1].legend()

axes[1, 0].scatter(df['od600'], df['dcw_g_per_l'], alpha=0.6, s=80, color='purple')
axes[1, 0].set_xlabel('OD600')
axes[1, 0].set_ylabel('DCW (g/L)')
axes[1, 0].set_title('OD600 vs Biomass Relationship')
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].plot(df['elapsed_hours'], df['viability_percent'], '^-', label='Viability', color='orange')
axes[1, 1].set_xlabel('Elapsed Time (hours)')
axes[1, 1].set_ylabel('Viability (%)')
axes[1, 1].set_title('Cell Viability Time Course')
axes[1, 1].set_ylim([90, 100])
axes[1, 1].grid(True, alpha=0.3)
axes[1, 1].legend()

plt.tight_layout()
plt.savefig('resources/notebooks/od-biomass-eda.png', dpi=100, bbox_inches='tight')
plt.show()

print("EDA plots generated")

## 4. Growth Phase Classification

In [None]:
# Calculate growth rate (dOD/dt)
df['dod_dt'] = df['od600'].diff() / df['elapsed_hours'].diff()
df['d2od_dt2'] = df['dod_dt'].diff() / df['elapsed_hours'].diff()

# Classify growth phases
def classify_phase(row):
    dod_dt = row['dod_dt']
    if pd.isna(dod_dt) or dod_dt < 0.02:
        return 'Lag'
    elif dod_dt > 0.05:
        return 'Exponential'
    else:
        return 'Transition'

df['phase'] = df.apply(classify_phase, axis=1)

# Summary by phase
phase_summary = df.groupby('phase').agg({
    'od600': ['min', 'max', 'mean'],
    'dod_dt': 'mean',
    'dcw_g_per_l': 'mean'
})

print("Growth Phase Summary:")
print(phase_summary)

# Plot growth rate
fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(df['elapsed_hours'], df['dod_dt'], 'o-', color='red', label='dOD/dt')
ax.axhline(y=0.05, color='green', linestyle='--', label='Exponential threshold')
ax.axhline(y=0.02, color='orange', linestyle='--', label='Lag threshold')
ax.set_xlabel('Elapsed Time (hours)')
ax.set_ylabel('Growth Rate (dOD/dt per hour)')
ax.set_title('Growth Rate Profile')
ax.legend()
ax.grid(True, alpha=0.3)
plt.savefig('resources/notebooks/growth-rate-profile.png', dpi=100, bbox_inches='tight')
plt.show()

print("\nPhase classification complete")

## 5. Linear Calibration Model (OLS Regression)

In [None]:
# Fit linear model: DCW = a + b * OD600
X = df[['od600']].values
y = df['dcw_g_per_l'].values

# Remove NaN values
mask = ~(np.isnan(X.flatten()) | np.isnan(y))
X_clean = X[mask]
y_clean = y[mask]

# Fit OLS
model_linear = LinearRegression()
model_linear.fit(X_clean, y_clean)

# Model parameters
intercept = model_linear.intercept_
slope = model_linear.coef_[0]
r2 = model_linear.score(X_clean, y_clean)

# Calculate residuals
y_pred_linear = model_linear.predict(X_clean)
residuals = y_clean - y_pred_linear
rmse = np.sqrt(np.mean(residuals**2))
mae = np.mean(np.abs(residuals))

# Calculate standard error
n = len(X_clean)
se_residuals = np.sqrt(np.sum(residuals**2) / (n - 2))

print(f"Linear Model: DCW = {intercept:.4f} + {slope:.4f} × OD600")
print(f"R² = {r2:.4f}")
print(f"RMSE = {rmse:.4f} g/L")
print(f"MAE = {mae:.4f} g/L")
print(f"Residual Std Error = {se_residuals:.4f}")

# Acceptance criteria check
print(f"\nAcceptance Criteria:")
print(f"  R² ≥ 0.90: {r2:.4f} {'✓ PASS' if r2 >= 0.90 else '✗ FAIL'}")
print(f"  RMSE < 0.15 g/L: {rmse:.4f} {'✓ PASS' if rmse < 0.15 else '✗ FAIL'}")

## 6. Surrogate Model (Gaussian Process Regression)

In [None]:
# Prepare features: OD600, growth rate (dOD/dt), elapsed time
df_gp = df.dropna(subset=['od600', 'dcw_g_per_l', 'dod_dt'])

X_gp = df_gp[['od600', 'dod_dt', 'elapsed_hours']].values
y_gp = df_gp['dcw_g_per_l'].values

# Normalize features
X_mean = X_gp.mean(axis=0)
X_std = X_gp.std(axis=0)
X_gp_norm = (X_gp - X_mean) / X_std

# Fit Gaussian Process
kernel = C(1.0, (1e-3, 1e3)) * RBF(1.0, (1e-2, 1e2))
model_gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=10, alpha=0.01)
model_gp.fit(X_gp_norm, y_gp)

# Predictions with uncertainty
y_pred_gp, y_std_gp = model_gp.predict(X_gp_norm, return_std=True)

# Model performance
r2_gp = model_gp.score(X_gp_norm, y_gp)
residuals_gp = y_gp - y_pred_gp
rmse_gp = np.sqrt(np.mean(residuals_gp**2))

print(f"Gaussian Process Model:")
print(f"R² = {r2_gp:.4f}")
print(f"RMSE = {rmse_gp:.4f} g/L")
print(f"Mean prediction uncertainty (std): {y_std_gp.mean():.4f} g/L")
print(f"Max prediction uncertainty: {y_std_gp.max():.4f} g/L")

print(f"\nAcceptance Criteria:")
print(f"  R² ≥ 0.92: {r2_gp:.4f} {'✓ PASS' if r2_gp >= 0.92 else '✗ FAIL'}")
print(f"  RMSE < 0.12 g/L: {rmse_gp:.4f} {'✓ PASS' if rmse_gp < 0.12 else '✗ FAIL'}")

## 7. Uncertainty Quantification

In [None]:
# Calculate 95% prediction intervals for linear model
df_pred = df.dropna(subset=['od600', 'dcw_g_per_l']).copy()
X_pred = df_pred[['od600']].values

y_pred_linear = model_linear.predict(X_pred)

# Standard error of prediction
from scipy.stats import t
n = len(X_clean)
dof = n - 2
t_crit = t.ppf(0.975, dof)  # 95% CI

# Leverage (hat values)
X_with_const = np.column_stack([np.ones(len(X_pred)), X_pred])
X_clean_with_const = np.column_stack([np.ones(len(X_clean)), X_clean])
hat = X_with_const @ np.linalg.inv(X_clean_with_const.T @ X_clean_with_const) @ X_with_const.T
leverage = np.diag(hat)

# Prediction interval
se_pred = se_residuals * np.sqrt(1 + leverage)
ci_lower = y_pred_linear - t_crit * se_pred
ci_upper = y_pred_linear + t_crit * se_pred

df_pred['pred'] = y_pred_linear
df_pred['ci_lower'] = ci_lower
df_pred['ci_upper'] = ci_upper

# Check coverage
coverage = np.mean((df_pred['dcw_g_per_l'] >= df_pred['ci_lower']) & 
                    (df_pred['dcw_g_per_l'] <= df_pred['ci_upper']))

print(f"95% Prediction Interval Coverage: {coverage*100:.1f}%")
print(f"Expected coverage: 95%")
print(f"Status: {'✓ PASS' if coverage >= 0.90 else '✗ FAIL'}")

## 8. Model Comparison & Visualization

In [None]:
# Create comprehensive comparison plot
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Plot 1: OD600 vs Biomass with both models
od600_range = np.linspace(df_pred['od600'].min(), df_pred['od600'].max(), 100).reshape(-1, 1)
dcw_linear_range = model_linear.predict(od600_range)

axes[0, 0].scatter(df_pred['od600'], df_pred['dcw_g_per_l'], alpha=0.6, s=80, label='Measured', color='blue')
axes[0, 0].plot(od600_range, dcw_linear_range, 'r-', linewidth=2, label='Linear Model')
axes[0, 0].fill_between(od600_range.flatten(), 
                         model_linear.predict(od600_range) - t_crit * se_residuals,
                         model_linear.predict(od600_range) + t_crit * se_residuals,
                         alpha=0.2, color='red', label='95% PI')
axes[0, 0].set_xlabel('OD600')
axes[0, 0].set_ylabel('Biomass (g/L)')
axes[0, 0].set_title('Linear Calibration Model')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Plot 2: Residuals
axes[0, 1].scatter(y_pred_linear, residuals, alpha=0.6, s=80, color='green')
axes[0, 1].axhline(y=0, color='black', linestyle='--')
axes[0, 1].axhline(y=2*se_residuals, color='red', linestyle=':', label='±2SE')
axes[0, 1].axhline(y=-2*se_residuals, color='red', linestyle=':')
axes[0, 1].set_xlabel('Predicted Biomass (g/L)')
axes[0, 1].set_ylabel('Residuals (g/L)')
axes[0, 1].set_title('Residual Plot')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

# Plot 3: Q-Q plot
stats.probplot(residuals, dist="norm", plot=axes[1, 0])
axes[1, 0].set_title('Q-Q Plot (Normality Check)')
axes[1, 0].grid(True, alpha=0.3)

# Plot 4: Model comparison metrics
models = ['Linear', 'Gaussian\nProcess']
r2_scores = [r2, r2_gp]
rmse_scores = [rmse, rmse_gp]

ax4a = axes[1, 1]
x_pos = np.arange(len(models))
width = 0.35

bars1 = ax4a.bar(x_pos - width/2, r2_scores, width, label='R²', color='steelblue')
ax4a.set_ylabel('R² Score', color='steelblue')
ax4a.set_ylim([0.85, 1.0])
ax4a.tick_params(axis='y', labelcolor='steelblue')

ax4b = ax4a.twinx()
bars2 = ax4b.bar(x_pos + width/2, rmse_scores, width, label='RMSE', color='coral')
ax4b.set_ylabel('RMSE (g/L)', color='coral')
ax4b.tick_params(axis='y', labelcolor='coral')

ax4a.set_xticks(x_pos)
ax4a.set_xticklabels(models)
ax4a.set_title('Model Comparison')
ax4a.axhline(y=0.90, color='green', linestyle='--', alpha=0.5, label='R² threshold')

plt.tight_layout()
plt.savefig('resources/notebooks/model-comparison.png', dpi=100, bbox_inches='tight')
plt.show()

print("Model comparison plots generated")

## 9. Anomaly Detection

In [None]:
# Detect anomalies using multiple methods
anomalies = []

# Method 1: Rate-of-change filter
max_growth_rate = df['dod_dt'].quantile(0.95)
min_growth_rate = df['dod_dt'].quantile(0.05)

for idx, row in df.iterrows():
    if pd.notna(row['dod_dt']):
        if row['dod_dt'] > max_growth_rate * 1.5 or row['dod_dt'] < min_growth_rate * 0.5:
            anomalies.append({
                'index': idx,
                'time': row['elapsed_hours'],
                'method': 'Rate-of-change',
                'value': row['dod_dt']
            })

# Method 2: Residual-based detection (using linear model)
if len(anomalies) == 0:
    residual_threshold = 3 * se_residuals
    for idx, row in df_pred.iterrows():
        if abs(row['dcw_g_per_l'] - row['pred']) > residual_threshold:
            anomalies.append({
                'index': idx,
                'time': row['elapsed_hours'],
                'method': 'Residual',
                'value': row['dcw_g_per_l'] - row['pred']
            })

print(f"Anomalies detected: {len(anomalies)}")
if anomalies:
    for anom in anomalies:
        print(f"  - {anom['method']} at t={anom['time']:.1f}h, value={anom['value']:.4f}")
else:
    print("  No anomalies detected ✓")

## 10. Summary & Acceptance Decision

In [None]:
# Final report
print("="*60)
print("OD→BIOMASS CALIBRATION REPORT")
print("="*60)

print(f"\nEXPERIMENT DETAILS:")
print(f"  Strain: {exp['strain_id']}")
print(f"  Medium: {exp['medium_type']}")
print(f"  Temperature: {exp['temperature_celsius']}°C")
print(f"  Duration: {df['elapsed_hours'].max():.1f} hours")
print(f"  Samples: {len(df)} time points")

print(f"\nLINEAR MODEL RESULTS:")
print(f"  Equation: DCW = {intercept:.4f} + {slope:.4f} × OD600")
print(f"  R² Score: {r2:.4f}")
print(f"  RMSE: {rmse:.4f} g/L")
print(f"  MAE: {mae:.4f} g/L")

print(f"\nGAUSSIAN PROCESS RESULTS:")
print(f"  R² Score: {r2_gp:.4f}")
print(f"  RMSE: {rmse_gp:.4f} g/L")
print(f"  Mean Uncertainty: ±{y_std_gp.mean():.4f} g/L")

print(f"\nACCEPTANCE CRITERIA:")
criteria = [
    ('Linear R² ≥ 0.90', r2 >= 0.90),
    ('Linear RMSE < 0.15 g/L', rmse < 0.15),
    ('GP R² ≥ 0.92', r2_gp >= 0.92),
    ('Prediction Interval Coverage ≥ 90%', coverage >= 0.90),
    ('No critical anomalies', len(anomalies) == 0)
]

passed = 0
for criterion, result in criteria:
    status = '✓ PASS' if result else '✗ FAIL'
    print(f"  {criterion}: {status}")
    if result:
        passed += 1

print(f"\nOVERALL DECISION: {passed}/{len(criteria)} criteria met")
if passed == len(criteria):
    print("STATUS: ✓ APPROVED - Model ready for deployment")
else:
    print("STATUS: ✗ HOLD - Address failures before deployment")

print("="*60)