# Create Athena Tables for Flight Data

**AAI-540 Group 1 - Flight Delay Prediction Project**

## Objective
Set up AWS Athena to catalog and query our flight delay dataset:
1. Create Athena database
2. Create external tables for flights, airlines, and airports
3. Verify tables with test queries

## Why Athena?
- Query S3 data directly without loading into a database
- Standard SQL interface
- Serverless and cost-effective
- Integrates with AWS Glue Data Catalog

---

## 1. Setup Environment

Import required libraries and load project configuration.

In [1]:
import sys
import boto3
import pandas as pd
from pathlib import Path
from datetime import datetime

# Add project root to Python path
project_root = Path.cwd().parent.parent
sys.path.insert(0, str(project_root))

# Import project configuration
from config import settings

# Display configuration
settings.print_config()

print(f"\nNotebook executed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
AAI-540 Group 1 Project Configuration
Region: us-east-1
S3 Bucket: sagemaker-us-east-1-786869526001
Project Prefix: aai540-group1/
Athena Database: aai540_group1_db

S3 Base URI: s3://sagemaker-us-east-1-786869526001/aai540-group1/

Notebook executed at: 2026-01-23 08:03:05


## 2. Initialize Athena Client

Set up boto3 Athena client and verify S3 data location.

In [14]:
# Initialize AWS clients
athena_client = boto3.client('athena', region_name=settings.REGION)
s3_client = boto3.client('s3', region_name=settings.REGION)

# Get configuration values
bucket_name = settings.DEFAULT_BUCKET
database_name = settings.ATHENA_DATABASE
raw_data_uri = settings.S3_PATHS['raw_data']
athena_staging_uri = settings.S3_PATHS['athena_staging']

# Define table locations (directories, not files)
tables_base_uri = f"s3://{bucket_name}/aai540-group1/data/tables"

print("Athena Configuration:")
print("=" * 70)
print(f"Database Name: {database_name}")
print(f"Raw Data Location: {raw_data_uri}")
print(f"Tables Location: {tables_base_uri}/")
print(f"Athena Staging: {athena_staging_uri}")
print("=" * 70)

# Verify S3 raw data exists
print(f"\nVerifying S3 raw data location...")
s3_prefix = raw_data_uri.replace(f's3://{bucket_name}/', '')

response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=s3_prefix, MaxKeys=5)

if 'Contents' in response:
    print(f"✓ Found {len(response['Contents'])} files in {raw_data_uri}")
    for obj in response['Contents']:
        print(f"  - {obj['Key'].split('/')[-1]}")
else:
    print(f"✗ No files found in {raw_data_uri}")
    print("⚠ Please run 01_setup_s3_and_ingest.ipynb first!")

# Copy CSV files to table-specific directories (Athena works better with directories)
print(f"\nOrganizing data into table directories...")
file_table_map = {
    'airlines.csv': 'airlines',
    'airports.csv': 'airports', 
    'flights.csv': 'flights'
}

for file_name, table_name in file_table_map.items():
    source_key = f"aai540-group1/data/raw/{file_name}"
    dest_key = f"aai540-group1/data/tables/{table_name}/{file_name}"
    
    # Check if already copied
    try:
        s3_client.head_object(Bucket=bucket_name, Key=dest_key)
        print(f"  ✓ {table_name}/ already exists")
    except:
        print(f"  Copying {file_name} -> tables/{table_name}/")
        s3_client.copy_object(
            Bucket=bucket_name,
            CopySource={'Bucket': bucket_name, 'Key': source_key},
            Key=dest_key
        )
        print(f"  ✓ Created tables/{table_name}/")

print(f"\n✓ Data organized for Athena tables!")

Athena Configuration:
Database Name: aai540_group1_db
Raw Data Location: s3://sagemaker-us-east-1-786869526001/aai540-group1/data/raw/
Tables Location: s3://sagemaker-us-east-1-786869526001/aai540-group1/data/tables/
Athena Staging: s3://sagemaker-us-east-1-786869526001/aai540-group1/athena/staging/

Verifying S3 raw data location...
✓ Found 3 files in s3://sagemaker-us-east-1-786869526001/aai540-group1/data/raw/
  - airlines.csv
  - airports.csv
  - flights.csv

Organizing data into table directories...
  Copying airlines.csv -> tables/airlines/
  ✓ Created tables/airlines/
  Copying airports.csv -> tables/airports/
  ✓ Created tables/airports/
  Copying flights.csv -> tables/flights/
  ✓ Created tables/flights/

✓ Data organized for Athena tables!


## 3. Create Athena Database

Create a database in the AWS Glue Data Catalog for organizing our tables.

In [15]:
import awswrangler as wr

