# CMS Open Payments Datalake Setup

**Project:** AAI-540 Machine Learning Operations - Final Team Project  
**Purpose:** Setup AWS S3 Datalake for CMS Open Payments Data  
**Dataset:** CMS Open Payments Program Year 2024

---

## Table of Contents
1. [Environment Setup](#setup)
2. [AWS Configuration & S3 Bucket Creation](#aws-config)
3. [Download CMS Open Payments Data](#download)
4. [Upload Data to S3](#upload)
5. [Create Athena Database](#athena)
6. [Register Data with Athena](#register)
7. [Convert CSV to Parquet](#parquet)
8. [Query Data with AWS Data Wrangler](#query)
9. [Validation & Verification](#validation)

---

## 1. Environment Setup

Install and import necessary libraries for AWS integration and data processing.

In [None]:
# Install required AWS packages
!pip install boto3 sagemaker awswrangler pyathena --quiet

In [None]:
# Import necessary libraries
import boto3
import sagemaker
import pandas as pd
import numpy as np
import os
import requests
from pathlib import Path
from datetime import datetime
from io import BytesIO, StringIO
import awswrangler as wr
from pyathena import connect
import warnings

warnings.filterwarnings('ignore')

# Display settings
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_rows', 100)

print("Libraries imported successfully")

## 2. AWS Configuration & S3 Bucket Creation

Configure AWS session and create S3 bucket for the datalake.

In [None]:
# Initialize AWS session and SageMaker
sess = sagemaker.Session()
bucket = "cmsopenpaymentsystems"
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
account_id = boto3.client("sts").get_caller_identity().get("Account")

# Initialize AWS clients
s3_client = boto3.client('s3', region_name=region)
s3_resource = boto3.resource('s3')

print(f"AWS Configuration:")
print(f"  Region: {region}")
print(f"  Account ID: {account_id}")
print(f"  S3 Bucket: {bucket}")
print(f"  Role: {role}")

In [None]:
# Define S3 paths for CMS data
cms_data_prefix = "cms-open-payments"
raw_data_prefix = f"{cms_data_prefix}/raw"
processed_data_prefix = f"{cms_data_prefix}/processed"
parquet_data_prefix = f"{cms_data_prefix}/parquet"

s3_raw_path = f"s3://{bucket}/{raw_data_prefix}"
s3_processed_path = f"s3://{bucket}/{processed_data_prefix}"
s3_parquet_path = f"s3://{bucket}/{parquet_data_prefix}"

print(f"S3 Data Paths:")
print(f"  Raw Data: {s3_raw_path}")
print(f"  Processed Data: {s3_processed_path}")
print(f"  Parquet Data: {s3_parquet_path}")

# Store paths for use in other notebooks
%store bucket
%store region
%store s3_raw_path
%store s3_processed_path
%store s3_parquet_path

## 3. Download CMS Open Payments Data

Download the CMS Open Payments Program Year 2024 General Payments dataset.

**Data Source:** CMS Open Payments  
**Dataset:** Program Year 2024 General Payments  
**Published:** June 30, 2025  
**Coverage:** January 1, 2024 - December 31, 2024

In [None]:
# CMS Open Payments data URL - Direct CSV download
cms_data_url = "https://download.cms.gov/openpayments/PGYR2024_P06302025_06162025/OP_DTL_GNRL_PGYR2024_P06302025_06162025.csv"

# Alternative: If the above URL doesn't work, use this approach:
# 1. Go to https://openpaymentsdata.cms.gov/datasets
# 2. Select "Program Year 2024" and "General Payments"
# 3. Download the CSV file manually and place it in ../data/ directory

print(f"CMS Data URL: {cms_data_url}")
print(f"\nNote: This dataset is approximately 3-4 GB.")
print(f"Download may take several minutes depending on your connection.")

In [None]:
# Create local data directory if it doesn't exist
local_data_dir = Path("../data")
local_data_dir.mkdir(exist_ok=True)

# Local CSV file path
local_csv_file = local_data_dir / "OP_DTL_GNRL_PGYR2024_P06302025_06162025.csv"

print(f"Local data directory: {local_data_dir.absolute()}")
print(f"Target CSV file: {local_csv_file.name}")

In [None]:
# Download CMS data if not already present
if local_csv_file.exists():
    print(f"CSV file already exists: {local_csv_file}")
    print(f"  File size: {local_csv_file.stat().st_size / (1024**3):.2f} GB")
else:
    print(f"Downloading CMS Open Payments data...")
    print(f"This may take 10-20 minutes depending on your connection.")
    
    try:
        # Download CSV file with progress indication
        response = requests.get(cms_data_url, stream=True)
        response.raise_for_status()
        
        total_size = int(response.headers.get('content-length', 0))
        print(f"Total download size: {total_size / (1024**3):.2f} GB")
        
        # Save CSV file directly
        with open(local_csv_file, 'wb') as f:
            downloaded = 0
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    if total_size > 0:
                        percent = (downloaded / total_size) * 100
                        print(f"\rProgress: {percent:.1f}%", end="")
        
        print(f"\nDownload complete: {local_csv_file}")
        print(f"  File size: {local_csv_file.stat().st_size / (1024**3):.2f} GB")
            
    except Exception as e:
        print(f"\nError downloading data: {e}")
        print(f"\nAlternative approach:")
        print(f"1. Visit: https://openpaymentsdata.cms.gov/datasets")
        print(f"2. Select 'Program Year 2024' and 'General Payments'")
        print(f"3. Download CSV and save to: {local_data_dir.absolute()}")

## 4. Upload Data to S3 upload

Upload the downloaded CMS data to S3 for datalake storage.

In [None]:
# Preview the data before upload
print("Loading sample of data for preview...")
df_sample = pd.read_csv(local_csv_file, nrows=5)

print(f"\nDataset Preview:")
print(f"  Columns: {len(df_sample.columns)}")
print(f"  Sample rows:")
display(df_sample.head())

print(f"\nColumn names:")
for i, col in enumerate(df_sample.columns, 1):
    print(f"  {i}. {col}")

In [None]:
# Upload raw CSV to S3
print(f"Uploading data to S3...")
print(f"  Source: {local_csv_file}")
print(f"  Destination: {s3_raw_path}/")

s3_raw_file_path = f"{s3_raw_path}/{local_csv_file.name}"

try:
    # Upload file with progress callback
    file_size = local_csv_file.stat().st_size
    
    def upload_progress(bytes_uploaded):
        percent = (bytes_uploaded / file_size) * 100
        print(f"\rUpload progress: {percent:.1f}%", end="")
    
    s3_client.upload_file(
        str(local_csv_file),
        bucket,
        f"{raw_data_prefix}/{local_csv_file.name}",
        Callback=upload_progress
    )
    
    print(f"\nUpload complete")
    print(f"  S3 URI: {s3_raw_file_path}")
    
    # Store the S3 file path
    %store s3_raw_file_path
    
except Exception as e:
    print(f"\nError uploading to S3: {e}")

In [None]:
# Verify upload
print("Verifying S3 upload...")

response = s3_client.list_objects_v2(
    Bucket=bucket,
    Prefix=raw_data_prefix
)

if 'Contents' in response:
    print(f"\nFiles in S3 bucket:")
    for obj in response['Contents']:
        size_gb = obj['Size'] / (1024**3)
        print(f"  {obj['Key']} ({size_gb:.2f} GB)")
else:
    print(f"\nNo files found in S3 bucket")

## 5. Create Athena Database

Create an Amazon Athena database for querying CMS data using SQL.

In [None]:
# Define Athena database name
database_name = "cms_open_payments"

# Set S3 staging directory for Athena queries
s3_athena_staging = f"s3://{bucket}/athena/staging"

print(f"Athena Configuration:")
print(f"  Database: {database_name}")
print(f"  Staging Directory: {s3_athena_staging}")

# Store for use in other notebooks
%store database_name
%store s3_athena_staging

In [None]:
# Create Athena connection
athena_conn = connect(
    region_name=region,
    s3_staging_dir=s3_athena_staging
)

print("Athena connection established")

In [None]:
# Create database
create_db_query = f"CREATE DATABASE IF NOT EXISTS {database_name}"

print(f"Creating Athena database...")
print(f"  Query: {create_db_query}")

try:
    result = pd.read_sql(create_db_query, athena_conn)
    print(f"Database created successfully")
except Exception as e:
    print(f"Error creating database: {e}")

In [None]:
# Verify database creation
show_db_query = "SHOW DATABASES"

print("Verifying database creation...")
databases = pd.read_sql(show_db_query, athena_conn)

print(f"\n Available Databases:")
display(databases)

if database_name in databases.values:
    print(f"\nDatabase '{database_name}' exists")
else:
    print(f"\nDatabase '{database_name}' not found")

## 6. Register Data with Athena

Create an external table in Athena to query the CSV data stored in S3.

In [None]:
# Define table name
table_name_csv = "general_payments_csv"

print(f"Table Configuration:")
print(f"  Database: {database_name}")
print(f"  Table: {table_name_csv}")
print(f"  Location: {s3_raw_path}/")

%store table_name_csv

In [None]:
# Get actual column names from the CSV
df_schema = pd.read_csv(local_csv_file, nrows=1)

# Create column definitions for Athena
# Map pandas dtypes to Athena types
def get_athena_type(dtype):
    if pd.api.types.is_integer_dtype(dtype):
        return 'BIGINT'
    elif pd.api.types.is_float_dtype(dtype):
        return 'DOUBLE'
    elif pd.api.types.is_datetime64_any_dtype(dtype):
        return 'TIMESTAMP'
    else:
        return 'STRING'

# Create column definitions
columns_def = []
for col in df_schema.columns:
    # Clean column name for Athena (replace spaces and special chars)
    clean_col = col.replace(' ', '_').replace('(', '').replace(')', '').replace('-', '_')
    athena_type = get_athena_type(df_schema[col].dtype)
    columns_def.append(f"`{col}` {athena_type}")

columns_str = ',\n    '.join(columns_def)

print(f"Schema preview (first 10 columns):")
for i, col_def in enumerate(columns_def[:10], 1):
    print(f"  {i}. {col_def}")
print(f"  ... ({len(columns_def)} columns total)")

In [None]:
# Create external table for CSV data
create_table_query = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name_csv} (
    {columns_str}
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\\n'
STORED AS TEXTFILE
LOCATION '{s3_raw_path}/'
TBLPROPERTIES (
    'skip.header.line.count'='1',
    'serialization.null.format'=''
)
"""

print(f"Creating external table...")
print(f"\nQuery preview:")
print(create_table_query[:500] + "...")

try:
    result = pd.read_sql(create_table_query, athena_conn)
    print(f"\nTable '{table_name_csv}' created successfully")
except Exception as e:
    print(f"\nError creating table: {e}")

In [None]:
# Verify table creation
show_tables_query = f"SHOW TABLES IN {database_name}"

print("Verifying table creation...")
tables = pd.read_sql(show_tables_query, athena_conn)

print(f"\nTables in database '{database_name}':")
display(tables)

if table_name_csv in tables.values:
    print(f"\nTable '{table_name_csv}' exists")
else:
    print(f"\nTable '{table_name_csv}' not found")

In [None]:
# Test query - count rows
count_query = f"""
SELECT COUNT(*) as row_count
FROM {database_name}.{table_name_csv}
"""

print("Testing table access...")
print(f"Query: {count_query}")

try:
    result = pd.read_sql(count_query, athena_conn)
    print(f"\nQuery successful")
    print(f"  Total rows: {result['row_count'][0]:,}")
except Exception as e:
    print(f"\nError querying table: {e}")

In [None]:
# Sample query - preview data
sample_query = f"""
SELECT *
FROM {database_name}.{table_name_csv}
LIMIT 5
"""

print("Fetching sample data...")

try:
    sample_data = pd.read_sql(sample_query, athena_conn)
    print(f"\nSample data retrieved")
    print(f"  Shape: {sample_data.shape}")
    display(sample_data.head())
except Exception as e:
    print(f"\nError fetching sample data: {e}")

## 7. Convert CSV to Parquet

Convert the CSV data to Parquet format for better performance and compression.

In [None]:
# Define Parquet table name
table_name_parquet = "general_payments_parquet"

print(f"Parquet Conversion Configuration:")
print(f"  Source Table: {database_name}.{table_name_csv}")
print(f"  Target Table: {database_name}.{table_name_parquet}")
print(f"  Target Location: {s3_parquet_path}/")

%store table_name_parquet

In [None]:
# Create CTAS (Create Table As Select) query to convert CSV to Parquet
create_parquet_query = f"""
CREATE TABLE {database_name}.{table_name_parquet}
WITH (
    format = 'PARQUET',
    parquet_compression = 'SNAPPY',
    external_location = '{s3_parquet_path}/',
    partitioned_by = ARRAY['program_year']
)
AS
SELECT 
    *,
    '2024' as program_year
FROM {database_name}.{table_name_csv}
"""

print("Converting CSV to Parquet format...")
print("Note: This operation may take 15-30 minutes for large datasets")
print(f"\nQuery:")
print(create_parquet_query)

try:
    # Execute conversion
    result = pd.read_sql(create_parquet_query, athena_conn)
    print(f"\nConversion complete")
    print(f"  Parquet table '{table_name_parquet}' created successfully")
except Exception as e:
    print(f"\nError during conversion: {e}")
    print(f"\nNote: If table already exists, drop it first:")
    print(f"  DROP TABLE IF EXISTS {database_name}.{table_name_parquet}")

In [None]:
# Verify Parquet table
count_parquet_query = f"""
SELECT COUNT(*) as row_count
FROM {database_name}.{table_name_parquet}
"""

print("Verifying Parquet table...")

try:
    result = pd.read_sql(count_parquet_query, athena_conn)
    print(f"\nParquet table verified")
    print(f"  Total rows: {result['row_count'][0]:,}")
except Exception as e:
    print(f"\nError verifying Parquet table: {e}")

In [None]:
# Compare file sizes
print("Comparing CSV vs Parquet storage:")

# Get CSV size
csv_objects = s3_client.list_objects_v2(
    Bucket=bucket,
    Prefix=raw_data_prefix
)

csv_size = sum(obj['Size'] for obj in csv_objects.get('Contents', []))

# Get Parquet size
parquet_objects = s3_client.list_objects_v2(
    Bucket=bucket,
    Prefix=parquet_data_prefix
)

parquet_size = sum(obj['Size'] for obj in parquet_objects.get('Contents', []))

print(f"\nStorage Comparison:")
print(f"  CSV Size: {csv_size / (1024**3):.2f} GB")
print(f"  Parquet Size: {parquet_size / (1024**3):.2f} GB")
if parquet_size > 0:
    compression_ratio = (1 - parquet_size/csv_size) * 100
    print(f"  Compression: {compression_ratio:.1f}% reduction")
    print(f"  Space Saved: {(csv_size - parquet_size) / (1024**3):.2f} GB")

## 8. Query Data with AWS Data Wrangler

Use AWS Data Wrangler for more efficient data querying and analysis.

In [None]:
# Query using AWS Data Wrangler
sample_query_wr = f"""
SELECT 
    COUNT(*) as total_payments,
    SUM(CAST(Total_Amount_of_Payment_USDollars AS DOUBLE)) as total_amount,
    AVG(CAST(Total_Amount_of_Payment_USDollars AS DOUBLE)) as avg_amount,
    MIN(CAST(Total_Amount_of_Payment_USDollars AS DOUBLE)) as min_amount,
    MAX(CAST(Total_Amount_of_Payment_USDollars AS DOUBLE)) as max_amount
FROM {database_name}.{table_name_parquet}
"""

print("Querying payment statistics with AWS Data Wrangler...")
print(f"\nQuery: {sample_query_wr}")

try:
    df_stats = wr.athena.read_sql_query(
        sql=sample_query_wr,
        database=database_name,
        ctas_approach=False
    )
    
    print(f"\nQuery successful")
    print(f"\nPayment Statistics:")
    display(df_stats)
    
except Exception as e:
    print(f"\nError querying data: {e}")

In [None]:
# Sample data by recipient type
recipient_query = f"""
SELECT 
    Covered_Recipient_Type,
    COUNT(*) as payment_count,
    SUM(CAST(Total_Amount_of_Payment_USDollars AS DOUBLE)) as total_amount
FROM {database_name}.{table_name_parquet}
GROUP BY Covered_Recipient_Type
ORDER BY total_amount DESC
"""

print("Analyzing payments by recipient type...")

try:
    df_recipients = wr.athena.read_sql_query(
        sql=recipient_query,
        database=database_name,
        ctas_approach=False
    )
    
    print(f"\nQuery successful")
    print(f"\nPayments by Recipient Type:")
    display(df_recipients)
    
except Exception as e:
    print(f"\nError querying data: {e}")

## 9. Validation & Verification

Perform final validation checks on the datalake setup.

In [None]:
# Comprehensive validation
print("=" * 70)
print("DATALAKE SETUP VALIDATION")
print("=" * 70)

validation_passed = True

# Check 1: S3 Buckets
print("\n1. S3 Storage:")
try:
    for prefix in [raw_data_prefix, parquet_data_prefix]:
        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1)
        if 'Contents' in response:
            print(f"   [OK] {prefix}/")
        else:
            print(f"   [FAIL] {prefix}/ (empty or missing)")
            validation_passed = False
except Exception as e:
    print(f"   [FAIL] Error checking S3: {e}")
    validation_passed = False

# Check 2: Athena Database
print("\n2. Athena Database:")
try:
    databases = pd.read_sql("SHOW DATABASES", athena_conn)
    if database_name in databases.values:
        print(f"   [OK] Database '{database_name}' exists")
    else:
        print(f"   [FAIL] Database '{database_name}' not found")
        validation_passed = False
except Exception as e:
    print(f"   [FAIL] Error checking database: {e}")
    validation_passed = False

# Check 3: Tables
print("\n3. Athena Tables:")
try:
    tables = pd.read_sql(f"SHOW TABLES IN {database_name}", athena_conn)
    for table in [table_name_csv, table_name_parquet]:
        if table in tables.values:
            print(f"   [OK] Table '{table}' exists")
        else:
            print(f"   [FAIL] Table '{table}' not found")
            validation_passed = False
except Exception as e:
    print(f"   [FAIL] Error checking tables: {e}")
    validation_passed = False

# Check 4: Data Accessibility
print("\n4. Data Accessibility:")
try:
    count_result = pd.read_sql(
        f"SELECT COUNT(*) as cnt FROM {database_name}.{table_name_parquet}",
        athena_conn
    )
    row_count = count_result['cnt'][0]
    print(f"   [OK] Query successful ({row_count:,} rows)")
except Exception as e:
    print(f"   [FAIL] Error querying data: {e}")
    validation_passed = False

# Final result
print("\n" + "=" * 70)
if validation_passed:
    print("ALL VALIDATION CHECKS PASSED")
    print("Datalake setup complete and operational")
    setup_datalake_passed = True
else:
    print("SOME VALIDATION CHECKS FAILED")
    print("Please review the errors above and re-run failed steps")
    setup_datalake_passed = False

print("=" * 70)

# Store validation result
%store setup_datalake_passed