# 01 — Data Collection

**Project:** Fentanyl Overdose Death (ODD) Analysis  
**Author:** Mitchell Yorkey 
**Last Updated:** 2026-02-19  

## Purpose
This notebook pulls provisional drug overdose death counts from the CDC's Vital Statistics Rapid Release (VSRR) system. It uses the CDC's Socrata API as the primary source and falls back to a local CSV if the API is unavailable. A timestamped snapshot is saved to `data/raw/` for reproducibility.

## Data Source
- **Dataset:** VSRR Provisional Drug Overdose Death Counts  
- **Publisher:** CDC / National Center for Health Statistics (NCHS)  
- **URL:** https://data.cdc.gov/National-Center-for-Health-Statistics/VSRR-Provisional-Drug-Overdose-Death-Counts/xkb8-kh2a  
- **API Endpoint:** `https://data.cdc.gov/resource/xkb8-kh2a.json`  
- **Update Frequency:** Monthly (provisional, subject to revision)  

---
## 1. Imports and Configuration

In [1]:
import requests
import pandas as pd
import numpy as np
import os
from datetime import datetime
from pathlib import Path

# ── Paths ──────────────────────────────────────────────────────────────────
ROOT = Path().resolve().parent  # assumes notebooks/ folder
RAW_DIR = ROOT / "data" / "raw"
PROCESSED_DIR = ROOT / "data" / "processed"

RAW_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

# ── API Config ─────────────────────────────────────────────────────────────
DATASET_ID = "xkb8-kh2a"
API_BASE = f"https://data.cdc.gov/resource/{DATASET_ID}.json"
PAGE_SIZE = 50000
APP_TOKEN = "gyCKnDytuJigWWnFNtsPElXA5"

# ── Fallback CSV path ──────────────────────────────────────────────────────
FALLBACK_CSV = RAW_DIR / "vsrr_overdose_fallback.csv"

print(f"Root:      {ROOT}")
print(f"Raw dir:   {RAW_DIR}")
print(f"Processed: {PROCESSED_DIR}")
print(f"App token: set ✓")

Root:      C:\Users\Mitch\Documents
Raw dir:   C:\Users\Mitch\Documents\data\raw
Processed: C:\Users\Mitch\Documents\data\processed
App token: set ✓


---
## 2. API Pull Function (with Pagination)

The CDC uses the **Socrata Open Data API (SODA)**. Results are paginated — we request 50,000 rows at a time and loop until the dataset is fully pulled. If the API fails for any reason, we fall back to a local CSV.

In [None]:
def fetch_cdc_vsrr_api(app_token=None, page_size=50000):
    """
    Pull all records from the CDC VSRR dataset via the Socrata API.
    Handles pagination automatically.

    Parameters
    ----------
    app_token : str or None
        Socrata app token for higher rate limits.
    page_size : int
        Number of records per API request.

    Returns
    -------
    pd.DataFrame or None
        Full dataset, or None if the pull failed.
    """
    headers = {"X-App-Token": app_token} if app_token else {}
    all_records = []
    offset = 0

    print("Fetching from CDC Socrata API...")

    while True:
        params = {
            "$limit": page_size,
            "$offset": offset,
            "$order": ":id"
        }

        try:
            response = requests.get(API_BASE, headers=headers, params=params, timeout=30)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print(f"  ✗ API request failed at offset {offset}: {e}")
            return None

        batch = response.json()

        if not batch:
            break

        all_records.extend(batch)
        print(f"  Fetched {len(all_records):,} records so far...")

        if len(batch) < page_size:
            break

        offset += page_size

    if all_records:
        print(f"  ✓ API pull complete. Total records: {len(all_records):,}")
        return pd.DataFrame(all_records)
    else:
        print("  ✗ No records returned from API.")
        return None

---
## 3. Fallback: Load from Local CSV

In [None]:
def load_fallback_csv(path):
    """
    Load the manually downloaded CDC VSRR CSV as a fallback.

    How to get this file manually:
    1. Go to https://data.cdc.gov/National-Center-Health-Statistics/
       VSRR-Provisional-Drug-Overdose-Death-Counts/xkb8-kh2a
    2. Click Export → CSV
    3. Save to data/raw/vsrr_overdose_fallback.csv
    """
    if Path(path).exists():
        print(f"Loading fallback CSV from: {path}")
        df = pd.read_csv(path, dtype=str)
        print(f"  ✓ Loaded {len(df):,} rows from CSV fallback.")
        return df
    else:
        print(f"  ✗ No fallback CSV found at {path}")
        print("    → Download manually from the CDC website and save to data/raw/vsrr_overdose_fallback.csv")
        return None

---
## 4. Main Ingestion Logic — API with CSV Fallback

In [None]:
# ── Try API first ──────────────────────────────────────────────────────────
df_raw = fetch_cdc_vsrr_api(app_token=APP_TOKEN)

# ── Fall back to CSV if API failed ─────────────────────────────────────────
if df_raw is None:
    print("\nFalling back to local CSV...")
    df_raw = load_fallback_csv(FALLBACK_CSV)

if df_raw is None:
    raise RuntimeError(
        "Both API and CSV fallback failed. "
        "Check your internet connection or download the CSV manually."
    )

print(f"\nDataset shape: {df_raw.shape}")
print(f"Columns: {df_raw.columns.tolist()}")

---
## 5. Save Raw Snapshot with Timestamp

We save an exact snapshot of what we pulled. This is critical for reproducibility — 
CDC provisional data gets revised retroactively, so a dated snapshot lets you 
know exactly what you were working with.

In [None]:
timestamp = datetime.now().strftime("%Y%m%d")
snapshot_path = RAW_DIR / f"vsrr_overdose_raw_{timestamp}.csv"

