# Phase 3 QoS Analysis Scratchpad

This notebook runs the Phase 3 latency metrics query (scoped to the rolling 7-day window to control cost) and derives:

- Latency and slot usage statistics per consumer classification.
- Slot utilization percentiles relative to reservation capacity.
- Concurrency vs. the current reservation slot pool.

> **Guardrail:** the full query across all historical windows scans ~35 GB. The notebook limits execution to `rolling_07d` by default; adjust the helper parameters if you need broader windows and confirm expected scan cost with `--dry_run` first.


**Classification update:** Jobs whose project ID starts with `monitor` are now labeled `MONITOR_USERS` ahead of hub-service heuristics. Refresh the cached JSON via the export helpers below to pull the updated shares into this notebook.


**Data sources:** SQL outputs are pre-generated and stored alongside this notebook. Specifically:
- `rolling_07d_latency.json` comes from running `new_audit_sql/phase3_qos_latency_metrics.sql` with `DECLARE window_ids = ['rolling_07d']` and saving the CLI JSON output.
- `rolling_07d_slot_usage_10min.json` comes from `new_audit_sql/phase3_qos_slot_usage_10min.sql` with the same window filter.
- `reservations_us.json` is a snapshot of `bq ls --reservation --project_id=bq-narvar-admin --location=US --format=prettyjson`.
These cached files let the notebook iterate quickly without reissuing the expensive BigQuery scans. Update them whenever you rerun the underlying SQL for a different window.


In [1]:
import json
import os
import pathlib
import re
import subprocess
import shutil
from textwrap import dedent

import pandas as pd

ROOT = pathlib.Path('..').resolve().parent
SQL_DIR = ROOT / 'analysis_peak_2025_gpt_codex' / 'new_audit_sql'

# Find bq command location (use full path as fallback)
BQ_CMD = shutil.which("bq") or "/Users/cezarmihaila/google-cloud-sdk/y/google-cloud-sdk/bin/bq"
GCLOUD_SDK_BIN = os.path.dirname(BQ_CMD)

# Create environment with gcloud SDK bin in PATH
BQ_ENV = os.environ.copy()
if GCLOUD_SDK_BIN not in BQ_ENV.get('PATH', ''):
    BQ_ENV['PATH'] = f"{GCLOUD_SDK_BIN}:{BQ_ENV.get('PATH', '')}"

print(f"Using SQL directory: {SQL_DIR}")
print(f"Using bq command: {BQ_CMD}")
print(f"gcloud SDK bin directory: {GCLOUD_SDK_BIN}")


Using SQL directory: /Users/cezarmihaila/workspace/do_it_query_optimization_queries/bigquery-optimization-queries/narvar/analysis_peak_2025_gpt_codex/new_audit_sql
Using bq command: /Users/cezarmihaila/google-cloud-sdk/y/google-cloud-sdk/bin/bq
gcloud SDK bin directory: /Users/cezarmihaila/google-cloud-sdk/y/google-cloud-sdk/bin


In [2]:
def run_bq_query(sql: str) -> pd.DataFrame:
    """Execute a SQL string with the bq CLI and return a DataFrame."""
    completed = subprocess.run(
        [BQ_CMD, "query", "--use_legacy_sql=false", "--format=prettyjson", "--max_rows=1000000"],
        input=sql.encode("utf-8"),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        env=BQ_ENV,
        check=False,
    )
    if completed.returncode != 0:
        raise RuntimeError(f"bq query failed: {completed.stderr.decode('utf-8')}")                                                                              
    text = completed.stdout.decode("utf-8").strip()
    data = json.loads(text)
    return pd.json_normalize(data)


def load_sql(name: str) -> str:
    """Read a SQL file from the Phase 3 directory."""
    sql_path = SQL_DIR / name
    sql_text = sql_path.read_text()
    return sql_text


### Regenerate cached JSON (optional)
Use the helpers below to dry-run each Phase 3 SQL asset and, when needed, refresh the cached JSON files consumed later in this notebook. Always confirm scan cost via dry run before executing full pulls.


In [3]:
BYTES_PER_GB = 1024 ** 3
WARNING_THRESHOLD_BYTES = 10 * BYTES_PER_GB


def apply_window_override(sql_text: str, window_ids: list[str] | None) -> str:
    """Replace the DECLARE window_ids block with the requested IDs."""
    if not window_ids:
        return sql_text

    pattern = r"DECLARE\s+window_ids\s+ARRAY<STRING>\s+DEFAULT\s+\[(.*?)\];"
    replacement = "DECLARE window_ids ARRAY<STRING> DEFAULT [\n  " + ",\n  ".join(
        f"'{window_id}'" for window_id in window_ids
    ) + "\n];"

    updated_sql, replacements = re.subn(pattern, replacement, sql_text, flags=re.S)
    if replacements == 0:
        raise ValueError("Could not locate DECLARE window_ids block in SQL text.")
    return updated_sql


def dry_run_sql(sql_text: str) -> int:
    """Return the estimated bytes processed for a BigQuery dry run."""
    # Debug: save SQL to temp file for inspection
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', suffix='.sql', delete=False) as f:
        f.write(sql_text)
        temp_sql_path = f.name
    
    completed = subprocess.run(
        [
            BQ_CMD,
            "query",
            "--use_legacy_sql=false",
            "--dry_run",
            "--format=prettyjson",
        ],
        input=sql_text.encode("utf-8"),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        env=BQ_ENV,
        check=False,
    )
    if completed.returncode != 0:
        stderr_msg = completed.stderr.decode('utf-8')
        stdout_msg = completed.stdout.decode('utf-8')
        error_details = f"Return code: {completed.returncode}\n"
        error_details += f"SQL saved to: {temp_sql_path}\n"
        error_details += f"SQL length: {len(sql_text)} bytes\n"
        error_details += f"First 500 chars of SQL:\n{sql_text[:500]}\n...\n"
        error_details += f"STDERR: {stderr_msg}\n"
        error_details += f"STDOUT: {stdout_msg}"
        raise RuntimeError(f"Dry run failed:\n{error_details}")
    
    # Clean up temp file on success
    import os
    try:
        os.unlink(temp_sql_path)
    except:
        pass

    payload = json.loads(completed.stdout.decode("utf-8"))
    stats = payload.get("statistics", {}).get("query", {})
    total_bytes = int(stats.get("totalBytesProcessed", 0))
    return total_bytes


