# Data Engineering Capstone Project

## Import all necessary libraries and packages

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
import os
import configparser
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, \
                            date_format, dayofweek, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, \
                                IntegerType, DateType, TimestampType
import pyspark.sql.functions as F
import json
import boto3
import findspark
findspark.init()

## Set environment variables (AWS credentials) so Spark can access the S3 buckets

In [2]:
config = configparser.ConfigParser()
config.read('aws_credentials.cfg')

os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']

## Initialize SparkSession instance, with hadoop-aws package to process S3 buckets

In [3]:
spark = SparkSession \
        .builder \
        .appName('capstone') \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.1.0") \
        .getOrCreate()

# Settings to allow the parsing of some timestamps
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

## Set up paths to csv files containing the data. There is also a json file, which will be handled separately.

These files can be found in the ```data``` folder of this repository.

In [4]:
input_data = "s3a://tung99-bucket/"

zone_data = os.path.join(input_data, "taxi+_zone_lookup.csv")
temp_data = os.path.join(input_data, "Hyperlocal_Temperature_Monitoring.csv")
trips_data = os.path.join(input_data, "yellow_tripdata_2018-07.csv")

## Process each of the csv file above into a DataFrame

After a brief inspection on the data files, these issues have been observed:
- Some of the entries in the ```nta_name``` column of the taxi zone lookup table contain more than one NTA name, making it impossible to match the names with the correct NTA code. To circumvent this, the problematic rows are split into identical ones using the ```/``` delimiter.
- Similarly, some NTA names were combined by ```-``` in the ```nta_codes.json``` file, requiring the use of splitting to separate the names.
While these fixes cannot guarantee that all location IDs will be mapped to an NTA code, it does significantly increases the number of matches (from 108 to 238 after the fixes).
- There is an empty row in the beginning of the trips file, but it will go away as we do the ```JOIN``` operations.

### Taxi zone lookup file (this connects the location ID to an NTA name)

In [5]:
# Pre-populate schema
zoneSchema = StructType([
    StructField("location_id", IntegerType(), nullable=False),
    StructField("boro_name", StringType(), nullable=False),
    StructField("nta_name", StringType(), nullable=False),
    StructField("service_zone", StringType(), nullable=False)
])

zone_df = spark.read.csv(zone_data, header=True, schema=zoneSchema)
# Splitting happens here, see that the 3rd and 4th rows are almost the same.
zone_df = zone_df.withColumn('nta_name', F.explode(F.split('nta_name', '/')))

zone_df.createOrReplaceTempView("zones")
zone_df.show(5)

+-----------+---------+--------------+------------+
|location_id|boro_name|      nta_name|service_zone|
+-----------+---------+--------------+------------+
|          1|      EWR|Newark Airport|         EWR|
|          2|   Queens|   Jamaica Bay|   Boro Zone|
|          3|    Bronx|      Allerton|   Boro Zone|
|          3|    Bronx|Pelham Gardens|   Boro Zone|
|          4|Manhattan| Alphabet City| Yellow Zone|
+-----------+---------+--------------+------------+
only showing top 5 rows



### Temperature information file

In [6]:
# Pre-populate the schema
tempSchema = StructType([
    StructField("sensor_id", StringType(), nullable=False),
    StructField("air_temp", DoubleType(), nullable=False),
    StructField("date", StringType(), nullable=False),
    StructField("hour", IntegerType(), nullable=False),
    StructField("latitude", DoubleType(), nullable=False),
    StructField("longitude", DoubleType(), nullable=False),
    StructField("year", IntegerType(), nullable=False),
    StructField("install_type", StringType(), nullable=False),
    StructField("boro_name", StringType(), nullable=False),
    StructField("nta_code", StringType(), nullable=False)
])

