### Data Engineering Capstone Project

#### Project Summary
The goal of this project is to create a data pipeline that provides a clean relational database that will be used for a data visualization of the number of yellow taxi trips, green taxi trips, uber taxi trips and lyft taxi from February to June 2020 along with the case count of COVID-19 in New York city .

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime, unix_timestamp, to_date, lit
import pandas as pd
from pyspark.sql.types import IntegerType
from pyspark.sql.types import TimestampType
from pyspark.sql.types import DateType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import from_unixtime            
import pyspark.sql.functions as F

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1599341779313_0021,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

The goal of this project is to create a data pipeline that provides a clean relational database that will be used for a data visualization of the number of yellow taxi trips, green taxi trips, uber taxi trips and lyft taxi from February to June 2020 along with the case count of COVID-19 in New York city. We think that the data visualization could bring insights to the data and this approach can be extended to other economic activities. 

We used Apache Spark for building the data pipeline and saved the clean data in a SQLite database. 

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 


The data that we are going to use comes from the following web sites:

[TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

"The yellow taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. This dataset is provided by NYC Taxi and Limousine Commission". 

[NYC Department of Health and Mental Hygiene](https://github.com/nychealth/coronavirus-data)

From the above repository we used only the file case-hosp-death.csv which contains daily counts of confirmed cases, hospitalizations, and deaths.

In order to download the data we used the bash script download.sh which contains the 
following code:

```bash
#!/bin/bash

echo "Downloading yellow trip data"
for i in {2..6}; 
do
    wget  https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-0$i.csv 
done


echo "Downloading green trip data"
for i in {2..6}; 
do
    wget  https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-0$i.csv 
done


echo "Downloading uber and lyft trip data"
for i in {2..6}; 
do
    wget  https://s3.amazonaws.com/nyc-tlc/trip+data/fhvhv_tripdata_2020-0$i.csv 
done



echo "Downloading covid data"
wget https://raw.githubusercontent.com/nychealth/coronavirus-data/master/case-hosp-death.csv
```

This script was executed in a in a m5.xlarge AWS EC2 instance with Ubuntu Server 16.04 installed. Next, we copy the csv files to a s3 bucket using the following python script: 

```python
#!/usr/bin/env python3

import pandas as pd
import boto3
import json
import glob

import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY = config.get('AWS','KEY')
SECRET = config.get('AWS','SECRET')

s3 = boto3.resource('s3',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )


# The following url was very helpful 
# https://stackoverflow.com/questions/37017244/uploading-a-file-to-a-s3-bucket-with-a-prefix-using-boto3

files = glob.glob("*.csv")

for file in files:
   print("Uploading file {} to s3 bucket...".format(file))
   s3.meta.client.upload_file(file, "ricrio", file)


```

The file dwh.cfg has the following structure: 


```python
[AWS]
KEY=put_your_access_key_id_here
SECRET=put_your_secret_access_key_here 
```




Next, we will read the datasets from s3, the total size of the datasets is 4.3 GB.

In [2]:
# the following code does not work.
#config = configparser.ConfigParser()
#config.read('dl.cfg')


#os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
#os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

os.environ['AWS_ACCESS_KEY_ID']="put_your_credentials_here"
os.environ['AWS_SECRET_ACCESS_KEY']="put_your_credentials_here"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# This code is not neccesary since that the spark session variable is already available. 
#spark = SparkSession \
#        .builder \
#        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
#        .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# we read the yellow trip data which is in a s3 bucket. 
df_spark_yellow_taxi = spark.read.csv("s3a://ricrio/yellow_tripdata_*", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df_spark_green_taxi = spark.read.csv("s3a://ricrio/green_tripdata_*", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df_spark_uber_lyft_taxi = spark.read.csv("s3a://ricrio/fhvhv_tripdata_*", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df_spark_covid = spark.read.csv("s3://ricrio/case-hosp-death.csv", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 2: Explore and Assess the Data
#### Explore and cleaning the Data 
Identify data quality issues, like missing values, duplicate data, etc.

First of all, we will explore the yellow taxi trips dataset.

##### Yellow taxi trips dataset


In [8]:
df_spark_yellow_taxi.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)

The printSchema method shows that there are 17 variables, all the variables are interpreted as strings by default.

In [9]:
df_spark_yellow_taxi.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10442770

There are nearly ten million and a half of rows. 

In [10]:
df_spark_yellow_taxi.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(VendorID='1', tpep_pickup_datetime='2020-02-01 00:17:35', tpep_dropoff_datetime='2020-02-01 00:30:32', passenger_count='1', trip_distance='2.60', RatecodeID='1', store_and_fwd_flag='N', PULocationID='145', DOLocationID='7', payment_type='1', fare_amount='11', extra='0.5', mta_tax='0.5', tip_amount='2.45', tolls_amount='0', improvement_surcharge='0.3', total_amount='14.75', congestion_surcharge='0'), Row(VendorID='1', tpep_pickup_datetime='2020-02-01 00:32:47', tpep_dropoff_datetime='2020-02-01 01:05:36', passenger_count='1', trip_distance='4.80', RatecodeID='1', store_and_fwd_flag='N', PULocationID='45', DOLocationID='61', payment_type='1', fare_amount='21.5', extra='3', mta_tax='0.5', tip_amount='6.3', tolls_amount='0', improvement_surcharge='0.3', total_amount='31.6', congestion_surcharge='2.5'), Row(VendorID='1', tpep_pickup_datetime='2020-02-01 00:31:44', tpep_dropoff_datetime='2020-02-01 00:43:28', passenger_count='1', trip_distance='3.20', RatecodeID='1', store_and_fwd_flag=

We get the 5 first rows of the dataframe. The data dictionary is shown as follows:


| Field Name            | Description                                                                                                                                                                                                                                               |
|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| VendorID              | A code indicating the TPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.                                                                                                                                      |
| tpep_pickup_datetime  | The date and time when the meter was engaged.                                                                                                                                                                                                             |
| tpep_dropoff_datetime | The date and time when the meter was disengaged.                                                                                                                                                                                                          |
| Passenger_count       | The number of passengers in the vehicle. This is a driver-entered value                                                                                                                                                                                   |
| Trip_distance         | The elapsed trip distance in miles reported by the taximeter.                                                                                                                                                                                             |
| PULocationID          | TLC Taxi Zone in which the taximeter was engaged                                                                                                                                                                                                          |
| DOLocationID          | TLC Taxi Zone in which the taximeter was disengaged                                                                                                                                                                                                       |
| RateCodeID            | The final rate code in effect at the end of the trip.  1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare 6=Group ride                                                                                                             |
| Store_and_fwd_flag    | This flag indicates whether the trip record was held in vehicle   memory before sending to the vendor, aka “store and forward,”   because the vehicle did not have a connection to the server.  Y=store and forward trip  N= not a store and forward trip |
| Payment_type          | A numeric code signifying how the passenger paid for the trip.    1= Credit card  2= Cash  3= No charge  4= Dispute  5= Unknown  6= Voided trip                                                                                                           |
| Fare_amount           | The time-and-distance fare calculated by the meter.                                                                                                                                                                                                       |
| Extra                 | Miscellaneous extras and surcharges.  Currently, this only includes the 0.50 and 1 rush hour and overnight charges      |
| MTA_tax               | \$0.50 MTA tax that is automatically triggered based on the metered rate in use.                                                                                                                                                                           |
| Improvement_surcharge | \$0.30 improvement surcharge assessed trips at the flag drop.The improvement surcharge began being levied in 2015.                                                                                                                                         |
| Tip_amount            | Tip amount –This field is automatically populated for credit card tips. Cash tips are not included.                                                                                                                                                       |
| Tolls_amount          | Total amount of all tolls paid in trip.                                                                                                                                                                                                                   |
| Total_amount          | The total amount charged to passengers. Does not include cash tips.                                                                                                                                                                                       |


Next, we will calculate the count of yellow taxi trips. 


In [11]:
df_spark_yellow_taxi_count = df_spark_yellow_taxi.select("tpep_pickup_datetime")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
df_spark_yellow_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- tpep_pickup_datetime: string (nullable = true)

We will change the tpep_pickup_datetime colum from string type to TimestampType type with the following code.

In [13]:
df_spark_yellow_taxi_count = df_spark_yellow_taxi_count.withColumn("tpep_pickup_datetime", df_spark_yellow_taxi_count["tpep_pickup_datetime"].cast(TimestampType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
df_spark_yellow_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- tpep_pickup_datetime: timestamp (nullable = true)

In [15]:
df_spark_yellow_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(tpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 17, 35)), Row(tpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 32, 47)), Row(tpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 31, 44)), Row(tpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 7, 35)), Row(tpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 51, 43))]

We will change the tpep_pickup_datetime colum from TimestampType type to DateType type with the following code.

In [16]:
df_spark_yellow_taxi_count = df_spark_yellow_taxi_count.withColumn("tpep_pickup_datetime", df_spark_yellow_taxi_count["tpep_pickup_datetime"].cast(DateType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df_spark_yellow_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- tpep_pickup_datetime: date (nullable = true)

In [18]:
df_spark_yellow_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(tpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(tpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(tpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(tpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(tpep_pickup_datetime=datetime.date(2020, 2, 1))]

We will identify missing values with the following code.

In [19]:
df_spark_yellow_taxi_count.where(col("tpep_pickup_datetime").isNull()).take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

There are no null values in the tpep_pickup_datetime column in this case it is not necessary to delete the missing values. Then, we will calculate the count of yellow taxi trips with the following code.

In [20]:
df_spark_yellow_taxi_count =  df_spark_yellow_taxi_count.groupBy('tpep_pickup_datetime').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df_spark_yellow_taxi_count.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+
|tpep_pickup_datetime| count|
+--------------------+------+
|          2020-04-30| 10283|
|          2020-03-07|208200|
|          2020-03-13|134702|
|          2020-02-04|216049|
|          2020-02-15|202201|
|          2009-01-01|    44|
|          2020-05-23|  8009|
|          2020-02-12|236425|
|          2020-05-08| 13136|
|          2020-05-24|  7984|
|          2020-06-04| 15836|
|          2020-04-29|  9683|
|          2020-05-10|  7605|
|          2020-04-26|  4992|
|          2020-04-21|  8085|
|          2020-03-09|175572|
|          2020-03-10|184068|
|          2020-03-11|182630|
|          2020-06-05| 16501|
|          2020-04-19|  5122|
+--------------------+------+
only showing top 20 rows

##### Green taxi trips dataset

In [22]:
df_spark_green_taxi.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)

The printSchema method shows that there are 20 variables, all the variables are interpreted as strings by default.

In [23]:
df_spark_green_taxi.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

778119

There are 778 thousand rows. 

In [24]:
df_spark_green_taxi.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(VendorID='2', lpep_pickup_datetime='2020-02-01 00:10:25', lpep_dropoff_datetime='2020-02-01 00:14:34', store_and_fwd_flag='N', RatecodeID='1', PULocationID='74', DOLocationID='41', passenger_count='1', trip_distance='.76', fare_amount='4.5', extra='0.5', mta_tax='0.5', tip_amount='0', tolls_amount='0', ehail_fee=None, improvement_surcharge='0.3', total_amount='5.8', payment_type='2', trip_type='1', congestion_surcharge='0'), Row(VendorID='2', lpep_pickup_datetime='2020-02-01 00:16:59', lpep_dropoff_datetime='2020-02-01 00:21:35', store_and_fwd_flag='N', RatecodeID='1', PULocationID='74', DOLocationID='74', passenger_count='1', trip_distance='.72', fare_amount='5', extra='0.5', mta_tax='0.5', tip_amount='0', tolls_amount='0', ehail_fee=None, improvement_surcharge='0.3', total_amount='6.3', payment_type='1', trip_type='1', congestion_surcharge='0'), Row(VendorID='2', lpep_pickup_datetime='2020-02-01 00:19:31', lpep_dropoff_datetime='2020-02-01 00:25:29', store_and_fwd_flag='N', Rate

We get the 5 first rows of the dataframe. The data dictionary is shown as follows:

| Field Name            | Description                                                                                                                                                                                                                                               |
|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| VendorID              | A code indicating the TPEP provider that provided the record. 1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.                                                                                                                                      |
| lpep_pickup_datetime | The date and time when the meter was engaged. |
| lpep_dropoff_datetime  | The date and time when the meter was disengaged.   |
| Passenger_count  | The number of passengers in the vehicle.  This is a driver-entered value.   |
| Trip_distance  | The elapsed trip distance in miles reported by the taximeter.  |
| PULocationID  | TLC Taxi Zone in which the taximeter was engaged  |
| DOLocationID  | TLC Taxi Zonein which the taximeter was disengaged  |
| RateCodeID  | The final rate code in effect at the end of the trip. 1= Standard rate 2=JFK 3=Newark 4=Nassau or Westchester 5=Negotiated fare6=Group rid  |
| Store_and_fwd_flag  | This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka “store and forward,” because the vehicle did not have a connection to the server. Y= store and forward trip N= not a store and forward trip |
| Payment_type  | A numeric code signifying how the passenger paid for the trip. 1= Credit card 2= Cash 3= No charge 4= Dispute 5= Unknown 6= Voided trip  |
| Fare_amount  | The time-and-distance fare calculated by the meter.  |
| Extra  | Miscellaneous extras and surcharges.  Currently, this only includes the 0.50 and 1 rush hour and overnight charges.  |
| MTA_tax  | \$0.50 MTA tax that is automatically triggered based on the metered rate in use.  |
| Improvement_surcharge  | \$0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.  |
| Tip_amount  | Tip amount –This field is automatically populated for credit card tips. Cash tips are not included.  |
| Tolls_amount  | Total amount of all tolls paid in trip.   |
| Total_amount  | The total amount charged to passengers. Does not include cash tips. |

Next, we will calculate the count of green taxi trips. 


In [25]:
df_spark_green_taxi_count = df_spark_green_taxi.select("lpep_pickup_datetime")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df_spark_green_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- lpep_pickup_datetime: string (nullable = true)

We will change the lpep_pickup_datetime colum from string type to TimestampType type with the following code.

In [27]:
df_spark_green_taxi_count = df_spark_green_taxi_count.withColumn("lpep_pickup_datetime", df_spark_green_taxi_count["lpep_pickup_datetime"].cast(TimestampType()))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df_spark_green_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- lpep_pickup_datetime: timestamp (nullable = true)

In [29]:
df_spark_green_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(lpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 10, 25)), Row(lpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 16, 59)), Row(lpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 19, 31)), Row(lpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 43, 52)), Row(lpep_pickup_datetime=datetime.datetime(2020, 2, 1, 0, 32, 53))]

We will change the lpep_pickup_datetime colum from TimestampType type to DateType type with the following code.

In [30]:
df_spark_green_taxi_count = df_spark_green_taxi_count.withColumn("lpep_pickup_datetime", df_spark_green_taxi_count["lpep_pickup_datetime"].cast(DateType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
df_spark_green_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- lpep_pickup_datetime: date (nullable = true)

In [32]:
df_spark_green_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(lpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(lpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(lpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(lpep_pickup_datetime=datetime.date(2020, 2, 1)), Row(lpep_pickup_datetime=datetime.date(2020, 2, 1))]

We will identify missing values with the following code.

In [33]:
df_spark_green_taxi_count.where(col("lpep_pickup_datetime").isNull()).take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

There are no null values in the lpep_pickup_datetime column in this case it is not necessary to delete the missing values. Then, we will calculate the count of green taxi trips with the following code.

In [34]:
df_spark_green_taxi_count =  df_spark_green_taxi_count.groupBy('lpep_pickup_datetime').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
df_spark_green_taxi_count.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|lpep_pickup_datetime|count|
+--------------------+-----+
|          2020-04-30| 1677|
|          2020-03-13|10783|
|          2020-03-07|13230|
|          2020-02-15|12786|
|          2020-02-04|13981|
|          2009-01-01|    6|
|          2020-05-23| 1399|
|          2020-02-12|14964|
|          2020-05-08| 2167|
|          2020-06-04| 2157|
|          2020-05-24| 1001|
|          2020-04-29| 1576|
|          2020-05-10| 1341|
|          2020-04-26|  666|
|          2020-04-21| 1079|
|          2020-03-09|12002|
|          2020-03-10|12258|
|          2020-03-11|12268|
|          2020-06-05| 2323|
|          2020-04-19|  692|
+--------------------+-----+
only showing top 20 rows

##### Uber and Lyft taxi trips dataset


In [36]:
df_spark_uber_lyft_taxi.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)

The printSchema method shows that there are 7 variables, all the variables are interpreted as strings by default.

In [37]:
df_spark_uber_lyft_taxi.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02872', pickup_datetime='2020-02-01 00:06:20', dropoff_datetime='2020-02-01 00:11:41', PULocationID='107', DOLocationID='114', SR_Flag=None), Row(hvfhs_license_num='HV0003', dispatching_base_num='B02872', pickup_datetime='2020-02-01 00:27:49', dropoff_datetime='2020-02-01 00:46:37', PULocationID='249', DOLocationID='263', SR_Flag=None), Row(hvfhs_license_num='HV0003', dispatching_base_num='B02872', pickup_datetime='2020-02-01 00:47:37', dropoff_datetime='2020-02-01 01:03:17', PULocationID='263', DOLocationID='41', SR_Flag=None), Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2020-02-01 00:10:26', dropoff_datetime='2020-02-01 00:34:57', PULocationID='4', DOLocationID='88', SR_Flag=None), Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2020-02-01 00:48:56', dropoff_datetime='2020-02-01 00:57:37', PULocationID='161', DOLocationID='162', SR_Flag=None)]

We get the 5 first rows of the dataframe. The data dictionary is shown as follows:

| Field Name            | Description                                                                                                                                                                                                                                               |
|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Hvfhs_license_num              | The TLC license number of the HVFHS base or businessAs of September 2019, the HVFHS licensees are the following: HV0002: Juno, HV0003: Uber, HV0004: Via, HV0005: Lyft                                                                                                                                     |
| Dispatching_base_num | The TLC Base License Number of the base that dispatched the trip. |
| Pickup_datetime | The date and time of the trip pick-up | 
| DropOff_datetime  | The date and time of the trip drop-off |
| PULocationID  | TLC Taxi Zone in which the trip began | 
| DOLocationID  | TLC Taxi Zone in which the trip ended |
| SR_Flag  |Indicates if the trip was a part of a shared ride chain offered by a High Volume FHV company (e.g.Uber Pool, Lyft Line). Forshared trips, the value is 1. For non-shared rides, this field is null. |


With the following code we will get the unique values of hvfhs_license_num column.

In [38]:
df_spark_uber_lyft_taxi.select("hvfhs_license_num").distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|hvfhs_license_num|
+-----------------+
|           HV0004|
|           HV0005|
|           HV0003|
+-----------------+

From the above there are values for Via, Uber, and Lyft but there are no values for Juno. 

We will create two dataframe for uber and lyft. 

In [39]:
df_spark_uber_taxi = df_spark_uber_lyft_taxi.where(col("hvfhs_license_num") == "HV0003").select("pickup_datetime")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
df_spark_uber_taxi.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_datetime: string (nullable = true)

In [41]:
df_spark_uber_taxi.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(pickup_datetime='2020-02-01 00:06:20'), Row(pickup_datetime='2020-02-01 00:27:49'), Row(pickup_datetime='2020-02-01 00:47:37'), Row(pickup_datetime='2020-02-01 00:10:26'), Row(pickup_datetime='2020-02-01 00:48:56')]

In [42]:
df_spark_uber_taxi.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

38152707

There are 38 millions of rows.

We will change the pickup_datetime colum from string type to TimestampType type with the following code.

In [43]:
df_spark_uber_taxi_count = df_spark_uber_taxi.withColumn("pickup_datetime", df_spark_uber_taxi["pickup_datetime"].cast(TimestampType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
df_spark_uber_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_datetime: timestamp (nullable = true)

In [45]:
df_spark_uber_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 6, 20)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 27, 49)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 47, 37)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 10, 26)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 48, 56))]

