# Forecast Setup (Timezone)

Timezone-parameterized pipeline. Fetches CT metadata, filters stations by timezone bucket,
pulls VP and VOVI data, downloads S3 artifacts, and produces joined.csv + visual.json.

Inputs (injected variables):
- `tz_bucket` â€” Timezone bucket: Eastern, Central, Mountain, Pacific, etc. (default: Eastern)
- `biz` â€” Business line: AMZL, AMXL, RSR (default: AMZL)
- `aws_profile` â€” AWS profile for S3 (default: None)

Data pulls:
1. CT metadata from Control Tower API
2. VP pipeline (unpublished) filtered by timezone bucket
3. VOVI forecasts (US + CA)
4. Pipeline artifacts from S3
5. Intraday PBA data from S3

Transforms:
- VP pivot (long to wide, util columns, grid keys)
- CT site list + VP + VOVI outer join with available_inputs flag
- PBA query (separate output file)

In [None]:
# name: setup | type: python
from pathlib import Path
from datetime import datetime, timedelta
import pytz

# Accept injected variables or use defaults
if 'tz_bucket' not in dir():
    tz_bucket = 'Eastern'
if 'biz' not in dir():
    biz = 'AMZL'
if 'aws_profile' not in dir():
    aws_profile = None

# Auto-calculate target date (tomorrow Pacific)
pacific = pytz.timezone('US/Pacific')
now_pacific = datetime.now(pacific)
tomorrow = now_pacific + timedelta(days=1)
target_date = tomorrow.strftime('%Y-%m-%d')

# Generate context ID and output directory
ctx_id = generate_ctx_id(f'forecast_{tz_bucket.lower()}')
ctx_dir = contexts_dir / ctx_id
ctx_dir.mkdir(parents=True, exist_ok=True)

# Create _vars table for SQL cells
conn.execute(f"CREATE OR REPLACE TABLE _vars AS SELECT '{tz_bucket}' as tz_bucket, '{biz}' as biz")

print(f'Timezone: {tz_bucket}')
print(f'Business: {biz}')
print(f'Target date: {target_date}')
print(f'Context ID: {ctx_id}')
print(f'Output: {ctx_dir}')

result = {'tz_bucket': tz_bucket, 'biz': biz, 'target_date': target_date, 'ctx_id': ctx_id, 'ctx_dir': str(ctx_dir)}

In [None]:
# name: ct_fetch | type: python
import json as _json

r = _json.loads(fetch_ct_metadata(ctx_id))
if r['success']:
    conn.execute(f"CREATE OR REPLACE TABLE ct_metadata AS SELECT * FROM '{r['output_file']}'")
    ct_count = conn.execute("SELECT COUNT(DISTINCT lm_node) FROM ct_metadata WHERE master_region = 'NA' AND status = 'A'").fetchone()[0]
    print(f'CT metadata: {r["row_count"]} total rows, {ct_count} active NA stations')
else:
    raise RuntimeError(f'CT metadata fetch failed: {r.get("error", "unknown")}')

result = r

