# Create an optimized NYC Taxi trips dataset


### 1. Getting started

We will write a script that applies the following transformations:

1. Join NYC Taxi trips 'facts table' **yellow** with look-up tables (denormalization)
2. Filter out (drop) unnecessary columns, rename existing columns, and create new columns
3. Partition the dataset
4. Convert the dataset to a columnar format (parquet)

Let's begin by running some boilerplate to import AWS Glue and PySpark classes and functions we'll need.

In [None]:
import sys

#AWS Glue imports
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

# Spark imports
from pyspark.context import SparkContext
from pyspark.sql.functions import *

# PLEASE SET THIS VARIABLE to your own S3 bucket name. We'll store output dataset in it.
your_bucket_name = 'ant313-1-087687018744'

### 2. Load NYC Taxi dataset tables as DynamicFrames

Next, we'll set up a single `GlueContext` and then load the NYC taxi dataset tables into DynamicFrames to apply our transformations. We'll print the schemas for extra verification before we perform further operations. These tables are stored as CSV files in Amazon S3, and cataloged in AWS Glue Data Catalog.

The tables we'll load are:
* **Yellow** taxi table. This is the biggest table of all, with millions of rows.
* **Taxi Zone** table. This is a look-up table with 265 rows.
* **Payment Type** table. Very small look-up table with 6 rows.
* **Rate Code** table. Also a very small look-up table with 6 rows.

In [None]:
# Set up an AWS Glue context
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Read our primary Yellow taxi table. This is the biggest table of all with millions of rows.
yellow_dyf = glueContext.create_dynamic_frame.from_catalog(database="nyctaxi", table_name="yellow")
yellow_dyf.printSchema()

# Read the look-up tables
taxi_zone_dyf = glueContext.create_dynamic_frame.from_catalog(database="nyctaxi", table_name="taxizone")
taxi_zone_dyf.printSchema()

payment_type_dyf = glueContext.create_dynamic_frame.from_catalog(database="nyctaxi", table_name="paymenttype")
payment_type_dyf.printSchema()

rate_code_dyf = glueContext.create_dynamic_frame.from_catalog(database="nyctaxi", table_name="ratecode")
rate_code_dyf.printSchema()

### 3. Apply Mapping transformations and convert to Spark DataFrames

This is where we write and test out ETL code until we achieve the results we're looking for.

In the ETL code below, we use AWS Glue's ApplyMapping transformation on the look-up tables' DataFrames to map from existing look-up tables column names and data types to new names and data types. 

In [None]:
# Rename look-up columns and change data types as needed. Convert into Spark DataFrames.

pu_taxi_zone_df = taxi_zone_dyf.apply_mapping([
    ("locationid", "bigint", "pu_locationid", "bigint"), 
    ("borough", "string", "pu_borough", "string"), 
    ("zone", "string", "pu_zone", "string"),
    ("service_zone", "string", "pu_service_zone", "string")]).toDF()

do_taxi_zone_df = taxi_zone_dyf.apply_mapping([
    ("locationid", "bigint", "do_locationid", "bigint"), 
    ("borough", "string", "do_borough", "string"), 
    ("zone", "string", "do_zone", "string"),
    ("service_zone", "string", "do_service_zone", "string")]).toDF()

payment_type_df = payment_type_dyf.apply_mapping([
    ("id", "bigint", "payment_type_id", "bigint"), 
    ("name", "string", "payment_type_name", "string")]).toDF()
    
rate_code_df = payment_type_dyf.apply_mapping([
    ("id", "bigint", "ratecode_id", "bigint"), 
    ("name", "string", "ratecode_name", "string")]).toDF()

# Convert into Spark DataFrames
yellow_df = yellow_dyf.toDF()
taxi_zone_df = taxi_zone_dyf.toDF()

### 4. Repartition and cache() Yellow table

Original number of partitions of Yellow table when loaded is 4. We increase that to 40 partitions (4 vCPUs * 10 DPUs = 40 partitions) to better leverage Spark parallelism on the Glue development endpoint.
We also cache these partitions in memory to reduce execution time of subsequent Spark actions on the Yellow table
We use repartition() since we're increasing the number of partitions.

In [None]:
print( 'Yellow table parts: {}'.format(yellow_df.rdd.getNumPartitions()) )
yellow_df = yellow_df.repartition(4 * 10).cache() # 40 partitions to make use of Glue dev endpoint's 10 DPUs
print( 'Repartitioned Yellow table parts: {}'.format(yellow_df.rdd.getNumPartitions()) )

### 5. Joining and Filtering 

Next, we convert Yellow taxi and look-ups DynamicFrames into Spark DataFrames. Then, we perform the following transformations.

1. Create a denormalized table by joining our Yellow taxi table with look-up tables
2. Create new pick-up (PU) & drop-off (DO) timestamp columns by casting string values from old PU and DO cols.
3. Create dedicated partition columns *pick-up year* and *month*, because we anticipate reporting queries to filter on those two values.
3. Drop unneeded columns

In [None]:
# Join with look-ups, add new cols, sort by year and month, then drop unnecessary cols
yellow_opt_df = (yellow_df
             .join(broadcast(pu_taxi_zone_df), yellow_df["pulocationid"] == pu_taxi_zone_df["pu_locationid"])
             .join(broadcast(do_taxi_zone_df), yellow_df["dolocationid"] == do_taxi_zone_df["do_locationid"])
             .join(broadcast(payment_type_df), yellow_df["payment_type"] == payment_type_df["payment_type_id"])
             .join(broadcast(rate_code_df), yellow_df["ratecodeid"] == rate_code_df["ratecode_id"])
             .withColumn("pu_datetime", to_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))
             .withColumn("do_datetime", to_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))
             .withColumn("pu_year", year("pu_datetime"))
             .withColumn("pu_month", month("pu_datetime"))
             .withColumn("pu_day", dayofmonth("pu_datetime"))
             .orderBy("pu_year", "pu_month", "pu_day")
             .drop("store_and_fwd_flag",
                   "pulocationid",
                   "dolocationid",
                   "payment_type_id",
                   "ratecodeid",
                   "tpep_pickup_datetime",
                   "tpep_dropoff_datetime"
                  )
                )


### 6. Repartition the data in Spark

The previous transformations on Yellow result in 60 partitions.
You can try to repartition or coalesce the Yellow dataframe and observe the effect on notebook execution time in addition to output files number and sizes.

This ultimately affects query execution time and data scanned in Amazon Athena

In [None]:
#yellow_opt_df = yellow_opt_df.coalesce(40)
#yellow_opt_df = yellow_opt_df.repartition("pu_year", "pu_month", "pu_day")
#print( 'OPTIMIZED Yellow table parts: {}'.format(yellow_opt_df.rdd.getNumPartitions()) )

# Use Spark's explain() to understand its physical plan to apply our transformations.
# In this case, explain() was used to help optimize the Join process.
# ####
# yellow_opt_df.explain()

### 7. Optimized Write to S3

Finally, we physically partition the output data in Amazon S3 into Hive-style partitions by *pick-up year* and *month* and convert the data into Parquet format.

In [None]:
# Set Parquet's Row Group size: 24, 32, and 64MB are reasonable values.
blockSize = 1024 * 1024 * 32      #32MB

(yellow_opt_df
 .write
 .mode("overwrite")
 .format("parquet")
 .option("parquet.block.size", blockSize)
 .partitionBy("pu_year", "pu_month")
 .save("s3://{}/data/staging/nyctaxi/yellow_opt/".format(your_bucket_name)))