temp_df = spark.read.csv(temp_data, header=True, schema=tempSchema)
# Convert original dates (in string) to timestamp format
temp_df = temp_df.withColumn("date", F.to_timestamp("date", "M/dd/yyyy"))
# Additional columns added to make comparisons with other DataFrames easier
temp_df = temp_df.withColumn("month", F.month("date"))
temp_df = temp_df.withColumn("day", F.dayofmonth("date"))

temp_df.createOrReplaceTempView("temperatures")
temp_df.show(5)

+---------+-----------+-------------------+----+-----------+------------+----+------------+---------+--------+-----+---+
|sensor_id|   air_temp|               date|hour|   latitude|   longitude|year|install_type|boro_name|nta_code|month|day|
+---------+-----------+-------------------+----+-----------+------------+----+------------+---------+--------+-----+---+
| Bk-BR_01|     71.189|2018-06-15 00:00:00|   1|40.66620508|-73.91691035|2018| Street Tree| Brooklyn|    BK81|    6| 15|
| Bk-BR_01|70.24333333|2018-06-15 00:00:00|   2|40.66620508|-73.91691035|2018| Street Tree| Brooklyn|    BK81|    6| 15|
| Bk-BR_01|69.39266667|2018-06-15 00:00:00|   3|40.66620508|-73.91691035|2018| Street Tree| Brooklyn|    BK81|    6| 15|
| Bk-BR_01|68.26316667|2018-06-15 00:00:00|   4|40.66620508|-73.91691035|2018| Street Tree| Brooklyn|    BK81|    6| 15|
| Bk-BR_01|     67.114|2018-06-15 00:00:00|   5|40.66620508|-73.91691035|2018| Street Tree| Brooklyn|    BK81|    6| 15|
+---------+-----------+---------

### Taxi trips file. For simplicity reason, only trips happening in July 2018 were included, so a lot of assumptions have been made.

In [7]:
# Pre-populate the schema
tripsSchema = StructType([
    StructField("vendor_id", IntegerType(), nullable=True),
    StructField("PU_date", StringType(), nullable=True),
    StructField("DO_date", StringType(), nullable=True),
    StructField("passenger_count", IntegerType(), nullable=True),
    StructField("trip_distance", DoubleType(), nullable=True),
    StructField("ratecode_id", IntegerType(), nullable=True),
    StructField("store_and_fwd_flag", StringType(), nullable=True),
    StructField("PU_location_id", IntegerType(), nullable=True),
    StructField("DO_location_id", IntegerType(), nullable=True),
    StructField("payment_type", IntegerType(), nullable=True),
    StructField("fare_amount", DoubleType(), nullable=True),
    StructField("extra", DoubleType(), nullable=True),
    StructField("mta_tax", DoubleType(), nullable=True),
    StructField("tip_amount", DoubleType(), nullable=True),
    StructField("tolls_amount", DoubleType(), nullable=True),
    StructField("improvement_surcharge", DoubleType(), nullable=True),
    StructField("total_amount", DoubleType(), nullable=True)
])

trips_df = spark.read.csv(trips_data, header=True, schema=tripsSchema)
# Convert strings to timestamps
trips_df = trips_df.withColumn("PU_date", F.to_timestamp("PU_date", "M/d/yyyy H:mm"))
trips_df = trips_df.withColumn("DO_date", F.to_timestamp("DO_date", "M/d/yyyy H:mm"))
# Add additional columns
trips_df = trips_df.withColumn("month", F.month("PU_date"))
trips_df = trips_df.withColumn("PU_day", F.dayofmonth("PU_date"))
trips_df = trips_df.withColumn("DO_day", F.dayofmonth("DO_date"))
trips_df = trips_df.withColumn("PU_hour", F.hour("PU_date"))
trips_df = trips_df.withColumn("DO_hour", F.hour("DO_date"))

trips_df.createOrReplaceTempView("trips")
trips_df.show(5)