In [None]:
-- name: tz_bucket_map | type: sql
CREATE OR REPLACE TABLE tz_bucket_map AS
WITH tomorrow_la AS (
    SELECT CAST((current_timestamp AT TIME ZONE 'America/Los_Angeles') AS DATE) + 1 AS plan_date
),
utc_noon AS (
    SELECT CAST((plan_date || ' 12:00:00')::VARCHAR AS TIMESTAMP WITHOUT TIME ZONE) AS utc_ts FROM tomorrow_la
),
timezones AS (
    SELECT DISTINCT timezone FROM ct_metadata WHERE master_region = 'NA'
),
tz_offsets AS (
    SELECT timezone, datediff('HOUR', TIMEZONE('UTC', utc_ts AT TIME ZONE 'UTC'), TIMEZONE(timezone, utc_ts AT TIME ZONE 'UTC')) as utc_hour_diff
    FROM utc_noon CROSS JOIN timezones
),
reference_buckets AS (
    SELECT 'America/Los_Angeles' AS ref_timezone, 'pacific' AS bucket FROM utc_noon UNION ALL
    SELECT 'America/Denver', 'mountain' FROM utc_noon UNION ALL
    SELECT 'America/Chicago', 'central' FROM utc_noon UNION ALL
    SELECT 'America/New_York', 'eastern' FROM utc_noon UNION ALL
    SELECT 'America/Halifax', 'halifax' FROM utc_noon UNION ALL
    SELECT 'America/Anchorage', 'alaska' FROM utc_noon UNION ALL
    SELECT 'Pacific/Honolulu', 'hawaii' FROM utc_noon
),
reference_offsets AS (
    SELECT rb.ref_timezone, rb.bucket,
        datediff('HOUR', TIMEZONE('UTC', utc_ts AT TIME ZONE 'UTC'), TIMEZONE(rb.ref_timezone, utc_ts AT TIME ZONE 'UTC')) as ref_utc_offset
    FROM utc_noon CROSS JOIN reference_buckets rb
)
SELECT tz.timezone, tz.utc_hour_diff, COALESCE(ro.bucket, 'other') AS bucket
FROM tz_offsets tz LEFT JOIN reference_offsets ro ON tz.utc_hour_diff = ro.ref_utc_offset

In [None]:
-- name: load_site_list | type: sql
CREATE OR REPLACE TABLE site_list AS
SELECT DISTINCT
    lm_node AS station,
    'Single' AS cycle,
    (SELECT biz FROM _vars) AS business_org
FROM ct_metadata
WHERE master_region = 'NA'
  AND status = 'A'
  AND timezone IN (SELECT timezone FROM tz_bucket_map WHERE bucket = LOWER((SELECT tz_bucket FROM _vars)))

In [None]:
-- name: vp_urls | type: sql
CREATE OR REPLACE TABLE vp_urls AS
WITH plan_start AS (
    SELECT CAST((current_timestamp AT TIME ZONE 'America/Los_Angeles') AS DATE) + 1 AS d
)
SELECT 'https://na.prod.control-tower.last-mile.amazon.dev/api/rap-dal/artifacts/NA/volume_planner/intraweek/pipeline_volume_plan/1?scenario=POR&status=UNPUBLISHED&space=station%3D'
    || station || '&time=plan_start_date%3D' || d || '&business=' || (SELECT biz FROM _vars) AS url
FROM site_list CROSS JOIN plan_start

In [None]:
# name: vp_fetch | type: python
import json as _json

urls = ','.join([r[0] for r in conn.execute('SELECT url FROM vp_urls').fetchall()])
station_count = conn.execute('SELECT COUNT(*) FROM vp_urls').fetchone()[0]
print(f'Fetching VP data for {station_count} {tz_bucket} stations...')

r = _json.loads(fetch_vp_pipeline(ctx_id, urls, 'unpublished', 4))
if r['success']:
    conn.execute(f"CREATE OR REPLACE TABLE vp_raw AS SELECT * FROM '{r['csv_file']}'")
    print(f'VP fetch: {r["fetched"]} ok, {r["failed"]} failed, {r["rows_transformed"]} rows')
else:
    raise RuntimeError(f'VP fetch failed: {r.get("error", "unknown")}')

result = r

In [None]:
# name: fetch_vovi | type: python
# Fetch VOVI forecasts for US and CA
import subprocess
import json as _json

cookie_path = str(Path.home() / '.midway' / 'cookie')
vovi_base = 'https://prod.vovi.last-mile.amazon.dev/api/forecast/list_approved'

countries = ['US', 'CA']
business_type = biz.lower()
shipping_type = 'premium'

vovi_ctx_dir = ctx_dir / 'vovi'
vovi_ctx_dir.mkdir(parents=True, exist_ok=True)

vovi_results = []

