# CSV to Athena

## Dependencies

In [137]:
import boto3
from botocore.exceptions import ClientError
import pandas as pd
import io
from dotenv import load_dotenv
import os
import time

## Functions

##### CSV to parquet partitions

In [138]:
def create_boto3_session():
    # Load .env file variables into the environment
    load_dotenv()
    aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
    aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
    aws_region = os.getenv("AWS_DEFAULT_REGION")

    # Safety check
    if not aws_access_key or not aws_secret_key:
        raise ValueError("Missing AWS credentials in .env file.")

    # Create boto3 session
    boto3_session = boto3.Session(
        aws_access_key_id=aws_access_key,
        aws_secret_access_key=aws_secret_key,
        region_name=aws_region
    )

    return boto3_session, aws_region

In [139]:
# Pulls the specified file from s3 and loads into pandas df
def connect_to_s3(boto3_session): 
    # Create an S3 client
    s3_client = boto3_session.client("s3")

    # Create S3 resource
    s3_resource = boto3_session.resource("s3")

    return s3_client, s3_resource

In [140]:
def bucket_exists(bucket_name: str) -> bool:
    s3 = boto3.client("s3")
    try:
        s3.head_bucket(Bucket=bucket_name)
        return True
    except ClientError as e:
        error_code = int(e.response["Error"]["Code"])
        if error_code == 404:
            return False  # Bucket does not exist
        elif error_code == 403:
            return True   # Bucket exists but is not accessible (you don't own it)
        else:
            return False  # Other error (e.g., network)

In [141]:
def create_bucket(s3_client, bucket_name, region):
    try:
        s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': region}
        )
        print(f"✅ Created bucket: {bucket_name}")
        
    except ClientError as e:
        if e.response["Error"]["Code"] == "BucketAlreadyOwnedByYou":
            print(f"✅ Bucket already exists and owned by you: {bucket_name}")
        else:
            raise

In [142]:
def upload_file_to_s3(s3_client, local_csv_file_path, bucket_name, file_name):
    try:
        s3_client.upload_file(local_csv_file_path, bucket_name, file_name)
        print(f"✅ Uploaded {local_csv_file_path} to s3://{bucket_name}/{file_name}")
    except ClientError as e:
        print(f"❌ Upload failed: {e}")
        raise

In [143]:
# Pulls the specified file from s3 and loads into pandas df
def s3_csv_to_df(s3_client, s3_bucket, file_name): 
    # Read CSV from S3 int pandas
    obj = s3_client.get_object(Bucket=s3_bucket, Key=file_name)
    df = pd.read_csv(obj["Body"])

    return df

In [144]:
# Handle data types for athena
def convert_dtypes_for_athena(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    
    for col in df.columns:
        col_lower = col.lower()
        dtype = df[col].dtype
        
        if dtype in ["int64", "float64"]:
            df[col] = df[col].astype("Int64")  # Nullable integer
        elif "date" in col_lower:
            df[col] = df[col].astype("object")  # Will convert to datetime64[ns]
        elif dtype == 'object':
            df[col] = df[col].astype("string")  # Use StringDtype
            
    return df

In [145]:
# Creates parguet files from a df with specific partitioning and destination location
def df_to_s3_parquet(s3_resource, df, s3_bucket, partition_by, partitioned_folder, local_parquet_file_path):
    first = True  # Flag to track the first file

    # Loop through each unique year
    for year in df[partition_by].unique():
        # Filter data for that year
        df_year = df[df[partition_by] == year]

        df_year = df_year.drop(columns=[partition_by])

        # Convert to Parquet using BytesIO (correct for binary files)
        # Dont use StringIO(), which is meant for handling text data (like CSV)
        buffer = io.BytesIO()
        df_year.to_parquet(buffer, index=False, engine="pyarrow")

        # Move buffer position to the beginning
        buffer.seek(0)

        # Define S3 key (file path)
        parquet_key = f"{partitioned_folder}season={year}/data.parquet"

        # Upload to S3
        s3_resource.Object(s3_bucket, parquet_key).put(Body=buffer)

        print(f"Uploaded {parquet_key} to S3 ✅")

                # Also upload the first file to the analysis folder
        if first:
            buffer.seek(0)  # Reset buffer for reuse

            with open(local_parquet_file_path, "wb") as f:
                f.write(buffer.read())
            print(f"Also saved first Parquet file locally to {local_parquet_file_path} ✅")

            first = False

##### Create Athena table

In [146]:
def query_athena(query, athena, athena_database, athena_output_location):
    response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={'Database': athena_database},
        ResultConfiguration={'OutputLocation': athena_output_location}
    )

    # Wait for the query to finish
    query_execution_id = response['QueryExecutionId']
    while True:
        status = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            print(f"Query finished with status: {status}")
            break
        time.sleep(1)

    if status != 'SUCCEEDED':
        raise Exception("Failed to query database.")
    
    return query_execution_id