+---------+-------------------+-------------------+---------------+-------------+-----------+------------------+--------------+--------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+------+------+-------+-------+
|vendor_id|            PU_date|            DO_date|passenger_count|trip_distance|ratecode_id|store_and_fwd_flag|PU_location_id|DO_location_id|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|month|PU_day|DO_day|PU_hour|DO_hour|
+---------+-------------------+-------------------+---------------+-------------+-----------+------------------+--------------+--------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+------+------+-------+-------+
|     null|               null|               null|           null|         null|       null|              null|          null|          null|        null|       null| n

### Process the nta_codes.json file. The NTA names that contain ```-``` are split into separate rows, but they have the same NTA code.

In [8]:
s3 = boto3.resource('s3')

content_object = s3.Object('tung99-bucket', 'nta_codes.json')
file_content = content_object.get()['Body'].read().decode('utf-8')
json_content = json.loads(file_content)
nta_data = json_content["data"]
nta_list = []

for nta in nta_data:
    for sub_nta in nta[13].split('-'):
        nta_list.append((nta[11], nta[12], sub_nta))
    
rdd = spark.sparkContext.parallelize(nta_list)
ntaSchema = StructType([
    StructField("boro_name", StringType(), nullable=False),
    StructField("nta_code", StringType(), nullable=False),
    StructField("nta_name", StringType(), nullable=False)
])
nta_df = spark.createDataFrame(rdd, schema=ntaSchema)

nta_df.createOrReplaceTempView("ntas")
nta_df.show(5)

+---------+--------+-------------+
|boro_name|nta_code|     nta_name|
+---------+--------+-------------+
| Brooklyn|    BK88| Borough Park|
|   Queens|    QN51|  Murray Hill|
|   Queens|    QN27|East Elmhurst|
| Brooklyn|    BK23|West Brighton|
|   Queens|    QN41|Fresh Meadows|
+---------+--------+-------------+
only showing top 5 rows



## Convert information gathered from the above DataFrames into specific tables.
Each code cell shows the processing of one table (with 4 in total).

In [9]:
time_table = spark.sql('''
    SELECT year(date) AS year, month(date) AS month, dayofmonth(date) AS day, dayofweek(date) AS weekday, hour
    FROM temperatures
    WHERE year(date)=2018 AND month(date)=7
''').distinct().withColumn('time_id', monotonically_increasing_id())

time_table.createOrReplaceTempView("times")
time_table.show(5)

+----+-----+---+-------+----+-------+
|year|month|day|weekday|hour|time_id|
+----+-----+---+-------+----+-------+
|2018|    7|  4|      4|   2|      0|
|2018|    7|  4|      4|  17|      1|
|2018|    7|  5|      5|  11|      2|
|2018|    7|  6|      6|  12|      3|
|2018|    7| 25|      4|   0|      4|
+----+-----+---+-------+----+-------+
only showing top 5 rows



In [10]:
trips_table = spark.sql('''
    SELECT vendor_id, times.time_id AS PU_date_id, DO_day, t.month, 
            DO_hour, passenger_count, trip_distance, PU_location_id, 
            DO_location_id, payment_type, fare_amount, extra, mta_tax,
            tip_amount, tolls_amount, improvement_surcharge, total_amount
    FROM trips t
    JOIN times ON t.PU_day=times.day AND t.month=times.month AND t.PU_hour=times.hour
''')

trips_table.createOrReplaceTempView("trips")
trips_table = spark.sql('''
    SELECT vendor_id, PU_date_id, times.time_id AS DO_date_id, passenger_count, 
            trip_distance, PU_location_id, DO_location_id, payment_type, 
            fare_amount, extra, mta_tax, tip_amount, tolls_amount, 
            improvement_surcharge, total_amount
    FROM trips t
    JOIN times ON t.DO_day=times.day AND t.month=times.month AND t.DO_hour=times.hour
''')

trips_table.show(5)