for country in countries:
    url = f'{vovi_base}?country={country}&cptDateKey={target_date}&shippingType={shipping_type}&businessType={business_type}'
    print(f'Fetching VOVI: {country} / {business_type} / {shipping_type} / {target_date}...')
    
    try:
        curl_result = subprocess.run(
            ['curl.exe', '--location-trusted', '-b', cookie_path, '-s', url],
            capture_output=True, text=True
        )
        
        if curl_result.returncode != 0:
            print(f'  {country}: curl failed - {curl_result.stderr[:100]}')
            vovi_results.append({'country': country, 'success': False, 'error': curl_result.stderr[:200]})
            continue
        
        data = _json.loads(curl_result.stdout)
        df = pd.DataFrame(data)
        
        # Save to context directory
        csv_file = vovi_ctx_dir / f'vovi_{country.lower()}_{business_type}_{shipping_type}.csv'
        df.to_csv(csv_file, index=False)
        
        # Register in DuckDB
        table_name = f'vovi_{country.lower()}'
        conn.register(table_name, df)
        
        print(f'  {country}: {len(df):,} rows, {len(df.columns)} cols -> {table_name}')
        vovi_results.append({'country': country, 'success': True, 'rows': len(df), 'table': table_name, 'path': str(csv_file)})
        
    except Exception as e:
        print(f'  {country}: failed - {e}')
        vovi_results.append({'country': country, 'success': False, 'error': str(e)})

# Create combined vovi table using UNION ALL BY NAME (handles differing column counts)
try:
    tables_to_union = [r['table'] for r in vovi_results if r.get('success')]
    if tables_to_union:
        union_sql = ' UNION ALL BY NAME '.join([f"SELECT * FROM {t}" for t in tables_to_union])
        conn.execute(f'CREATE OR REPLACE VIEW vovi AS {union_sql}')
        vovi_total = conn.execute('SELECT COUNT(*) FROM vovi').fetchone()[0]
        print(f'\nCombined vovi view: {vovi_total:,} rows')
except Exception as e:
    print(f'Could not create combined view: {e}')

result = vovi_results

In [None]:
# name: download_artifacts | type: python
# Download latest pipeline artifacts from S3
import re

artifact_bucket = 'lma-glue-pipeline'
sort_code = 'DS-A'
artifacts_dir = ctx_dir / 'pipeline_artifacts'
artifacts_dir.mkdir(parents=True, exist_ok=True)

session = make_boto3_session(profile_name=aws_profile)
s3 = session.client('s3')

s3_prefix = f'pipeline_output/internal_sort_code={sort_code}/target_forecast_date={target_date}/'
print(f'Scanning: s3://{artifact_bucket}/{s3_prefix}')

# Group by artifact type, keep latest timestamp
paginator = s3.get_paginator('list_objects_v2')
files_by_type = {}

for page in paginator.paginate(Bucket=artifact_bucket, Prefix=s3_prefix):
    for obj in page.get('Contents', []):
        key = obj['Key']
        filename = key.split('/')[-1]
        match = re.match(r'^(.+?)_(\d{8}_\d{6})(.*)\.csv$', filename)
        if match:
            base_name = match.group(1)
            timestamp = match.group(2)
            suffix = match.group(3)
            artifact_type = base_name + suffix
            if artifact_type not in files_by_type or timestamp > files_by_type[artifact_type]['timestamp']:
                files_by_type[artifact_type] = {
                    'key': key, 'filename': filename, 'timestamp': timestamp,
                    'size': obj['Size'], 'artifact_type': artifact_type
                }

print(f'Found {len(files_by_type)} artifact types')

# Download and register each
artifact_results = []
for atype, info in sorted(files_by_type.items()):
    dest_file = artifacts_dir / info['filename']
    size_mb = info['size'] / 1024 / 1024
    
    try:
        print(f'  {info["filename"]} ({size_mb:.1f} MB)...', flush=True)
        s3.download_file(artifact_bucket, info['key'], str(dest_file))
        
        # Register in DuckDB
        conn.execute(f"CREATE OR REPLACE TABLE {atype} AS SELECT * FROM read_csv_auto('{dest_file}')")
        row_count = conn.execute(f'SELECT COUNT(*) FROM {atype}').fetchone()[0]
        
        artifact_results.append({'artifact': atype, 'rows': row_count, 'size_mb': round(size_mb, 1), 'path': str(dest_file)})
    except Exception as e:
        print(f'    FAILED: {e}')
        artifact_results.append({'artifact': atype, 'error': str(e)})

