## Lakehouse Architecture using NYC Yellow Taxi Data

Steps:
1. Parametrize Notebook: Use widgets to parameterize notebooks. Makes it easy to execute using different variables.
2. Import dependencies. This includes configuration file and utilities notebook.
3. Read data from the API.
4. Perform data quality checks.
5. Create a Delta table to store raw data from API.
6. Enforce schema 
7. Join fact and dimension tables and create an analytics delta table.

### 1. Parameterize Notebook

In [0]:
dbutils.widgets.removeAll()

In [0]:
dbutils.widgets.dropdown("date", "2020-09-21", ["2020-09-21", "2020-09-22", "2020-09-23", "2020-09-24"])

In [0]:
batch_date = dbutils.widgets.get("date")
print(batch_date)

### 2. Import Dependencies

In [0]:
from pyspark.sql.functions import *

In [0]:
%run ./includes/configuration

### 3. Read data

In [0]:
# Create API client object
# NYC Open Data uses Socrata Open Data API to publish datasets. 
client = create_client(app_token)

In [0]:
# Using get method to extract data from the API
client.get?

#### 3.a. Fact Table

Get 2020 Yellow Taxi data from NYC Open Data
- https://data.cityofnewyork.us/Transportation/2020-Yellow-Taxi-Trip-Data/kxp8-n2sj
- These records are generated from the trip record submissions made by yellow taxi Technology Service Providers (TSPs). 
- Each row represents a single trip in a yellow taxi in 2020. The trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.

- Create a dynamic query to extract data from API.
 - Use Notebook parameter to dynamically update API query to execute for certain date.

In [0]:
# create a dynamic query using Notebook parameter. The notebook can be executed daily as a batch job with parameter updated for each run.
query = f"':created_at' > '{batch_date}'"

In [0]:
# The get_data function in utilities.py uses **kwargs to provide optional keywords to filter data. 
# This allows the function to be dynamic and can used for multiplte use cases.
taxi_df = get_data(client, "kxp8-n2sj", exclude_system_fields = False, limit=100, where = query)

In [0]:
display(taxi_df)

:created_at,:id,:updated_at,congestion_surcharge,dolocationid,extra,fare_amount,improvement_surcharge,mta_tax,passenger_count,payment_type,pulocationid,ratecodeid,store_and_fwd_flag,tip_amount,tolls_amount,total_amount,tpep_dropoff_datetime,tpep_pickup_datetime,trip_distance,vendorid
2020-09-24T18:32:41.448Z,row-yhyc-8ch4_4nr4,2020-09-24T18:32:41.448Z,2.5,239,3.0,6.0,0.3,0.5,1,1,238,1,N,1.47,0.0,11.27,2020-01-01T00:33:03.000,2020-01-01T00:28:15.000,1.2,1
2020-09-24T18:32:41.448Z,row-bi8h.smty-srvv,2020-09-24T18:32:41.448Z,2.5,238,3.0,7.0,0.3,0.5,1,1,239,1,N,1.5,0.0,12.3,2020-01-01T00:43:04.000,2020-01-01T00:35:39.000,1.2,1
2020-09-24T18:32:41.448Z,row-mzwp_53j7_3mgx,2020-09-24T18:32:41.448Z,2.5,238,3.0,6.0,0.3,0.5,1,1,238,1,N,1.0,0.0,10.8,2020-01-01T00:53:52.000,2020-01-01T00:47:41.000,0.6,1
2020-09-24T18:32:41.448Z,row-8846_s59t.hpdh,2020-09-24T18:32:41.448Z,0.0,151,0.5,5.5,0.3,0.5,1,1,238,1,N,1.36,0.0,8.16,2020-01-01T01:00:14.000,2020-01-01T00:55:23.000,0.8,1
2020-09-24T18:32:41.448Z,row-mc2u~fkij-yv4w,2020-09-24T18:32:41.448Z,0.0,193,0.5,3.5,0.3,0.5,1,2,193,1,N,0.0,0.0,4.8,2020-01-01T00:04:16.000,2020-01-01T00:01:58.000,0.0,2
2020-09-24T18:32:41.448Z,row-qpuz_gz4u~xtxu,2020-09-24T18:32:41.448Z,0.0,193,0.5,2.5,0.3,0.5,1,2,7,1,N,0.0,0.0,3.8,2020-01-01T00:10:37.000,2020-01-01T00:09:44.000,0.03,2
2020-09-24T18:32:41.448Z,row-gizt_4tdv-5id2,2020-09-24T18:32:41.448Z,0.0,193,0.5,2.5,0.3,0.5,1,1,193,1,N,0.01,0.0,3.81,2020-01-01T00:39:29.000,2020-01-01T00:39:25.000,0.0,2
2020-09-24T18:32:41.448Z,row-sxcu_gfqc_y64i,2020-09-24T18:32:41.448Z,2.5,193,0.0,0.01,0.3,0.0,1,1,193,5,N,0.0,0.0,2.81,2019-12-18T15:28:59.000,2019-12-18T15:27:49.000,0.0,2
2020-09-24T18:32:41.448Z,row-cpi7_r2i2_e54k,2020-09-24T18:32:41.448Z,2.5,193,0.5,2.5,0.3,0.5,4,1,193,1,N,0.0,0.0,6.3,2019-12-18T15:31:35.000,2019-12-18T15:30:35.000,0.0,2
2020-09-24T18:32:41.448Z,row-whyb-878k~xwxv,2020-09-24T18:32:41.448Z,2.5,48,3.0,8.0,0.3,0.5,2,1,246,1,N,2.35,0.0,14.15,2020-01-01T00:40:28.000,2020-01-01T00:29:01.000,0.7,1


