# Phase 2: Data Preprocessing & Standardization

Goal: Build reusable cleaning functions for names, addresses, and identifiers
that will feed into blocking and matching.


In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
medicare_path = "../data/MUP_PHY_R25_P05_V20_D23_Prov_Svc.csv"
open_payments_path = "../data/OP_DTL_GNRL_PGYR2023_P01232026_01102026.csv"
pecos_path = "../data/Medicare_FFS_Public_Provider_Enrollment_Q3_2025.csv"

In [3]:
pecos_df = pd.read_csv(pecos_path, low_memory=False, encoding='latin')
print(f"  PECOS: {len(pecos_df):,} rows, {len(pecos_df.columns)} columns")
print(pecos_df.columns)

  PECOS: 2,936,748 rows, 11 columns
Index(['NPI', 'MULTIPLE_NPI_FLAG', 'PECOS_ASCT_CNTL_ID', 'ENRLMT_ID',
       'PROVIDER_TYPE_CD', 'PROVIDER_TYPE_DESC', 'STATE_CD', 'FIRST_NAME',
       'MDL_NAME', 'LAST_NAME', 'ORG_NAME'],
      dtype='object')


In [4]:
medicare_df = pd.read_csv(medicare_path, low_memory=False)
print(f"  Medicare: {len(medicare_df):,} rows, {len(medicare_df.columns)} columns")
print(medicare_df.columns)

  Medicare: 9,660,647 rows, 28 columns
Index(['Rndrng_NPI', 'Rndrng_Prvdr_Last_Org_Name', 'Rndrng_Prvdr_First_Name',
       'Rndrng_Prvdr_MI', 'Rndrng_Prvdr_Crdntls', 'Rndrng_Prvdr_Ent_Cd',
       'Rndrng_Prvdr_St1', 'Rndrng_Prvdr_St2', 'Rndrng_Prvdr_City',
       'Rndrng_Prvdr_State_Abrvtn', 'Rndrng_Prvdr_State_FIPS',
       'Rndrng_Prvdr_Zip5', 'Rndrng_Prvdr_RUCA', 'Rndrng_Prvdr_RUCA_Desc',
       'Rndrng_Prvdr_Cntry', 'Rndrng_Prvdr_Type',
       'Rndrng_Prvdr_Mdcr_Prtcptg_Ind', 'HCPCS_Cd', 'HCPCS_Desc',
       'HCPCS_Drug_Ind', 'Place_Of_Srvc', 'Tot_Benes', 'Tot_Srvcs',
       'Tot_Bene_Day_Srvcs', 'Avg_Sbmtd_Chrg', 'Avg_Mdcr_Alowd_Amt',
       'Avg_Mdcr_Pymt_Amt', 'Avg_Mdcr_Stdzd_Amt'],
      dtype='object')


In [5]:
use_cols = [
    "Covered_Recipient_Profile_ID",  # Physician Profile ID
    "Covered_Recipient_First_Name",
    "Covered_Recipient_Last_Name",
    "Recipient_Primary_Business_Street_Address_Line1",
    "Recipient_City",
    "Recipient_State",
    "Recipient_Zip_Code",
    "Covered_Recipient_NPI",  # may be missing
    "Total_Amount_of_Payment_USDollars",
    "Date_of_Payment",
    "Submitting_Applicable_Manufacturer_or_Applicable_GPO_Name",
    "Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name",
    "Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State",
    "Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country",
    "Program_Year"
]


In [6]:
chunk_list = []
for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
    # Optional: filter, clean, or sample chunk here
    chunk_list.append(chunk)