def run_bq_query_raw(sql_text: str) -> str:
    """Execute a BigQuery SQL string and return the raw prettyjson output."""
    completed = subprocess.run(
        [
            BQ_CMD,
            "query",
            "--use_legacy_sql=false",
            "--format=prettyjson",
            "--max_rows=1000000",
        ],
        input=sql_text.encode("utf-8"),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        env=BQ_ENV,
        check=False,
    )
    if completed.returncode != 0:
        raise RuntimeError(f"bq query failed: {completed.stderr.decode('utf-8')}")                                                                              
    return completed.stdout.decode("utf-8")


def export_query(sql_filename: str, window_ids: list[str], output_path: pathlib.Path, *, execute: bool = False,
                 warn_threshold_bytes: int = WARNING_THRESHOLD_BYTES) -> tuple[str, int]:
    """Dry-run (and optionally execute) a SQL file, returning the rendered SQL and estimated bytes."""
    sql_text = apply_window_override(load_sql(sql_filename), window_ids)
    estimated_bytes = dry_run_sql(sql_text)
    estimated_gb = estimated_bytes / BYTES_PER_GB
    print(
        f"Dry run estimate for {sql_filename} with window_ids={window_ids}: "
        f"{estimated_gb:.2f} GB scanned."
    )
    if estimated_bytes > warn_threshold_bytes:
        print("WARNING: estimated scan exceeds 10 GB threshold.")

    if not execute:
        print("Skipping execution; set execute=True to refresh cached JSON.")
        return sql_text, estimated_bytes

    raw_json = run_bq_query_raw(sql_text)
    output_path = pathlib.Path(output_path)
    output_path.write_text(raw_json)
    print(f"Wrote {output_path} ({len(raw_json)} bytes).")
    return sql_text, estimated_bytes



In [4]:
# Test if bq command works from Python subprocess
test_sql = "SELECT 1 AS test"
test_result = subprocess.run(
    [BQ_CMD, "query", "--use_legacy_sql=false", "--dry_run", "--format=prettyjson"],
    input=test_sql.encode("utf-8"),
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    env=BQ_ENV,
    check=False
)

print(f"Return code: {test_result.returncode}")
print(f"STDOUT length: {len(test_result.stdout)}")
print(f"STDERR length: {len(test_result.stderr)}")

if test_result.returncode == 0:
    print("✓ Basic bq query test passed")
    result_json = json.loads(test_result.stdout.decode('utf-8'))
    print(f"Total bytes processed: {result_json.get('statistics', {}).get('totalBytesProcessed', 'N/A')}")
else:
    print("✗ Basic bq query test FAILED")
    print(f"STDERR: {test_result.stderr.decode('utf-8')}")
    print(f"STDOUT: {test_result.stdout.decode('utf-8')}")


Return code: 0
STDOUT length: 1472
STDERR length: 0
✓ Basic bq query test passed
Total bytes processed: 0


In [5]:
latency_output_path = pathlib.Path('rolling_07d_latency.json').resolve()
latency_sql_text, latency_estimated_bytes = export_query(
    'phase3_qos_latency_metrics.sql',
    ['rolling_07d'],
    latency_output_path,
    execute=True,
)

slot_output_path = pathlib.Path('rolling_07d_slot_usage_10min.json').resolve()
slot_sql_text, slot_estimated_bytes = export_query(
    'phase3_qos_slot_usage_10min.sql',
    ['rolling_07d'],
    slot_output_path,
    execute=True,
)



Dry run estimate for phase3_qos_latency_metrics.sql with window_ids=['rolling_07d']: 37.76 GB scanned.
Wrote /Users/cezarmihaila/workspace/do_it_query_optimization_queries/bigquery-optimization-queries/narvar/analysis_peak_2025_gpt_codex/notebooks/rolling_07d_latency.json (10848 bytes).
Dry run estimate for phase3_qos_slot_usage_10min.sql with window_ids=['rolling_07d']: 37.76 GB scanned.
Wrote /Users/cezarmihaila/workspace/do_it_query_optimization_queries/bigquery-optimization-queries/narvar/analysis_peak_2025_gpt_codex/notebooks/rolling_07d_slot_usage_10min.json (1769728 bytes).


In [6]:
if 'latency_sql_text' not in globals():
    latency_sql_text = apply_window_override(
        load_sql('phase3_qos_latency_metrics.sql'),
        ['rolling_07d'],
    )

print("Latency SQL (truncated):\n", latency_sql_text[:500], "...", sep="")


Latency SQL (truncated):
-- Phase 3 QoS latency metrics per consumer classification and analysis window.
-- Computes queue time, run time, and total duration quantiles plus slot usage.

DECLARE window_ids ARRAY<STRING> DEFAULT [
  'rolling_07d'
];