We will change the pickup_datetime colum from TimestampType type to DateType type with the following code.

In [46]:
df_spark_uber_taxi_count = df_spark_uber_taxi_count.withColumn("pickup_datetime", df_spark_uber_taxi_count["pickup_datetime"].cast(DateType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
df_spark_uber_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_datetime: date (nullable = true)

In [48]:
df_spark_uber_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1))]

We will identify missing values with the following code.

In [49]:
df_spark_uber_taxi_count.where(col("pickup_datetime").isNull()).take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

There are no null values in the pickup_datetime column in this case it is not necessary to delete the missing values. Then, we will calculate the count of uber taxi trips with the following code.

In [50]:
df_spark_uber_taxi_count =  df_spark_uber_taxi_count.groupBy('pickup_datetime').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
df_spark_uber_taxi_count.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+------+
|pickup_datetime| count|
+---------------+------+
|     2020-04-30|128397|
|     2020-03-07|641273|
|     2020-03-13|466869|
|     2020-02-15|627477|
|     2020-02-04|455434|
|     2020-05-23|143010|
|     2020-02-12|493095|
|     2020-05-08|150325|
|     2020-06-04|134098|
|     2020-05-24|134702|
|     2020-04-29|118377|
|     2020-05-10|131154|
|     2020-04-26| 94219|
|     2020-04-21|105946|
|     2020-03-09|459584|
|     2020-03-10|458998|
|     2020-03-11|448862|
|     2020-04-19| 92133|
|     2020-06-05|144314|
|     2020-02-11|474745|
+---------------+------+
only showing top 20 rows