# Concatenate after all chunks are collected
open_payments_df = pd.concat(chunk_list, ignore_index=True)
print(f"Loaded dataset: {len(open_payments_df):,} rows, {len(open_payments_df.columns)} columns")




  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, chunksize=200_000):
  for chunk in pd.read_csv(open_payments_path, usecols=use_cols, 

Loaded dataset: 14,700,786 rows, 15 columns


In [7]:
print("Phase 2 setup complete.")
print(f"PECOS shape:         {pecos_df.shape}")
print(f"Medicare shape:      {medicare_df.shape}")
print(f"Open Payments shape: {open_payments_df.shape}")

Phase 2 setup complete.
PECOS shape:         (2936748, 11)
Medicare shape:      (9660647, 28)
Open Payments shape: (14700786, 15)


## 2.1 Importing the Preprocessing Module

This section connects the notebook to a reusable Python module (`preprocessing.py`) that implements the core cleaning and validation functions used throughout Phase 2.

- **Module location**
  - The module is stored in a dedicated `lib/` folder (separate from notebooks) to enforce a clean project structure and support reuse across phases.

In [8]:

import os, sys, json, pandas as pd

OUTPUT_DIR = "../artifacts/phase2_preprocessing"
os.makedirs(OUTPUT_DIR, exist_ok=True)

# pecos_df, medicare_df, open_payments_df already loaded from Phase 1,
# or load them here from raw files.


In [9]:
# 2.1 IMPORT PREPROCESSING MODULE
LIB_DIR = "../lib"   # adjust if needed
if LIB_DIR not in sys.path:
    sys.path.insert(0, LIB_DIR)

import importlib, preprocessing
importlib.reload(preprocessing)

from preprocessing import (
    clean_name, soundex_code, metaphone_code,
    clean_street, clean_city, clean_state,
    normalize_zip5, is_valid_npi,
)

print("Spot check 1053656744:", is_valid_npi(1053656744))


Spot check 1053656744: True


In [10]:
OUTPUT_DIR = "../artifacts/phase2_preprocessing"

### 2.2 PECOS – Enrollment & Identity Standardization

In this section we transform the PECOS enrollment file into a cleaner, feature-enriched provider registry suitable for linkage and temporal analysis.

- **Enrollment ID decoding**
  - Parse `ENRLMT_ID` into its logical components:
    - `ENRLMT_ENTITY`: first character (`I` = Individual, `O` = Organization).
    - `ENRLMT_DATE`: enrollment date derived from the `YYYYMMDD` segment.
    - `ENRLMT_YEAR`: enrollment calendar year for easier temporal filtering.
    - `ENRLMT_SEQ`: trailing sequence number, preserved as an identifier.
  - Confirm that all derived dates are valid and inspect the full date range (2002–2025), reinforcing that PECOS is a cumulative registry.

- **Name standardization**
  - Clean `FIRST_NAME`, `MDL_NAME`, `LAST_NAME`, and `ORG_NAME`:
    - Convert to uppercase, strip whitespace, collapse multiple spaces, and remove trailing punctuation.
  - This reduces variation caused by formatting differences and prepares names for phonetic encoding and fuzzy matching.

- **Phonetic features for individuals**
  - For records where `ENRLMT_ENTITY = 'I'`, compute:
    - `FIRST_NAME_SOUNDEX`, `LAST_NAME_SOUNDEX`.
    - `FIRST_NAME_METAPHONE`, `LAST_NAME_METAPHONE`.
  - These fields will later support robust matching in dirty-data scenarios (Scenario 2) when exact text matches are unreliable.

- **NPI validation**
  - Apply the NPI checksum (Luhn with the `80840` prefix) to each `NPI`.
  - After correcting the parity logic, 100% of PECOS NPIs pass validation, and `NPI_VALID` is set to `True` for all records.

- **Name presence sanity check**
  - Verify that every PECOS record has at least one of `FIRST_NAME`, `LAST_NAME`, or `ORG_NAME` populated.
  - Confirm that there are zero records with all three name fields missing, so no PECOS rows need to be dropped on this basis.

In [11]:
# 2.2 PECOS PREPROCESSING

print("2.2 PECOS PREPROCESSING")
print("-" * 60)

pecos = pecos_df.copy()

# 1) ENRLMT_ID → entity, date, sequence (already mostly done in EDA, but ensure here)
pecos['ENRLMT_ENTITY'] = pecos['ENRLMT_ID'].str[0]
pecos['ENRLMT_DATE'] = pd.to_datetime(pecos['ENRLMT_ID'].str[1:9], format='%Y%m%d', errors='coerce')
pecos['ENRLMT_YEAR'] = pecos['ENRLMT_DATE'].dt.year
pecos['ENRLMT_SEQ'] = pecos['ENRLMT_ID'].str[9:]

print("Enrollment date range:", pecos['ENRLMT_DATE'].min(), "→", pecos['ENRLMT_DATE'].max())
print("Invalid ENRLMT_DATE rows:", pecos['ENRLMT_DATE'].isna().sum())

# 2) Name standardization
for col in ['FIRST_NAME', 'MDL_NAME', 'LAST_NAME', 'ORG_NAME']:
    if col in pecos.columns:
        pecos[col] = pecos[col].apply(clean_name)

# 3) Phonetic columns for individuals (Scenario 2 support)
is_individual = pecos['ENRLMT_ENTITY'] == 'I'
pecos.loc[is_individual, 'FIRST_NAME_SOUNDEX'] = pecos.loc[is_individual, 'FIRST_NAME'].apply(soundex_code)
pecos.loc[is_individual, 'LAST_NAME_SOUNDEX'] = pecos.loc[is_individual, 'LAST_NAME'].apply(soundex_code)
pecos.loc[is_individual, 'FIRST_NAME_METAPHONE'] = pecos.loc[is_individual, 'FIRST_NAME'].apply(metaphone_code)
pecos.loc[is_individual, 'LAST_NAME_METAPHONE'] = pecos.loc[is_individual, 'LAST_NAME'].apply(metaphone_code)

# 4) NPI validation
pecos['NPI_VALID'] = pecos['NPI'].apply(is_valid_npi)
invalid_npi_count = (~pecos['NPI_VALID']).sum()
print("Invalid NPIs in PECOS:", invalid_npi_count)

# 5) Basic sanity: records with no name fields at all
no_name_mask = pecos['FIRST_NAME'].isna() & pecos['LAST_NAME'].isna() & pecos['ORG_NAME'].isna()
no_name_count = no_name_mask.sum()
print("Records with no FIRST/LAST/ORG name:", no_name_count)

# (Save deferred — adding Fix 2/3/6 columns first)


2.2 PECOS PREPROCESSING
------------------------------------------------------------
Enrollment date range: 2002-08-01 00:00:00 → 2025-09-13 00:00:00
Invalid ENRLMT_DATE rows: 0
Invalid NPIs in PECOS: 0
Records with no FIRST/LAST/ORG name: 0


#### Fix 2: Flag 575 Dual-Entity NPIs

From EDA Part 5, 575 NPIs appear as both Individual (`I`) and Organization (`O`) in PECOS — likely data entry errors. We flag these for cautious handling in linkage rather than dropping them.

In [12]:
# Fix 2: Flag NPIs that appear as both Individual AND Organization
entity_per_npi = pecos.groupby('NPI')['ENRLMT_ENTITY'].nunique()
dual_npis = entity_per_npi[entity_per_npi > 1].index
pecos['DUAL_ENTITY_FLAG'] = pecos['NPI'].isin(dual_npis)
print(f"Dual-entity NPIs flagged: {pecos['DUAL_ENTITY_FLAG'].sum():,} rows ({len(dual_npis)} unique NPIs)")


Dual-entity NPIs flagged: 1,198 rows (575 unique NPIs)


#### Fix 3: Temporal Alignment Flag

Medicare and Open Payments data are both program year 2023. This flag enables Phase 3 to filter PECOS for temporal alignment without re-deriving dates from `ENRLMT_ID`.

In [13]:
# Fix 3: Temporal flag — enrolled by end of 2023 (aligns with Medicare/OP program year)
pecos['ENROLLED_BY_2023'] = pecos['ENRLMT_YEAR'] <= 2023
print(f"Enrolled by 2023: {pecos['ENROLLED_BY_2023'].sum():,} / {len(pecos):,} ({pecos['ENROLLED_BY_2023'].mean()*100:.1f}%)")


Enrolled by 2023: 2,405,538 / 2,936,748 (81.9%)


#### Fix 6: Multi-NPI Organization Dedup Prep

From EDA Part 5, 14,210 `PECOS_ASCT_CNTL_ID` values link to multiple NPIs — these are parent organizations with multiple billing locations. We identify them and assign a canonical `PARENT_NPI` (lowest NPI per group) to support an optional organization matching pipeline in Phase 3.

In [14]:
# Fix 6: Multi-NPI org dedup — identify parent organizations via PECOS_ASCT_CNTL_ID
org_mask = pecos['ENRLMT_ENTITY'] == 'O'
org_npi_counts = pecos.loc[org_mask].groupby('PECOS_ASCT_CNTL_ID')['NPI'].nunique()
multi_npi_orgs = org_npi_counts[org_npi_counts > 1]

pecos['MULTI_NPI_ORG'] = False
pecos.loc[
    org_mask & pecos['PECOS_ASCT_CNTL_ID'].isin(multi_npi_orgs.index),
    'MULTI_NPI_ORG'
] = True

# For each multi-NPI org group, assign the lowest NPI as canonical PARENT_NPI
canonical = pecos.loc[org_mask].groupby('PECOS_ASCT_CNTL_ID')['NPI'].min().rename('PARENT_NPI')
pecos = pecos.merge(canonical, on='PECOS_ASCT_CNTL_ID', how='left')

print(f"Multi-NPI organizations: {len(multi_npi_orgs):,} parent orgs covering {pecos['MULTI_NPI_ORG'].sum():,} rows")
print(f"PARENT_NPI assigned for org-level dedup")


Multi-NPI organizations: 14,210 parent orgs covering 114,062 rows
PARENT_NPI assigned for org-level dedup


In [15]:
# Save cleaned PECOS (with Fix 2/3/6 columns included)
pecos_output_path = os.path.join(OUTPUT_DIR, "pecos_clean.parquet")
pecos.to_parquet(pecos_output_path, index=False)
print("Saved PECOS cleaned to:", pecos_output_path)
print("Final PECOS shape:", pecos.shape)
print(f"New columns: DUAL_ENTITY_FLAG, ENROLLED_BY_2023, MULTI_NPI_ORG, PARENT_NPI")


Saved PECOS cleaned to: ../artifacts/phase2_preprocessing\pecos_clean.parquet
Final PECOS shape: (2936748, 24)
New columns: DUAL_ENTITY_FLAG, ENROLLED_BY_2023, MULTI_NPI_ORG, PARENT_NPI


- **Result**
  - Save the cleaned, enriched PECOS dataset to `pecos_clean.parquet`.
  - This file now provides:
    - Validated NPIs.
    - Explicit entity type (individual vs organization).
    - Enrollment dates and years.
    - Standardized names plus phonetic variants.
  - It serves as the primary enrollment backbone for later linkage and temporal analyses.

In [16]:
# Free raw DataFrames — we've already copied what we need into `pecos`
# medicare_df and open_payments_df will be re-copied in their own sections
import gc
del pecos_df
gc.collect()
print(f"Freed pecos_df. pecos working copy: {pecos.shape}")
print(f"Memory: {pecos.memory_usage(deep=True).sum() / 1e9:.2f} GB")


Freed pecos_df. pecos working copy: (2936748, 24)
Memory: 2.54 GB


### 2.3 Medicare – Provider-Level Aggregation

The raw Medicare file is at the **service level**, where each row corresponds to a single NPI combined with a specific HCPCS code (and other service attributes). This means the same provider (same NPI) appears on many rows. In this step, we transform it into a **provider-level** dataset with one row per NPI.

**Steps performed in this cell:**

- **Name & credential cleaning**
  - Standardize rendering provider names to a consistent format (uppercase, trimmed, punctuation removed).
  - Normalize credentials (e.g., `M.D.` → `MD`) to reduce variation.

- **Address standardization** (Note: `Rndrng_Prvdr_St2` dropped — 76.1% null per EDA decision matrix)
  - Clean and normalize street, city, and state fields (uppercase, standardized abbreviations) to support downstream comparison and blocking.
  - ZIP codes are already in clean 5‑digit format in this dataset.

- **NPI validation**
  - Apply the NPI checksum (Luhn) algorithm to flag any invalid NPIs.
  - This ensures we only carry forward structurally valid identifiers into the linkage pipeline.

- **Phonetic features for names**
  - For individual providers, compute Soundex and Metaphone codes for first and last names.
  - These features will be used later for robust fuzzy matching (Scenario 2).

- **Provider-level deduplication (key step)**
  - Group the data by `Rndrng_NPI` and aggregate service-level fields:
    - `total_services` = sum of `Tot_Srvcs` across all rows for that NPI.
    - `total_beneficiaries` = sum of `Tot_Benes`.
    - `total_submitted_charges` = sum of (`Avg_Sbmtd_Chrg` × `Tot_Srvcs`) — weighted total, not sum of averages.
    - `total_medicare_payment` = sum of (`Avg_Mdcr_Pymt_Amt` × `Tot_Srvcs`) — weighted total, not sum of averages.
    - `unique_hcpcs_count` = number of distinct `HCPCS_Cd` per NPI.
    - `service_row_count` = number of service rows for that NPI.
  - For provider attributes that are constant per NPI (name, address, specialty, etc.), keep the first occurrence as the canonical provider record.

In [17]:
# 2.3 MEDICARE PREPROCESSING

print("2.3 MEDICARE PREPROCESSING")
print("-" * 60)

medicare = medicare_df.copy()

# 1) Name standardization
medicare['Rndrng_Prvdr_First_Name'] = medicare['Rndrng_Prvdr_First_Name'].apply(clean_name)
medicare['Rndrng_Prvdr_Last_Org_Name'] = medicare['Rndrng_Prvdr_Last_Org_Name'].apply(clean_name)
medicare['Rndrng_Prvdr_MI'] = medicare['Rndrng_Prvdr_MI'].apply(clean_name)

# 2) Credentials cleanup (M.D. → MD, D.O. → DO, etc.)
def clean_credential(s):
    if s is None or pd.isna(s):
        return None
    s = str(s).strip().upper()
    s = s.replace(".", "").replace(",", "").strip()
    return s or None

medicare['Rndrng_Prvdr_Crdntls'] = medicare['Rndrng_Prvdr_Crdntls'].apply(clean_credential)

# 3) Address standardization
medicare['Rndrng_Prvdr_St1'] = medicare['Rndrng_Prvdr_St1'].apply(clean_street)
medicare['Rndrng_Prvdr_City'] = medicare['Rndrng_Prvdr_City'].apply(clean_city)
medicare['Rndrng_Prvdr_State_Abrvtn'] = medicare['Rndrng_Prvdr_State_Abrvtn'].apply(clean_state)
# ZIP already 5-digit in Medicare ✓

# Drop Rndrng_Prvdr_St2 — 76.1% null, too sparse for matching (per EDA Section 9 decision matrix)
medicare.drop(columns=['Rndrng_Prvdr_St2'], inplace=True)

# Drop redundant payment columns (r > 0.90 with Avg_Mdcr_Pymt_Amt per EDA Section 6)
medicare.drop(columns=['Avg_Mdcr_Alowd_Amt', 'Avg_Mdcr_Stdzd_Amt'], inplace=True)
print("Dropped: Rndrng_Prvdr_St2 (76.1% null), Avg_Mdcr_Alowd_Amt, Avg_Mdcr_Stdzd_Amt (redundant)")

# 4) NPI validation
medicare['NPI_VALID'] = medicare['Rndrng_NPI'].apply(is_valid_npi)
print(f"Invalid NPIs: {(~medicare['NPI_VALID']).sum()}")

# 5) Phonetic columns (individual records only)
is_indiv = medicare['Rndrng_Prvdr_Ent_Cd'] == 'I'
medicare.loc[is_indiv, 'FIRST_NAME_SOUNDEX'] = medicare.loc[is_indiv, 'Rndrng_Prvdr_First_Name'].apply(soundex_code)
medicare.loc[is_indiv, 'LAST_NAME_SOUNDEX'] = medicare.loc[is_indiv, 'Rndrng_Prvdr_Last_Org_Name'].apply(soundex_code)
medicare.loc[is_indiv, 'FIRST_NAME_METAPHONE'] = medicare.loc[is_indiv, 'Rndrng_Prvdr_First_Name'].apply(metaphone_code)
medicare.loc[is_indiv, 'LAST_NAME_METAPHONE'] = medicare.loc[is_indiv, 'Rndrng_Prvdr_Last_Org_Name'].apply(metaphone_code)

print(f"Cleaned Medicare shape (before dedup): {medicare.shape}")


2.3 MEDICARE PREPROCESSING
------------------------------------------------------------
Dropped: Rndrng_Prvdr_St2 (76.1% null), Avg_Mdcr_Alowd_Amt, Avg_Mdcr_Stdzd_Amt (redundant)
Invalid NPIs: 0
Cleaned Medicare shape (before dedup): (9660647, 30)


In [18]:
dataframes = {
    name: obj for name, obj in globals().items()
    if isinstance(obj, pd.DataFrame)
}

if dataframes:
    print("DataFrames currently in memory:\n")
    for name, df in dataframes.items():
        print(f"Name: {name}")
        print(f"Shape: {df.shape}")
        print("-" * 40)
else:
    print("No pandas DataFrames currently in memory.")

DataFrames currently in memory:

Name: medicare_df
Shape: (9660647, 28)
----------------------------------------
Name: chunk
Shape: (100786, 15)
----------------------------------------
Name: open_payments_df
Shape: (14700786, 15)
----------------------------------------
Name: pecos
Shape: (2936748, 24)
----------------------------------------
Name: medicare
Shape: (9660647, 30)
----------------------------------------


In [19]:
# 6) Provider-level dedup: collapse multiple HCPCS rows per NPI → 1 row

provider_cols = [
    'Rndrng_NPI', 'Rndrng_Prvdr_Last_Org_Name', 'Rndrng_Prvdr_First_Name',
    'Rndrng_Prvdr_MI', 'Rndrng_Prvdr_Crdntls', 'Rndrng_Prvdr_Ent_Cd',
    'Rndrng_Prvdr_St1', 'Rndrng_Prvdr_City',
    'Rndrng_Prvdr_State_Abrvtn', 'Rndrng_Prvdr_State_FIPS', 'Rndrng_Prvdr_Zip5',
    'Rndrng_Prvdr_RUCA', 'Rndrng_Prvdr_RUCA_Desc', 'Rndrng_Prvdr_Cntry',
    'Rndrng_Prvdr_Type', 'Rndrng_Prvdr_Mdcr_Prtcptg_Ind',
    'NPI_VALID', 'FIRST_NAME_SOUNDEX', 'LAST_NAME_SOUNDEX',
    'FIRST_NAME_METAPHONE', 'LAST_NAME_METAPHONE'
]

# Compute weighted totals (Fix 1: avg × volume, not sum of averages)
medicare['_total_submitted_charges'] = medicare['Avg_Sbmtd_Chrg'] * medicare['Tot_Srvcs']
medicare['_total_medicare_payment']  = medicare['Avg_Mdcr_Pymt_Amt'] * medicare['Tot_Srvcs']

# Aggregate service-level stats per NPI
service_agg = medicare.groupby('Rndrng_NPI').agg(
    total_services=('Tot_Srvcs', 'sum'),
    total_beneficiaries=('Tot_Benes', 'sum'),
    total_submitted_charges=('_total_submitted_charges', 'sum'),
    total_medicare_payment=('_total_medicare_payment', 'sum'),
    unique_hcpcs_count=('HCPCS_Cd', 'nunique'),
    service_row_count=('HCPCS_Cd', 'count'),
).reset_index()

# Get one row per NPI for provider-level fields
provider_info = medicare[provider_cols].drop_duplicates(subset='Rndrng_NPI', keep='first')

# Merge
medicare_dedup = provider_info.merge(service_agg, on='Rndrng_NPI', how='left')

print(f"Medicare BEFORE dedup: {len(medicare):,} rows")
print(f"Medicare AFTER dedup:  {len(medicare_dedup):,} rows")
print(f"Reduction: {(1 - len(medicare_dedup)/len(medicare))*100:.1f}%")

# Save
medicare_output = os.path.join(OUTPUT_DIR, "medicare_clean.parquet")
medicare_dedup.to_parquet(medicare_output, index=False)
print(f"Saved to: {medicare_output}")


Medicare BEFORE dedup: 9,660,647 rows
Medicare AFTER dedup:  1,175,281 rows
Reduction: 87.8%
Saved to: ../artifacts/phase2_preprocessing\medicare_clean.parquet


- **Result**
  - Reduce the dataset from **9,660,647 service rows** to **1,175,281 provider rows**, an **87.8% reduction** in row count.
  - Each remaining row represents a single provider (one NPI) with:
    - Cleaned identity and address fields.
    - Phonetic name encodings.
    - Aggregated utilization and payment statistics.

- **Why this matters**
  - Later phases (blocking, matching, and multi-source linkage) operate at the **provider level**, not at the HCPCS-line level.
  - This aggregation drastically reduces computation while preserving the information needed for linkage and analysis.


In [20]:
# Free raw DataFrames — we've already copied what we need into `pecos`
# medicare_df and open_payments_df will be re-copied in their own sections
import gc
del medicare_df
gc.collect()
print(f"Freed medicare_df. medicare working copy: {medicare.shape}")
print(f"Memory: {pecos.memory_usage(deep=True).sum() / 1e9:.2f} GB")


Freed medicare_df. medicare working copy: (9660647, 32)
Memory: 2.54 GB


### 2.4 Open Payments Preprocessing

In [21]:
# 2.4 OPEN PAYMENTS PREPROCESSING

print("2.4 OPEN PAYMENTS PREPROCESSING")
print("-" * 60)

op = open_payments_df.copy()

# 1) Flag rows with no name AND no NPI (unlinkable records)
no_identity = (
    op['Covered_Recipient_First_Name'].isna() &
    op['Covered_Recipient_Last_Name'].isna() &
    op['Covered_Recipient_NPI'].isna()
)
print(f"Rows with no name AND no NPI: {no_identity.sum():,} — flagging as unlinkable")
op['LINKABLE'] = ~no_identity

# 2) NPI: float → int (handle NaN safely)
op['Covered_Recipient_NPI'] = pd.to_numeric(op['Covered_Recipient_NPI'], errors='coerce')
op.loc[op['Covered_Recipient_NPI'].notna(), 'Covered_Recipient_NPI'] = (
    op.loc[op['Covered_Recipient_NPI'].notna(), 'Covered_Recipient_NPI'].astype(int)
)

# 3) Name standardization
op['Covered_Recipient_First_Name'] = op['Covered_Recipient_First_Name'].apply(clean_name)
op['Covered_Recipient_Last_Name']  = op['Covered_Recipient_Last_Name'].apply(clean_name)

# 4) Address standardization
op['Recipient_Primary_Business_Street_Address_Line1'] = (
    op['Recipient_Primary_Business_Street_Address_Line1'].apply(clean_street)
)
op['Recipient_City']  = op['Recipient_City'].apply(clean_city)
op['Recipient_State'] = op['Recipient_State'].apply(clean_state)

# 5) ZIP → 5-digit
op['Recipient_Zip5'] = op['Recipient_Zip_Code'].apply(normalize_zip5)
print("ZIP+4 → ZIP5 conversion done.")
print("Sample ZIP5:", op['Recipient_Zip5'].dropna().head(5).tolist())

# 6) NPI validation (only where NPI exists)
op['NPI_VALID'] = op['Covered_Recipient_NPI'].apply(
    lambda x: is_valid_npi(int(x)) if pd.notna(x) else None
)
invalid_npi = op.loc[op['NPI_VALID'] == False]
print(f"Invalid NPIs: {len(invalid_npi):,}")

# 7) Phonetic columns (for all rows with a name)
has_name = op['Covered_Recipient_First_Name'].notna()
op.loc[has_name, 'FIRST_NAME_SOUNDEX']   = op.loc[has_name, 'Covered_Recipient_First_Name'].apply(soundex_code)
op.loc[has_name, 'LAST_NAME_SOUNDEX']    = op.loc[has_name, 'Covered_Recipient_Last_Name'].apply(soundex_code)
op.loc[has_name, 'FIRST_NAME_METAPHONE'] = op.loc[has_name, 'Covered_Recipient_First_Name'].apply(metaphone_code)
op.loc[has_name, 'LAST_NAME_METAPHONE']  = op.loc[has_name, 'Covered_Recipient_Last_Name'].apply(metaphone_code)

# 8) Assign linkage tier
has_npi = op['NPI_VALID'] == True

has_real_name_and_state = (
    op['Covered_Recipient_First_Name'].notna() &
    (op['Covered_Recipient_First_Name'] != 'NAN') &
    op['Covered_Recipient_Last_Name'].notna() &
    (op['Covered_Recipient_Last_Name'] != 'NAN') &
    op['Recipient_State'].notna() &
    (op['Recipient_State'] != 'NAN')
)

op['linkage_tier'] = 'unmatchable'
op.loc[has_npi, 'linkage_tier'] = 'tier1_npi'
op.loc[~has_npi & has_real_name_and_state, 'linkage_tier'] = 'tier2_fuzzy'

print("\nLinkage tier distribution (row-level, before dedup):")
print(op['linkage_tier'].value_counts().to_string())

print(f"\nCleaned Open Payments shape (before dedup): {op.shape}")


2.4 OPEN PAYMENTS PREPROCESSING
------------------------------------------------------------
Rows with no name AND no NPI: 31,400 — flagging as unlinkable
ZIP+4 → ZIP5 conversion done.
Sample ZIP5: ['55369', '46219', '98101', '89014', '43551']
Invalid NPIs: 4

Linkage tier distribution (row-level, before dedup):
linkage_tier
tier1_npi      14656549
unmatchable       31424
tier2_fuzzy       12813

Cleaned Open Payments shape (before dedup): (14700786, 23)


In [22]:
# Inspect the 4 invalid NPIs
invalid_ops = op[op['NPI_VALID'] == False]
print("4 Invalid NPIs in Open Payments:")
print(invalid_ops[['Covered_Recipient_NPI', 'Covered_Recipient_First_Name', 
                    'Covered_Recipient_Last_Name', 'Recipient_State']].drop_duplicates().to_string())


4 Invalid NPIs in Open Payments:
          Covered_Recipient_NPI Covered_Recipient_First_Name Covered_Recipient_Last_Name Recipient_State
5805590            1.202321e+09                         REZA                 FARDSHISHEH              VA
13471051           1.356763e+09                  CHRISTOPHER                      AQUINO              FL
14472428           1.374625e+09                       TRACEY                      TOBACK              NY
14611723           1.851791e+09                          MEL                      IRVINE              FL


In [23]:
# Open Payments: Provider-level dedup + save
import gc


# Aggregate to provider level (one row per NPI/Profile)
op_provider_cols = [
    'Covered_Recipient_NPI', 'Covered_Recipient_Profile_ID',
    'Covered_Recipient_First_Name', 'Covered_Recipient_Last_Name',
    'Recipient_Primary_Business_Street_Address_Line1',
    'Recipient_City', 'Recipient_State', 'Recipient_Zip5',
    'NPI_VALID', 'LINKABLE', 'linkage_tier',
    'FIRST_NAME_SOUNDEX', 'LAST_NAME_SOUNDEX',
    'FIRST_NAME_METAPHONE', 'LAST_NAME_METAPHONE'
]


# Group by NPI (or Profile ID for NPI-missing rows)
op['Date_of_Payment'] = pd.to_datetime(op['Date_of_Payment'], format='%m/%d/%Y', errors='coerce')


# Use NPI as primary key; Profile_ID as fallback; row index as last resort
npi_str = op.loc[op['Covered_Recipient_NPI'].notna(), 'Covered_Recipient_NPI'].astype(int).astype(str)
pid_valid = op['Covered_Recipient_Profile_ID'].notna()

op['_group_key'] = 'ROW_' + op.index.astype(str)                                     # last resort: unique per row
op.loc[pid_valid, '_group_key'] = 'PID_' + op.loc[pid_valid, 'Covered_Recipient_Profile_ID'].astype(str)
op.loc[op['Covered_Recipient_NPI'].notna(), '_group_key'] = 'NPI_' + npi_str


payment_agg = op.groupby('_group_key').agg(
    total_payment_amount=('Total_Amount_of_Payment_USDollars', 'sum'),
    payment_count=('Total_Amount_of_Payment_USDollars', 'count'),
    avg_payment=('Total_Amount_of_Payment_USDollars', 'mean'),
    max_payment=('Total_Amount_of_Payment_USDollars', 'max'),
    unique_manufacturers=('Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name', 'nunique'),
    min_payment_date=('Date_of_Payment', 'min'),
    max_payment_date=('Date_of_Payment', 'max'),
).reset_index()


provider_info = op[op_provider_cols + ['_group_key']].drop_duplicates(
    subset='_group_key', keep='first'
)


op_dedup = provider_info.merge(payment_agg, on='_group_key', how='left')
op_dedup.drop(columns=['_group_key'], inplace=True)


print(f"Open Payments BEFORE dedup: {len(op):,} rows")
print(f"Open Payments AFTER dedup:  {len(op_dedup):,} rows")
print(f"Reduction: {(1 - len(op_dedup)/len(op))*100:.1f}%")
print(f"Linkage tier distribution:")
print(op_dedup['linkage_tier'].value_counts().to_string())


# Save
op_output = os.path.join(OUTPUT_DIR, "open_payments_clean.parquet")
op_dedup.to_parquet(op_output, index=False)
print(f"\nSaved to: {op_output}")


# Free memory
del op, op_dedup, payment_agg, provider_info
gc.collect()


Open Payments BEFORE dedup: 14,700,786 rows
Open Payments AFTER dedup:  969,703 rows
Reduction: 93.4%
Linkage tier distribution:
linkage_tier
tier1_npi      933615
unmatchable     31405
tier2_fuzzy      4683

Saved to: ../artifacts/phase2_preprocessing\open_payments_clean.parquet


0

In [24]:
INPUT_DIR = "../artifacts/phase2_preprocessing/"

In [25]:
import os

# Check what raw files exist
for f in sorted(os.listdir(INPUT_DIR)):
    size_mb = os.path.getsize(os.path.join(INPUT_DIR, f)) / 1e6
    print(f"  {f:60s} {size_mb:>10.1f} MB")

print("\n--- Artifacts so far ---")
OUTPUT_DIR = '../artifacts/phase2_preprocessing'
if os.path.exists(OUTPUT_DIR):
    for f in sorted(os.listdir(OUTPUT_DIR)):
        size_mb = os.path.getsize(os.path.join(OUTPUT_DIR, f)) / 1e6
        print(f"  {f:60s} {size_mb:>10.1f} MB")


  medicare_clean.parquet                                             78.0 MB
  open_payments_clean.parquet                                        61.2 MB
  pecos_clean.parquet                                               117.2 MB
  preprocessing_report.json                                           0.0 MB

--- Artifacts so far ---
  medicare_clean.parquet                                             78.0 MB
  open_payments_clean.parquet                                        61.2 MB
  pecos_clean.parquet                                               117.2 MB
  preprocessing_report.json                                           0.0 MB


In [26]:
op_check = pd.read_parquet(os.path.join(OUTPUT_DIR, 'open_payments_clean.parquet'))

t2 = op_check[op_check['linkage_tier'] == 'tier2_fuzzy']
print(f"Tier2 total: {len(t2):,}")
print(f"  Name = 'NAN': {(t2['Covered_Recipient_First_Name'] == 'NAN').sum():,}")
print(f"  Real names:   {(t2['Covered_Recipient_First_Name'] != 'NAN').sum():,}")

print(f"\nUnmatchable count: {(op_check['linkage_tier'] == 'unmatchable').sum():,}")


Tier2 total: 4,683
  Name = 'NAN': 0
  Real names:   4,683

Unmatchable count: 31,405


In [27]:
# Check if names are real or empty strings
t2 = op_check[op_check['linkage_tier'] == 'tier2_fuzzy']
print("Sample first names:")
print(t2['Covered_Recipient_First_Name'].value_counts().head(10))
print(f"\nEmpty string count: {(t2['Covered_Recipient_First_Name'] == '').sum():,}")
print(f"Whitespace-only count: {(t2['Covered_Recipient_First_Name'].str.strip() == '').sum():,}")
print(f"\nTotal tier2 rows: {len(t2):,}")


Sample first names:
Covered_Recipient_First_Name
JENNIFER    61
MICHAEL     51
DAVID       50
JOHN        49
ROBERT      38
JESSICA     38
AMY         38
MARY        35
RICHARD     33
KAREN       33
Name: count, dtype: int64

Empty string count: 0
Whitespace-only count: 0

Total tier2 rows: 4,683


## 2.5 Post-Preprocessing Validation Report

This section loads all three cleaned parquet files from disk and generates a validation report. It catches schema drift between phases and serves as a checkpoint before Phase 3 blocking/matching.

The report includes shape, dtypes, null counts, cardinality of key columns, and file sizes — saved as `preprocessing_report.json` for programmatic access.

In [28]:
# 2.5 POST-PREPROCESSING VALIDATION REPORT

import os, json
from datetime import datetime

OUTPUT_DIR = "../artifacts/phase2_preprocessing"

print("=" * 70)
print("2.5 POST-PREPROCESSING VALIDATION REPORT")
print("=" * 70)

# Load all three parquet files
pecos_v = pd.read_parquet(os.path.join(OUTPUT_DIR, "pecos_clean.parquet"))
medicare_v = pd.read_parquet(os.path.join(OUTPUT_DIR, "medicare_clean.parquet"))
op_v = pd.read_parquet(os.path.join(OUTPUT_DIR, "open_payments_clean.parquet"))

report = {
    "generated_at": datetime.now().isoformat(),
    "datasets": {}
}

# Key columns for cardinality checks
key_cols = {
    "pecos": ["NPI", "ENRLMT_ENTITY", "STATE_CD", "DUAL_ENTITY_FLAG", "ENROLLED_BY_2023", "MULTI_NPI_ORG"],
    "medicare": ["Rndrng_NPI", "Rndrng_Prvdr_Ent_Cd", "Rndrng_Prvdr_State_Abrvtn", "NPI_VALID"],
    "open_payments": ["Covered_Recipient_NPI", "linkage_tier", "Recipient_State", "NPI_VALID"]
}

datasets = {
    "pecos": pecos_v,
    "medicare": medicare_v,
    "open_payments": op_v
}

for name, df in datasets.items():
    filepath = os.path.join(OUTPUT_DIR, f"{name.replace('open_payments', 'open_payments')}_clean.parquet")
    file_size_mb = round(os.path.getsize(filepath) / 1e6, 2)
    
    nulls = {col: int(df[col].isna().sum()) for col in df.columns}
    dtypes = {col: str(df[col].dtype) for col in df.columns}
    cardinality = {}
    for col in key_cols.get(name, []):
        if col in df.columns:
            cardinality[col] = int(df[col].nunique())
    
    report["datasets"][name] = {
        "rows": len(df),
        "cols": len(df.columns),
        "columns": list(df.columns),
        "dtypes": dtypes,
        "nulls": nulls,
        "cardinality": cardinality,
        "file_size_mb": file_size_mb
    }
    
    print(f"\n--- {name.upper()} ---")
    print(f"  Shape: {df.shape}")
    print(f"  File size: {file_size_mb} MB")
    print(f"  Columns: {list(df.columns)}")
    print(f"  Nulls per column:")
    for col, n in nulls.items():
        if n > 0:
            print(f"    {col:50s} {n:>12,}")
    print(f"  Cardinality (key columns):")
    for col, c in cardinality.items():
        print(f"    {col:50s} {c:>12,}")

# Save JSON report
report_path = os.path.join(OUTPUT_DIR, "preprocessing_report.json")
with open(report_path, "w") as f:
    json.dump(report, f, indent=2, default=str)
print(f"\nSaved: {report_path}")

# Summary table
print("\n" + "=" * 70)
print(f"{'Dataset':20s} {'Rows':>12s} {'Cols':>6s} {'Size (MB)':>10s} {'Key NPIs':>12s}")
print("-" * 60)
for name in ["pecos", "medicare", "open_payments"]:
    d = report["datasets"][name]
    npi_col = {"pecos": "NPI", "medicare": "Rndrng_NPI", "open_payments": "Covered_Recipient_NPI"}[name]
    npi_count = d["cardinality"].get(npi_col, "N/A")
    print(f"{name:20s} {d['rows']:>12,} {d['cols']:>6d} {d['file_size_mb']:>10.1f} {npi_count:>12,}")

# Cleanup
del pecos_v, medicare_v, op_v
import gc
gc.collect()
print("\n✓ Phase 2 validation complete.")


2.5 POST-PREPROCESSING VALIDATION REPORT

--- PECOS ---
  Shape: (2936748, 24)
  File size: 117.18 MB
  Columns: ['NPI', 'MULTIPLE_NPI_FLAG', 'PECOS_ASCT_CNTL_ID', 'ENRLMT_ID', 'PROVIDER_TYPE_CD', 'PROVIDER_TYPE_DESC', 'STATE_CD', 'FIRST_NAME', 'MDL_NAME', 'LAST_NAME', 'ORG_NAME', 'ENRLMT_ENTITY', 'ENRLMT_DATE', 'ENRLMT_YEAR', 'ENRLMT_SEQ', 'FIRST_NAME_SOUNDEX', 'LAST_NAME_SOUNDEX', 'FIRST_NAME_METAPHONE', 'LAST_NAME_METAPHONE', 'NPI_VALID', 'DUAL_ENTITY_FLAG', 'ENROLLED_BY_2023', 'MULTI_NPI_ORG', 'PARENT_NPI']
  Nulls per column:
    MDL_NAME                                                     10
    FIRST_NAME_SOUNDEX                                      434,372
    LAST_NAME_SOUNDEX                                       434,372
    FIRST_NAME_METAPHONE                                    434,372
    LAST_NAME_METAPHONE                                     434,372
    PARENT_NPI                                            2,502,376
  Cardinality (key columns):
    NPI                   