WITH qos_windows AS (
  SELECT *
  FROM (
    SELECT 'peak_fy22' AS window_id,
           TIMESTAMP('2021-11-01') AS start_ts,
           TIMESTAMP('2022-01-15') AS end_ts UNION ALL
    SELECT 'baseline_fy22', TIMESTAMP('2021-08-01'), TIMESTAMP('2021-10-31') UNION ALL
    SEL...


In [7]:
if 'slot_sql_text' not in globals():
    slot_sql_text = apply_window_override(
        load_sql('phase3_qos_slot_usage_10min.sql'),
        ['rolling_07d'],
    )

print("10-minute slot usage SQL (truncated):\n", slot_sql_text[:500], "...", sep="")



10-minute slot usage SQL (truncated):
-- Phase 3 QoS: 10-minute slot usage aggregates per consumer classification and analysis window.
-- Adjust the DECLARE statements to control which windows are processed.

DECLARE window_ids ARRAY<STRING> DEFAULT [
  'rolling_07d'
];
DECLARE interval_minutes INT64 DEFAULT 10;
DECLARE interval_seconds INT64 DEFAULT interval_minutes * 60;

WITH qos_windows AS (
  SELECT *
  FROM (
    SELECT 'peak_fy22' AS window_id,
           TIMESTAMP('2021-11-01') AS start_ts,
           TIMESTAMP('2022-01-15')...


In [8]:
latency_results_path = pathlib.Path('rolling_07d_latency.json').resolve()
with open(latency_results_path) as f:
    latency_raw = json.load(f)
latency_df = pd.json_normalize(latency_raw[0])
latency_df


Unnamed: 0,avg_active_slots,avg_queue_seconds,avg_run_seconds,avg_total_seconds,classification_type,event_name,job_count,jobs_over_60s,max_total_seconds,p50_queue_seconds,...,p90_queue_seconds,p90_run_seconds,p90_total_seconds,p99_queue_seconds,p99_run_seconds,p99_total_seconds,pct_jobs_over_60s,total_run_seconds,total_slot_ms,window_id
0,1.827862977212193,0.0,4.779349363507779,4.9024045261669045,AUTOMATION,extract_job_completed,707,0,20,0,...,0,6,6,0,9,9,0.0,3379,6176349,rolling_07d
1,7.897518706793857,0.0009433538475360498,4.914768728568649,5.115530898581976,AUTOMATION,load_job_completed,133566,355,629,0,...,0,12,12,0,28,28,0.0026578620307563,656446,5184294565,rolling_07d
2,138.40640302296902,0.0034926634739475,0.4596412166584097,0.4714205153611384,AUTOMATION,query_job_completed,4087425,4319,8489,0,...,0,0,0,0,2,2,0.0010566554738007,1878749,260030891273,rolling_07d
3,2.803122887842268,0.0005000625078134766,34.055631953994265,34.243780472559074,HUB_SERVICE,load_job_completed,15998,235,481,0,...,0,47,47,0,57,57,0.0146893361670208,544822,1527203018,rolling_07d
4,225.29804293396015,0.0939096463988781,7.775318040669137,7.970076052365941,HUB_SERVICE,query_job_completed,259558,4531,9458,0,...,0,3,3,0,48,49,0.0174565992957258,2018146,454684344155,rolling_07d
5,0.1705,0.0,4.0,4.0,INTERNAL_USER,extract_job_completed,1,0,4,0,...,0,4,4,0,4,4,0.0,4,682,rolling_07d
6,0.7985741017964072,0.0,3.58176943699732,3.7560321715817695,INTERNAL_USER,load_job_completed,373,1,185,0,...,0,6,6,0,10,10,0.002680965147453,1336,1066895,rolling_07d
7,391.427650028936,0.0025972457115245,21.917733752114035,22.06046146412177,INTERNAL_USER,query_job_completed,16556,767,32476,0,...,0,7,7,0,87,87,0.0463276153660304,362870,142037351366,rolling_07d
8,0.2295964912280702,0.0,1.7999999999999998,1.953684210526316,MONITOR_USERS,load_job_completed,475,0,37,0,...,0,3,3,0,7,7,0.0,855,196305,rolling_07d
9,45.292190845275,0.5247358834244081,45.64794171220395,46.51052823315115,MONITOR_USERS,query_job_completed,41175,1320,12245,0,...,0,3,3,1,120,125,0.0320582877959927,1879554,85129118472,rolling_07d


### Load latency metrics
Read the rolling-7d latency query output (`rolling_07d_latency.json`) into a DataFrame and coerce numeric columns. This file is the raw response from `phase3_qos_latency_metrics.sql` filtered to the `rolling_07d` window.


In [9]:
for col in ['jobs_over_60s', 'pct_jobs_over_60s', 'max_total_seconds']:
    if col not in latency_df.columns:
        latency_df[col] = pd.NA

numeric_cols = [
    'job_count',
    'total_slot_ms',
    'total_run_seconds',
    'jobs_over_60s',
    'pct_jobs_over_60s',
    'avg_active_slots',
    'avg_queue_seconds',
    'avg_run_seconds',
    'avg_total_seconds',
    'p50_queue_seconds', 'p90_queue_seconds', 'p99_queue_seconds',
    'p50_run_seconds', 'p90_run_seconds', 'p99_run_seconds',
    'p50_total_seconds', 'p90_total_seconds', 'p99_total_seconds',
    'max_total_seconds'
]
for col in numeric_cols:
    latency_df[col] = pd.to_numeric(latency_df[col], errors='coerce')
latency_df.head()


Unnamed: 0,avg_active_slots,avg_queue_seconds,avg_run_seconds,avg_total_seconds,classification_type,event_name,job_count,jobs_over_60s,max_total_seconds,p50_queue_seconds,...,p90_queue_seconds,p90_run_seconds,p90_total_seconds,p99_queue_seconds,p99_run_seconds,p99_total_seconds,pct_jobs_over_60s,total_run_seconds,total_slot_ms,window_id
0,1.827863,0.0,4.779349,4.902405,AUTOMATION,extract_job_completed,707,0,20,0,...,0,6,6,0,9,9,0.0,3379,6176349,rolling_07d
1,7.897519,0.000943,4.914769,5.115531,AUTOMATION,load_job_completed,133566,355,629,0,...,0,12,12,0,28,28,0.002658,656446,5184294565,rolling_07d
2,138.406403,0.003493,0.459641,0.471421,AUTOMATION,query_job_completed,4087425,4319,8489,0,...,0,0,0,0,2,2,0.001057,1878749,260030891273,rolling_07d
3,2.803123,0.0005,34.055632,34.24378,HUB_SERVICE,load_job_completed,15998,235,481,0,...,0,47,47,0,57,57,0.014689,544822,1527203018,rolling_07d
4,225.298043,0.09391,7.775318,7.970076,HUB_SERVICE,query_job_completed,259558,4531,9458,0,...,0,3,3,0,48,49,0.017457,2018146,454684344155,rolling_07d


In [10]:
slot_percentiles = (
    latency_df.groupby(['classification_type', 'event_name'])['total_slot_ms']
    .quantile([0.5, 0.9, 0.99])
    .unstack(level=-1)
    .rename(columns={0.5: 'p50', 0.9: 'p90', 0.99: 'p99'})
)
# These slot-ms percentiles highlight which event types consume the most resource at median/p90/p99, guiding tuning efforts.
slot_percentiles


Unnamed: 0_level_0,Unnamed: 1_level_0,p50,p90,p99
classification_type,event_name,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
AUTOMATION,extract_job_completed,6176349.0,6176349.0,6176349.0
AUTOMATION,load_job_completed,5184295000.0,5184295000.0,5184295000.0
AUTOMATION,query_job_completed,260030900000.0,260030900000.0,260030900000.0
HUB_SERVICE,load_job_completed,1527203000.0,1527203000.0,1527203000.0
HUB_SERVICE,query_job_completed,454684300000.0,454684300000.0,454684300000.0
INTERNAL_USER,extract_job_completed,682.0,682.0,682.0
INTERNAL_USER,load_job_completed,1066895.0,1066895.0,1066895.0
INTERNAL_USER,query_job_completed,142037400000.0,142037400000.0,142037400000.0
MONITOR_USERS,load_job_completed,196305.0,196305.0,196305.0
MONITOR_USERS,query_job_completed,85129120000.0,85129120000.0,85129120000.0


In [11]:
over_60_stats = (
    latency_df.groupby(['classification_type', 'event_name'])
    .agg(
        job_count=('job_count', 'sum'),
        jobs_over_60s=('jobs_over_60s', 'sum')
    )
    .assign(pct_over_60s=lambda df: df['jobs_over_60s'] / df['job_count'])
)
over_60_stats['pct_over_60s'] = over_60_stats['pct_over_60s'].map(lambda x: f"{x * 100:.2f}%" if pd.notna(x) else None)
over_60_stats


Unnamed: 0_level_0,Unnamed: 1_level_0,job_count,jobs_over_60s,pct_over_60s
classification_type,event_name,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
AUTOMATION,extract_job_completed,707,0,0.00%
AUTOMATION,load_job_completed,133566,355,0.27%
AUTOMATION,query_job_completed,4087425,4319,0.11%
HUB_SERVICE,load_job_completed,15998,235,1.47%
HUB_SERVICE,query_job_completed,259558,4531,1.75%
INTERNAL_USER,extract_job_completed,1,0,0.00%
INTERNAL_USER,load_job_completed,373,1,0.27%
INTERNAL_USER,query_job_completed,16556,767,4.63%
MONITOR_USERS,load_job_completed,475,0,0.00%
MONITOR_USERS,query_job_completed,41175,1320,3.21%


In [12]:
reservations_path = pathlib.Path('reservations_us.json').resolve()
with open(reservations_path) as f:
    reservations_raw = json.load(f)
reservations_df = pd.json_normalize(reservations_raw)
reservations_df


Unnamed: 0,creationTime,edition,ignoreIdleSlots,name,updateTime,autoscale.maxSlots,slotCapacity,autoscale.currentSlots
0,2024-06-28T17:24:59.668513Z,STANDARD,True,projects/bq-narvar-admin/locations/US/reservat...,2024-06-28T17:24:59.668513Z,300,,
1,2022-04-29T21:13:02.192290Z,ENTERPRISE,,projects/bq-narvar-admin/locations/US/reservat...,2025-10-31T13:19:01.830464Z,700,1000.0,50.0


In [13]:
# Summary by classification_type: combine job-level metrics (counts, slot usage, response-time quantiles)
# and compute weighted averages. For each class we:
#  • sum job_count, slot_ms, run_seconds, and >60s breaches
#  • weight avg_total_seconds by job_count to get an overall average response time
#  • pull median/p90/p99 (and max when available) from the query-level quantiles
#  • format the over-60s share as a percentage
latency_summary = latency_df.copy()
latency_summary['total_seconds_weighted'] = latency_summary['avg_total_seconds'] * latency_summary['job_count']
agg_map = {
    'job_count': ('job_count', 'sum'),
    'total_slot_ms': ('total_slot_ms', 'sum'),
    'total_run_seconds': ('total_run_seconds', 'sum'),
    'total_seconds': ('total_seconds_weighted', 'sum'),
    'jobs_over_60s': ('jobs_over_60s', 'sum'),
    'avg_active_slots': ('avg_active_slots', 'mean'),
    'median_response_seconds': ('p50_total_seconds', 'mean'),
    'p90_response_seconds': ('p90_total_seconds', 'mean'),
    'p99_response_seconds': ('p99_total_seconds', 'mean')
}
if 'max_total_seconds' in latency_summary.columns:
    agg_map['max_response_seconds'] = ('max_total_seconds', 'max')
latency_summary = (
    latency_summary.groupby('classification_type')
    .agg(**agg_map)
    .assign(
        avg_response_seconds=lambda df: df['total_seconds'] / df['job_count'],
        pct_over_60s=lambda df: df['jobs_over_60s'] / df['job_count']
    )
    .drop(columns=['total_seconds'])
    .reset_index()
)
response_cols = ['avg_response_seconds', 'median_response_seconds', 'p90_response_seconds', 'p99_response_seconds']
if 'max_response_seconds' in latency_summary.columns:
    response_cols.append('max_response_seconds')
latency_summary[response_cols] = latency_summary[response_cols].round(2)
latency_summary['pct_over_60s'] = latency_summary['pct_over_60s'].map(lambda x: f"{x * 100:.2f}%" if pd.notna(x) else None)
format_cols = ['total_slot_ms', 'total_run_seconds', 'jobs_over_60s', 'avg_active_slots', 'avg_response_seconds',
               'median_response_seconds', 'p90_response_seconds', 'p99_response_seconds', 'max_response_seconds']
for col in format_cols:
    if col in latency_summary.columns:
        latency_summary[col] = latency_summary[col].map(lambda x: f"{x:,.2f}" if pd.notna(x) else None)
latency_summary


Unnamed: 0,classification_type,job_count,total_slot_ms,total_run_seconds,jobs_over_60s,avg_active_slots,median_response_seconds,p90_response_seconds,p99_response_seconds,max_response_seconds,avg_response_seconds,pct_over_60s
0,AUTOMATION,4221698,265221362187,2538574,4674,49.377262,2.33,6.0,13.0,8489,0.62,0.11%
1,HUB_SERVICE,275556,456211547173,2562968,4766,114.050583,16.0,25.0,53.0,9458,9.5,1.73%
2,INTERNAL_USER,16930,142038418943,364210,768,130.798908,2.0,5.67,33.67,32476,21.66,4.54%
3,MONITOR_USERS,41650,85129314777,1880409,1320,22.760894,1.0,3.0,66.0,12245,46.0,3.17%
4,UNKNOWN,3930,33281621726,128774,119,126.120072,1.75,12.75,61.5,12823,32.89,3.03%


### SLA breaches (>60s) — rolling 7‑day overview
Focus on external-facing classes (`HUB_SERVICE`, `MONITOR_USERS`) by counting jobs whose total duration exceeded the 60-second SLA threshold.


In [14]:
external_classes = ['HUB_SERVICE', 'MONITOR_USERS']
external_sla = (
    latency_df[latency_df['classification_type'].isin(external_classes)]
    .groupby('classification_type')
    .agg(
        job_count=('job_count', 'sum'),
        jobs_over_60s=('jobs_over_60s', 'sum')
    )
    .assign(pct_over_60s=lambda df: df['jobs_over_60s'] / df['job_count'])
    .reset_index()
)
if not external_sla.empty:
    total_row = external_sla.agg({
        'job_count': 'sum',
        'jobs_over_60s': 'sum'
    }).to_dict()
    total_row['classification_type'] = 'TOTAL_EXTERNAL'
    total_row['pct_over_60s'] = (
        total_row['jobs_over_60s'] / total_row['job_count'] if total_row['job_count'] else float('nan')
    )
    external_sla = pd.concat([external_sla, pd.DataFrame([total_row])], ignore_index=True)
    for col in ['job_count', 'jobs_over_60s']:
        external_sla[col] = external_sla[col].map(lambda x: f"{x:,.0f}" if pd.notna(x) else None)
external_sla


Unnamed: 0,classification_type,job_count,jobs_over_60s,pct_over_60s
0,HUB_SERVICE,275556,4766,0.017296
1,MONITOR_USERS,41650,1320,0.031693
2,TOTAL_EXTERNAL,317206,6086,0.019186


### Reservation metadata
Load `reservations_us.json` (captured via `bq ls --reservation`) to pull committed slots and autoscale headroom for `projects/bq-narvar-admin/locations/US/reservations/default`. These values anchor the capacity comparisons.


In [15]:
default_row = reservations_df.loc[reservations_df['name'].str.endswith('/reservations/default')]
if default_row.empty:
    raise ValueError('Could not find default reservation row in reservations_df')

slot_capacity = pd.to_numeric(default_row['slotCapacity'], errors='coerce').fillna(0).iloc[0]
autoscale_current = 0.0
if 'autoscale.currentSlots' in default_row.columns:
    autoscale_current = pd.to_numeric(default_row['autoscale.currentSlots'], errors='coerce').fillna(0).iloc[0]

print(f"Default reservation committed slots: {slot_capacity}")
print(f"Current autoscale slots: {autoscale_current}")

latency_summary['avg_active_slots_pct_capacity'] = latency_summary['avg_active_slots'] / slot_capacity * 100
latency_summary


Default reservation committed slots: 1000
Current autoscale slots: 50


Unnamed: 0,classification_type,job_count,total_slot_ms,total_run_seconds,jobs_over_60s,avg_active_slots,median_response_seconds,p90_response_seconds,p99_response_seconds,max_response_seconds,avg_response_seconds,pct_over_60s,avg_active_slots_pct_capacity
0,AUTOMATION,4221698,265221362187,2538574,4674,49.377262,2.33,6.0,13.0,8489,0.62,0.11%,4.937726
1,HUB_SERVICE,275556,456211547173,2562968,4766,114.050583,16.0,25.0,53.0,9458,9.5,1.73%,11.405058
2,INTERNAL_USER,16930,142038418943,364210,768,130.798908,2.0,5.67,33.67,32476,21.66,4.54%,13.079891
3,MONITOR_USERS,41650,85129314777,1880409,1320,22.760894,1.0,3.0,66.0,12245,46.0,3.17%,2.276089
4,UNKNOWN,3930,33281621726,128774,119,126.120072,1.75,12.75,61.5,12823,32.89,3.03%,12.612007


### 10-minute slot usage snapshot
Load `rolling_07d_slot_usage_10min.json`, the 10-minute aggregation produced by `phase3_qos_slot_usage_10min.sql`. This provides per-class slot usage/queue totals used for spike detection.


### Detect spikes via MAD baseline
1. Aggregate total slot-ms per 10-minute bucket.
2. Compute a robust baseline (median + 3 × 1.4826 × MAD).
3. Flag buckets above the threshold and group consecutive buckets into spike events (10-minute cadence).

We detect spikes by building a robust baseline with the Median Absolute Deviation (MAD):
* For each 10-minute bucket we sum total_slot_ms across classifications.
* We take the median of those totals.
* Compute the absolute deviation of every bucket from that median and take the median of those deviations.
* Scale that MAD by 1.4826 (so it tracks standard deviation for a normal distribution).
* Set the baseline threshold to median + 3 × scaled MAD. Any bucket whose slot total exceeds that threshold is flagged as a spike.


In [16]:
total_avg_active_slots = latency_summary['avg_active_slots'].sum()
print(f"Aggregate avg active slots across all types: {total_avg_active_slots:.2f}")
print(f"Share of reservation: {total_avg_active_slots / slot_capacity * 100:.2f}%")


Aggregate avg active slots across all types: 443.11
Share of reservation: 44.31%


In [17]:
# Load 10-minute slot usage JSON (rolling 7 days) and coerce breach metrics to numeric, ensuring timestamps are parsed.
slot_usage_path = pathlib.Path('rolling_07d_slot_usage_10min.json').resolve()
with open(slot_usage_path) as f:
    slot_usage_raw = json.load(f)
slot_df = pd.json_normalize(slot_usage_raw[0])
print(f"Loaded slot usage rows: {slot_df.shape[0]}")
for col in ['breach_job_count', 'breach_job_pct', 'p95_queue_seconds', 'p95_run_seconds', 'p50_total_seconds', 'p90_total_seconds', 'p99_total_seconds', 'max_total_seconds']:
    if col not in slot_df.columns:
        slot_df[col] = pd.NA
slot_numeric_cols = ['job_count', 'total_slot_ms', 'sum_queue_seconds', 'sum_run_seconds', 'sum_total_seconds', 'breach_job_count', 'breach_job_pct', 'p95_queue_seconds', 'p95_run_seconds', 'p50_total_seconds', 'p90_total_seconds', 'p99_total_seconds', 'max_total_seconds']
for col in slot_numeric_cols:
    slot_df[col] = pd.to_numeric(slot_df[col], errors='coerce')
slot_df['bucket_ts'] = pd.to_datetime(slot_df['bucket_ts'])
slot_df.head()


Loaded slot usage rows: 4350


Unnamed: 0,breach_job_count,breach_job_pct,bucket_ts,classification_type,job_count,p95_queue_seconds,p95_run_seconds,sum_queue_seconds,sum_run_seconds,sum_total_seconds,total_slot_ms,window_id
0,1,1.0,2025-10-28 17:20:00,INTERNAL_USER,1,0,5247,0,5247,5247,8780044584,rolling_07d
1,1,1.0,2025-10-28 18:20:00,AUTOMATION,1,0,310,0,310,311,111872935,rolling_07d
2,1,1.0,2025-10-28 18:20:00,HUB_SERVICE,1,0,979,0,979,979,400465062,rolling_07d
3,1,0.000637,2025-10-28 18:30:00,AUTOMATION,1569,0,2,2,899,946,84270374,rolling_07d
4,1,0.002577,2025-10-28 18:30:00,HUB_SERVICE,388,0,18,1,1368,1405,335507874,rolling_07d


### Threshold-based alerts (INFO/WARNING/CRITICAL)
Mirror the DAG logic by checking each 10-minute bucket against the pending/run thresholds used for live VictorOps alerts.


In [None]:
alert_rules = [
    {'severity': 'CRITICAL', 'max_jobs': 60, 'p95_pending': 50 * 60, 'p95_running': 50 * 60},
    {'severity': 'WARNING', 'max_jobs': 30, 'p95_pending': 20 * 60, 'p95_running': 20 * 60},
    {'severity': 'INFO', 'max_jobs': 20, 'p95_pending': 6 * 60, 'p95_running': 6 * 60},
]

bucket_alerts = (
    slot_df.groupby('bucket_ts')
    .agg(
        total_jobs=('job_count', 'sum'),
        max_p95_queue=('p95_queue_seconds', 'max'),
        max_p95_run=('p95_run_seconds', 'max')
    )
    .reset_index()
)

if 'bucket_totals' not in globals():
    bucket_totals_from_latency = (
        slot_df.groupby('bucket_ts')['total_slot_ms']
        .sum()
        .reset_index()
        .assign(is_spike=False)
    )
else:
    bucket_totals_from_latency = bucket_totals


def evaluate_alert(row):
    for rule in alert_rules:
        reasons = []
        if pd.notna(row['total_jobs']) and row['total_jobs'] >= rule['max_jobs']:
            reasons.append(
                f"total_jobs {row['total_jobs']:.0f} ≥ {rule['severity']} threshold {rule['max_jobs']}"
            )
        if pd.notna(row['max_p95_queue']) and row['max_p95_queue'] >= rule['p95_pending']:
            reasons.append(
                f"p95 queue {row['max_p95_queue']:.1f}s ≥ {rule['severity']} threshold {rule['p95_pending']}s"
            )
        if pd.notna(row['max_p95_run']) and row['max_p95_run'] >= rule['p95_running']:
            reasons.append(
                f"p95 run {row['max_p95_run']:.1f}s ≥ {rule['severity']} threshold {rule['p95_running']}s"
            )
        if reasons:
            return pd.Series({
                'severity': rule['severity'],
                'conditions': reasons,
            })
    return pd.Series({'severity': None, 'conditions': None})

alert_flags = bucket_alerts.join(bucket_alerts.apply(evaluate_alert, axis=1))
alert_flags = alert_flags[alert_flags['severity'].notna()].copy()
if not alert_flags.empty:
    alert_flags['conditions'] = alert_flags['conditions'].apply(lambda c: '\n'.join(c) if isinstance(c, list) else c)
alert_flags.head()



Unnamed: 0,bucket_ts,total_jobs,max_p95_queue,max_p95_run,severity,conditions
0,2025-10-28 17:20:00,1,0,5247,CRITICAL,p95 run 5247.0s ≥ CRITICAL threshold 3000s
1,2025-10-28 18:20:00,2,0,979,INFO,p95 run 979.0s ≥ INFO threshold 360s
2,2025-10-28 18:30:00,2031,0,1707,CRITICAL,total_jobs 2031 ≥ CRITICAL threshold 60
3,2025-10-28 18:40:00,3753,0,88,CRITICAL,total_jobs 3753 ≥ CRITICAL threshold 60
4,2025-10-28 18:50:00,3564,0,16,CRITICAL,total_jobs 3564 ≥ CRITICAL threshold 60


In [24]:
if not alert_flags.empty:
    alerts_vs_mad = alert_flags.merge(
        bucket_totals_from_latency[['bucket_ts', 'is_spike']],
        on='bucket_ts',
        how='left'
    ).rename(columns={'is_spike': 'mad_spike'})
    alerts_vs_mad.head()
else:
    print('No threshold-based alerts triggered in the current window.')



In [26]:
bucket_totals = (
    slot_df.groupby('bucket_ts')['total_slot_ms']
    .sum()
    .reset_index()
    .sort_values('bucket_ts')
)
median_slots = bucket_totals['total_slot_ms'].median()
mad_slots = (bucket_totals['total_slot_ms'] - median_slots).abs().median()
mad_scaled = 1.4826 * mad_slots
threshold = median_slots + 3 * mad_scaled
bucket_totals['is_spike'] = bucket_totals['total_slot_ms'] > threshold
threshold, bucket_totals.head()


(np.float64(2164670215.5456),
             bucket_ts  total_slot_ms  is_spike
 0 2025-10-28 17:20:00     8780044584      True
 1 2025-10-28 18:20:00      512337997     False
 2 2025-10-28 18:30:00     1441737875     False
 3 2025-10-28 18:40:00     1103230046     False
 4 2025-10-28 18:50:00      101092691     False)

In [27]:
interval = pd.Timedelta(minutes=10)
spike_groups = []
current_group = 0
prev_ts = None
for ts, is_spike in zip(bucket_totals['bucket_ts'], bucket_totals['is_spike']):
    if not is_spike:
        spike_groups.append(pd.NA)
        prev_ts = None
        continue
    if prev_ts is None or ts - prev_ts > interval:
        current_group += 1
    spike_groups.append(current_group)
    prev_ts = ts
bucket_totals['spike_id'] = spike_groups
bucket_totals.head(10)


Unnamed: 0,bucket_ts,total_slot_ms,is_spike,spike_id
0,2025-10-28 17:20:00,8780044584,True,1.0
1,2025-10-28 18:20:00,512337997,False,
2,2025-10-28 18:30:00,1441737875,False,
3,2025-10-28 18:40:00,1103230046,False,
4,2025-10-28 18:50:00,101092691,False,
5,2025-10-28 19:00:00,1011359629,False,
6,2025-10-28 19:10:00,620234815,False,
7,2025-10-28 19:20:00,834316900,False,
8,2025-10-28 19:30:00,724184153,False,
9,2025-10-28 19:40:00,1495815696,False,


### SLA breaches during detected spikes
Aggregate the same >60s SLA check using only the spike buckets identified above to see how external workloads behave under load.


In [28]:
if 'slot_spike' in locals() and not slot_spike.empty:
    external_spike = (
        slot_spike[slot_spike['classification_type'].isin(external_classes)]
        .groupby('classification_type')
        .agg(
            job_count=('job_count', 'sum'),
            breach_job_count=('breach_job_count', 'sum')
        )
        .assign(breach_job_pct=lambda df: df['breach_job_count'] / df['job_count'])
        .reset_index()
    )
    if not external_spike.empty:
        total_row = external_spike.agg({
            'job_count': 'sum',
            'breach_job_count': 'sum'
        }).to_dict()
        total_row['classification_type'] = 'TOTAL_EXTERNAL'
        total_row['breach_job_pct'] = (
            total_row['breach_job_count'] / total_row['job_count'] if total_row['job_count'] else float('nan')
        )
        external_spike = pd.concat([external_spike, pd.DataFrame([total_row])], ignore_index=True)
    external_spike
else:
    print("No spike buckets available to evaluate SLA breaches.")



No spike buckets available to evaluate SLA breaches.


In [29]:
spike_buckets = bucket_totals.dropna(subset=['spike_id']).copy()
slot_spike = slot_df.merge(spike_buckets[['bucket_ts', 'spike_id']], on='bucket_ts', how='inner')
slot_spike.head()


Unnamed: 0,breach_job_count,breach_job_pct,bucket_ts,classification_type,job_count,p95_queue_seconds,p95_run_seconds,sum_queue_seconds,sum_run_seconds,sum_total_seconds,total_slot_ms,window_id,spike_id
0,1,1.0,2025-10-28 17:20:00,INTERNAL_USER,1,0,5247,0,5247,5247,8780044584,rolling_07d,1
1,10,0.001385,2025-10-29 00:30:00,AUTOMATION,7221,0,0,43,2690,2797,411658217,rolling_07d,2
2,0,0.0,2025-10-29 00:30:00,HUB_SERVICE,225,0,21,0,562,591,4025860,rolling_07d,2
3,1,0.04,2025-10-29 00:30:00,MONITOR_USERS,25,0,4,1,86,95,19055233,rolling_07d,2
4,1,0.5,2025-10-29 00:30:00,UNKNOWN,2,0,4239,0,4239,4239,1993172612,rolling_07d,2


In [50]:
spike_events = (
    slot_spike.groupby('spike_id')
    .agg(
        start_ts=('bucket_ts', 'min'),
        end_ts=('bucket_ts', 'max'),
        duration_minutes=('bucket_ts', lambda s: (len(s) * 10)),
        total_slot_ms=('total_slot_ms', 'sum'),
        max_slot_ms=('total_slot_ms', 'max'),
        avg_queue_seconds=('sum_queue_seconds', lambda s: s.sum() / max(len(s), 1)),
        classifications=('classification_type', lambda s: s.nunique())
    )
    .reset_index()
)
external_classes = ['HUB_SERVICE', 'MONITOR_USERS']
external_subset = slot_spike[slot_spike['classification_type'].isin(external_classes)]
agg_map = {
    'ext_job_count': ('job_count', 'sum'),
    'ext_breach_jobs': ('breach_job_count', 'sum'),
    'ext_total_seconds': ('sum_total_seconds', 'sum')
}
if 'p50_total_seconds' in external_subset.columns:
    agg_map['ext_median_proxy'] = ('p50_total_seconds', 'median')
if 'p90_total_seconds' in external_subset.columns:
    agg_map['ext_p90_seconds'] = ('p90_total_seconds', 'max')
if 'p99_total_seconds' in external_subset.columns:
    agg_map['ext_p99_seconds'] = ('p99_total_seconds', 'max')
if 'max_total_seconds' in external_subset.columns:
    agg_map['ext_max_seconds'] = ('max_total_seconds', 'max')
external_metrics = external_subset.groupby('spike_id').agg(**agg_map)
if not external_metrics.empty:
    external_metrics['ext_avg_seconds'] = external_metrics.apply(
        lambda row: row['ext_total_seconds'] / row['ext_job_count'] if row['ext_job_count'] else float('nan'),
        axis=1
    )
    if 'ext_median_proxy' in external_metrics.columns:
        external_metrics.rename(columns={'ext_median_proxy': 'ext_median_seconds'}, inplace=True)
    external_metrics['ext_pct_over_60'] = external_metrics.apply(
        lambda row: row['ext_breach_jobs'] / row['ext_job_count'] if row['ext_job_count'] else float('nan'),
        axis=1
    )
    spike_events = spike_events.merge(external_metrics.drop(columns=['ext_total_seconds']), on='spike_id', how='left')
    spike_events['ext_pct_over_60'] = spike_events['ext_pct_over_60'].map(
        lambda x: f"{x * 100:.2f}%" if pd.notna(x) else None
    )
if 'sum_total_seconds' in slot_spike.columns:
    totals_agg = {
        'total_jobs': ('job_count', 'sum'),
        'total_seconds': ('sum_total_seconds', 'sum')
    }
    if 'p50_total_seconds' in slot_spike.columns:
        totals_agg['median_total_seconds'] = ('p50_total_seconds', 'median')

    spike_totals = slot_spike.groupby('spike_id').agg(**totals_agg).reset_index()
    if 'median_total_seconds' not in spike_totals.columns:
        spike_totals['median_total_seconds'] = pd.NA
    spike_totals['avg_total_seconds'] = spike_totals['total_seconds'] / spike_totals['total_jobs']
    spike_totals = spike_totals.drop(columns=['total_seconds'])
    spike_events = spike_events.merge(spike_totals, on='spike_id', how='left')
spike_events['end_ts'] = spike_events['end_ts'] + interval
spike_events_display = spike_events.copy()
format_cols = [
    'total_slot_ms', 'max_slot_ms', 'slot_hours', 'ext_job_count', 'ext_breach_jobs',
    'ext_p90_seconds', 'ext_p99_seconds', 'ext_max_seconds', 'ext_avg_seconds', 'ext_median_seconds',
    'total_jobs', 'avg_total_seconds', 'median_total_seconds'
]
for col in format_cols:
    if col in spike_events_display.columns:
        if col in ['ext_job_count', 'ext_breach_jobs', 'total_jobs']:
            spike_events_display[col] = spike_events_display[col].map(lambda x: f"{x:,.0f}" if pd.notna(x) else None)
        else:
            spike_events_display[col] = spike_events_display[col].map(lambda x: f"{x:,.2f}" if pd.notna(x) else None)
spike_events_display


Unnamed: 0,spike_id,start_ts,end_ts,duration_minutes,total_slot_ms,max_slot_ms,avg_queue_seconds,classifications,ext_job_count,ext_breach_jobs,ext_avg_seconds,ext_pct_over_60,total_jobs,median_total_seconds,avg_total_seconds
0,1,2025-10-28 17:20:00,2025-10-28 17:30:00,10,8780044584.0,8780044584.0,0.0,1,,,,,1,,5247.0
1,2,2025-10-29 00:30:00,2025-10-29 00:40:00,40,2427911922.0,1993172612.0,11.0,4,250.0,1.0,2.74,0.40%,7473,,1.03
2,3,2025-10-29 03:40:00,2025-10-29 03:50:00,50,2263825472.0,2014783139.0,0.0,5,318.0,1.0,4.91,0.31%,10820,,2.44
3,4,2025-10-29 05:40:00,2025-10-29 05:50:00,50,12155017889.0,10837134996.0,0.2,5,362.0,2.0,4.83,0.55%,10852,,1.17
4,5,2025-10-29 08:30:00,2025-10-29 08:40:00,50,5216528522.0,4228311792.0,0.0,5,332.0,1.0,7.22,0.30%,13619,,0.83
5,6,2025-10-29 09:00:00,2025-10-29 09:10:00,50,7261181559.0,5652154208.0,0.2,5,344.0,6.0,6.08,1.74%,13549,,1.78
6,7,2025-10-29 12:00:00,2025-10-29 12:10:00,50,8636866826.0,7706802748.0,3.6,5,389.0,7.0,19.05,1.80%,14977,,1.14
7,8,2025-10-29 14:30:00,2025-10-29 14:40:00,50,6240154197.0,5653602317.0,0.2,5,548.0,4.0,3.27,0.73%,17737,,0.53
8,9,2025-10-29 16:40:00,2025-10-29 16:50:00,40,4649510058.0,3408567790.0,0.0,4,649.0,5.0,5.76,0.77%,19442,,0.55
9,10,2025-10-30 00:30:00,2025-10-30 00:40:00,50,3431959963.0,2105432332.0,0.0,5,261.0,3.0,7.72,1.15%,24049,,0.46


In [51]:
classification_totals = (
    slot_spike.groupby('classification_type')['total_slot_ms']
    .sum()
    .reset_index(name='total_slot_ms')
)
classification_totals['slot_share'] = (
    classification_totals['total_slot_ms'] /
    classification_totals['total_slot_ms'].sum()
)
# slot_share expresses the share of total spike slot-ms attributed to each classification across all spikes.
classification_totals['slot_share'] = classification_totals['slot_share'].map(
    lambda x: f"{x * 100:.2f}%" if pd.notna(x) else None
)
classification_totals['total_slot_ms_fmt'] = classification_totals['total_slot_ms'].map(
    lambda x: f"{x:,.2f}" if pd.notna(x) else None
)
classification_totals.sort_values('total_slot_ms', ascending=False).reset_index(drop=True)


Unnamed: 0,classification_type,total_slot_ms,slot_share,total_slot_ms_fmt
0,INTERNAL_USER,134289641570,69.62%,134289641570.0
1,HUB_SERVICE,17695255881,9.17%,17695255881.0
2,UNKNOWN,17291577915,8.96%,17291577915.0
3,AUTOMATION,13927247856,7.22%,13927247856.0
4,MONITOR_USERS,9677841916,5.02%,9677841916.0


In [52]:
total_slot_ms_all = slot_df['total_slot_ms'].sum()
spike_events['slot_hours'] = spike_events['total_slot_ms'] / (1000 * 60 * 60)
spike_summary = {
    'spike_count': len(spike_events),
    'spike_days': spike_events['start_ts'].dt.floor('D').nunique(),
    'slot_hours_in_spikes': spike_events['slot_hours'].sum(),
    'share_slot_ms_spikes': spike_events['total_slot_ms'].sum() / total_slot_ms_all if total_slot_ms_all else 0,
    'median_duration_minutes': spike_events['duration_minutes'].median() if len(spike_events) > 0 else 0
}
spike_summary


{'spike_count': 31,
 'spike_days': 8,
 'slot_hours_in_spikes': np.float64(53578.21253833333),
 'share_slot_ms_spikes': np.float64(0.19643852018490376),
 'median_duration_minutes': np.float64(50.0)}

In [43]:
spike_events.to_csv('../rolling_07d_spike_events.csv', index=False)
classification_totals.to_csv('../rolling_07d_spike_mix.csv', index=False)
spike_summary


{'spike_count': 31,
 'spike_days': 8,
 'slot_hours_in_spikes': np.float64(53578.21253833333),
 'share_slot_ms_spikes': np.float64(0.19643852018490376),
 'median_duration_minutes': np.float64(50.0)}

### Rolling-7d Spike Snapshot
- **Spikes detected:** see summary cell below for counts and duration stats.
- **Outputs:**
  - `rolling_07d_spike_events.csv` – spike intervals with slot totals, max load, queue averages.
  - `rolling_07d_spike_mix.csv` – classification share per spike event.
- Re-run the upstream SQL with alternative `window_ids` (e.g., `peak_fy22`, `baseline_fy22`, etc.) and drop the resulting JSON/CSVs in this folder to iterate across historical windows.



In [53]:
print("Spike summary:")
for k, v in spike_summary.items():
    if isinstance(v, float):
        print(f"  {k}: {v:,.2f}")
    else:
        print(f"  {k}: {v}")

print("\nTop classifications by spike slot share:")
print(
    classification_totals.sort_values('total_slot_ms', ascending=False)
    .loc[:, ['classification_type', 'total_slot_ms_fmt', 'slot_share']]
    .head(10)
)


Spike summary:
  spike_count: 31
  spike_days: 8
  slot_hours_in_spikes: 53,578.21
  share_slot_ms_spikes: 0.20
  median_duration_minutes: 50.00

Top classifications by spike slot share:
  classification_type   total_slot_ms_fmt slot_share
2       INTERNAL_USER  134,289,641,570.00     69.62%
1         HUB_SERVICE   17,695,255,881.00      9.17%
4             UNKNOWN   17,291,577,915.00      8.96%
0          AUTOMATION   13,927,247,856.00      7.22%
3       MONITOR_USERS    9,677,841,916.00      5.02%


## Notes
- All metrics above are scoped to the rolling 7-day window (easy to expand by editing the `DECLARE window_ids` clause in `phase3_qos_latency_metrics.sql`).
- Slot percentiles are expressed in slot-hours per job; compare to reservation capacity to identify outliers quickly.
- `avg_active_slots` approximates the sustained slot concurrency (slot-ms / runtime). Use it alongside reservation totals (`slot_capacity`, `autoscaleCurrentSlots`) to spot headroom or gaps.
- Next steps: repeat for peak windows, build time-series charts for queue seconds, and integrate reservation assignment events to flag bursts that exceeded the 1000-slot baseline.


In [47]:
slot_percentiles_hours = slot_percentiles / (1000 * 60 * 60)
# Converting to slot-hours makes it easy to compare percentile loads against reservation capacity.
slot_percentiles_hours


Unnamed: 0_level_0,Unnamed: 1_level_0,p50,p90,p99
classification_type,event_name,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
AUTOMATION,extract_job_completed,1.715653,1.715653,1.715653
AUTOMATION,load_job_completed,1440.081824,1440.081824,1440.081824
AUTOMATION,query_job_completed,72230.803131,72230.803131,72230.803131
HUB_SERVICE,load_job_completed,424.223061,424.223061,424.223061
HUB_SERVICE,query_job_completed,126301.20671,126301.20671,126301.20671
INTERNAL_USER,extract_job_completed,0.000189,0.000189,0.000189
INTERNAL_USER,load_job_completed,0.29636,0.29636,0.29636
INTERNAL_USER,query_job_completed,39454.819824,39454.819824,39454.819824
MONITOR_USERS,load_job_completed,0.054529,0.054529,0.054529
MONITOR_USERS,query_job_completed,23646.977353,23646.977353,23646.977353
