# Form 5500 Data Downloader

Downloads the following files from the DOL EBSA public database for **2009–2023**:

| Label | File | What it contains |
|---|---|---|
| `main_5500` | F_5500 | Plan identity, type, participants, plan year |
| `schedule_H` | F_SCH_H | Assets, liabilities, income, expenses, equity allocation |
| `schedule_R` | F_SCH_R | Retirement plan info, contribution rates |
| `schedule_R1` | F_SCH_R_PART1 | **Employer roster** — EINs of every contributing employer |

Files are saved to your **Google Drive** at `MyDrive/form5500/raw/{year}/`

**Run cells in order: 1 → 2 → 3 → 4 → 5 → 6**

---
**Estimated download size:** ~15–25 GB total across all years  
**Estimated time:** 45–90 minutes depending on your connection  
**Note:** Colab Pro recommended to avoid session timeouts on the full download.
For a quick test, run Cell 6 with `years=[2019]` first.

In [None]:
# ── Cell 1: Mount Google Drive ───────────────────────────────────────────────
# Your files will be saved here and persist after the session ends.

from google.colab import drive
drive.mount('/content/drive')
print('Drive mounted.')

In [None]:
# ── Cell 2: Imports and Configuration ───────────────────────────────────────

import os
import time
import zipfile
import csv
import requests
import pandas as pd
from pathlib import Path

# ── Where to save everything ──────────────────────────────────────────────
BASE_DIR = Path('/content/drive/MyDrive/form5500/raw')
BASE_DIR.mkdir(parents=True, exist_ok=True)
print(f'Base directory: {BASE_DIR}')

# ── Years to download ─────────────────────────────────────────────────────
# DOL switched to the current EFAST2 format in 2009.
# 2004-2008 have different URLs and file structures — handled separately below.
YEARS = list(range(2009, 2024))  # 2009 through 2023

# ── File types ────────────────────────────────────────────────────────────
# (label_for_your_reference, DOL_filename_stem)
FILE_TYPES = [
    ('main_5500',   'F_5500'),          # Main form: plan identity, type, participants
    ('schedule_H',  'F_SCH_H'),         # Assets, liabilities, equity allocation
    ('schedule_R',  'F_SCH_R'),         # Contribution rates, plan type
    ('schedule_R1', 'F_SCH_R_PART1'),   # EMPLOYER ROSTER — the linchpin file
]

# ── DOL URL base ──────────────────────────────────────────────────────────
# All files follow this pattern:
# https://askebsa.dol.gov/FOIA Files/{year}/Latest/{stem}_{year}_Latest.zip
DOL_BASE = 'https://askebsa.dol.gov/FOIA%20Files'

# ── Retry / politeness settings ───────────────────────────────────────────
MAX_RETRIES    = 3     # retry failed downloads this many times
RETRY_DELAY    = 15   # seconds to wait between retries
PAUSE_BETWEEN  = 3    # seconds to pause between each successful download

print(f'Will download {len(YEARS)} years × {len(FILE_TYPES)} file types = {len(YEARS)*len(FILE_TYPES)} zip files')
print(f'Years: {YEARS[0]}–{YEARS[-1]}')
print(f'File types: {[f[0] for f in FILE_TYPES]}')

In [None]:
# ── Cell 3: Helper Functions ─────────────────────────────────────────────────

def build_url(year, file_stem):
    """Build the DOL download URL for a given year and file stem."""
    filename = f'{file_stem}_{year}_Latest.zip'
    return f'{DOL_BASE}/{year}/Latest/{filename}'


def download_file(url, dest_path, label):
    """
    Download url to dest_path with retry logic.
    Skips if the file already exists and is > 1KB (avoids re-downloading).
    Returns True on success, False on failure.
    """
    dest_path = Path(dest_path)
    dest_path.parent.mkdir(parents=True, exist_ok=True)

    # Skip if already downloaded and non-trivial size
    if dest_path.exists() and dest_path.stat().st_size > 1024:
        size_mb = dest_path.stat().st_size / (1024 * 1024)
        print(f'  [SKIP]  {label} already exists ({size_mb:.1f} MB)')
        return True

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            print(f'  [GET {attempt}/{MAX_RETRIES}] {label}')
            response = requests.get(url, timeout=180, stream=True)

            if response.status_code == 404:
                print(f'  [404]   {label} — file not found at DOL server')
                print(f'          URL: {url}')
                return False

            response.raise_for_status()  # raises on 4xx/5xx

            # Stream to disk in 1 MB chunks
            bytes_written = 0
            with open(dest_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=1024 * 1024):
                    f.write(chunk)
                    bytes_written += len(chunk)

            size_mb = bytes_written / (1024 * 1024)
            print(f'  [OK]    {label} — {size_mb:.1f} MB')
            return True

        except requests.exceptions.Timeout:
            print(f'  [TIMEOUT] {label} attempt {attempt} timed out')
        except requests.exceptions.RequestException as e:
            print(f'  [ERR]   {label} attempt {attempt}: {e}')

        if attempt < MAX_RETRIES:
            print(f'          Retrying in {RETRY_DELAY}s...')
            time.sleep(RETRY_DELAY)
        else:
            print(f'  [FAIL]  {label} — giving up after {MAX_RETRIES} attempts')
            return False