In [0]:
# Look at the Schema
taxi_df.printSchema()

##### Data Quality Check:

In [0]:
assert row_check(taxi_df), "API did not return any data"
print("Assertion Passed.")

In [0]:
# check for nans and nulls in the dataframe
null_check(taxi_df)

#### 3.b. Zone and Borough Lookup Table

Get Lookup Table to Augment data
 - The table is available as a CSV file on a webpage
 - The lookup table contains borough and zone information
 - The lookup table can be joined with Taxi data using LocationID

In [0]:
lookup_table = get_lookup_data()

In [0]:
display(lookup_table)

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


In [0]:
assert row_check(lookup_table), "API did not return any data"
print("Assertion Passed.")

### 4. Write data to a Delta table

- To make the Notebook Idempotent, delete table first
- Register the Delta table in the Metastore

In [0]:
dbutils.fs.rm(taxi_data + "processed", recurse=True)

spark.sql(
    f"""
DROP TABLE IF EXISTS taxi_data_processed
"""
)

In [0]:
(taxi_df.write
 .mode("overwrite")
 .format("delta")
 .save(taxi_data + "processed"))

In [0]:
#Register the Table in the Metastore
spark.sql(f"""
DROP TABLE IF EXISTS taxi_data_processed
""")

spark.sql(f"""
CREATE TABLE taxi_data_processed
USING DELTA
LOCATION "{taxi_data}/processed" 
""")

In [0]:
taxi_data_processed = spark.read.table("taxi_data_processed")

In [0]:
assert taxi_data_processed.count() == taxi_df.count(), "Row count does not match between raw and processed."
print("Assertion passed.")

In [0]:
dbutils.fs.rm(taxi_data + "dim_location", recurse=True)

spark.sql(
    f"""
DROP TABLE IF EXISTS dim_location
"""
)

In [0]:
(lookup_table.write
 .mode("overwrite")
 .format("delta")
 .save(taxi_data + "dim_location"))

In [0]:
#Register the Table in the Metastore
spark.sql(f"""
DROP TABLE IF EXISTS dim_location
""")

spark.sql(f"""
CREATE TABLE dim_location
USING DELTA
LOCATION "{taxi_data}/dim_location" 
""")

### 5. Transform Data

In [0]:
processed_taxi_df = (spark.read
                     .format("delta")
                     .load(taxi_data + "/processed"))

Apply schema to the extracted data.

In [0]:
# apply schema
processed_taxi_df = process_taxi_data(processed_taxi_df)

In [0]:
processed_taxi_df.printSchema()

In [0]:
taxi_df.head(1)

In [0]:
dim_location = (spark.read
                     .format("delta")
                     .load(taxi_data + "/dim_location"))

### 6. Create Table for Analytics

