# BLS PR Raw Sync (Mirror Ingestion) + Run Metadata (JSON)

This notebook performs a **raw-zone mirror sync** of the BLS *PR time series* files from the public BLS directory into the lakehouse:

- **Source:** `https://download.bls.gov/pub/time.series/pr/`
- **Target:** `/Volumes/rearc_quest/lakehouse/raw_bls`

## What it does
1. **Discovers upstream files** by parsing the BLS directory listing and selecting only `pr.*` files.
2. **Downloads each file** using an HTTP session configured with **retry/backoff** for transient failures (429 / 5xx).
3. **Writes new files** to the target directory.
4. **Updates existing files** only when content changes, using **strict full-file SHA-256 hashing** (not timestamp-based).
5. **Deletes local files** that no longer exist upstream when `ENABLE_DELETE=True` (mirror behavior).
   - Uses `PROTECTED_FILES` as guardrails (never delete these).

## Observability / Audit trail
Every run writes metadata as JSON:
- **Per-run file (append-only):** `.../_meta/runs/<run_id>.json`
- **Latest pointer:** `.../_meta/latest.json`

The metadata includes:
- run timestamps, duration, status
- counts (uploaded/updated/skipped/deleted)
- any per-file errors encountered

## Notes
- It is a **raw ingestion** pattern: preserve upstream file boundaries and overwrite only when content differs.
- Using a stable content hash makes reruns **idempotent** and avoids unnecessary rewrites.



In [0]:
import re
import requests
import hashlib
import json
import datetime as dt
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# ----------------------------
# Config
# ----------------------------
DATA_SOURCE = "https://download.bls.gov/pub/time.series/pr/"
TARGET_DIR  = "/Volumes/rearc_quest/lakehouse/raw_bls"

# Mirror-delete behavior:
# - True  : delete local files not present upstream (mirror sync)
# - False : non-destructive (keep local files even if upstream removes them)
ENABLE_DELETE = True

# Guardrails: never delete these even if ENABLE_DELETE=True
PROTECTED_FILES = {"population.json"}

USER_AGENT = "rearc-quest-contact: rohit.pradhan2995@gmail.com"

# Run metadata paths (JSON-only history)
META_DIR     = f"{TARGET_DIR}/_meta"
RUNS_DIR     = f"{META_DIR}/runs"
LATEST_PATH  = f"{META_DIR}/latest.json"

dbutils.fs.mkdirs(TARGET_DIR)
dbutils.fs.mkdirs(META_DIR)
dbutils.fs.mkdirs(RUNS_DIR)

# ----------------------------
# Robust HTTP session (retry/backoff for 429/5xx)
# ----------------------------
retry_strategy = Retry(
    total=5,
    backoff_factor=1.5,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["GET"],
    raise_on_status=False
)

adapter = HTTPAdapter(max_retries=retry_strategy)

session = requests.Session()
session.headers.update({"User-Agent": USER_AGENT, "Accept": "*/*"})
session.mount("https://", adapter)
session.mount("http://", adapter)

# ----------------------------
# Helpers
# ----------------------------
def sha256_bytes(b: bytes) -> str:
    return hashlib.sha256(b).hexdigest()

def read_local_bytes(volume_path: str) -> bytes:
    """
    Full-file read for strict correctness hashing.
    Tries direct Volume path, then /dbfs fallback.
    """
    candidates = [volume_path]
    if not volume_path.startswith("/dbfs"):
        candidates.append("/dbfs" + volume_path)

    last_err = None
    for p in candidates:
        try:
            with open(p, "rb") as f:
                return f.read()
        except Exception as e:
            last_err = e
    raise last_err

def list_local_files(target_dir: str):
    try:
        return sorted([f.name for f in dbutils.fs.ls(target_dir) if f.isFile()])
    except Exception:
        return []

def list_remote_files():
    """
    Discover upstream files by parsing the BLS directory listing.
    Uses multiple patterns to handle slightly different listing formats.
    """
    r = session.get(DATA_SOURCE, timeout=60, allow_redirects=True)
    r.raise_for_status()
    text = r.text

    patterns = [
        r'href="(pr\.[^"]+)"',
        r"href='(pr\.[^']+)'",
        r'href=(pr\.[^\s>]+)',
        r'>(pr\.[^<\s]+)<',
    ]
    files = set()
    for p in patterns:
        files.update(re.findall(p, text))

    # Defensive: keep scope tight to `pr.*`
    return sorted([f for f in files if f and f.startswith("pr.")])



In [0]:

# ----------------------------
# Create run_id + run metadata
# ----------------------------
start_ts = dt.datetime.utcnow()

run_utc = start_ts.replace(microsecond=0).isoformat() + "Z"
run_id_seed = f"{run_utc}|{DATA_SOURCE}|{TARGET_DIR}"
run_id = dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + "_" + hashlib.sha256(run_id_seed.encode("utf-8")).hexdigest()[:8]

