# Manifest-Orchestrated Ingestion Pipeline
This notebook prototypes a staged workflow for capturing raw realtime feeds into GCS, tracking them via a BigQuery manifest, and batching curated upserts without relying on Redis or the Storage Write API.

## Working Plan
We will iterate through four stages, validating each before productionising:
1. **Manifest schema + helpers** – design the BigQuery table layout and author idempotent insert/update utilities.
2. **Raw fetch simulation** – exercise current ingestion logic to write sample manifest rows while landing files in GCS.
3. **Batch assembly strategies** – evaluate parallel downloads versus server-side `gcloud storage objects compose` for bundling raw files.
4. **Batch load + upsert** – reuse the existing BigQuery batch uploader to load a staging table and run partition-scoped MERGEs, finishing by updating manifest statuses.

In [22]:
"""Initialise shared clients and common configuration for the staged experiments."""
from __future__ import annotations
import datetime as dt
import json
import os
from dataclasses import dataclass
from typing import Iterable

import pandas as pd
from google.cloud import bigquery, storage

PROJECT_ID = os.environ.get("PROJECT_ID", "regal-dynamo-470908-v9")
BQ_DATASET = os.environ.get("BQ_MANIFEST_DATASET", "rt_manifest_dev")
MANIFEST_TABLE = f"{PROJECT_ID}.{BQ_DATASET}.ingestion_manifest"
RAW_BUCKET = os.environ.get("RAW_BUCKET", "auckland-data-dev")
RAW_PREFIX = os.environ.get("RAW_PREFIX", "raw/vehicle_positions/")

bq_client = bigquery.Client(project=PROJECT_ID)
gcs_client = storage.Client(project=PROJECT_ID)

print("Project:", PROJECT_ID)
print("Manifest table:", MANIFEST_TABLE)
print("Raw bucket prefix:", f"gs://{RAW_BUCKET}/{RAW_PREFIX}")

Project: regal-dynamo-470908-v9
Manifest table: regal-dynamo-470908-v9.rt_manifest_dev.ingestion_manifest
Raw bucket prefix: gs://auckland-data-dev/raw/vehicle_positions/


## Stage 1 · Manifest schema and helpers
Goals for this section:
- Declare the manifest table schema (columns, partitioning, clustering).
- Create utility functions to upsert manifest rows idempotently using standard BigQuery inserts.
- Smoke-test by writing and reading back a sample entry.