print("Creating Athena Database...")
print("=" * 70)
print(f"Database: {database_name}")
print(f"Location: {settings.S3_PATHS['raw_data']}")
print("=" * 70)

try:
    # Create database using awswrangler (handles all the polling automatically)
    wr.catalog.create_database(
        name=database_name,
        exist_ok=True,  # Don't fail if database already exists
        description='AAI-540 Group 1 Flight Delay Prediction Database'
    )
    print(f"\n✓ Database '{database_name}' created successfully!")
    
except Exception as e:
    print(f"\n✗ Error creating database: {e}")

Creating Athena Database...
Database: aai540_group1_db
Location: s3://sagemaker-us-east-1-786869526001/aai540-group1/data/raw/

✓ Database 'aai540_group1_db' created successfully!


## 4. Create External Tables

Create external tables pointing to the CSV files in S3. We'll start with the simplest table first.

### 4.1 Create Airlines Table

2 columns: IATA_CODE (string), AIRLINE (string)

In [16]:
# Create airlines table using raw DDL with OpenCSVSerde
airlines_location = f"{tables_base_uri}/airlines/"

print("Creating airlines table...")
print(f"S3 Location: {airlines_location}")

# Drop existing table first
wr.catalog.delete_table_if_exists(database=database_name, table='airlines')

# Create table with OpenCSVSerde for proper CSV parsing
airlines_ddl = f"""
CREATE EXTERNAL TABLE {database_name}.airlines (
    IATA_CODE STRING,
    AIRLINE STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar' = '"'
)
STORED AS TEXTFILE
LOCATION '{airlines_location}'
TBLPROPERTIES ('skip.header.line.count' = '1')
"""

wr.athena.read_sql_query(
    sql=airlines_ddl,
    database=database_name,
    ctas_approach=False
)

print("✓ Airlines table created successfully!")

# Verify with sample
df_airlines_sample = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {database_name}.airlines LIMIT 5",
    database=database_name,
    ctas_approach=False
)
print(f"\nSample data ({len(df_airlines_sample)} rows):")
print(df_airlines_sample)

Creating airlines table...
S3 Location: s3://sagemaker-us-east-1-786869526001/aai540-group1/data/tables/airlines/
✓ Airlines table created successfully!

Sample data (5 rows):
  iata_code                 airline
0        UA   United Air Lines Inc.
1        AA  American Airlines Inc.
2        US         US Airways Inc.
3        F9  Frontier Airlines Inc.
4        B6         JetBlue Airways


### 4.2 Create Airports Table

7 columns: IATA_CODE, AIRPORT, CITY, STATE, COUNTRY, LATITUDE, LONGITUDE

In [17]:
# Create airports table using raw DDL with OpenCSVSerde
airports_location = f"{tables_base_uri}/airports/"

print("Creating airports table...")
print(f"S3 Location: {airports_location}")

# Drop existing table first
wr.catalog.delete_table_if_exists(database=database_name, table='airports')

# Create table with OpenCSVSerde for proper CSV parsing
airports_ddl = f"""
CREATE EXTERNAL TABLE {database_name}.airports (
    IATA_CODE STRING,
    AIRPORT STRING,
    CITY STRING,
    STATE STRING,
    COUNTRY STRING,
    LATITUDE STRING,
    LONGITUDE STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar' = '"'
)
STORED AS TEXTFILE
LOCATION '{airports_location}'
TBLPROPERTIES ('skip.header.line.count' = '1')
"""

wr.athena.read_sql_query(
    sql=airports_ddl,
    database=database_name,
    ctas_approach=False
)

print("✓ Airports table created successfully!")

# Verify with sample
df_airports_sample = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {database_name}.airports LIMIT 5",
    database=database_name,
    ctas_approach=False
)
print(f"\nSample data ({len(df_airports_sample)} rows):")
print(df_airports_sample)

Creating airports table...
S3 Location: s3://sagemaker-us-east-1-786869526001/aai540-group1/data/tables/airports/
✓ Airports table created successfully!

Sample data (5 rows):
  iata_code                              airport         city state country  \
0       ABE  Lehigh Valley International Airport    Allentown    PA     USA   
1       ABI             Abilene Regional Airport      Abilene    TX     USA   
2       ABQ    Albuquerque International Sunport  Albuquerque    NM     USA   
3       ABR            Aberdeen Regional Airport     Aberdeen    SD     USA   
4       ABY   Southwest Georgia Regional Airport       Albany    GA     USA   

   latitude   longitude  
0  40.65236   -75.44040  
1  32.41132   -99.68190  
2  35.04022  -106.60919  
3  45.44906   -98.42183  
4  31.53552   -84.19447  


### 4.3 Create Flights Table

The main table with ~5.8M rows and 31 columns.

In [19]:
# Create flights table using raw DDL with OpenCSVSerde
flights_location = f"{tables_base_uri}/flights/"

