# 5 â€“ Convert CSV Dataset to Partitioned Parquet Format

This notebook converts the registered CSV dataset into partitioned Parquet format
using an Athena CTAS (Create Table As Select) query.

Parquet improves query performance and reduces storage costs.


In [24]:
pd.read_sql(
    f"DROP TABLE IF EXISTS {database_name}.{parquet_table}",
    conn
)


  pd.read_sql(


## Import Required Libraries and Initialize AWS Session


In [25]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from pyathena import connect
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
region = boto3.Session().region_name
role = get_execution_role()

print("Bucket:", bucket)
print("Region:", region)


Bucket: sagemaker-us-east-1-083422367993
Region: us-east-1


## Define Database, Table Names, and S3 Output Location


In [26]:
database_name = "ghcn_extreme_precip_db"
csv_table = "extreme_precip_csv"
parquet_table = "extreme_precip_parquet"

project_prefix = "ghcn-extreme"
parquet_s3_location = f"s3://{bucket}/{project_prefix}/parquet/"

print("Parquet output location:", parquet_s3_location)


Parquet output location: s3://sagemaker-us-east-1-083422367993/ghcn-extreme/parquet/


## Establish Athena Connection


In [27]:
athena_staging_dir = f"s3://{bucket}/athena/staging/"

conn = connect(
    region_name=region,
    s3_staging_dir=athena_staging_dir
)

print("Connected to Athena.")


Connected to Athena.


## Create Partitioned Parquet Table Using CTAS


In [28]:
ctas_query = f"""
CREATE TABLE {database_name}.{parquet_table}
WITH (
    format = 'PARQUET',
    external_location = '{parquet_s3_location}',
    partitioned_by = ARRAY['year']
) AS
SELECT
    station_id,
    date,
    TMAX,
    TMIN,
    prcp_lag_1,
    prcp_roll_7,
    extreme_precip_tomorrow,
    month,
    year
FROM {database_name}.{csv_table};
"""

pd.read_sql(ctas_query, conn)

print("Parquet table created.")


  pd.read_sql(ctas_query, conn)


Parquet table created.


## Repair Partitions


In [29]:
repair_query = f"MSCK REPAIR TABLE {database_name}.{parquet_table};"
pd.read_sql(repair_query, conn)

print("Partitions repaired.")


  pd.read_sql(repair_query, conn)


Partitions repaired.


## Verify Parquet Table Registration


In [30]:
pd.read_sql(f"SHOW TABLES IN {database_name}", conn)


  pd.read_sql(f"SHOW TABLES IN {database_name}", conn)


Unnamed: 0,tab_name
0,extreme_precip_csv
1,extreme_precip_parquet


## Verify Partition Structure


In [31]:
pd.read_sql(f"SHOW PARTITIONS {database_name}.{parquet_table}", conn).head()


  pd.read_sql(f"SHOW PARTITIONS {database_name}.{parquet_table}", conn).head()


Unnamed: 0,partition
0,year=2010
1,year=2022
2,year=2014
3,year=2021
4,year=2011


## Validate Parquet Table Query


In [32]:
pd.read_sql(
    f"SELECT * FROM {database_name}.{parquet_table} LIMIT 10",
    conn
)


  pd.read_sql(


Unnamed: 0,station_id,date,tmax,tmin,prcp_lag_1,prcp_roll_7,extreme_precip_tomorrow,month,year
0,USW00012921,2026-01-01,23.3,5.0,0.0,0.042857,0,1,2026
1,USW00012921,2026-01-02,31.7,15.0,0.0,0.042857,0,1,2026
2,USW00012921,2026-01-03,26.1,11.1,0.0,0.042857,0,1,2026
3,USW00012921,2023-01-01,25.6,8.3,0.0,0.0,0,1,2023
4,USW00012921,2026-01-04,21.7,8.9,0.0,0.042857,0,1,2026
5,USW00012921,2026-01-05,23.3,12.2,0.0,0.0,0,1,2026
6,USW00012921,2026-01-06,28.3,13.9,0.0,0.0,0,1,2026
7,USW00012921,2026-01-07,27.2,11.1,0.0,0.0,0,1,2026
8,USW00012921,2023-01-02,27.8,17.8,0.0,0.042857,0,1,2023
9,USW00012921,2023-01-03,26.1,11.7,0.3,0.042857,0,1,2023


## Confirm Parquet Dataset is Ready for Feature Store Integration

The dataset is now stored in optimized, partitioned Parquet format.
It is ready for integration with SageMaker Feature Store
and downstream model training pipelines.


In [33]:
pd.read_sql(
    f"SELECT COUNT(*) FROM {database_name}.{parquet_table}",
    conn
)


  pd.read_sql(


Unnamed: 0,_col0
0,36444
