# EMR Data Prep + SageMaker Deep Learning

This notebook is tested using `Studio SparkMagic - PySpark Kernel` running on a `ml.t3.medium` instance and connected to an EMR clsuter with an `m5.xlarge` Master node and 2 `m5.xlarge` Core nodes. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.

In this 3 part notebook lesson, we'll see how to use EMR for data prep and serialization to S3. Next we'll prototype a deep learning architecture using SageMaker Studio notebooks, and lastly we'll scale the training using SageMaker ephemeral training jobs.

In [None]:
# %load_ext sagemaker_studio_analytics_extension.magics
# %sm_analytics emr connect --cluster_id j-xxxxxxxxxxxx --auth-type None 

## Inspect the public NYC Taxi Dataset

In [2]:
%%local
!aws s3 ls "s3://nyc-tlc/trip data/green" --human-readable | grep green_tripdata_2016

2016-08-12 12:07:19  220.3 MiB green_tripdata_2016-01.csv
2016-08-12 12:08:24  230.2 MiB green_tripdata_2016-02.csv
2016-08-12 12:09:18  240.3 MiB green_tripdata_2016-03.csv
2016-08-12 12:04:43  235.5 MiB green_tripdata_2016-04.csv
2016-08-12 12:05:31  234.5 MiB green_tripdata_2016-05.csv
2016-08-12 12:06:26  214.2 MiB green_tripdata_2016-06.csv
2017-01-17 19:33:38  116.6 MiB green_tripdata_2016-07.csv
2017-01-17 19:34:13  109.1 MiB green_tripdata_2016-08.csv
2017-01-17 19:31:30  101.6 MiB green_tripdata_2016-09.csv
2017-01-17 19:31:58  109.5 MiB green_tripdata_2016-10.csv
2017-01-17 19:32:30  100.3 MiB green_tripdata_2016-11.csv
2017-01-17 19:33:07  106.8 MiB green_tripdata_2016-12.csv


In [3]:
df = spark.read.csv("s3://nyc-tlc/trip data/green_tripdata_2016*.csv", header=True, inferSchema=True, timestampFormat='yyyy-MM-dd HH:mm:ss').cache()
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

16385532

## Format the dataset

In [4]:
%%pretty
from pyspark.sql.functions import col, dayofweek, month, hour
df_dt = df.select(dayofweek(col('lpep_pickup_datetime')).alias('day_of_week'),
                   month(col('lpep_pickup_datetime')).alias('month'),
                   hour(col('lpep_pickup_datetime')).alias('hour'),
                   col("Pickup_latitude").alias("pickup_latitude"),
                   col("Pickup_longitude").alias("pickup_longitude"),
                   col("Dropoff_latitude").alias("dropoff_latitude"),
                   col("Dropoff_latitude").alias("dropoff_longitude"),
                   col("Trip_distance").alias("trip_distance"),
                   col("Fare_amount").alias("fare_amount")
                  )
df_dt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

day_of_week,month,hour,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,trip_distance,fare_amount
6,1,0,40.68061065673828,-73.92864227294923,40.69804382324219,40.69804382324219,1.46,8.0
6,1,0,40.723175048828125,-73.95267486572266,40.76137924194336,40.76137924194336,3.56,15.5
6,1,0,40.67610549926758,-73.97161102294923,40.64607238769531,40.64607238769531,3.79,16.5
6,1,0,40.669578552246094,-73.989501953125,40.68903350830078,40.68903350830078,3.01,13.5
6,1,0,40.68285369873047,-73.96472930908203,40.66301345825195,40.66301345825195,2.55,12.0
6,1,0,40.74645614624024,-73.89114379882812,40.74211120605469,40.74211120605469,1.37,7.0
6,1,0,40.746196746826165,-73.89667510986328,40.74568939208984,40.74568939208984,0.57,5.0
6,1,0,40.803558349609375,-73.95335388183594,40.79412078857422,40.79412078857422,1.01,7.0
6,1,0,40.70281600952149,-73.99406433105469,40.679725646972656,40.679725646972656,2.46,12.0
6,1,0,40.756641387939446,-73.91413116455078,40.73965835571289,40.73965835571289,1.61,9.0


## Run Data Clean Up at Scale on the Cluster

In [5]:
df_dt = df_dt[
    (df_dt.fare_amount > 0)
    & (df_dt.fare_amount < 200)    
]
df_dt.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8973394

In [8]:
%%pretty
df_dt = df_dt[
    (df_dt.pickup_latitude != 0)    
]
df_dt.show()
df_dt.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

day_of_week,month,hour,pickup_latitude,pickup_longitude,dropoff_latitude,dropoff_longitude,trip_distance,fare_amount
6,1,0,40.68061065673828,-73.92864227294923,40.69804382324219,40.69804382324219,1.46,8.0
6,1,0,40.723175048828125,-73.95267486572266,40.76137924194336,40.76137924194336,3.56,15.5
6,1,0,40.67610549926758,-73.97161102294923,40.64607238769531,40.64607238769531,3.79,16.5
6,1,0,40.669578552246094,-73.989501953125,40.68903350830078,40.68903350830078,3.01,13.5
6,1,0,40.68285369873047,-73.96472930908203,40.66301345825195,40.66301345825195,2.55,12.0
6,1,0,40.74645614624024,-73.89114379882812,40.74211120605469,40.74211120605469,1.37,7.0
6,1,0,40.746196746826165,-73.89667510986328,40.74568939208984,40.74568939208984,0.57,5.0
6,1,0,40.803558349609375,-73.95335388183594,40.79412078857422,40.79412078857422,1.01,7.0
6,1,0,40.70281600952149,-73.99406433105469,40.679725646972656,40.679725646972656,2.46,12.0
6,1,0,40.756641387939446,-73.91413116455078,40.73965835571289,40.73965835571289,1.61,9.0


In [9]:
train_df, val_df = df_dt.randomSplit([0.8, 0.2], seed=42)
val_df, test_df = val_df.randomSplit([0.05, 0.95], seed=42)

print("Train Count:", train_df.count())
print("Validation Count:", val_df.count())
print("Test Count:", test_df.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Train Count: 7169617
Validation Count: 89392
Test Count: 1700967

In [10]:
%%local 
import sagemaker

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sess.default_bucket()

data_bucket = f"{bucket}/nyc-taxi/data/processed"
print(data_bucket)

sagemaker-us-west-1-176842773820/nyc-taxi/data/processed


In [11]:
%%send_to_spark -i data_bucket -t str -n data_bucket

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Successfully passed 'data_bucket' as 'data_bucket' to Spark kernel

In [12]:
train_df.write.csv(f"s3://{data_bucket}/train", mode='overwrite')
test_df.write.csv(f"s3://{data_bucket}/test", mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Store data location for next notebook

In [13]:
%store data_bucket

Stored 'data_bucket' (str)