In [23]:
MANIFEST_SCHEMA = [
    bigquery.SchemaField("dataset", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("object_uri", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("capture_ts", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("partition_key", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("status", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("status_updated_ts", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("checksum", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("payload_bytes", "INT64", mode="NULLABLE"),
    bigquery.SchemaField("attributes", "JSON", mode="NULLABLE"),
]

def ensure_manifest_table() -> None:
    """Create the manifest dataset/table if they do not yet exist."""
    dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{BQ_DATASET}")
    dataset_ref.location = "australia-southeast1"
    try:
        bq_client.get_dataset(dataset_ref)
    except Exception:
        bq_client.create_dataset(dataset_ref, exists_ok=True)

    table = bigquery.Table(MANIFEST_TABLE, schema=MANIFEST_SCHEMA)
    table.time_partitioning = bigquery.TimePartitioning(field="capture_ts", type_=bigquery.TimePartitioningType.HOUR)
    table.clustering_fields = ["dataset", "partition_key"]
    bq_client.create_table(table, exists_ok=True)

ensure_manifest_table()
print("Manifest table is ready.")

Manifest table is ready.


In [24]:
def upsert_manifest_rows(rows: Iterable[dict]) -> None:
    """Insert or update manifest rows using DML MERGE on the primary key (dataset, object_uri)."""
    rows = list(rows)
    if not rows:
        return

    table = bigquery.TableReference.from_string(MANIFEST_TABLE)
    temp_table_id = f"{MANIFEST_TABLE}_staging"
    staging_table = bigquery.Table(temp_table_id, schema=MANIFEST_SCHEMA)
    staging_table.time_partitioning = bigquery.TimePartitioning(field="capture_ts", type_=bigquery.TimePartitioningType.HOUR)
    staging_table.clustering_fields = ["dataset", "partition_key"]
    bq_client.create_table(staging_table, exists_ok=True)

    job_config = bigquery.LoadJobConfig(
        write_disposition="WRITE_TRUNCATE",
        schema=MANIFEST_SCHEMA,
    )
    job = bq_client.load_table_from_json(
        rows,
        temp_table_id,
        job_config=job_config,
    )
    job.result()

    merge_sql = f"""\n    MERGE `{MANIFEST_TABLE}` AS target\n    USING `{temp_table_id}` AS source\n      ON target.dataset = source.dataset\n     AND target.object_uri = source.object_uri\n    WHEN MATCHED THEN\n      UPDATE SET\n        status = source.status,\n        status_updated_ts = source.status_updated_ts,\n        checksum = source.checksum,\n        payload_bytes = source.payload_bytes,\n        attributes = TO_JSON(source.attributes)\n    WHEN NOT MATCHED THEN\n      INSERT (`dataset`, `object_uri`, `capture_ts`, `partition_key`, `status`, `status_updated_ts`, `checksum`, `payload_bytes`, `attributes`)\n      VALUES (source.dataset, source.object_uri, source.capture_ts, CAST(source.partition_key AS STRING), source.status, source.status_updated_ts, source.checksum, source.payload_bytes, TO_JSON(source.attributes))\n    """
    bq_client.query(merge_sql).result()
    bq_client.delete_table(temp_table_id, not_found_ok=True)

In [25]:
sample_row = {
    "dataset": "vehicle-positions",
    "object_uri": "gs://sample-bucket/raw/vehicle_positions/2025-10-21T22:00:05Z.json",
    "capture_ts": dt.datetime.utcnow().isoformat(),
    "partition_key": "2025-10-21T22:00",
    "status": "new",
    "status_updated_ts": dt.datetime.utcnow().isoformat(),
    "checksum": "deadbeef",
    "payload_bytes": 12345,
    "attributes": {"notes": "Stage1 smoke test"},
}

upsert_manifest_rows([sample_row])
display(bq_client.query(
    f"SELECT dataset, object_uri, status, status_updated_ts FROM `{MANIFEST_TABLE}` ORDER BY status_updated_ts DESC LIMIT 5"
).to_dataframe())

E0000 00:00:1761075598.490796 6836922 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


Unnamed: 0,dataset,object_uri,status,status_updated_ts
0,vehicle-positions,gs://sample-bucket/raw/vehicle_positions/2025-...,new,2025-10-21 19:39:48.560749+00:00
1,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,2025-10-21 19:12:37.183684+00:00
2,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,2025-10-21 19:12:36.888786+00:00
3,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,2025-10-21 19:12:36.498692+00:00
4,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,2025-10-21 19:12:36.191643+00:00


## Next · Stage 2 setup
With the manifest scaffolding ready, the next step will simulate raw ingestion:
- Reuse existing fetch/transform code to land a handful of vehicle-position snapshots in GCS.
- Record their URIs via `upsert_manifest_rows`, validating idempotency and timing metrics.
We will add these cells once you’re ready to move into Stage 2.

## Stage 2 · Simulate raw ingestion
Objectives:
- Pull a handful of recent vehicle-position snapshots (or synthetic payloads) into the raw bucket.
- Record each object via `upsert_manifest_rows`, capturing partition keys and sizes.
- Demonstrate idempotency by running the manifest write twice without creating duplicates.

In [26]:
from pathlib import Path
import random
import time

RAW_SAMPLE_DIR = Path("../queries/scheduled")  # reuse existing SQL directory for mock payloads
SIMULATED_DATASET = "vehicle-positions"

def generate_mock_payload(seed: int) -> bytes:
    random.seed(seed)
    payload = json.dumps({
        "vehicle_id": f"veh-{seed:05d}",
        "trip_id": f"trip-{seed:05d}",
        "timestamp": dt.datetime.utcnow().isoformat(),
        "lat": -36.8485 + random.random() * 0.01,
        "lng": 174.7633 + random.random() * 0.01,
    })
    return f"{payload}\n".encode("utf-8")

def write_sample_objects(count: int = 5) -> list[dict]:
    bucket = gcs_client.bucket(RAW_BUCKET)
    written = []
    for idx in range(count):
        payload = generate_mock_payload(idx)
        capture_ts = dt.datetime.utcnow()
        partition_key = capture_ts.strftime("%Y-%m-%dT%H:%M")
        object_name = f"{RAW_PREFIX}{capture_ts:%Y/%m/%d/%H/}{capture_ts:%M%S}-{idx:02d}.json"
        blob = bucket.blob(object_name)
        blob.metadata = {"dataset": SIMULATED_DATASET}
        blob.upload_from_string(payload, content_type="application/json")
        written.append({
            "dataset": SIMULATED_DATASET,
            "object_uri": f"gs://{RAW_BUCKET}/{object_name}",
            "capture_ts": capture_ts.isoformat(),
            "partition_key": partition_key,
            "status": "new",
            "status_updated_ts": capture_ts.isoformat(),
            "checksum": blob.crc32c,
            "payload_bytes": len(payload),
            "attributes": {"content_type": blob.content_type},
        })
        time.sleep(0.2)  # ensure unique timestamps
    return written

In [27]:
sample_manifest_rows = write_sample_objects(count=5)
print("Uploaded objects:")
for row in sample_manifest_rows:
    print(" •", row["object_uri"], row["payload_bytes"], "bytes")

upsert_manifest_rows(sample_manifest_rows)
result_df = bq_client.query(
    """
    SELECT dataset, object_uri, status, payload_bytes, status_updated_ts
    FROM `{table}`
    WHERE dataset = @dataset
    ORDER BY status_updated_ts DESC
    LIMIT 10
    """.format(table=MANIFEST_TABLE),
    job_config=bigquery.QueryJobConfig(query_parameters=[
        bigquery.ScalarQueryParameter("dataset", "STRING", SIMULATED_DATASET),
    ]),
).to_dataframe()
display(result_df)

Uploaded objects:
 • gs://auckland-data-dev/raw/vehicle_positions/2025/10/21/19/4000-00.json 150 bytes
 • gs://auckland-data-dev/raw/vehicle_positions/2025/10/21/19/4001-01.json 150 bytes
 • gs://auckland-data-dev/raw/vehicle_positions/2025/10/21/19/4001-02.json 150 bytes
 • gs://auckland-data-dev/raw/vehicle_positions/2025/10/21/19/4002-03.json 151 bytes
 • gs://auckland-data-dev/raw/vehicle_positions/2025/10/21/19/4002-04.json 149 bytes


E0000 00:00:1761075614.728443 6836922 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


Unnamed: 0,dataset,object_uri,status,payload_bytes,status_updated_ts
0,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,149,2025-10-21 19:40:02.528893+00:00
1,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,151,2025-10-21 19:40:02.231125+00:00
2,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,150,2025-10-21 19:40:01.912734+00:00
3,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,150,2025-10-21 19:40:01.602847+00:00
4,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,150,2025-10-21 19:40:00.443062+00:00
5,vehicle-positions,gs://sample-bucket/raw/vehicle_positions/2025-...,new,12345,2025-10-21 19:39:48.560749+00:00
6,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,148,2025-10-21 19:12:37.183684+00:00
7,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,150,2025-10-21 19:12:36.888786+00:00
8,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,149,2025-10-21 19:12:36.498692+00:00
9,vehicle-positions,gs://auckland-data-dev/raw/vehicle_positions/2...,new,149,2025-10-21 19:12:36.191643+00:00


## Stage 3 · Batch assembly strategies
Stage 3 explores how to bundle manifest-listed raw objects into larger batches for downstream loading. We will benchmark two approaches:
- **Parallel downloads** – fetch objects concurrently to a local working directory for client-side concatenation.
- **Server-side compose** – let GCS assemble a composite object, reducing egress but subject to the 32-component limit.
The helpers below surface the candidate manifest rows, implement both strategies, and capture timing/size metrics so we can compare trade-offs before choosing a production implementation.

In [28]:
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Sequence

BATCH_PREFIX = os.environ.get("BATCH_PREFIX", f"{RAW_PREFIX}batches/")

def list_manifest_candidates(dataset: str, *, status: str = "new", limit: int = 16) -> list[dict]:
    """Fetch manifest rows for the given dataset, ordered by most recent capture."""
    limit = max(1, min(int(limit), 32))  # compose hard-limit is 32 components
    query = f"""
    SELECT dataset, object_uri, payload_bytes, capture_ts, partition_key
    FROM `{MANIFEST_TABLE}`
    WHERE dataset = @dataset
      AND status = @status
    ORDER BY capture_ts DESC
    LIMIT {limit}
    """
    job_config = bigquery.QueryJobConfig(query_parameters=[
        bigquery.ScalarQueryParameter("dataset", "STRING", dataset),
        bigquery.ScalarQueryParameter("status", "STRING", status),
    ])
    df = bq_client.query(query, job_config=job_config).to_dataframe()
    return df.to_dict("records")

def split_gs_uri(uri: str) -> tuple[str, str]:
    if not uri.startswith("gs://"):
        raise ValueError(f"Invalid GCS URI: {uri}")
    bucket, _, blob = uri[5:].partition("/")
    if not bucket or not blob:
        raise ValueError(f"Invalid GCS URI: {uri}")
    return bucket, blob

def benchmark_parallel_download(rows: Sequence[dict], *, max_workers: int = 4) -> dict:
    """Download objects concurrently to a temp directory and report timing stats."""
    if not rows:
        raise ValueError("No manifest rows supplied for parallel download.")
    bucket = gcs_client.bucket(RAW_BUCKET)
    start = time.perf_counter()
    tmpdir = tempfile.TemporaryDirectory()
    target_dir = Path(tmpdir.name)
    downloaded: list[Path] = []

    def _download(row: dict) -> Path:
        bucket_name, blob_name = split_gs_uri(row["object_uri"])
        if bucket_name != RAW_BUCKET:
            raise ValueError(f"Unexpected bucket {bucket_name}, expected {RAW_BUCKET}.")
        dest_path = target_dir / Path(blob_name).name
        blob = bucket.blob(blob_name)
        blob.download_to_filename(dest_path)
        return dest_path

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(_download, row): row for row in rows}
        for future in as_completed(futures):
            downloaded.append(future.result())

    duration = time.perf_counter() - start
    total_bytes = sum(path.stat().st_size for path in downloaded)
    tmpdir.cleanup()
    return {
        "strategy": "parallel_download",
        "object_count": len(rows),
        "total_bytes": total_bytes,
        "duration_sec": round(duration, 3),
    }

def benchmark_server_compose(rows: Sequence[dict], *, prefix: str | None = None) -> dict:
    """Compose objects server-side into a single blob and report timing stats."""
    if not rows:
        raise ValueError("No manifest rows supplied for server-side compose.")
    if len(rows) > 32:
        raise ValueError("GCS compose supports at most 32 components in a single call.")
    bucket = gcs_client.bucket(RAW_BUCKET)
    prefix = prefix or BATCH_PREFIX
    if not prefix.endswith("/"):
        prefix = f"{prefix}/"
    compose_name = f"{prefix}{dt.datetime.utcnow():%Y/%m/%d/%H%M%S%f}-batch.json"
    destination_blob = bucket.blob(compose_name)
    source_blobs = [bucket.blob(split_gs_uri(row["object_uri"])[1]) for row in rows]
    start = time.perf_counter()
    destination_blob.compose(source_blobs)
    duration = time.perf_counter() - start
    destination_blob.reload()
    total_bytes = destination_blob.size or sum((row.get("payload_bytes") or 0) for row in rows)
    destination_blob.delete()
    return {
        "strategy": "server_compose",
        "object_count": len(rows),
        "total_bytes": total_bytes,
        "duration_sec": round(duration, 3),
        "destination_uri": f"gs://{RAW_BUCKET}/{compose_name}",
    }

candidate_rows = list_manifest_candidates(SIMULATED_DATASET, limit=8)
if not candidate_rows:
    candidate_rows = write_sample_objects(count=8)
    upsert_manifest_rows(candidate_rows)
    candidate_rows = list_manifest_candidates(SIMULATED_DATASET, limit=8)

print(f"Benchmarking with {len(candidate_rows)} manifest rows (status='new').")
benchmarks = [
    benchmark_parallel_download(candidate_rows, max_workers=4),
    benchmark_server_compose(candidate_rows),
]
display(pd.DataFrame(benchmarks))

E0000 00:00:1761075618.110292 6836922 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


Benchmarking with 8 manifest rows (status='new').


Unnamed: 0,strategy,object_count,total_bytes,duration_sec,destination_uri
0,parallel_download,8,1197,0.392,
1,server_compose,8,1197,0.414,gs://auckland-data-dev/raw/vehicle_positions/b...


## Stage 4 · Batch load and manifest updates
Stage 4 wires everything together: compose a batch artifact, load it into a BigQuery staging table, run an idempotent MERGE into the curated table, and finally flip the corresponding manifest rows to `processed`. This mirrors the production flow we expect once the manifest-driven orchestrator takes over.

In [None]:
CURATED_DATASET = os.environ.get("CURATED_DATASET", BQ_DATASET)
CURATED_TABLE = os.environ.get("CURATED_TABLE", f"{CURATED_DATASET}.vehicle_positions_curated")
CURATED_TABLE_FQN = f"{PROJECT_ID}.{CURATED_TABLE}"
STAGING_TABLE_FQN = f"{CURATED_TABLE_FQN}_staging"

CURATED_SCHEMA = [
    bigquery.SchemaField("vehicle_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("trip_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("event_ts", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("lat", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("lng", "FLOAT", mode="NULLABLE"),
]

MERGE_PRIMARY_FIELDS = ["vehicle_id", "event_ts"]


def ensure_curated_tables() -> None:
    """Create curated and staging tables used for batch loads."""
    dataset_ref = bigquery.Dataset(f"{PROJECT_ID}.{CURATED_DATASET}")
    dataset_ref.location = "australia-southeast1"
    bq_client.create_dataset(dataset_ref, exists_ok=True)

    curated_table = bigquery.Table(CURATED_TABLE_FQN, schema=CURATED_SCHEMA)
    curated_table.time_partitioning = bigquery.TimePartitioning(field="event_ts", type_=bigquery.TimePartitioningType.HOUR)
    curated_table.clustering_fields = ["vehicle_id"]
    bq_client.create_table(curated_table, exists_ok=True)

    staging_table = bigquery.Table(STAGING_TABLE_FQN, schema=CURATED_SCHEMA)
    bq_client.create_table(staging_table, exists_ok=True)


def compose_batch(rows: Sequence[dict], *, prefix: str | None = None) -> storage.Blob:
    """Compose manifest-listed objects into a single newline-delimited JSON blob."""
    if not rows:
        raise ValueError("No manifest rows supplied for batch compose.")
    if len(rows) > 32:
        raise ValueError("GCS compose supports at most 32 components per request.")
    bucket = gcs_client.bucket(RAW_BUCKET)
    prefix = prefix or BATCH_PREFIX
    if not prefix.endswith("/"):
        prefix = f"{prefix}/"
    compose_name = f"{prefix}{dt.datetime.utcnow():%Y/%m/%d/%H%M%S%f}-vehicle-positions.json"
    destination_blob = bucket.blob(compose_name)
    source_blobs = [bucket.blob(split_gs_uri(row["object_uri"])[1]) for row in rows]
    destination_blob.compose(source_blobs)
    destination_blob.reload()
    return destination_blob


def load_batch_to_staging(payload_rows: Sequence[dict]) -> int:
    """Load transformed rows into the staging table and return inserted rows."""
    if not payload_rows:
        return 0
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        schema=CURATED_SCHEMA,
        write_disposition="WRITE_TRUNCATE",
    )
    job = bq_client.load_table_from_json(payload_rows, STAGING_TABLE_FQN, job_config=job_config)
    job.result()
    staging_table = bq_client.get_table(STAGING_TABLE_FQN)
    return staging_table.num_rows or 0


def merge_staging_into_curated() -> None:
    """Run an idempotent MERGE from the staging table into the curated table."""
    merge_sql = f"""
    MERGE `{CURATED_TABLE_FQN}` AS curated
    USING `{STAGING_TABLE_FQN}` AS staging
      ON {" AND ".join([f"curated.{field} = staging.{field}" for field in MERGE_PRIMARY_FIELDS])}
    WHEN MATCHED THEN
      UPDATE SET
        trip_id = staging.trip_id,
        lat = staging.lat,
        lng = staging.lng
    WHEN NOT MATCHED THEN
      INSERT (`vehicle_id`, `trip_id`, `event_ts`, `lat`, `lng`)
      VALUES (staging.vehicle_id, staging.trip_id, staging.event_ts, staging.lat, staging.lng)
    """
    bq_client.query(merge_sql).result()


def transform_batch_blob(batch_blob: storage.Blob) -> list[dict]:
    """Convert batch payload into the curated schema for loading."""
    payload = batch_blob.download_as_text()
    decoder = json.JSONDecoder()
    idx = 0
    length = len(payload)
    transformed: list[dict] = []
    while idx < length:
        while idx < length and payload[idx].isspace():
            idx += 1
        if idx >= length:
            break
        record, next_idx = decoder.raw_decode(payload, idx)
        idx = next_idx
        transformed.append({
            "vehicle_id": record["vehicle_id"],
            "trip_id": record["trip_id"],
            "event_ts": record["timestamp"],
            "lat": record.get("lat"),
            "lng": record.get("lng"),
        })
    return transformed


def mark_manifest_processed(rows: Sequence[dict], *, batch_uri: str) -> None:
    """Update manifest rows to reflect that their payloads were loaded."""
    updates = []
    now_iso = dt.datetime.utcnow().isoformat()
    for row in rows:
        updates.append({
            "dataset": row["dataset"],
            "object_uri": row["object_uri"],
            "capture_ts": row["capture_ts"],
            "partition_key": row["partition_key"],
            "status": "processed",
            "status_updated_ts": now_iso,
            "checksum": row.get("checksum"),
            "payload_bytes": row.get("payload_bytes"),
            "attributes": {"batch_uri": batch_uri},
        })
    upsert_manifest_rows(updates)


In [30]:
ensure_curated_tables()

candidate_rows = list_manifest_candidates(SIMULATED_DATASET, status="new", limit=8)
if not candidate_rows:
    candidate_rows = write_sample_objects(count=8)
    upsert_manifest_rows(candidate_rows)
    candidate_rows = list_manifest_candidates(SIMULATED_DATASET, status="new", limit=8)

print(f"Stage 4 using {len(candidate_rows)} manifest rows.")
batch_blob = compose_batch(candidate_rows)
print("Composed batch URI:", f"gs://{batch_blob.bucket.name}/{batch_blob.name}")

payload_rows = transform_batch_blob(batch_blob)
inserted = load_batch_to_staging(payload_rows)
print(f"Loaded {inserted} rows into staging table {STAGING_TABLE_FQN}.")

merge_staging_into_curated()
print(f"MERGE complete for curated table {CURATED_TABLE_FQN}.")

mark_manifest_processed(candidate_rows, batch_uri=f"gs://{batch_blob.bucket.name}/{batch_blob.name}")
print("Manifest rows marked as processed.")

curated_preview = bq_client.query(
    f"""
    SELECT vehicle_id, trip_id, event_ts, lat, lng
    FROM `{CURATED_TABLE_FQN}`
    ORDER BY event_ts DESC
    LIMIT 10
    """
).to_dataframe()

display(curated_preview)

E0000 00:00:1761075623.155728 6836922 alts_credentials.cc:93] ALTS creds ignored. Not running on GCP and untrusted ALTS is not enabled.


Stage 4 using 8 manifest rows.
Composed batch URI: gs://auckland-data-dev/raw/vehicle_positions/batches/2025/10/21/194024889710-vehicle-positions.json
Composed batch URI: gs://auckland-data-dev/raw/vehicle_positions/batches/2025/10/21/194024889710-vehicle-positions.json


JSONDecodeError: Extra data: line 1 column 149 (char 148)