+---------+------------+-------------+---------------+-------------+--------------+--------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|vendor_id|  PU_date_id|   DO_date_id|passenger_count|trip_distance|PU_location_id|DO_location_id|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+---------+------------+-------------+---------------+-------------+--------------+--------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|        2|283467841536|1125281431552|              3|         1.28|           246|           234|           1|        7.5|  0.5|    0.5|      1.76|         0.0|                  0.3|       10.56|
|        2| 25769803778|1125281431552|              1|         2.01|           164|            79|           2|        8.0|  0.5|    0.5|       0.0|         0.0|                  0.3|         9.3|
|        2| 257

In [11]:
loc_table = spark.sql('''
    SELECT location_id, z.boro_name, nta_code, z.nta_name, service_zone
    FROM zones z
    JOIN ntas ON z.nta_name=ntas.nta_name
''')

loc_table.createOrReplaceTempView("locations")
loc_table.show(5)

+-----------+-------------+--------+-------------+------------+
|location_id|    boro_name|nta_code|     nta_name|service_zone|
+-----------+-------------+--------+-------------+------------+
|         26|     Brooklyn|    BK88| Borough Park|   Boro Zone|
|        170|    Manhattan|    QN51|  Murray Hill| Yellow Zone|
|         70|       Queens|    QN27|East Elmhurst|   Boro Zone|
|        245|Staten Island|    BK23|West Brighton|   Boro Zone|
|         98|       Queens|    QN41|Fresh Meadows|   Boro Zone|
+-----------+-------------+--------+-------------+------------+
only showing top 5 rows



In [12]:
temps_table = spark.sql('''
    SELECT times.time_id, l.location_id, t.air_temp, t.install_type
    FROM temperatures t
    JOIN times ON times.month=t.month AND times.day=t.day AND times.hour=t.hour
    JOIN locations l ON l.nta_code=t.nta_code
''')

temps_table.show(5)

+-------------+-----------+-----------+------------+
|      time_id|location_id|   air_temp|install_type|
+-------------+-----------+-----------+------------+
|1022202216448|         75|73.18433333| Street Tree|
|1022202216448|         75|72.95483333| Street Tree|
|1022202216448|         75|71.95583333| Street Tree|
|1022202216448|         75|73.24216667| Street Tree|
|1022202216448|         75|    72.7315| Street Tree|
+-------------+-----------+-----------+------------+
only showing top 5 rows



## Data quality checks happen after all tables have been made:
- Since the data only comes from the month of July, there should be exactly 744 entries corresponding to 31 days of July (and each day contains 24 hours).
- All the tables should not contain any ```NULL```s, as there was only one ```NULL``` row in the beginning, which should be eliminated after the ```JOIN``` on the ```trips``` table.

In [13]:
if time_table.count() != 31*24:
    raise ValueError('Some hours are missing!')

table_list = [trips_table, time_table, temps_table, loc_table]

for table in table_list:
    # Check for NULL and NaN in every column of each table
    null_count = table.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) \
                               for c in table.columns]).rdd.map(lambda x: (1,x[1])).reduceByKey(lambda x,y: x + y).collect()[0][1]
    if null_count != 0:
        raise ValueError('Null values(s) found!')
print('No null values found.')

No null values found.


## Finally, the resulting tables are written back to an S3 bucket in the form of parquet files.

In [None]:
output_bucket = "s3a://tung99-bucket/"

trips_output = os.path.join(output_bucket, "trips")
temperatures_output = os.path.join(output_bucket, "temperatures")
locations_output = os.path.join(output_bucket, "locations")
times_output = os.path.join(output_bucket, "times")

trips_table.write.parquet(trips_output, mode='overwrite', partitionBy=['PU_location_id', 'DO_location_id'])
temps_table.write.parquet(temperatures_output, mode='overwrite', partitionBy='location_id')
loc_table.write.parquet(locations_output, mode='overwrite', partitionBy='service_zone')
time_table.write.parquet(times_output, mode='overwrite', partitionBy='weekday')