Next, we will calculate the count of lyft taxi trips.

In [52]:
df_spark_lyft_taxi = df_spark_uber_lyft_taxi.where(col("hvfhs_license_num") == "HV0005").select("pickup_datetime")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
df_spark_lyft_taxi.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_datetime: string (nullable = true)

In [54]:
df_spark_lyft_taxi.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(pickup_datetime='2020-02-01 00:10:24'), Row(pickup_datetime='2020-02-01 00:10:32'), Row(pickup_datetime='2020-02-01 00:12:56'), Row(pickup_datetime='2020-02-01 00:31:21'), Row(pickup_datetime='2020-02-01 00:44:41')]

In [55]:
df_spark_lyft_taxi.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

13635111

There are 13 and a half millions of rows.

We will change the pickup_datetime colum from string type to TimestampType type with the following code.

In [56]:
df_spark_lyft_taxi_count = df_spark_lyft_taxi.withColumn("pickup_datetime", df_spark_lyft_taxi["pickup_datetime"].cast(TimestampType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
df_spark_lyft_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_datetime: timestamp (nullable = true)

In [58]:
df_spark_lyft_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 10, 24)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 10, 32)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 12, 56)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 31, 21)), Row(pickup_datetime=datetime.datetime(2020, 2, 1, 0, 44, 41))]

