# 02 | Bronze Ingestion

## Stage Contract
- Download CSVs from FiveThirtyEight with retries.
- Standardize column names to snake_case.
- Add Bronze metadata columns.
- Write deterministic Parquet outputs to `lakehouse/bronze/<dataset>/data.parquet`.

In [None]:
# Parameters
source = "fivethirtyeight"
dataset = "recent_grads,bechdel_movies"
run_date = "2026-02-22"
force_refresh = False

In [None]:
import sys
from pathlib import Path

ROOT_DIR = Path.cwd()
if str(ROOT_DIR) not in sys.path:
    sys.path.insert(0, str(ROOT_DIR))

In [None]:
import time

import pandas as pd

from src.common.datasets import DATASETS, parse_dataset_argument
from src.common.io import update_pipeline_metrics, update_stage_metrics
from src.common.paths import BRONZE_DIR, RAW_DATA_DIR
from src.common.pipeline import (
    add_bronze_metadata,
    download_csv_with_retries,
    standardize_column_names,
    write_parquet,
)

force_refresh_flag = str(force_refresh).lower() in {'1', 'true', 'yes'}
selected_datasets = parse_dataset_argument(dataset)
batch_id = f"{run_date.replace('-', '')}_{source}"

stage_start = time.perf_counter()
rows_ingested = 0
rows_bronze = 0
summary_rows = []

for dataset_name in selected_datasets:
    config = DATASETS[dataset_name]
    raw_path = RAW_DATA_DIR / config['raw_filename']

    download_csv_with_retries(
        config['url'],
        raw_path,
        force_refresh=force_refresh_flag,
        max_attempts=3,
    )

    raw_df = pd.read_csv(raw_path)
    rows_ingested += len(raw_df)

    standardized_df = standardize_column_names(raw_df)
    bronze_df = add_bronze_metadata(
        standardized_df,
        source=source,
        dataset=dataset_name,
        file_path=raw_path,
        batch_id=batch_id,
    )

    bronze_path = BRONZE_DIR / dataset_name / 'data.parquet'
    write_parquet(bronze_df, bronze_path)

    rows_bronze += len(bronze_df)
    summary_rows.append(
        {
            'dataset': dataset_name,
            'rows_raw': len(raw_df),
            'rows_bronze': len(bronze_df),
            'raw_path': str(raw_path),
            'bronze_path': str(bronze_path),
        }
    )

runtime_seconds = round(time.perf_counter() - stage_start, 2)

update_stage_metrics(
    'bronze',
    {
        'runtime_seconds': runtime_seconds,
        'rows_ingested': rows_ingested,
        'rows_bronze': rows_bronze,
        'datasets_processed': selected_datasets,
        'batch_id': batch_id,
    },
)
update_pipeline_metrics({'rows_ingested': rows_ingested, 'rows_bronze': rows_bronze})

pd.DataFrame(summary_rows)