In [0]:
# Observation Deck (Databricks)
# Reads from Delta/UC schema: datamodel_db

from datetime import datetime, timedelta, timezone

try:
    spark
except NameError as e:
    raise RuntimeError('This notebook must be run on a Spark cluster (Databricks) with an active `spark` session.') from e

LOOKBACK_DAYS = 14
DRIFT_ALERT_THRESHOLD = 0.20
MAX_HEATMAP_EPISODES = 50

since_ts = datetime.now(timezone.utc) - timedelta(days=LOOKBACK_DAYS)

spark.sql('USE datamodel_db')

episodes_df = spark.table('datamodel_db.episode').where(f"created_at >= TIMESTAMP '{since_ts.isoformat()}'")
steps_df = spark.table('datamodel_db.episode_steps').where(f"created_at >= TIMESTAMP '{since_ts.isoformat()}'")
eval_df = spark.table('datamodel_db.episode_evaluation').where(f"evaluated_at >= TIMESTAMP '{since_ts.isoformat()}'")

episodes_df.createOrReplaceTempView('v_episodes')
steps_df.createOrReplaceTempView('v_steps')
eval_df.createOrReplaceTempView('v_eval')

print('âœ“ Views ready:', 'v_episodes', 'v_steps', 'v_eval')


In [0]:
# Outcomes over time (daily)
outcomes = spark.sql('''
WITH base AS (
  SELECT date_trunc('DAY', created_at) AS day, status
  FROM v_episodes
)
SELECT
  day,
  SUM(CASE WHEN status = 'successful' THEN 1 ELSE 0 END) AS successful,
  SUM(CASE WHEN status = 'degraded' THEN 1 ELSE 0 END) AS degraded,
  SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) AS failed,
  COUNT(*) AS total,
  ROUND(SUM(CASE WHEN status = 'successful' THEN 1 ELSE 0 END) / NULLIF(COUNT(*),0), 4) AS success_rate
FROM base
GROUP BY day
ORDER BY day
''')
display(outcomes)

In [0]:
# Token/cost efficiency
eff = spark.sql('''
SELECT
  status,
  COUNT(*) AS episodes,
  ROUND(AVG(total_tokens), 2) AS avg_total_tokens,
  ROUND(AVG(input_tokens), 2) AS avg_input_tokens,
  ROUND(AVG(output_tokens), 2) AS avg_output_tokens,
  ROUND(AVG(duration_ms), 1) AS avg_duration_ms,
  ROUND(AVG(cost_usd), 6) AS avg_cost_usd,
  ROUND(SUM(cost_usd), 6) AS total_cost_usd
FROM v_episodes
GROUP BY status
ORDER BY episodes DESC
''')
display(eff)

cost_per_success = spark.sql('''
SELECT
  ROUND(SUM(cost_usd) / NULLIF(SUM(CASE WHEN status='successful' THEN 1 ELSE 0 END),0), 6) AS cost_per_successful_episode_usd,
  ROUND(SUM(total_tokens) / NULLIF(SUM(CASE WHEN status='successful' THEN 1 ELSE 0 END),0), 2) AS tokens_per_successful_episode
FROM v_episodes
''')
display(cost_per_success)

In [0]:
# Drift vs golden templates
# - drift_score is sourced from datamodel_db.episode_evaluation
# - episodes marked is_golden=TRUE are treated as the baseline set

drift = spark.sql(f'''
WITH joined AS (
  SELECT
    e.created_at,
    e.episode_id,
    e.golden_template_id,
    e.is_golden,
    ev.drift_score
  FROM v_episodes e
  LEFT JOIN v_eval ev
    ON e.episode_id = ev.episode_id AND e.episode_version = ev.episode_version
)
SELECT
  date_trunc('DAY', created_at) AS day,
  golden_template_id,
  ROUND(AVG(CASE WHEN is_golden THEN drift_score END), 4) AS avg_golden_drift,
  ROUND(AVG(CASE WHEN NOT is_golden THEN drift_score END), 4) AS avg_non_golden_drift,
  SUM(CASE WHEN NOT is_golden AND drift_score IS NOT NULL AND drift_score > {DRIFT_ALERT_THRESHOLD} THEN 1 ELSE 0 END) AS drift_alerts,
  SUM(CASE WHEN NOT is_golden THEN 1 ELSE 0 END) AS non_golden_episodes
FROM joined
GROUP BY day, golden_template_id
ORDER BY day, golden_template_id
''')
display(drift)

In [0]:
# Step heatmap inputs (use Databricks pivot visualization as heatmap)
# Rows: step_name (or step_index)
# Columns: episode_id (recent episodes)
# Values: total_tokens (or latency_ms / score)

