# Athena SQL Model

This example will create an athena table for [Jan 2017 taxi dataset](https://aws.amazon.com/blogs/big-data/build-a-data-lake-foundation-with-aws-glue-and-amazon-s3/).  You can improve performance if you convert into a parquet format.

Configure your notebook role with permissions to [query data from athena](https://aws.amazon.com/blogs/machine-learning/run-sql-queries-from-your-sagemaker-notebooks-using-amazon-athena/) and access the s3 staging bucket.

## Install libraries

Install the [Athena library](https://pypi.org/project/PyAthena/) for python and [tqdm](https://tqdm.github.io/)

In [None]:
import sys
!{sys.executable} -m pip install -U pip
!{sys.executable} -m pip install -U pandas
!{sys.executable} -m pip install -U PyAthena[Pandas]==1.11.2
!{sys.executable} -m pip install -U tqdm
!{sys.executable} -m pip install -U sagemaker

In [2]:
!pip install pyathena[Pandas]==1.11.2




### Restart Kernel

Now that you have upgraded SageMaker you need to restart the kernel by clicking menu: `Kernel -> Restart & Clear Output`.

Once restarted, run the next cell to check you have version starting with `2.x`

In [1]:
import sys
!{sys.executable} -m pip show sagemaker

Name: sagemaker
Version: 2.19.0
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon Web Services
Author-email: None
License: Apache License 2.0
Location: /opt/conda/lib/python3.7/site-packages
Requires: protobuf3-to-dict, importlib-metadata, protobuf, packaging, google-pasta, numpy, attrs, boto3, smdebug-rulesconfig
Required-by: 


## Import Data

Create an anthena database and external table for the imported nyc bit dataset.

In [3]:
import boto3
import sagemaker

# Initialize the boto session in us-east-1 region
boto_session = boto3.session.Session(region_name='us-east-1')
region = boto_session.region_name
bucket = sagemaker.session.Session(boto_session).default_bucket()

# Get the athena staging dir andtable
s3_staging_dir = 's3://{}/athena'.format(bucket)
db_name = 'nyc_taxi'
table_name = '{}.taxi_csv'.format(db_name)

print('s3 staging dir: {}'.format(s3_staging_dir))
print('athena table: {}'.format(table_name))

s3 staging dir: s3://sagemaker-us-east-1-840276314986/athena
athena table: nyc_taxi.taxi_csv


Make the bucket if it doesn't exist

In [4]:
!aws s3 mb s3://$bucket --region $region

make_bucket: sagemaker-us-east-1-840276314986


Query the nyc taxi dataset using [PandasCursor](https://pypi.org/project/PyAthena/#pandascursor) for improved performance

In [5]:
from pyathena import connect
from pyathena.pandas_cursor import PandasCursor
import pandas as pd

cursor = connect(s3_staging_dir=s3_staging_dir,
                 region_name=region,
                 cursor_class=PandasCursor).cursor()

In [6]:
sql_ddl_create_table = 'CREATE DATABASE IF NOT EXISTS {};'.format(db_name)

cursor.execute(sql_ddl_create_table)
print('Status: {}, Run time: {:.2f}s'.format(cursor.state, 
    cursor.execution_time_in_millis/1000.0))

Status: SUCCEEDED, Run time: 0.40s


In [7]:
sql_create_table = '''
CREATE EXTERNAL TABLE IF NOT EXISTS `{}` (
    `vendorid` bigint, 
    `lpep_pickup_datetime` string, 
    `lpep_dropoff_datetime` string, 
    `store_and_fwd_flag` string, 
    `ratecodeid` bigint, 
    `pulocationid` bigint, 
    `dolocationid` bigint, 
    `passenger_count` bigint, 
    `trip_distance` double, 
    `fare_amount` double, 
    `extra` double, 
    `mta_tax` double, 
    `tip_amount` double, 
    `tolls_amount` double, 
    `ehail_fee` string, 
    `improvement_surcharge` double, 
    `total_amount` double, 
    `payment_type` bigint, 
    `trip_type` bigint)
ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
    'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
    's3://aws-bigdata-blog/artifacts/glue-data-lake/data/'
TBLPROPERTIES (
    'columnsOrdered'='true', 
    'compressionType'='none', 
    'skip.header.line.count'='1')
'''.format(table_name)

cursor.execute(sql_create_table)
print('Status: {}, Run time: {:.2f}s'.format(cursor.state, 
    cursor.execution_time_in_millis/1000.0))

Status: SUCCEEDED, Run time: 0.45s


In [9]:
data_sql = '''
SELECT 
    total_amount, fare_amount, lpep_pickup_datetime, lpep_dropoff_datetime, trip_distance 
FROM {} WHERE total_amount is not null;
'''.format(table_name)
print('Querying...', data_sql)

data_df = cursor.execute(data_sql).as_pandas()
print('Status: {}, Run time: {:.2f}s, Data scanned: {:.2f}MB, Records: {:,}'.format(cursor.state, 
    cursor.execution_time_in_millis/1000.0, cursor.data_scanned_in_bytes/1024.0/1024.0, data_df.shape[0]))

data_df.head()

Querying... 
SELECT 
    total_amount, fare_amount, lpep_pickup_datetime, lpep_dropoff_datetime, trip_distance 
FROM nyc_taxi.taxi_csv WHERE total_amount is not null;

Status: SUCCEEDED, Run time: 5.76s, Data scanned: 91.34MB, Records: 1,070,261


Unnamed: 0,total_amount,fare_amount,lpep_pickup_datetime,lpep_dropoff_datetime,trip_distance
0,20.3,16.5,2017-01-22 21:49:27,2017-01-22 22:07:02,4.74
1,26.16,20.5,2017-01-22 21:52:32,2017-01-22 22:15:40,5.56
2,10.56,7.5,2017-01-22 21:07:23,2017-01-22 21:14:19,1.61
3,12.96,9.5,2017-01-22 21:37:01,2017-01-22 21:46:48,2.28
4,11.16,8.0,2017-01-22 21:55:06,2017-01-22 22:03:13,1.71


Performance some simple feature engineering

In [10]:
# Add some date features
data_df['lpep_pickup_datetime'] = data_df['lpep_pickup_datetime'].astype('datetime64[ns]')
data_df['lpep_dropoff_datetime'] = data_df['lpep_dropoff_datetime'].astype('datetime64[ns]')
data_df['duration_minutes'] = (data_df['lpep_dropoff_datetime'] - data_df['lpep_pickup_datetime']).dt.seconds/60
data_df['hour_of_day'] = data_df['lpep_pickup_datetime'].dt.hour
data_df['day_of_week'] = data_df['lpep_pickup_datetime'].dt.dayofweek
data_df['week_of_year'] = data_df['lpep_pickup_datetime'].dt.weekofyear
data_df['month_of_year'] = data_df['lpep_pickup_datetime'].dt.month

In [11]:
# Exclude any outliers
data_df = data_df[(data_df.total_amount > 0) & (data_df.total_amount < 200) & 
                  (data_df.duration_minutes > 0) & (data_df.duration_minutes < 120) & 
                  (data_df.trip_distance > 0) & (data_df.trip_distance < 1000)].dropna()
print(data_df.shape)
data_df.head()

(1046381, 10)


Unnamed: 0,total_amount,fare_amount,lpep_pickup_datetime,lpep_dropoff_datetime,trip_distance,duration_minutes,hour_of_day,day_of_week,week_of_year,month_of_year
0,20.3,16.5,2017-01-22 21:49:27,2017-01-22 22:07:02,4.74,17.583333,21,6,3,1
1,26.16,20.5,2017-01-22 21:52:32,2017-01-22 22:15:40,5.56,23.133333,21,6,3,1
2,10.56,7.5,2017-01-22 21:07:23,2017-01-22 21:14:19,1.61,6.933333,21,6,3,1
3,12.96,9.5,2017-01-22 21:37:01,2017-01-22 21:46:48,2.28,9.783333,21,6,3,1
4,11.16,8.0,2017-01-22 21:55:06,2017-01-22 22:03:13,1.71,8.116667,21,6,3,1


## Train Model

Build an XGBoost model to predict the total amount based on some fields

In [19]:
import boto3 
import sagemaker

sagemaker_session = sagemaker.session.Session(boto_session)
role = sagemaker.get_execution_role()
prefix = 'nyc-taxi'

print('bucket: {}, prefix: {}'.format(bucket, prefix))

bucket: sagemaker-us-east-1-840276314986, prefix: nyc-taxi


In [20]:
# Trip test split
from sklearn.model_selection import train_test_split

train_cols = ['total_amount', 'duration_minutes', 'trip_distance', 'hour_of_day']
train_df, val_df = train_test_split(data_df[train_cols], test_size=0.20, random_state=42)
val_df, test_df = train_test_split(val_df, test_size=0.50, random_state=42)

print('split train: {}, val: {}, test: {} '.format(train_df.shape[0], val_df.shape[0], test_df.shape[0]))

split train: 837104, val: 104638, test: 104639 


In [21]:
# Reset index and save files with target as first column
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)

### Upload Data

Save train and validation as CSV with `total_amount` as first col but no headers

In [22]:
# Drop the tpep_pickup_datetime and save
train_df.to_csv('train.csv', index=False, header=False)
val_df.to_csv('validation.csv', index=False, header=False)

In [23]:
%%time

# Uplaod the files to s3 
s3_train_uri = sagemaker_session.upload_data('train.csv', bucket, prefix + '/data/training')
s3_val_uri = sagemaker_session.upload_data('validation.csv', bucket, prefix + '/data/validation')

CPU times: user 189 ms, sys: 58.6 ms, total: 247 ms
Wall time: 1.01 s


Validate that we have uploaded these files succesfully

In [24]:
!aws s3 ls $s3_train_uri 
!aws s3 ls $s3_val_uri

2020-12-13 09:44:20   22030287 train.csv
2020-12-13 09:44:21    2751797 validation.csv


### Get estimator

In [25]:
container = sagemaker.image_uris.retrieve(region=region, framework="xgboost", version="latest")
print('container: {}'.format(container))

container: 811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:latest


In [26]:
output_path = 's3://{}/{}/output'.format(bucket, prefix)
print('output: {}'.format(output_path))

xgb = sagemaker.estimator.Estimator(container,
                                    role,
                                    instance_count=1,
                                    instance_type='ml.m4.xlarge',
                                    output_path=output_path,
                                    sagemaker_session=sagemaker_session)

output: s3://sagemaker-us-east-1-840276314986/nyc-taxi/output


In [27]:
xgb.set_hyperparameters(max_depth=9,
                        eta=0.2, 
                        gamma=4,
                        min_child_weight=300,
                        subsample=0.8,
                        silent=0,
                        objective='reg:linear',
                        early_stopping_rounds=10,
                        num_round=10000)

s3_input_train = sagemaker.inputs.TrainingInput(s3_data=s3_train_uri, content_type="csv")
s3_input_val = sagemaker.inputs.TrainingInput(s3_data=s3_val_uri, content_type="csv")

xgb.fit({'train': s3_input_train,  'validation': s3_input_val})

2020-12-13 09:44:41 Starting - Starting the training job...
2020-12-13 09:45:05 Starting - Launching requested ML instancesProfilerReport-1607852681: InProgress
.........
2020-12-13 09:46:26 Starting - Preparing the instances for training...
2020-12-13 09:47:09 Downloading - Downloading input data.........
2020-12-13 09:48:36 Training - Training image download completed. Training in progress..[34mArguments: train[0m
[34m[2020-12-13:09:48:37:INFO] Running standalone xgboost training.[0m
[34m[2020-12-13:09:48:37:INFO] File size need to be processed in the node: 23.63mb. Available memory size in the node: 8424.12mb[0m
[34m[2020-12-13:09:48:37:INFO] Determined delimiter of CSV input is ','[0m
[34m[09:48:37] S3DistributionType set as FullyReplicated[0m
[34m[09:48:38] 837104x3 matrix with 2511312 entries loaded from /opt/ml/input/data/train?format=csv&label_column=0&delimiter=,[0m
[34m[2020-12-13:09:48:38:INFO] Determined delimiter of CSV input is ','[0m
[34m[09:48:38] S3Distr

### Deploy model

In [28]:
xgb_predictor = xgb.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge', endpoint_name='xgb-athena-integration-endpoint')

-------------!

### Evalulate Model

Get predicitons for the validation set

In [29]:
from sagemaker.serializers import CSVSerializer
xgb_predictor.serializer = CSVSerializer()

In [30]:
%%time

import numpy as np
from tqdm import tqdm

def predict(data, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in tqdm(split_array):
        predictions = ','.join([predictions, xgb_predictor.predict(array).decode('utf-8')])
    return np.fromstring(predictions[1:], sep=',')

# Get predictions and store in df
predictions = predict(val_df[train_cols[1:]].values)
predictions = pd.DataFrame({'total_amount_predictions': predictions })

100%|██████████| 210/210 [00:04<00:00, 47.20it/s]

CPU times: user 1.42 s, sys: 166 ms, total: 1.59 s
Wall time: 4.5 s





In [37]:
# Get the abs error between predictions
pred_df = val_df.join(predictions)
pred_df['error'] = abs(pred_df['total_amount']-pred_df['total_amount_predictions'])
pred_df.sort_values('error', ascending=True).head(10)

Unnamed: 0,total_amount,duration_minutes,trip_distance,hour_of_day,total_amount_predictions,error
45359,20.76,18.05,5.0,8,20.76,2.289e-07
9445,5.38,1.15,0.22,20,5.379971,2.944946e-05
42849,14.08,14.716667,2.68,13,14.080075,7.5264e-05
25607,8.8,7.833333,0.65,19,8.799897,0.0001028061
77665,14.3,16.55,2.4,7,14.300106,0.0001060486
19321,16.3,15.483333,3.31,1,16.299704,0.000296402
100119,36.36,15.933333,10.88,10,36.360481,0.0004812622
86318,59.16,32.1,17.39,20,59.159405,0.0005952454
64403,19.8,22.8,3.63,18,19.800652,0.0006515503
40555,14.8,12.6,3.32,9,14.800687,0.0006868362


## Create Athena UDF 

Create a [User Defined Function](https://aws.amazon.com/blogs/big-data/prepare-data-for-model-training-and-invoke-machine-learning-models-with-amazon-athena/) for the deployed endpoint so you can query directly in Athena.

In [34]:
endpoint_name = xgb_predictor.endpoint_name
print('endpoint: {}'.format(endpoint_name))

endpoint: xgb-athena-integration-endpoint


`NOTE`: Athena ML is [in preview](https://aws.amazon.com/athena/faqs/#Preview_features).   To enable this Preview feature you need to create an Athena workgroup named `AmazonAthenaPreviewFunctionality` and run any queries attempting to federate to this connector, use a UDF, or SageMaker inference from that workgroup.

In [35]:
workgroup_name = 'AmazonAthenaPreviewFunctionality'

!aws athena create-work-group --name $workgroup_name --region $region

Using presto [datetime](https://prestodb.io/docs/0.172/functions/datetime.html) functions with inline query, rank by absolute error.

In [36]:
query_sql  = '''
USING FUNCTION predict_total(
  duration_minutes DOUBLE, 
  trip_distance DOUBLE, 
  hour_of_day DOUBLE) returns DOUBLE type SAGEMAKER_INVOKE_ENDPOINT
WITH (sagemaker_endpoint='{}')

SELECT 
    *, ABS(predicted_total_amount-total_amount) as error
FROM ( 
    SELECT
        *,
        predict_total(duration_minutes, trip_distance, hour_of_day) as predicted_total_amount
    FROM 
    (
        SELECT 
            total_amount,
            CAST(date_diff('minute', 
                CAST(lpep_pickup_datetime as timestamp), 
                CAST(lpep_dropoff_datetime as timestamp)) as DOUBLE) as duration_minutes,
            CAST(trip_distance as DOUBLE) as trip_distance,
            CAST(hour(CAST(lpep_pickup_datetime as timestamp)) as double) as hour_of_day
        FROM {}
        WHERE DAY(CAST(lpep_pickup_datetime as timestamp)) = {} -- Filter by day
    )
)
ORDER BY error DESC
LIMIT {};
'''.format(endpoint_name, table_name, 1, 10)
print('Querying...', query_sql)

query_df = cursor.execute(query_sql, work_group=workgroup_name).as_pandas()
print('Status: {}, Run time: {:.2f}s, Data scanned: {:.2f}MB, Records: {:,}'.format(cursor.state, 
    cursor.execution_time_in_millis/1000.0, cursor.data_scanned_in_bytes/1024.0/1024.0, query_df.shape[0]))

query_df

Querying... 
USING FUNCTION predict_total(
  duration_minutes DOUBLE, 
  trip_distance DOUBLE, 
  hour_of_day DOUBLE) returns DOUBLE type SAGEMAKER_INVOKE_ENDPOINT
WITH (sagemaker_endpoint='xgb-athena-integration-endpoint')

SELECT 
    *, ABS(predicted_total_amount-total_amount) as error
FROM ( 
    SELECT
        *,
        predict_total(duration_minutes, trip_distance, hour_of_day) as predicted_total_amount
    FROM 
    (
        SELECT 
            total_amount,
            CAST(date_diff('minute', 
                CAST(lpep_pickup_datetime as timestamp), 
                CAST(lpep_dropoff_datetime as timestamp)) as DOUBLE) as duration_minutes,
            CAST(trip_distance as DOUBLE) as trip_distance,
            CAST(hour(CAST(lpep_pickup_datetime as timestamp)) as double) as hour_of_day
        FROM nyc_taxi.taxi_csv
        WHERE DAY(CAST(lpep_pickup_datetime as timestamp)) = 1 -- Filter by day
    )
)
ORDER BY error DESC
LIMIT 10;

Status: SUCCEEDED, Run time: 13.28s, Data s

Unnamed: 0,total_amount,duration_minutes,trip_distance,hour_of_day,predicted_total_amount,error
0,240.0,0.0,0.0,4.0,17.547598,222.452402
1,276.64,49.0,48.2,8.0,96.674095,179.965905
2,140.8,1.0,0.2,10.0,4.677799,136.122201
3,203.16,43.0,38.59,11.0,96.3936,106.7664
4,7.0,193.0,52.96,22.0,104.396317,97.396317
5,141.3,46.0,12.14,16.0,47.895618,93.404382
6,-68.31,0.0,0.01,17.0,22.884491,91.194491
7,4.8,5.0,26.4,14.0,92.397316,87.597316
8,10.0,52.0,26.49,7.0,96.674095,86.674095
9,104.0,0.0,0.0,6.0,17.547598,86.452402
