# AAI-540 Group 6 Final Project

Authors: Alden Caterio, Gary Takahashi, Paul Parks

In [2]:
!pip install --disable-pip-version-check -q PyAthena
!pip install --disable-pip-version-check -q awswrangler

[0m

# Imports

In [19]:
import os
import pandas as pd
import numpy as np
import boto3
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker import get_execution_role
from time import gmtime, strftime, time
from botocore.client import ClientError
from pyathena import connect
import pandas as pd
import boto3
import sagemaker
import awswrangler as wr
from sklearn.model_selection import train_test_split

## AWS S3 and Athena Setup

### Collect a raw data set and store it in an S3 Datalake.

In [4]:
session = boto3.session.Session()
region = session.region_name
sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

s3 = boto3.Session().client(service_name="s3", region_name=region)

print("Default bucket: {}".format(bucket))

Default bucket: sagemaker-us-east-1-692501163596


In [5]:
response = None
try:
    response = s3.head_bucket(Bucket=bucket)
    print(response)
    setup_s3_bucket_passed = True
except ClientError as e:
    print("[ERROR] Cannot find bucket {} in {} due to {}.".format(bucket, response, e))

{'ResponseMetadata': {'RequestId': 'P3K3XQNJ0YSS5N34', 'HostId': 'qcBc3G5zGV/Hpr15OmrliMjare5+vKSP2GpGtGyu9bqalK+Ti35mMNnis6yu3nfqSoThzND0cfc=', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amz-id-2': 'qcBc3G5zGV/Hpr15OmrliMjare5+vKSP2GpGtGyu9bqalK+Ti35mMNnis6yu3nfqSoThzND0cfc=', 'x-amz-request-id': 'P3K3XQNJ0YSS5N34', 'date': 'Thu, 19 Sep 2024 00:43:33 GMT', 'x-amz-bucket-region': 'us-east-1', 'x-amz-access-point-alias': 'false', 'content-type': 'application/xml', 'server': 'AmazonS3'}, 'RetryAttempts': 0}, 'BucketRegion': 'us-east-1', 'AccessPointAlias': False}


In [6]:
filename = 'CEAS_08.csv'
local_csv_path = './dataset/' + filename

parquet_filename = filename.replace('.csv', '.parquet')  # Parquet file name
local_parquet_path = os.path.join('./dataset/', parquet_filename)

df = pd.read_csv(local_csv_path)
# Convert the CSV to Parquet
df['urls'] = df['urls'].astype(str)
df.to_parquet(local_parquet_path, engine='pyarrow')

folder_in_s3 = 'Dataset/'
s3_destination_path = f's3://{bucket}/{folder_in_s3}{parquet_filename}'
s3_destination_dir = f's3://{bucket}/{folder_in_s3}'

s3 = boto3.client('s3')

bucket_name = bucket
key = f'{folder_in_s3}{parquet_filename}'

s3.upload_file(local_parquet_path, bucket_name, key)

response = s3.list_objects_v2(Bucket=bucket_name, Prefix=key)
if 'Contents' in response:
    print(f"File '{local_parquet_path}' uploaded successfully to '{s3_destination_path}'")
else:
    print(f"Failed to upload file to {s3_destination_path}")

File './dataset/CEAS_08.parquet' uploaded successfully to 's3://sagemaker-us-east-1-692501163596/Dataset/CEAS_08.parquet'


### Set up Athena tables to enable cataloging and querying of your data.

In [7]:
database_name = "spam_detection_db"
s3_staging_dir = "s3://{0}/athena/staging".format(bucket)

conn = connect(region_name=region, s3_staging_dir=s3_staging_dir)
statement = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)

pd.read_sql(statement, conn)

  pd.read_sql(statement, conn)


In [8]:
statement = "SHOW DATABASES"
df_show = pd.read_sql(statement, conn)
df_show.head(5)

  df_show = pd.read_sql(statement, conn)


Unnamed: 0,database_name
0,default
1,dsoaws
2,music_db
3,sagemaker_featurestore
4,spam_detection_db


In [9]:
table_name = "emails"
statement = f"""DROP TABLE IF EXISTS {database_name}.{table_name}"""
pd.read_sql(statement, conn)

  pd.read_sql(statement, conn)


In [10]:
# statement = f"""
# CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name_csv}(
#          sender string,
#          receiver string,
#          date string,
#          subject string,
#          body string,
#          label int,
#          urls string
# ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n' LOCATION '{s3_destination_dir}'
# TBLPROPERTIES ('skip.header.line.count'='1')
# """
statement = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {database_name}.{table_name}(
    sender string,
    receiver string,
    date string,
    subject string,
    body string,
    label int,
    urls string
)
STORED AS PARQUET
LOCATION '{s3_destination_dir}'
TBLPROPERTIES ('skip.header.line.count'='1');
"""

pd.read_sql(statement, conn)

  pd.read_sql(statement, conn)


In [11]:
statement = f"SHOW TABLES in {database_name}"

df_show = pd.read_sql(statement, conn)
print(df_show.head(5))

if table_name in df_show.values:
    print("[OK] Table created successfully.")
else:
    print("[ERROR] Table creation failed.")

  df_show = pd.read_sql(statement, conn)


  tab_name
0   emails
[OK] Table created successfully.


In [12]:
statement = f"""SELECT COUNT(*) AS total_rows
FROM {database_name}.{table_name}"""

result = pd.read_sql(statement, conn)
result.head(5)

  result = pd.read_sql(statement, conn)


Unnamed: 0,total_rows
0,39154


In [13]:
statement = f"""SELECT * FROM {database_name}.{table_name} LIMIT 10"""

result = pd.read_sql(statement, conn)
result.head(10)

  result = pd.read_sql(statement, conn)


Unnamed: 0,sender,receiver,date,subject,body,label,urls
0,Young Esposito <Young@iworld.de>,user4@gvc.ceas-challenge.cc,"Tue, 05 Aug 2008 16:31:02 -0700",Never agree to be a loser,"Buck up, your troubles caused by small dimensi...",1,1


## Exploratory Data Analysis

## AWS FeatureStore

### Test Train Split
Split your feature data into training (~40%), test (~10%) validation (~10%) datasets.
Reserve some data for “production data” (~40%).

In [25]:
feature_store_data = wr.athena.read_sql_query(f"SELECT * FROM {database_name}.{table_name}", database=database_name)

In [21]:
# Set up AWS clients and SageMaker session
boto_session = boto3.Session()
region = boto_session.region_name
sagemaker_client = boto_session.client(service_name="sagemaker", region_name=region)
featurestore_runtime = boto_session.client(service_name="sagemaker-featurestore-runtime", region_name=region)
feature_store_session = Session(boto_session=boto_session, sagemaker_client=sagemaker_client, sagemaker_featurestore_runtime_client=featurestore_runtime)

# Define the S3 bucket for the offline store
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = "sagemaker-featurestore-spamdetection"

# Set up IAM role for SageMaker
role = get_execution_role()

In [26]:
# Create a FeatureGroup for the data
spamdetection_feature_group_name = "spamdetection-feature-group" + strftime("%d-%H-%M-%S", gmtime())
spamdetection_feature_group = FeatureGroup(name=spamdetection_feature_group_name, sagemaker_session=feature_store_session)

# Cast necessary columns to the right types
def cast_object_to_string(data_frame):
    for label in data_frame.columns:
        if data_frame.dtypes[label] == "object":
            data_frame[label] = data_frame[label].astype("str").astype("string")

cast_object_to_string(feature_store_data)

In [33]:
from time import sleep
import uuid
current_time_sec = int(time())

feature_store_data['primary_key'] = [str(uuid.uuid4()) for _ in range(len(feature_store_data))]
feature_store_data['event_time'] = pd.Series([current_time_sec] * len(feature_store_data), dtype='float64')

# Record identifier and event time feature names
record_identifier_feature_name = 'primary_key'
event_time_feature_name = 'event_time'

# Load feature definitions from the DataFrame
spamdetection_feature_group.load_feature_definitions(data_frame=feature_store_data)

# Create the FeatureGroup in the SageMaker FeatureStore
spamdetection_feature_group.create(
    s3_uri=f"s3://{default_s3_bucket_name}/{prefix}",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name=event_time_feature_name,
    role_arn=role,
    enable_online_store=True,
)


# Wait for the FeatureGroup to be created
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group Creation")
        sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    if status != "Created":
        raise RuntimeError(f"Failed to create feature group {feature_group.name}")
    print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(feature_group=spamdetection_feature_group)

Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
Waiting for Feature Group Creation
FeatureGroup spamdetection-feature-group19-00-50-49 successfully created.


In [35]:
# todo read from feature store for splits

In [None]:
# # Assuming df is already loaded with your Athena query result
# # Split 40% of the data for production and the remaining 60% for further splitting
# df_remaining, df_production = train_test_split(df, test_size=0.40, random_state=42)

# # From the remaining 60%, split 66.67% for training (which is 40% of total) and 33.33% for test + validation (20% of total)
# df_train, df_test_validation = train_test_split(df_remaining, test_size=0.33, random_state=42)

# # Split the remaining test + validation data into test (10%) and validation (10%)
# df_test, df_validation = train_test_split(df_test_validation, test_size=0.50, random_state=42)

# # Print the sizes of each split to confirm
# print(f"Training data: {len(df_train)} rows")
# print(f"Test data: {len(df_test)} rows")
# print(f"Validation data: {len(df_validation)} rows")
# print(f"Production data: {len(df_production)} rows")