print(f'\nDownloaded and registered {len([a for a in artifact_results if "rows" in a])}/{len(files_by_type)} artifacts')

result = artifact_results

In [None]:
# name: download_pba | type: python
# Download latest intraday PBA parquet from S3 (last-mile-staging bucket)
import re as _re

pba_bucket = 'last-mile-staging'
sort_code = 'DS-A'
pba_dir = ctx_dir / 'intraday_pba'
pba_dir.mkdir(parents=True, exist_ok=True)

pba_session = make_boto3_session(profile_name='last-mile-staging')
pba_s3 = pba_session.client('s3')

pba_prefix = f'intraday-pba/partition_internal_sort_code={sort_code}/partition_target_date={target_date}/'
print(f'Scanning: s3://{pba_bucket}/{pba_prefix}')

# List all objects under target date to find run_time partitions
paginator = pba_s3.get_paginator('list_objects_v2')
run_times = {}

for page in paginator.paginate(Bucket=pba_bucket, Prefix=pba_prefix):
    for obj in page.get('Contents', []):
        key = obj['Key']
        match = _re.search(r'partition_run_time=(\d+)/', key)
        if match:
            run_time = int(match.group(1))
            run_times[run_time] = {'key': key, 'size': obj['Size']}

if not run_times:
    print(f'No intraday PBA data found for {target_date}')
    result = {'success': False, 'error': f'No data for {target_date}'}
else:
    latest_run_time = max(run_times)
    latest = run_times[latest_run_time]
    filename = latest['key'].split('/')[-1]
    size_mb = latest['size'] / 1024 / 1024
    dest_file = pba_dir / filename

    print(f'Latest run_time: {latest_run_time}')
    print(f'Downloading: {filename} ({size_mb:.1f} MB)...', flush=True)

    pba_s3.download_file(pba_bucket, latest['key'], str(dest_file))

    # Register in DuckDB
    conn.execute(f"CREATE OR REPLACE TABLE intraday_pba AS SELECT * FROM read_parquet('{dest_file}')")
    pba_count = conn.execute('SELECT COUNT(*) FROM intraday_pba').fetchone()[0]
    pba_cols = [col[0] for col in conn.execute('DESCRIBE intraday_pba').fetchall()]

    print(f'Intraday PBA: {pba_count:,} rows, {len(pba_cols)} columns')
    print(f'Saved to: {dest_file}')

    result = {
        'success': True,
        'run_time': latest_run_time,
        'rows': pba_count,
        'columns': pba_cols,
        'size_mb': round(size_mb, 1),
        'path': str(dest_file)
    }

In [None]:
-- name: vp_pivot | type: sql
SET TimeZone = 'UTC';