In [147]:
def create_df_from_athena_query(query_execution_id, athena_client):
    
    # Get Query Results
    results = athena_client.get_query_results(QueryExecutionId=query_execution_id)

    columns = [col["Label"] for col in results["ResultSet"]["ResultSetMetadata"]["ColumnInfo"]]

    # Extract Rows
    rows = []
    for row in results["ResultSet"]["Rows"][1:]:  # Skip header row
        extracted_row = [col.get("VarCharValue", None) for col in row["Data"]]  # Extract actual values
        rows.append(extracted_row)

    # Convert to Pandas DataFrame
    df = pd.DataFrame(rows, columns=columns)
    df = df.fillna("NA")
    df.columns = df.columns.str.replace('_', ' ').str.title()

    return df

In [148]:
def generate_table_schema_sql(local_parquet_file_path):

    df = pd.read_parquet(local_parquet_file_path)

    dtype_mapping = {
        'Int64': 'INT',
        'object': 'VARCHAR(100)',
        'string': 'VARCHAR(100)',
    }

    columns_sql = []
    for col in df.columns:
        dtype = str(df[col].dtype)
        athena_type = dtype_mapping.get(dtype, 'VARCHAR(100)')  # default fallback
        columns_sql.append(f"{col} {athena_type}")

    # Join into a single string for the CREATE TABLE query
    schema_sql_temp = ",\n    ".join(columns_sql)
    add_start_tab = '    '
    add_end_line_break = '\n'
    table_schema_sql = add_start_tab + schema_sql_temp + add_end_line_break

    return table_schema_sql

## Custom variables

In [151]:
## CSV to parquet partitions
s3_bucket = "chalkjuice-backend"                              # Csv bucket
file_name = "nfl_games_all.csv"                               # Csv file name
data_folder_name = 'data'

partition_by = "season"                                       # Define column to partition by 
partitioned_folder = "nfl_games_all_partitions/"              # Define new S3 folder for new partitions

In [152]:
## Create Athena Table From Parquet
athena_database = 'nfl'
athena_table = 'nfl_games_all'
athena_output_folder = 'nfl_games_all_athena_parquet/'

## CSV to parquet partitions

### Connect to S3 with boto3

In [153]:
boto3_session, aws_region = create_boto3_session()
s3_client, s3_resource = connect_to_s3(boto3_session)

### If csv in s3 => continue. If in the local repo => create new bucket and upload the local csv file to s3.

In [154]:
if bucket_exists("chalkjuice-backend"):
    print('Bucket Exists')
    pass
else:
    # Create a new s3 bucket
    create_bucket(s3_client, s3_bucket, aws_region)

    # Upload the CSV to s3
    local_csv_file_path = os.path.join("..", data_folder_name, file_name)
    upload_file_to_s3(s3_client, local_csv_file_path, s3_bucket, file_name)

✅ Created bucket: chalkjuice-backend
✅ Uploaded ..\data\nfl_games_all.csv to s3://chalkjuice-backend/nfl_games_all.csv