def unzip_file(zip_path, extract_dir):
    """
    Unzip zip_path into extract_dir.
    Skips if CSVs already present. Returns True on success.
    """
    zip_path    = Path(zip_path)
    extract_dir = Path(extract_dir)
    extract_dir.mkdir(parents=True, exist_ok=True)

    # Skip if already unzipped
    existing = list(extract_dir.glob('*.csv'))
    if existing:
        print(f'  [SKIP]  Already unzipped ({len(existing)} CSV files in {extract_dir.name}/)')
        return True

    try:
        with zipfile.ZipFile(zip_path, 'r') as z:
            z.extractall(extract_dir)
        csvs = list(extract_dir.glob('*.csv'))
        print(f'  [UNZIP] Extracted {len(csvs)} file(s) to {extract_dir.name}/')
        return True
    except zipfile.BadZipFile:
        print(f'  [ERR]   Bad zip — {zip_path.name} is corrupt (deleting for re-download)')
        zip_path.unlink(missing_ok=True)
        return False


def count_rows(csv_dir):
    """Count rows in the first CSV found in csv_dir. Returns 0 if none."""
    csvs = list(Path(csv_dir).glob('*.csv'))
    if not csvs:
        return 0
    with open(csvs[0], 'r', encoding='latin-1', errors='replace') as f:
        return sum(1 for _ in f) - 1  # subtract header


print('Helper functions defined.')

In [None]:
# ── Cell 4: Main Download Function ───────────────────────────────────────────

def run_downloads(years=YEARS, file_types=FILE_TYPES, unzip=True, validate=True):
    """
    Download, unzip, and validate all Form 5500 files.
    Returns a list of result dicts; also writes download_manifest.csv to BASE_DIR.
    """
    print('=' * 65)
    print('Form 5500 Downloader')
    print(f'Years: {years[0]}–{years[-1]}  |  Files per year: {len(file_types)}')
    print(f'Save to: {BASE_DIR}')
    print('=' * 65)

    results = []

    for year in years:
        year_dir = BASE_DIR / str(year)
        year_dir.mkdir(parents=True, exist_ok=True)

        print(f'\n{"-"*55}')
        print(f'  YEAR {year}')
        print(f'{"-"*55}')

        for label, stem in file_types:
            url      = build_url(year, stem)
            zip_path = year_dir / f'{stem}_{year}_Latest.zip'
            csv_dir  = year_dir / stem

            # Step 1: Download
            dl_ok = download_file(url, zip_path, f'{year} / {label}')

            row_count = 0
            status    = 'DOWNLOAD_FAIL'

            if dl_ok:
                if unzip:
                    # Step 2: Unzip
                    uz_ok = unzip_file(zip_path, csv_dir)
                    if uz_ok:
                        if validate:
                            # Step 3: Count rows
                            row_count = count_rows(csv_dir)
                            if row_count >= 100:
                                status = 'OK'
                                print(f'  [VAL]   {row_count:,} rows — OK')
                            else:
                                status = 'WARN_LOW_ROWS'
                                print(f'  [WARN]  Only {row_count} rows — check file')
                        else:
                            status = 'UNZIPPED'
                    else:
                        status = 'UNZIP_FAIL'
                else:
                    status = 'DOWNLOADED'

            results.append({
                'year':      year,
                'file_type': label,
                'stem':      stem,
                'url':       url,
                'row_count': row_count,
                'status':    status,
            })

            time.sleep(PAUSE_BETWEEN)

    # Write manifest
    manifest_path = BASE_DIR / 'download_manifest.csv'
    with open(manifest_path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=results[0].keys())
        writer.writeheader()
        writer.writerows(results)

    # Summary
    ok   = sum(1 for r in results if r['status'] == 'OK')
    skip = sum(1 for r in results if 'SKIP' in r['status'])
    fail = sum(1 for r in results if 'FAIL' in r['status'])
    warn = sum(1 for r in results if 'WARN' in r['status'])

    print(f'\n{"="*65}')
    print(f'COMPLETE — manifest saved to {manifest_path}')
    print(f'  ✓  OK:       {ok}')
    print(f'  ↷  Skipped:  {skip}  (already existed)')
    print(f'  ⚠  Warnings: {warn}  (low row count — inspect)')
    print(f'  ✗  Failed:   {fail}  (see manifest for URLs)')
    print(f'{"="*65}')

    return results


print('run_downloads() defined.')

