In [1]:
# Setup AWS and SageMaker session
import boto3
import sagemaker
import pandas as pd
from pyathena import connect
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Initialize SageMaker session and environment variables
sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name
sm = boto3.client("sagemaker", region_name=region)

In [3]:
# Define S3 paths
s3_public_path = "s3://usd-team1-ads508/ld_data/"
s3_private_path = f"s3://{bucket}/ld_data"
print(f"Public path: {s3_public_path}")
# print(f"Private path: {s3_private_path}")

Public path: s3://usd-team1-ads508/ld_data/


In [4]:
# Store paths for later use
%store s3_public_path
%store s3_private_path

Stored 's3_public_path' (str)
Stored 's3_private_path' (str)


In [5]:
# Copy CSV files from public to private bucket
!aws s3 cp --recursive {s3_public_path} {s3_private_path}/ --include "*.csv" > /dev/null 2>&1
# !aws s3 cp --recursive {s3_public_path} {s3_private_path}/ --include "*.csv"

In [6]:
# --- Create Athena Database ---
database_name = "demo2"
s3_staging_dir = f"s3://{bucket}/athena/staging"

In [7]:
# Install PyAthena if needed
# !pip install --disable-pip-version-check -q PyAthena==2.1.0

In [8]:
# Connect to Athena
conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)

In [9]:
# Create database
create_db_statement = f"CREATE DATABASE IF NOT EXISTS {database_name}"
print(create_db_statement)
pd.read_sql(create_db_statement, conn)


CREATE DATABASE IF NOT EXISTS demo2


In [10]:
# Verify database creation
show_db_statement = "SHOW DATABASES"
df_show = pd.read_sql(show_db_statement, conn)
print(df_show.head(5))

  database_name
0       default
1         demo1
2         demo2


In [11]:
# Mark database creation as successful if database exists
ingest_create_athena_db_passed = database_name in df_show.values
print(f"Database creation successful: {ingest_create_athena_db_passed}")
%store ingest_create_athena_db_passed

Database creation successful: True
Stored 'ingest_create_athena_db_passed' (bool)


#### Key Variables

See link for variable descriptions: https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Dataset_Documentation/NHIS/2023/adult-summary.pdf


| Variable         | Description                                        |
|-----------------|----------------------------------------------------|
| EVERCOVD_A      | Ever had COVID-19                                  |
| SHTCVD191_A     | COVID-19 vaccination                                |
| EMPDYSMSS2_A    | Days missed work (2019-2020), past 12 months (top-coded)      |
| EMPDYSMSS3_A    | Days missed work (2021-2023), past 12 months (top-coded)      |
| HICOV_A         | Have health insurance                             |
| EMDINDSTN1_A    | Detailed 2-digit recode for sample adult's industry |
| SEX_A           | Sex of Sample Adult                               |
| AGEP_A          | Age of SA (top coded)                            |
| EDUCP_A         | Educational level of sample adult                |
| REGION          | Household region                                 |


In [12]:
# --- Create Athena Table ---
if not ingest_create_athena_db_passed:
    print("[ERROR] You need to create the Athena database first")
else:
    # Define table
    table_name = "ld_data"
    
    # Print
    print_statement = f"""CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name}(
        EVERCOVD_A int,
        SHTCVD191_A int,
        EMPDYSMSS2_A int,
        EMPDYSMSS3_A int,
        HICOV_A int,
        EMDINDSTN1_A int,
        SEX_A int,
        AGEP_A int,
        EDUCP_A int,
        REGION int
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION 's3://[REDACTED_BUCKET]/[REDACTED_PATH]'
    TBLPROPERTIES ('skip.header.line.count'='1')"""
    
    # Statement to execute
    create_table_statement = f"""CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name}(
        EVERCOVD_A int,
        SHTCVD191_A int,
        EMPDYSMSS2_A int,
        EMPDYSMSS3_A int,
        HICOV_A int,
        EMDINDSTN1_A int,
        SEX_A int,
        AGEP_A int,
        EDUCP_A int,
        REGION int
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{s3_private_path}'
    TBLPROPERTIES ('skip.header.line.count'='1')"""
    
    # Print
    print(print_statement)
    
    # Execute statement
    pd.read_sql(create_table_statement, conn)
    
    # Verify table creation
    show_tables_statement = f"SHOW TABLES in {database_name}"
    df_show = pd.read_sql(show_tables_statement, conn)
    print(df_show.head(5))
    
    # Mark table creation as successful if table exists
    ingest_create_athena_table_passed = table_name in df_show.values
    print(f"Table creation successful: {ingest_create_athena_table_passed}")
    %store ingest_create_athena_table_passed

CREATE EXTERNAL TABLE IF NOT EXISTS demo2.ld_data(
        EVERCOVD_A int,
        SHTCVD191_A int,
        EMPDYSMSS2_A int,
        EMPDYSMSS3_A int,
        HICOV_A int,
        EMDINDSTN1_A int,
        SEX_A int,
        AGEP_A int,
        EDUCP_A int,
        REGION int
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' LOCATION 's3://[REDACTED_BUCKET]/[REDACTED_PATH]'
    TBLPROPERTIES ('skip.header.line.count'='1')
  tab_name
0  ld_data
Table creation successful: True
Stored 'ingest_create_athena_table_passed' (bool)


In [13]:
# Query to count total records in the table
count_query = f"""
SELECT COUNT(*) as total_records 
FROM {database_name}.{table_name}
"""
print("Query to count total records:")
print(count_query)

# Execute the query
df_count = pd.read_sql(count_query, conn)

# Display the result
print(f"Total records in {table_name}: {df_count['total_records'].iloc[0]}")

Query to count total records:

SELECT COUNT(*) as total_records 
FROM demo2.ld_data

Total records in ld_data: 150220