Join Taxi fact table and location dimension table to create a table for analytics.

In [0]:
analytics_taxi_df = processed_taxi_df.join(dim_location, processed_taxi_df.pulocationid == dim_location.LocationID)

Create a new Delta table for analytics dataframe that can be used by downstream users and apps.

In [0]:
dbutils.fs.rm(taxi_data + "taxi_analytics", recurse=True)

spark.sql(
    f"""
DROP TABLE IF EXISTS taxi_analytics
"""
)

In [0]:
(analytics_taxi_df.write
 .mode("overwrite")
 .format("delta")
 .save(taxi_data + "taxi_analytics"))

In [0]:
#Register the Table in the Metastore
spark.sql(f"""
DROP TABLE IF EXISTS taxi_analytics
""")

spark.sql(f"""
CREATE TABLE taxi_analytics
USING DELTA
LOCATION "{taxi_data}/taxi_analytics" 
""")

In [0]:
(spark.read
 .format("delta")
 .load(taxi_data + "taxi_analytics")
 .display())

vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,trip_distance,pulocationid,dolocationid,ratecodeid,store_and_fwd_flag,passenger_count,payment_type,fare_amount,extra,mta_tax,improvement_surcharge,congestion_surcharge,tip_amount,tolls_amount,total_amount,LocationID,Borough,Zone,service_zone
2,2020-01-01T00:22:30.000+0000,2020-01-01T00:43:08.000+0000,3,211,246,1,N,1,2,15.0,0.5,0.5,0.3,2.5,0.0,0.0,18.8,211,Manhattan,SoHo,Yellow Zone
2,2020-01-01T00:11:46.000+0000,2020-01-01T00:17:54.000+0000,0,211,231,1,N,1,1,5.5,0.5,0.5,0.3,2.5,1.86,0.0,11.16,211,Manhattan,SoHo,Yellow Zone
2,2020-01-01T00:08:40.000+0000,2020-01-01T00:21:20.000+0000,1,211,107,1,N,2,2,10.0,0.5,0.5,0.3,2.5,0.0,0.0,13.8,211,Manhattan,SoHo,Yellow Zone
1,2020-01-01T00:15:35.000+0000,2020-01-01T00:27:06.000+0000,1,211,234,1,N,3,2,9.0,3.0,0.5,0.3,2.5,0.0,0.0,12.8,211,Manhattan,SoHo,Yellow Zone
1,2020-01-01T00:23:37.000+0000,2020-01-01T00:26:18.000+0000,0,229,140,1,N,1,1,4.5,3.0,0.5,0.3,2.5,1.65,0.0,9.95,229,Manhattan,Sutton Place/Turtle Bay North,Yellow Zone
1,2020-01-01T00:19:19.000+0000,2020-01-01T00:34:22.000+0000,1,229,142,1,N,1,2,11.0,3.0,0.5,0.3,2.5,0.0,0.0,14.8,229,Manhattan,Sutton Place/Turtle Bay North,Yellow Zone
2,2020-01-01T00:50:47.000+0000,2020-01-01T01:08:23.000+0000,3,229,239,1,N,2,1,13.5,0.5,0.5,0.3,2.5,0.0,0.0,17.3,229,Manhattan,Sutton Place/Turtle Bay North,Yellow Zone
1,2020-01-01T00:03:02.000+0000,2020-01-01T00:09:22.000+0000,1,232,79,1,N,1,2,6.5,3.0,0.5,0.3,2.5,0.0,0.0,10.3,232,Manhattan,Two Bridges/Seward Park,Yellow Zone
1,2020-01-01T00:58:34.000+0000,2020-01-01T01:07:17.000+0000,1,233,229,1,N,1,1,7.0,3.0,0.5,0.3,2.5,2.15,0.0,12.95,233,Manhattan,UN/Turtle Bay South,Yellow Zone
1,2020-01-01T00:47:27.000+0000,2020-01-01T00:55:40.000+0000,0,233,170,1,N,1,1,6.5,3.0,0.5,0.3,2.5,2.05,0.0,12.35,233,Manhattan,UN/Turtle Bay South,Yellow Zone


Future Considerations:

 - Create a Notebook for batch processing
 - Use object store for storing data (S3 or ADL)