RUN_META_PATH = f"{RUNS_DIR}/{run_id}.json"

run_info = {
    "run_id": run_id,
    "run_utc": run_utc,
    "source": DATA_SOURCE,
    "target_dir": TARGET_DIR,

    "delete_enabled": ENABLE_DELETE,
    "protected_files": sorted(list(PROTECTED_FILES)),

    "remote_file_count": None,
    "local_file_count_start": None,

    "uploaded": 0,
    "updated": 0,
    "skipped": 0,
    "deleted": 0,

    "status": None,           # "success" | "partial_success" | "failed"
    "errors": [],

    "started_utc": run_utc,
    "ended_utc": None,
    "duration_seconds": None
}

try:
    # 1) Snapshot local state
    local_files = set(list_local_files(TARGET_DIR))
    run_info["local_file_count_start"] = len(local_files)
    delete_candidates = set(local_files)

    # 2) Snapshot remote state
    remote_files = list_remote_files()
    run_info["remote_file_count"] = len(remote_files)
    print("Remote files:", len(remote_files))

    # 3) Download & sync (strict full-file hash compare)
    for file_name in remote_files:
        if not file_name or file_name.strip() == "" or file_name == "[To Parent Directory]":
            continue

        url = DATA_SOURCE + file_name
        dst_path = f"{TARGET_DIR}/{file_name}"

        try:
            resp = session.get(url, timeout=120)
            resp.raise_for_status()

            remote_content = resp.content
            remote_hash = sha256_bytes(remote_content)

            if file_name not in local_files:
                # New file
                dbutils.fs.put(dst_path, remote_content.decode("utf-8", errors="replace"), overwrite=True)
                run_info["uploaded"] += 1
            else:
                # Existing file: strict correctness full-file hash
                existing_bytes = read_local_bytes(dst_path)
                existing_hash = sha256_bytes(existing_bytes)

                if existing_hash != remote_hash:
                    dbutils.fs.put(dst_path, remote_content.decode("utf-8", errors="replace"), overwrite=True)
                    run_info["updated"] += 1
                else:
                    run_info["skipped"] += 1

            # If it exists upstream, it's not a delete candidate
            delete_candidates.discard(file_name)

        except Exception as e:
            run_info["errors"].append({"file": file_name, "url": url, "error": str(e)})

    print(f"Uploaded={run_info['uploaded']}, Updated={run_info['updated']}, Skipped={run_info['skipped']}")

    # 4) Delete removed files (mirror behavior)
    if ENABLE_DELETE:
        deleted = 0
        for file_name in list(delete_candidates):
            if file_name in PROTECTED_FILES:
                continue
            try:
                dbutils.fs.rm(f"{TARGET_DIR}/{file_name}", recurse=False)
                deleted += 1
            except Exception as e:
                run_info["errors"].append({"file": file_name, "url": None, "error": f"delete_failed: {str(e)}"})
        run_info["deleted"] = deleted
        print("Deleted files:", deleted)

    # 5) Status
    run_info["status"] = "partial_success" if run_info["errors"] else "success"
    if run_info["errors"]:
        print(f"Errors: {len(run_info['errors'])} (see per-run JSON in _meta/runs/)")

except Exception as e:
    run_info["status"] = "failed"
    run_info["errors"].append({"file": None, "url": DATA_SOURCE, "error": f"fatal: {str(e)}"})
    raise

finally:
    end_ts = dt.datetime.utcnow()
    run_info["ended_utc"] = end_ts.replace(microsecond=0).isoformat() + "Z"
    run_info["duration_seconds"] = int((end_ts - start_ts).total_seconds())

    # Always write per-run metadata + latest pointer
    dbutils.fs.put(RUN_META_PATH, json.dumps(run_info, indent=2), overwrite=False)
    dbutils.fs.put(LATEST_PATH, json.dumps(run_info, indent=2), overwrite=True)

    print("Wrote run metadata:", RUN_META_PATH)
    print("Updated latest pointer:", LATEST_PATH)
    print("Done. Target dir:", TARGET_DIR)

  start_ts = dt.datetime.utcnow()
  run_id = dt.datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") + "_" + hashlib.sha256(run_id_seed.encode("utf-8")).hexdigest()[:8]


Remote files: 12
Uploaded=0, Updated=0, Skipped=12
Deleted files: 0


  end_ts = dt.datetime.utcnow()


Wrote 543 bytes.
Wrote 543 bytes.
Wrote run metadata: /Volumes/rearc_quest/lakehouse/raw_bls/_meta/runs/20260119T010028Z_9d23c934.json
Updated latest pointer: /Volumes/rearc_quest/lakehouse/raw_bls/_meta/latest.json
Done. Target dir: /Volumes/rearc_quest/lakehouse/raw_bls