recent_episode_ids = [
    r.episode_id
    for r in spark.sql(
        f"""
        SELECT episode_id
        FROM v_episodes
        ORDER BY created_at DESC
        LIMIT {MAX_HEATMAP_EPISODES}
        """
    ).collect()
]

if not recent_episode_ids:
    print('No episodes in lookback window.')
else:
    ids = ",".join([f"'{x}'" for x in recent_episode_ids])
    heatmap_long = spark.sql(
        f"""
        SELECT
          COALESCE(step_name, CONCAT('step_', CAST(step_index AS STRING))) AS step_key,
          episode_id,
          total_tokens,
          latency_ms,
          score
        FROM v_steps
        WHERE episode_id IN ({ids})
        """
    )
    display(heatmap_long)


In [0]:
# Failure clustering by step / failure_type / invariant
failures = spark.sql('''
WITH failed_eps AS (
  SELECT episode_id, episode_version, status
  FROM v_episodes
  WHERE status IN ('degraded', 'failed')
)
SELECT
  s.step_index,
  s.step_name,
  s.failure_type,
  s.invariant_violated,
  COUNT(*) AS occurrences
FROM v_steps s
JOIN failed_eps e
  ON s.episode_id = e.episode_id AND s.episode_version = e.episode_version
WHERE s.failure_type IS NOT NULL OR s.invariant_violated IS NOT NULL
GROUP BY s.step_index, s.step_name, s.failure_type, s.invariant_violated
ORDER BY occurrences DESC
''')
display(failures)

In [0]:
# Golden deltas (median non-golden vs median golden)
golden_deltas = spark.sql('''
WITH joined AS (
  SELECT
    e.golden_template_id,
    e.is_golden,
    COALESCE(s.step_name, CONCAT('step_', CAST(s.step_index AS STRING))) AS step_key,
    s.total_tokens,
    s.latency_ms,
    s.score
  FROM v_episodes e
  JOIN v_steps s
    ON e.episode_id = s.episode_id AND e.episode_version = s.episode_version
  WHERE e.golden_template_id IS NOT NULL
),
aggs AS (
  SELECT
    golden_template_id,
    step_key,
    percentile_approx(CASE WHEN is_golden THEN total_tokens END, 0.5) AS p50_tokens_golden,
    percentile_approx(CASE WHEN NOT is_golden THEN total_tokens END, 0.5) AS p50_tokens_current,
    percentile_approx(CASE WHEN is_golden THEN latency_ms END, 0.5) AS p50_latency_golden,
    percentile_approx(CASE WHEN NOT is_golden THEN latency_ms END, 0.5) AS p50_latency_current,
    percentile_approx(CASE WHEN is_golden THEN score END, 0.5) AS p50_score_golden,
    percentile_approx(CASE WHEN NOT is_golden THEN score END, 0.5) AS p50_score_current
  FROM joined
  GROUP BY golden_template_id, step_key
)
SELECT
  golden_template_id,
  step_key,
  p50_tokens_golden,
  p50_tokens_current,
  (p50_tokens_current - p50_tokens_golden) AS delta_tokens,
  p50_latency_golden,
  p50_latency_current,
  (p50_latency_current - p50_latency_golden) AS delta_latency_ms,
  p50_score_golden,
  p50_score_current,
  (p50_score_current - p50_score_golden) AS delta_score
FROM aggs
ORDER BY golden_template_id, step_key
''')
display(golden_deltas)

In [0]:
# Single-episode detail (edit EPISODE_ID to view)
# This is a timeline/replay view: episode header -> steps -> evaluation(s)

EPISODE_ID = None  # e.g. 'ep_demo_001'

if not EPISODE_ID:
    print('Set EPISODE_ID to view a single episode.')
else:
    header = spark.sql(f"SELECT * FROM datamodel_db.episode WHERE episode_id = '{EPISODE_ID}' ORDER BY episode_version DESC")
    steps = spark.sql(f"SELECT * FROM datamodel_db.episode_steps WHERE episode_id = '{EPISODE_ID}' ORDER BY episode_version DESC, step_index ASC")
    evals = spark.sql(f"SELECT * FROM datamodel_db.episode_evaluation WHERE episode_id = '{EPISODE_ID}' ORDER BY evaluated_at DESC")

    display(header)
    display(steps)
    display(evals)

    verdict = spark.sql(f"SELECT status, total_tokens, cost_usd, duration_ms FROM datamodel_db.episode WHERE episode_id = '{EPISODE_ID}' ORDER BY episode_version DESC LIMIT 1").collect()
    if verdict:
        v = verdict[0]
        print(f"VERDICT: status={v['status']} | total_tokens={v['total_tokens']} | cost_usd={v['cost_usd']} | duration_ms={v['duration_ms']}")