CREATE OR REPLACE TABLE vp AS
WITH pivot_all AS (
    SELECT
        node,
        plan_start_date,
        ofd_dates,
        demand_types,
        CAST(cpts AS TIMESTAMP) AS cpts,
        MAX(CASE WHEN metric_name = 'total_volume_available' THEN metric_value END) AS total_volume_available,
        MAX(CASE WHEN metric_name = 'automated_uncapped_slam_forecast' THEN metric_value END) AS automated_uncapped_slam_forecast,
        MAX(CASE WHEN metric_name = 'current_slam' THEN metric_value END) AS current_slam,
        MAX(CASE WHEN metric_name = 'weekly_uncapped_slam_forecast' THEN metric_value END) AS weekly_uncapped_slam_forecast,
        MAX(CASE WHEN metric_name = 'post_cutoff_adjustment' THEN metric_value END) AS post_cutoff_adjustment,
        MAX(CASE WHEN metric_name = 'total_backlog' THEN metric_value END) AS total_backlog,
        MAX(CASE WHEN metric_name = 'automated_confidence' THEN metric_value END) AS automated_confidence,
        MAX(CASE WHEN metric_name = 'uncapped_slam_forecast' THEN metric_value END) AS uncapped_slam_forecast,
        MAX(CASE WHEN metric_name = 'atrops_soft_cap' THEN metric_value END) AS atrops_soft_cap,
        MAX(CASE WHEN metric_name = 'confidence_anomaly' THEN metric_value END) AS confidence_anomaly,
        MAX(CASE WHEN metric_name = 'net_volume_adjustments' THEN metric_value END) AS net_volume_adjustments,
        MAX(CASE WHEN metric_name = 'adjusted_uncapped_slam_forecast' THEN metric_value END) AS adjusted_uncapped_slam_forecast,
        MAX(CASE WHEN metric_name = 'cap_target_buffer' THEN metric_value END) AS cap_target_buffer,
        MAX(CASE WHEN metric_name = 'earlies_expected' THEN metric_value END) AS earlies_expected,
        MAX(CASE WHEN metric_name = 'returns' THEN metric_value END) AS returns,
        MAX(CASE WHEN metric_name = 'sideline_in' THEN metric_value END) AS sideline_in,
        MAX(CASE WHEN metric_name = 'vovi_uncapped_slam_forecast' THEN metric_value END) AS vovi_uncapped_slam_forecast,
        MAX(CASE WHEN metric_name = 'in_station_backlog' THEN metric_value END) AS in_station_backlog,
        MAX(CASE WHEN metric_name = 'mnr_expected' THEN metric_value END) AS mnr_expected,
        MAX(CASE WHEN metric_name = 'mnr_received' THEN metric_value END) AS mnr_received,
        MAX(CASE WHEN metric_name = 'current_schedule' THEN metric_value END) AS current_schedule,
        MAX(CASE WHEN metric_name = 'vovi_adjustment' THEN metric_value END) AS vovi_adjustment,
        MAX(CASE WHEN metric_name = 'forecast_type' THEN metric_value END) AS forecast_type,
        MAX(CASE WHEN metric_name = 'earlies_received' THEN metric_value END) AS earlies_received,
        MAX(CASE WHEN metric_name = 'latest_deployed_cap' THEN metric_value END) AS latest_deployed_cap,
        MAX(CASE WHEN metric_name = 'atrops_hard_cap' THEN metric_value END) AS atrops_hard_cap,
        MAX(CASE WHEN metric_name = 'capped_slam_forecast' THEN metric_value END) AS capped_slam_forecast
    FROM vp_raw
    WHERE plan_start_date::DATE = ofd_dates::DATE
    GROUP BY node, plan_start_date, ofd_dates, demand_types, cpts
)
SELECT
    *,
    strftime(cpts, '%H:%M') AS cpts_local,
    strftime(cpts AT TIME ZONE 'UTC', '%H:%M') AS cpts_utc,
    ofd_dates || '#' || node || '#' || strftime(cpts, '%H:%M') AS grid_key_local,
    ofd_dates || '#' || node || '#' || strftime(cpts AT TIME ZONE 'UTC', '%H:%M') AS grid_key_utc,
    CASE WHEN GREATEST(COALESCE(latest_deployed_cap::FLOAT, 0), COALESCE(atrops_soft_cap::FLOAT, 0)) > 0
         THEN ROUND(COALESCE(automated_uncapped_slam_forecast::FLOAT, 0) / GREATEST(COALESCE(latest_deployed_cap::FLOAT, 0), COALESCE(atrops_soft_cap::FLOAT, 0)), 4)
         ELSE NULL END AS auto_forecast_util,
    CASE WHEN GREATEST(COALESCE(latest_deployed_cap::FLOAT, 0), COALESCE(atrops_soft_cap::FLOAT, 0)) > 0
         THEN ROUND(COALESCE(current_schedule::FLOAT, 0) / GREATEST(COALESCE(latest_deployed_cap::FLOAT, 0), COALESCE(atrops_soft_cap::FLOAT, 0)), 4)
         ELSE NULL END AS util
FROM pivot_all
ORDER BY node, cpts

In [None]:
-- name: joined | type: sql
SET TimeZone = 'UTC';