We will change the pickup_datetime colum from TimestampType type to DateType type with the following code.

In [59]:
df_spark_lyft_taxi_count = df_spark_lyft_taxi_count.withColumn("pickup_datetime", df_spark_lyft_taxi_count["pickup_datetime"].cast(DateType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
df_spark_lyft_taxi_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- pickup_datetime: date (nullable = true)

In [61]:
df_spark_lyft_taxi_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1)), Row(pickup_datetime=datetime.date(2020, 2, 1))]

We will identify missing values with the following code.

In [62]:
df_spark_lyft_taxi_count.where(col("pickup_datetime").isNull()).take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

There are no null values in the pickup_datetime column in this case it is not necessary to delete the missing values. Then, we will calculate the count of lyft taxi trips with the following code.


In [63]:
df_spark_lyft_taxi_count =  df_spark_lyft_taxi_count.groupBy('pickup_datetime').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
df_spark_lyft_taxi_count.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+------+
|pickup_datetime| count|
+---------------+------+
|     2020-04-30| 43460|
|     2020-03-13|178447|
|     2020-03-07|226778|
|     2020-02-15|243454|
|     2020-02-04|140767|
|     2020-05-23| 53421|
|     2020-02-12|157806|
|     2020-05-08| 51373|
|     2020-06-04| 60824|
|     2020-05-24| 53527|
|     2020-04-29| 41142|
|     2020-05-10| 51271|
|     2020-04-26| 39685|
|     2020-04-21| 40546|
|     2020-03-09|147601|
|     2020-03-10|144810|
|     2020-03-11|158213|
|     2020-04-19| 38481|
|     2020-06-05| 60517|
|     2020-06-17| 82148|
+---------------+------+
only showing top 20 rows