### Return the s3 csv as a pandas df

In [155]:
df = s3_csv_to_df(s3_client, s3_bucket, file_name)

### Configure column datatypes for Athena - its fussy.

In [156]:
df = convert_dtypes_for_athena(df)
#df.dtypes

### Create partitioned Parquet files

In [157]:
local_parquet_file_path = os.path.join("..", data_folder_name, f"{file_name[:-4]}.parquet") # OPTIONAL save one parquet file locally to autogenerate athena table schema
df_to_s3_parquet(s3_resource, df, s3_bucket, partition_by, partitioned_folder, local_parquet_file_path)

Uploaded nfl_games_all_partitions/season=1967/data.parquet to S3 ✅
Also saved first Parquet file locally to ..\data\nfl_games_all.parquet ✅
Uploaded nfl_games_all_partitions/season=1968/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1969/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1970/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1971/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1972/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1973/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1974/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1975/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1976/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1977/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1978/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1979/data.parquet to S3 ✅
Uploaded nfl_games_all_partitions/season=1980/data.parqu

## Create Athena Table From Parquet

### Connect to Athena

In [158]:
athena_output_location = f's3://{s3_bucket}/{athena_output_folder}'
athena_client = boto3_session.client('athena')

### Create database if not exist

In [159]:
create_db_query = f"CREATE DATABASE IF NOT EXISTS {athena_database}"
query_athena(create_db_query, athena_client, athena_database, athena_output_location)

Query finished with status: SUCCEEDED


'56984afd-ba5f-4970-b7a0-1eb576d5dff1'

### Remove table if exists

In [160]:
remove_table_query = f'DROP TABLE IF EXISTS {athena_database}.{athena_table};'
query_athena(remove_table_query, athena_client, athena_database, athena_output_location)

Query finished with status: SUCCEEDED


'27c5f3c0-dc1b-45b7-9d32-90af57a83c6d'

### Create Table

In [161]:
table_schema_sql = generate_table_schema_sql(local_parquet_file_path)
create_and_fill_table_query = f"""
    CREATE EXTERNAL TABLE IF NOT EXISTS {athena_database}.{athena_table} (
    {table_schema_sql}
    )
    PARTITIONED BY ({partition_by} INT)
    STORED AS PARQUET
    LOCATION 's3://{s3_bucket}/{partitioned_folder}'
    TBLPROPERTIES (
        'parquet.compression'='SNAPPY',
        'projection.enabled'='true',
        'projection.{partition_by}.type'='integer',
        'projection.{partition_by}.range'='1967,2023',
        'storage.location.template'='s3://{s3_bucket}/{partitioned_folder}{partition_by}=${{{partition_by}}}/'
    );
"""
query_athena(create_and_fill_table_query, athena_client, athena_database, athena_output_location)

Query finished with status: SUCCEEDED


'32fccf62-4820-4f97-a6e7-6c6e3a16e347'

### Load data into the table and map the partitions

In [162]:
map_partitions_query = f'MSCK REPAIR TABLE {athena_database}.{athena_table};'
query_athena(map_partitions_query, athena_client, athena_database, athena_output_location)

Query finished with status: SUCCEEDED


'e446c3d0-9886-46d2-9bf6-8ed682ce3416'

# Query Athena

In [163]:
team_name = "MIN"
year1, year2, year3 = 2021, 2022, 2023  # Adjust years as needed
test_athena_connection_query = f'''
    SELECT * FROM "{athena_database}"."{athena_table}" 
    WHERE season IN ({year1}, {year2}, {year3});
'''

In [167]:
query_execution_id = query_athena('SELECT COUNT(*) FROM "nfl"."nfl_games_all";', athena_client, athena_database, athena_output_location)
df = create_df_from_athena_query(query_execution_id, athena_client)
df.head(10)

Query finished with status: SUCCEEDED


Unnamed: 0,Col0
0,26206