CREATE OR REPLACE TABLE joined AS
WITH vovi_prepared AS (
    SELECT
        station,
        CAST(timezone('UTC', to_timestamp(station_cpt)) AS TIMESTAMP) AS cpt_utc,
        match_key
    FROM vovi
    WHERE match_key IS NOT NULL
),
-- Map station+cpt_utc -> cpt_time_local from cumulative
cpt_map AS (
    SELECT DISTINCT
        station,
        strftime(cpt_utc::TIMESTAMP, '%H:%M') AS cpts_utc,
        cpt_time_local
    FROM cumulative
),
base AS (
    SELECT
        COALESCE(sl.station, vp.node) AS station,
        sl.cycle,
        sl.business_org,
        CASE
            WHEN sl.station IS NOT NULL AND vp.node IS NOT NULL THEN 'vp_list'
            WHEN vp.node IS NOT NULL THEN 'vp'
            ELSE 'list'
        END AS available_inputs,
        vp.plan_start_date,
        vp.ofd_dates,
        vp.demand_types,
        vp.cpts,
        vp.cpts_local,
        vp.cpts_utc,
        vp.grid_key_local,
        vp.grid_key_utc,
        vp.forecast_type,
        vp.automated_confidence,
        vp.auto_forecast_util,
        vp.util,
        vp.vovi_uncapped_slam_forecast,
        vp.uncapped_slam_forecast,
        vp.adjusted_uncapped_slam_forecast,
        vp.capped_slam_forecast,
        vp.atrops_soft_cap,
        vp.atrops_hard_cap,
        vp.latest_deployed_cap,
        vp.cap_target_buffer,
        vp.current_slam,
        vp.current_schedule,
        vp.total_volume_available,
        vp.total_backlog,
        vp.in_station_backlog,
        vp.post_cutoff_adjustment,
        vp.net_volume_adjustments,
        vp.vovi_adjustment,
        vp.confidence_anomaly,
        vp.automated_uncapped_slam_forecast,
        vp.weekly_uncapped_slam_forecast,
        vp.earlies_expected,
        vp.earlies_received,
        vp.returns,
        vp.sideline_in,
        vp.mnr_expected,
        vp.mnr_received,
        v.modified_user AS vovi_modified_user,
        v.proposed_cap AS vovi_proposed_cap,
        v.post_cutoff_adjustment AS vovi_post_cutoff_adjustment,
        v.adjusted_forecast AS vovi_adjusted_forecast,
        v.forecast_source AS vovi_forecast_source,
        v.original_forecast AS vovi_original_forecast,
        v.forecast_status AS vovi_forecast_status,
        v.forecast_adjustment AS vovi_forecast_adjustment,
        v.current_slammed AS vovi_current_slammed,
        v.current_scheduled AS vovi_current_scheduled,
        v.soft_cap AS vovi_soft_cap,
        v.hard_cap AS vovi_hard_cap,
        -- Day classifier columns (from VOVI match date)
        dc.bucket_lower,
        dc.bucket_upper,
        dc.peak_to_eod_drop_pct,
        dc.constrained_after_target,
        dc.sched_at_max_drop,
        dc.max_drop_4hr,
        dc.had_desched_notify,
        dc.had_desched_execute,
        -- Flatline flags (from target date)
        fl.flatline_execute,
        fl.flatline_notify,
        NOW()::TIMESTAMP AS execution_ts
    FROM site_list sl
    FULL OUTER JOIN vp ON sl.station = vp.node
    LEFT JOIN vovi v
        ON COALESCE(sl.station, vp.node) = v.station
        AND vp.cpts_utc = strftime(CAST(timezone('UTC', to_timestamp(v.station_cpt)) AS TIMESTAMP), '%H:%M')
    LEFT JOIN vovi_prepared vp2
        ON COALESCE(sl.station, vp.node) = vp2.station
        AND vp.cpts_utc = strftime(vp2.cpt_utc, '%H:%M')
    LEFT JOIN cpt_map m
        ON COALESCE(sl.station, vp.node) = m.station
        AND vp.cpts_utc = m.cpts_utc
    LEFT JOIN day_classifier dc
        ON COALESCE(sl.station, vp.node) = dc.station
        AND m.cpt_time_local = dc.cpt_time_local
        AND vp2.match_key IS NOT NULL
        AND dc.ofd_date = vp2.match_key::DATE
    LEFT JOIN flatline_execute_flags fl
        ON COALESCE(sl.station, vp.node) = fl.station
        AND m.cpt_time_local = fl.cpt_time_local
)
SELECT
    *,
    -- Derived flags (comma-separated)
    ARRAY_TO_STRING(LIST_VALUE(
        CASE WHEN CAST(bucket_upper AS FLOAT) <= 10 THEN 'risk-under' END,
        CASE WHEN CAST(flatline_execute AS INT) = 1 THEN 'flatline' END
    ).list_filter(x -> x IS NOT NULL), ',') AS flags