Finally, we will prepare the covid data.

In [65]:
df_spark_covid.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- DATE_OF_INTEREST: string (nullable = true)
 |-- CASE_COUNT: string (nullable = true)
 |-- HOSPITALIZED_COUNT: string (nullable = true)
 |-- DEATH_COUNT: string (nullable = true)
 |-- CASE_COUNT_7DAY_AVG: string (nullable = true)
 |-- INCOMPLETE: string (nullable = true)

In [66]:
df_spark_covid.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(DATE_OF_INTEREST='02/29/2020', CASE_COUNT='1', HOSPITALIZED_COUNT='11', DEATH_COUNT='0', CASE_COUNT_7DAY_AVG=None, INCOMPLETE=None), Row(DATE_OF_INTEREST='03/01/2020', CASE_COUNT='0', HOSPITALIZED_COUNT='4', DEATH_COUNT='0', CASE_COUNT_7DAY_AVG=None, INCOMPLETE=None), Row(DATE_OF_INTEREST='03/02/2020', CASE_COUNT='0', HOSPITALIZED_COUNT='21', DEATH_COUNT='0', CASE_COUNT_7DAY_AVG=None, INCOMPLETE=None), Row(DATE_OF_INTEREST='03/03/2020', CASE_COUNT='2', HOSPITALIZED_COUNT='20', DEATH_COUNT='0', CASE_COUNT_7DAY_AVG=None, INCOMPLETE=None), Row(DATE_OF_INTEREST='03/04/2020', CASE_COUNT='5', HOSPITALIZED_COUNT='22', DEATH_COUNT='0', CASE_COUNT_7DAY_AVG=None, INCOMPLETE=None)]