df_raw.to_csv(snapshot_path, index=False)
print(f"✓ Raw snapshot saved to: {snapshot_path}")
print(f"  Rows: {len(df_raw):,} | Columns: {len(df_raw.columns)}")

---
## 6. Initial Inspection

Before cleaning anything, just look at what we have.

In [None]:
df_raw.head(10)

In [None]:
df_raw.dtypes

In [None]:
# What states/jurisdictions are included?
print("Unique states/jurisdictions:")
state_col_guess = next((c for c in df_raw.columns if 'state' in c.lower()), None)
print(sorted(df_raw[state_col_guess].unique()) if state_col_guess else "Could not detect state column")

In [None]:
# What indicator types exist?
indicator_col_guess = next((c for c in df_raw.columns if 'indicator' in c.lower()), None)
print("Unique indicators (cause-of-death categories):")
if indicator_col_guess:
    for ind in sorted(df_raw[indicator_col_guess].unique()):
        print(f"  {ind}")

In [None]:
# Date range and null check
print("Year range:", df_raw['year'].unique() if 'year' in df_raw.columns else 'see columns above')
print("\nNull counts per column:")
print(df_raw.isnull().sum())

---
## 7. Light Cleaning & Save to Processed

Minimal cleaning here — type casting and column standardization only. 
Deeper analytical filtering happens in `02_eda_national_trend.ipynb`.

In [None]:
df = df_raw.copy()

# ── Standardize column names to snake_case ─────────────────────────────────
df.columns = (
    df.columns
    .str.strip()
    .str.lower()
    .str.replace(" ", "_")
    .str.replace(r"[^a-z0-9_]", "", regex=True)
)

print("Cleaned column names:")
print(df.columns.tolist())

In [None]:
# ── Cast types ─────────────────────────────────────────────────────────────
# 'data_value' contains suppressed values marked as 'Suppressed' or 'Missing'
# for small counts — coerce these to NaN.

count_col = next((c for c in df.columns if 'data_value' in c or 'death' in c), None)
print(f"Death count column identified as: '{count_col}'")

if count_col:
    df[count_col] = pd.to_numeric(df[count_col], errors='coerce')

if 'year' in df.columns:
    df['year'] = pd.to_numeric(df['year'], errors='coerce').astype('Int64')

if 'month' in df.columns:
    df['month'] = df['month'].astype(str).str.strip()

# Create a proper date column for time series plotting
if 'year' in df.columns and 'month' in df.columns:
    df['date'] = pd.to_datetime(
        df['year'].astype(str) + '-' + df['month'].str[:3],
        format='%Y-%b',
        errors='coerce'
    )
    print(f"Date range: {df['date'].min()} to {df['date'].max()}")

df.dtypes

In [None]:
# ── Save processed file ────────────────────────────────────────────────────
processed_path = PROCESSED_DIR / f"vsrr_overdose_processed_{timestamp}.csv"
df.to_csv(processed_path, index=False)
print(f"✓ Processed data saved to: {processed_path}")

# Also save a 'latest' version so downstream notebooks don't need a hardcoded date
latest_path = PROCESSED_DIR / "vsrr_overdose_latest.csv"
df.to_csv(latest_path, index=False)
print(f"✓ Also saved as: {latest_path} (overwritten on each pull)")

---
## 8. Quick Sanity Check — National Synthetic Opioid Deaths

Filter to synthetic opioid deaths at the national level and confirm 
we can see the rise-and-fall shape described in Vangelov et al. (2026).

> The article reports fentanyl ODDs peaking at **76,000 in 2023** and 
> dropping by over a third by end of 2024. We should see that shape here.

In [None]:
import matplotlib.pyplot as plt

state_col = next((c for c in df.columns if 'state' in c), None)
indicator_col = next((c for c in df.columns if 'indicator' in c), None)

print(f"State column:     '{state_col}'")
print(f"Indicator column: '{indicator_col}'")

mask = (
    (df[state_col].str.contains("United States", na=False)) &
    (df[indicator_col].str.contains("Synthetic opioids", na=False))
)
df_national_synth = df[mask].copy()
df_national_synth = df_national_synth.sort_values('date').dropna(subset=['date', count_col])

print(f"\nRows matching filter: {len(df_national_synth)}")
df_national_synth[['date', count_col]].tail(12)

In [None]:
fig, ax = plt.subplots(figsize=(12, 5))

ax.plot(
    df_national_synth['date'],
    df_national_synth[count_col],
    color='#c0392b',
    linewidth=2,
    label='Synthetic Opioid Deaths (provisional)'
)

ax.axvline(
    pd.Timestamp('2023-05-01'),
    color='gray', linestyle='--', alpha=0.7,
    label='May 2023 peak (Vangelov et al.)'
)

ax.set_title('US Provisional Synthetic Opioid Overdose Deaths Over Time', fontsize=14)
ax.set_xlabel('Date')
ax.set_ylabel('Death Count (provisional)')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()

fig_path = ROOT / "outputs" / "figures" / "01_sanity_check_national_trend.png"
fig_path.parent.mkdir(parents=True, exist_ok=True)
plt.savefig(fig_path, dpi=150)
print(f"✓ Figure saved to: {fig_path}")
plt.show()

---
## 9. Session Summary

In [None]:
print("=" * 55)
print("DATA COLLECTION SUMMARY")
print("=" * 55)
print(f"Run date:          {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"Source:            CDC VSRR API (xkb8-kh2a)")
print(f"Total rows pulled: {len(df):,}")
print(f"Date range:        {df['date'].min().date()} to {df['date'].max().date()}")
print(f"Raw snapshot:      {snapshot_path.name}")
print(f"Processed file:    {processed_path.name}")
print("")
print("Next step → 02_eda_national_trend.ipynb")
print("=" * 55)