FROM base
ORDER BY station, cpts

In [None]:
# name: summary | type: python
# Summary of available_inputs breakdown
summary = conn.execute("""
    SELECT available_inputs, COUNT(DISTINCT station) AS stations, COUNT(*) AS rows
    FROM joined
    GROUP BY available_inputs
    ORDER BY available_inputs
""").fetchdf()
print(summary.to_string(index=False))

total = conn.execute("SELECT COUNT(*) FROM joined").fetchone()[0]
distinct = conn.execute("SELECT COUNT(DISTINCT station) FROM joined").fetchone()[0]
print(f'\nTotal: {distinct} stations, {total} rows')

result = {
    'status': 'success',
    'total_rows': total,
    'distinct_stations': distinct,
    'breakdown': summary.to_dict('records')
}

In [None]:
# name: save_joined | type: python
# Export joined table to CSV
output_file = ctx_dir / 'joined.csv'
conn.execute(f"COPY joined TO '{output_file}' (HEADER, DELIMITER ',')")
row_count = conn.execute("SELECT COUNT(*) FROM joined").fetchone()[0]
print(f'Saved joined table: {row_count} rows -> {output_file}')

result = {'output_file': str(output_file), 'rows': row_count}

In [None]:
-- name: pba_visual | type: sql
SET TimeZone = 'UTC';

CREATE OR REPLACE TABLE pba_visual AS
WITH vovi_prepared AS (
    SELECT
        *,
        CAST(timezone('UTC', to_timestamp(station_cpt)) AS TIMESTAMP) AS cpt_utc
    FROM vovi
),
vp_keys AS (
    SELECT DISTINCT station, cpts_utc
    FROM joined
    WHERE cpts_utc IS NOT NULL
),
vp_target_date AS (
    SELECT MAX(target_forecast_date::DATE) AS target_date FROM cumulative
),

-- Target date: cumulative data (has fan chart percentiles)
target_rows AS (
    SELECT
        'target' AS pba_type,
        (SELECT target_date FROM vp_target_date) || '#' || C.station || '#' || strftime(C.cpt_utc::TIMESTAMP, '%H:%M') AS grid_key,
        C.target_forecast_date::VARCHAR AS pba_ofd_date,
        C.dhm_horizon AS pba_dhm_horizon,
        C.bi_hourly_trunc_local::VARCHAR AS pba_bi_hourly_local,
        C.horizon_rank_local::FLOAT AS pba_horizon_rank,
        C.scheduled::FLOAT AS pba_scheduled,
        COALESCE(C.soft_cap::FLOAT, C.target_horizon_soft_cap::FLOAT, 0) AS pba_soft_cap,
        I.hard_cap::FLOAT AS pba_hard_cap,
        COALESCE(C.slammed::FLOAT, 0) AS pba_slammed,
        I.cap_utilization::FLOAT AS pba_cap_utilization,
        COALESCE(C.cumulative_median::FLOAT, 0) AS pba_cumulative_median,
        COALESCE(C.cumulative_median_adj::FLOAT, 0) AS pba_cumulative_median_adj,
        COALESCE(C.cumulative_p10::FLOAT, 0) AS pba_p10,
        COALESCE(C.cumulative_p30::FLOAT, 0) AS pba_p30,
        COALESCE(C.cumulative_p50::FLOAT, 0) AS pba_p50,
        COALESCE(C.cumulative_p70::FLOAT, 0) AS pba_p70,
        COALESCE(C.cumulative_p90::FLOAT, 0) AS pba_p90
    FROM cumulative C
    INNER JOIN vp_keys K
        ON C.station = K.station
        AND strftime(C.cpt_utc::TIMESTAMP, '%H:%M') = K.cpts_utc
    LEFT JOIN intraday_pba I
        ON C.station = I.station
        AND C.cpt_utc::TIMESTAMP = I.cpt_utc
        AND C.target_forecast_date::DATE = I.ofd_date
        AND C.horizon_rank_local::FLOAT = I.horizon_rank::FLOAT
    WHERE C.horizon_day > -2 OR (C.horizon_day = -2 AND C.horizon_hour >= 12)
),