In [None]:
# ── Cell 5: TEST FIRST — Download one year only ───────────────────────────────
# Run this before the full download to confirm everything works.
# 2019 is a good test year: post-GFC, pre-COVID, clean data.

test_results = run_downloads(
    years=[2019],
    file_types=FILE_TYPES
)

# If all 4 files show OK, proceed to Cell 6 for the full download.

In [None]:
# ── Cell 6: FULL DOWNLOAD — All years 2009-2023 ───────────────────────────────
# Only run after Cell 5 confirms the test year works.
# This will take 45-90 minutes. Already-downloaded files are skipped.
# Safe to re-run if interrupted — it picks up where it left off.

results = run_downloads(
    years=YEARS,
    file_types=FILE_TYPES
)

In [None]:
# ── Cell 7: Review Results ────────────────────────────────────────────────────

manifest = pd.read_csv(BASE_DIR / 'download_manifest.csv')

print('Row counts by year and file type:')
print('(These are record counts in the CSV — gives a sense of scale)')
print()
pivot = manifest.pivot_table(
    index='year',
    columns='file_type',
    values='row_count',
    aggfunc='sum'
)
print(pivot.to_string())

# Anything that needs attention:
problems = manifest[~manifest['status'].isin(['OK', 'SKIP'])]
if len(problems) > 0:
    print(f'\n⚠  {len(problems)} files need attention:')
    print(problems[['year', 'file_type', 'status', 'url']].to_string())
else:
    print('\n✓ All files OK.')

In [None]:
# ── Cell 8: Sanity Check — Peek at Files ────────────────────────────────────
# Open a sample year of each file type and print shape + column names.
# This confirms the CSVs are readable and have the expected structure.

SAMPLE_YEAR = 2019

for label, stem in FILE_TYPES:
    csv_dir = BASE_DIR / str(SAMPLE_YEAR) / stem
    csvs = list(csv_dir.glob('*.csv'))
    if not csvs:
        print(f'\n{stem}: NO CSV FOUND in {csv_dir}')
        continue

    df = pd.read_csv(csvs[0], nrows=3, encoding='latin-1', low_memory=False)
    print(f'\n{"-"*55}')
    print(f'{label} ({stem}) — {csvs[0].name}')
    print(f'  Columns: {len(df.columns)}')
    print(f'  First 5 columns: {list(df.columns[:5])}')
    # Key columns to confirm present:
    key_cols = {
        'main_5500':   ['ACK_ID', 'PLAN_NAME', 'SPONS_DFE_EIN', 'PLAN_NUM'],
        'schedule_H':  ['ACK_ID', 'SCH_H_TOT_ASSETS_BOY_AMT', 'SCH_H_TOT_ASSETS_EOY_AMT'],
        'schedule_R':  ['ACK_ID', 'SCH_R_TOT_PARTCP_BOY_CNT'],
        'schedule_R1': ['ACK_ID', 'SCH_R_CONTRIBING_EMPL_EIN'],
    }
    if label in key_cols:
        found    = [c for c in key_cols[label] if c in df.columns]
        missing  = [c for c in key_cols[label] if c not in df.columns]
        print(f'  Key columns present: {found}')
        if missing:
            print(f'  ⚠ Missing expected columns: {missing}')
            print(f'    (column names may differ by year — check layout file)')

print(f'\n{"-"*55}')
print('Sanity check complete.')
print(f'Files are at: {BASE_DIR}')

In [None]:
# ── Cell 9: OPTIONAL — 2004-2008 Early Years ─────────────────────────────────
# The 2004-2008 data uses a different URL structure and older file format.
# These years are useful for pre-period analysis but have worse data quality.
# Download only if you need the full 2004-2023 panel.

EARLY_YEARS = list(range(2004, 2009))  # 2004-2008

# Early years URL: https://www.dol.gov/sites/dolgov/files/ebsa/researchers/
#                  analysis/form-5500/{year}-form-5500-datasets.zip
# These are single zip files per year containing all schedules together.

EARLY_DOL_BASE = 'https://www.dol.gov/sites/dolgov/files/ebsa/researchers/analysis/form-5500'

def download_early_year(year):
    """Download the combined zip for 2004-2008 years."""
    url       = f'{EARLY_DOL_BASE}/{year}-form-5500-datasets.zip'
    year_dir  = BASE_DIR / str(year)
    year_dir.mkdir(parents=True, exist_ok=True)
    zip_path  = year_dir / f'form5500_{year}_all.zip'

    print(f'\nYear {year}')
    success = download_file(url, zip_path, f'{year} early-format')
    if success:
        unzip_file(zip_path, year_dir)
    return success

# Uncomment to run:
# for y in EARLY_YEARS:
#     download_early_year(y)
#     time.sleep(PAUSE_BETWEEN)

print('Early-years download function defined (not yet run).')
print('Uncomment the loop above when ready.')