print("Creating flights table...")
print(f"S3 Location: {flights_location}")

# Drop existing table first
wr.catalog.delete_table_if_exists(database=database_name, table='flights')

# Create table with OpenCSVSerde for proper CSV parsing
# Note: OpenCSVSerde reads all columns as STRING - we'll cast in queries as needed
flights_ddl = f"""
CREATE EXTERNAL TABLE {database_name}.flights (
    YEAR STRING,
    MONTH STRING,
    DAY STRING,
    DAY_OF_WEEK STRING,
    AIRLINE STRING,
    FLIGHT_NUMBER STRING,
    TAIL_NUMBER STRING,
    ORIGIN_AIRPORT STRING,
    DESTINATION_AIRPORT STRING,
    SCHEDULED_DEPARTURE STRING,
    DEPARTURE_TIME STRING,
    DEPARTURE_DELAY STRING,
    TAXI_OUT STRING,
    WHEELS_OFF STRING,
    SCHEDULED_TIME STRING,
    ELAPSED_TIME STRING,
    AIR_TIME STRING,
    DISTANCE STRING,
    WHEELS_ON STRING,
    TAXI_IN STRING,
    SCHEDULED_ARRIVAL STRING,
    ARRIVAL_TIME STRING,
    ARRIVAL_DELAY STRING,
    DIVERTED STRING,
    CANCELLED STRING,
    CANCELLATION_REASON STRING,
    AIR_SYSTEM_DELAY STRING,
    SECURITY_DELAY STRING,
    AIRLINE_DELAY STRING,
    LATE_AIRCRAFT_DELAY STRING,
    WEATHER_DELAY STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
    'separatorChar' = ',',
    'quoteChar' = '"'
)
STORED AS TEXTFILE
LOCATION '{flights_location}'
TBLPROPERTIES ('skip.header.line.count' = '1')
"""

print("⚠ Creating table (may take a few seconds)...")

wr.athena.read_sql_query(
    sql=flights_ddl,
    database=database_name,
    ctas_approach=False
)

print("✓ Flights table created successfully!")

# Verify with sample (Athena returns lowercase column names)
df_flights_sample = wr.athena.read_sql_query(
    sql=f"SELECT * FROM {database_name}.flights LIMIT 5",
    database=database_name,
    ctas_approach=False
)
print(f"\nSample data ({len(df_flights_sample)} rows):")
print(df_flights_sample[['year', 'month', 'day', 'airline', 'origin_airport', 'destination_airport', 'departure_delay']])

Creating flights table...
S3 Location: s3://sagemaker-us-east-1-786869526001/aai540-group1/data/tables/flights/
⚠ Creating table (may take a few seconds)...
✓ Flights table created successfully!

Sample data (5 rows):
   year month day airline origin_airport destination_airport departure_delay
0  2015     1   1      AS            ANC                 SEA             -11
1  2015     1   1      AA            LAX                 PBI              -8
2  2015     1   1      US            SFO                 CLT              -2
3  2015     1   1      AA            LAX                 MIA              -5
4  2015     1   1      AS            SEA                 ANC              -1


## 5. Verify Tables with Test Queries

Run sample queries to verify all tables are accessible and queryable via Athena.

In [20]:
# Test query: Count rows in each table
print("Verifying table row counts...")
print("=" * 70)

# Airlines
df_airlines_count = wr.athena.read_sql_query(
    sql=f"SELECT COUNT(*) as total FROM {database_name}.airlines",
    database=database_name,
    ctas_approach=False
)
print(f"✓ Airlines table: {df_airlines_count['total'][0]} rows")

# Airports  
df_airports_count = wr.athena.read_sql_query(
    sql=f"SELECT COUNT(*) as total FROM {database_name}.airports",
    database=database_name,
    ctas_approach=False
)
print(f"✓ Airports table: {df_airports_count['total'][0]} rows")

# Flights (full count - may take a few seconds)
print("\n⚠ Counting flights rows (may take 10-15 seconds)...")
df_flights_count = wr.athena.read_sql_query(
    sql=f"SELECT COUNT(*) as total FROM {database_name}.flights",
    database=database_name,
    ctas_approach=False
)
print(f"✓ Flights table: {df_flights_count['total'][0]:,} rows")

print("\n" + "=" * 70)
print("✓ All tables created and queryable via Athena!")
print("\nTask 2 Complete: Athena tables are set up for cataloging and querying data.")

Verifying table row counts...
✓ Airlines table: 14 rows
✓ Airports table: 322 rows

⚠ Counting flights rows (may take 10-15 seconds)...
✓ Flights table: 5,819,079 rows

✓ All tables created and queryable via Athena!

Task 2 Complete: Athena tables are set up for cataloging and querying data.