-- Match date: intraday_pba via VOVI match_key (no fan chart data)
match_rows AS (
    SELECT
        'match' AS pba_type,
        (SELECT target_date FROM vp_target_date) || '#' || I.station || '#' || strftime(I.cpt_utc, '%H:%M') AS grid_key,
        I.ofd_date::VARCHAR AS pba_ofd_date,
        I.dhm_horizon AS pba_dhm_horizon,
        I.bi_hourly_trunc_local::VARCHAR AS pba_bi_hourly_local,
        I.horizon_rank::FLOAT AS pba_horizon_rank,
        COALESCE(I.scheduled::FLOAT, 0) AS pba_scheduled,
        COALESCE(I.soft_cap::FLOAT, 0) AS pba_soft_cap,
        COALESCE(I.hard_cap::FLOAT, 0) AS pba_hard_cap,
        COALESCE(I.slammed::FLOAT, 0) AS pba_slammed,
        COALESCE(I.cap_utilization::FLOAT, 0) AS pba_cap_utilization,
        NULL::FLOAT AS pba_cumulative_median,
        NULL::FLOAT AS pba_cumulative_median_adj,
        NULL::FLOAT AS pba_p10,
        NULL::FLOAT AS pba_p30,
        NULL::FLOAT AS pba_p50,
        NULL::FLOAT AS pba_p70,
        NULL::FLOAT AS pba_p90
    FROM vp_keys K
    INNER JOIN vovi_prepared V
        ON K.station = V.station
        AND K.cpts_utc = strftime(V.cpt_utc, '%H:%M')
    INNER JOIN intraday_pba I
        ON V.station = I.station
        AND strftime(V.cpt_utc, '%H:%M') = strftime(I.cpt_utc, '%H:%M')
        AND V.match_key IS NOT NULL
        AND I.ofd_date = V.match_key::DATE
    WHERE I.horizon_day > -2 OR (I.horizon_day = -2 AND I.horizon_hour >= 12)
)

SELECT * FROM target_rows
UNION ALL
SELECT * FROM match_rows
ORDER BY grid_key, pba_type, pba_horizon_rank

In [None]:
# name: save_pba_visual | type: python
# Export PBA visual data to JSON for frontend chart consumption
import json as _json
import math as _math

pba_df = conn.execute("SELECT * FROM pba_visual").fetchdf()
output_file = ctx_dir / 'visual.json'

# Convert to list of dicts, replacing NaN with None for valid JSON
records = pba_df.to_dict('records')
for rec in records:
    for k, v in rec.items():
        if isinstance(v, float) and _math.isnan(v):
            rec[k] = None

with open(output_file, 'w') as f:
    _json.dump(records, f, default=str)

row_count = len(records)
distinct_keys = pba_df['grid_key'].nunique()
target_count = len(pba_df[pba_df['pba_type'] == 'target'])
match_count = len(pba_df[pba_df['pba_type'] == 'match'])
print(f'Saved PBA visual data: {row_count} rows ({target_count} target, {match_count} match), {distinct_keys} grid_keys -> {output_file}')

result = {'output_file': str(output_file), 'rows': row_count, 'target': target_count, 'match': match_count, 'distinct_grid_keys': distinct_keys}