In [67]:
df_spark_covid_count = df_spark_covid.select(["CASE_COUNT", "DATE_OF_INTEREST"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
df_spark_covid_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CASE_COUNT: string (nullable = true)
 |-- DATE_OF_INTEREST: string (nullable = true)

In [69]:
df_spark_covid_count = df_spark_covid_count.withColumn("DATE_OF_INTEREST", from_unixtime(unix_timestamp('DATE_OF_INTEREST', 'MM/dd/yyy')) )
df_spark_covid_count = df_spark_covid_count.withColumn("CASE_COUNT", df_spark_covid_count["CASE_COUNT"].cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
df_spark_covid_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CASE_COUNT: integer (nullable = true)
 |-- DATE_OF_INTEREST: string (nullable = true)

In [71]:
df_spark_covid_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(CASE_COUNT=1, DATE_OF_INTEREST='2020-02-29 00:00:00'), Row(CASE_COUNT=0, DATE_OF_INTEREST='2020-03-01 00:00:00'), Row(CASE_COUNT=0, DATE_OF_INTEREST='2020-03-02 00:00:00'), Row(CASE_COUNT=2, DATE_OF_INTEREST='2020-03-03 00:00:00'), Row(CASE_COUNT=5, DATE_OF_INTEREST='2020-03-04 00:00:00')]

In [72]:
df_spark_covid_count = df_spark_covid_count.withColumn("DATE_OF_INTEREST", df_spark_covid_count["DATE_OF_INTEREST"].cast(TimestampType()))
df_spark_covid_count = df_spark_covid_count.withColumn("DATE_OF_INTEREST", df_spark_covid_count["DATE_OF_INTEREST"].cast(DateType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
df_spark_covid_count.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- CASE_COUNT: integer (nullable = true)
 |-- DATE_OF_INTEREST: date (nullable = true)

In [74]:
df_spark_covid_count.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(CASE_COUNT=1, DATE_OF_INTEREST=datetime.date(2020, 2, 29)), Row(CASE_COUNT=0, DATE_OF_INTEREST=datetime.date(2020, 3, 1)), Row(CASE_COUNT=0, DATE_OF_INTEREST=datetime.date(2020, 3, 2)), Row(CASE_COUNT=2, DATE_OF_INTEREST=datetime.date(2020, 3, 3)), Row(CASE_COUNT=5, DATE_OF_INTEREST=datetime.date(2020, 3, 4))]

In all datasets we will take only dates from 2020-02-29 to 2020-06-30. 

In [75]:
# good reference https://stackoverflow.com/questions/31407461/datetime-range-filter-in-pyspark-sql

dates = ("2020-02-29",  "2020-06-30")
date_from, date_to = [to_date(lit(s)).cast(DateType()) for s in dates]

#sf.where((sf.my_col > date_from) & (sf.my_col < date_to))
df_spark_yellow_taxi_count = df_spark_yellow_taxi_count.where( (df_spark_yellow_taxi_count.tpep_pickup_datetime > date_from) & (df_spark_yellow_taxi_count.tpep_pickup_datetime < date_to) )
df_spark_green_taxi_count = df_spark_green_taxi_count.where( (df_spark_green_taxi_count.lpep_pickup_datetime > date_from) & (df_spark_green_taxi_count.lpep_pickup_datetime < date_to) )
df_spark_uber_taxi_count = df_spark_uber_taxi_count.where( (df_spark_uber_taxi_count.pickup_datetime > date_from) & (df_spark_uber_taxi_count.pickup_datetime < date_to) )
df_spark_lyft_taxi_count = df_spark_lyft_taxi_count.where( (df_spark_lyft_taxi_count.pickup_datetime > date_from) & (df_spark_lyft_taxi_count.pickup_datetime < date_to) )
df_spark_covid_count = df_spark_covid_count.where( (df_spark_covid_count.DATE_OF_INTEREST > date_from)  & ( df_spark_covid_count.DATE_OF_INTEREST < date_to) )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We will verify if the number of rows in df_spark_yellow_taxi_count, df_spark_green_taxi_count, df_spark_uber_taxi_count, and df_spark_lyft_taxi_count are greater than zero.

In [76]:
print(df_spark_yellow_taxi_count.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

121

In [77]:
print(df_spark_green_taxi_count.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

121

In [78]:
print(df_spark_uber_taxi_count.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

121

In [79]:
print(df_spark_lyft_taxi_count.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

121

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model. 

In order to combine the information of the df_spark_yellow_taxi_count, df_spark_green_taxi_count   df_spark_uber_taxi_count, and df_spark_lyft_taxi_count, we design the following Entity Relationship Model which is an abstract data model, that defines a data or information structure which can be implemented in a database, typically a relational database.  

![entity relationship diagram](./erd.png)

There are three entitys and 1 relation between the three entities. In the following table we will describe in more details the information provided in the entity relationship model.

| Name | Type  | Description  |
|------|-------|--------------|
|     event|  entity    |  records the information of the event of interest which is covid-19. |
|     economic_item |  entity    |  records the information of the economic item which can be: yellow taxi, green taxi, uber or lyft. |
|     day_of_interest|  entity    |  a particular day of interest. |
|     economic_item_event_day|  relation    |  represents how the entities share information. |


The entity relationship model can be traduced to a more operative model which is the relational model which is shown below.  

![relational model](./relational.png)

The reasons for choosing the relation model are shown below: 

* Simplicity 
* We can guarantee the data integrity which is an assurance of the accuracy and consistency of the data over its entire life-cycle.

From the relational model we can go on to write the DDL in PostgreSQL which is shown below. 

```sql

CREATE TABLE economic_item(
name varchar(100) not null primary key, 
description text default null
);


CREATE TABLE day_of_interest( 
day date not null primary key, 
comment text default null  
); 


CREATE TABLE event(
name varchar(100) not null primary key, 
description text  default null  
); 


CREATE TABLE economic_item_event_day(
name_economic_item varchar(100) not null,
day_of_interest date not null, 
name_event varchar(100) not null,
count_economic_item integer not null, 
count_event integer not null, 
PRIMARY KEY(name_economic_item, day_of_interest, name_event), 
   FOREIGN KEY(name_economic_item) 
      REFERENCES economic_item(name), 
   FOREIGN KEY(day_of_interest) 
      REFERENCES day_of_interest(day), 
   FOREIGN KEY(name_event) 
      REFERENCES event(name) 
); 

```

Poner lo siguiente en otro lugar 
* We can guarantee entity integrity through primary key
* We can guarantee referential integrity through foreign key
* Ftom  


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

From the step 2 we can insert the cleaned data into the model as follows: 

* Populate the economic_item table with the values: yellow_taxi, green_taxi, uber_taxi, and lyft_taxi.
* Populate the day_of_interest  with dates from 2020-02-29 to 2020-06-30. 
* Populate the event table with the value COVID-19.
* Populate the economic_item_event_day table combining the information of the pyspark dataframes: df_spark_yellow_taxi_count, df_spark_green_taxi_count, df_spark_uber_taxi_count, and df_spark_lyft_taxi_count.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In order to build the etl pipeline to create the data model, the following python script was created:

[ETL Pipeline]('https://raw.githubusercontent.com/ricardoues/data-engineering-capstone-project/master/etl.py')

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
In order to guarantee integrity in the data we define primary and foreign keys in the SQLite database. The DDL is shown below:


```sql


CREATE TABLE economic_item(
name varchar(100) not null primary key, 
description text default null
);


CREATE TABLE day_of_interest( 
day date not null primary key, 
comment text default null  
); 


CREATE TABLE event(
name varchar(100) not null primary key, 
description text  default null  
); 


CREATE TABLE economic_item_event_day(
name_economic_item varchar(100) not null,
day_of_interest date not null, 
name_event varchar(100) not null,
count_economic_item integer not null, 
count_event integer not null, 
PRIMARY KEY(name_economic_item, day_of_interest, name_event), 
   FOREIGN KEY(name_economic_item) 
      REFERENCES economic_item(name), 
   FOREIGN KEY(day_of_interest) 
      REFERENCES day_of_interest(day), 
   FOREIGN KEY(name_event) 
      REFERENCES event(name) 
); 

```

Moreover in the etl pipelie we defined a try catch block for the purpose of catching any error. Also we created the following validation rules in order to guarantee consistency in the data:


```python

 if not (yellow.shape[0] == green.shape[0] == uber.shape[0] == lyft.shape[0] == covid.shape[0]):
    raise Exception("The number of rows in the dataframes that represents taxi and covid data  must be equals.")

if yellow.shape[0] == 0:
    raise Exception("The number of rows must be greater than zero") 

```


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Table: economic_item 

| Field       | Description                                                          |
|-------------|----------------------------------------------------------------------|
| Name        | The name of the economic item: yellow taxi, green taxi, uber and lyft. |
| Description | Additional information.                                              |

Table day_of_interest

| Field       | Description                                                          |
|-------------|----------------------------------------------------------------------|
| day        | The day of interest 
| comment | Additional information.                                              |

Table event 

| Field       | Description                                                          |
|-------------|----------------------------------------------------------------------|
| name        | The event of interest in our case is COVID-19 but we can add other events. 
| description | Additional information.                                              |


Table economic_item_event_day 

| Field       | Description                                                          |
|-------------|----------------------------------------------------------------------|
| name_economic_item        | The name of the economic item: yellow taxi, green taxi, uber, and lyft.    |  
| day_of_interest |  The day of interest.  |
| name_event        |  The event of interest.  |
| count_economic_item |   In our case this is the number of taxis trips.  |
| count_event       |  In our case this is the number of covid cases. |


One advantage of this data model is that we can take account other economic activities and events. 

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

**Clearly state the rationale for the choice of tools and technologies for the project**: Apache Spark is a good choice for this project in order to treat with the raw data which is substantial (4.3 GB). The reasons for choosing SQLite are easier to install and use.

**Propose how often the data should be updated and why.**: According to the following web site:

[TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)


From the data published in September 5 it turns out that the data goes up to June. In order to answer question about the economic activity in the taxi sector we think that the data should be updated each month since according to accenture: "consumers attitudes, behaviors and purchasing habits are changing—and many of these new ways will remain post-pandemic" that is why we need a long period of time to observe changes in the patterns of taxi trips.

**Write a description of how you would approach the problem differently under the following scenarios**:
    
    

* The data was increased by 100x

In this case we can need more workers in Apache Spark.

* The data populates a dashboard that must be updated on a daily basis by 7am every day.

In this situation we need to take this actions:

1. Use Apache airflow to run daily updates in the data 
2. Use PostgreSQL or Redshift depending of web traffic


* The database needed to be accessed by 100+ people.

In this case it is preferable to use Redshift since that this dataware technologies scales well.

#### References 

https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf

https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf

https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf

https://www.freetutes.com/systemanalysis/sa8-integrity-rules-relational-database-model.html

https://en.wikipedia.org/wiki/Data_integrity

https://en.wikipedia.org/wiki/Entity–relationship_model

https://www.sqlite.org/whentouse.html

https://www.accenture.com/us-en/insights/consumer-goods-services/coronavirus-consumer-behavior-research