# Green Taxi Trip Records


In this notebook, we will perform the ETL process for the [Green Taxi Trip Records](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page).

**Obs.:** To perform the data assessment, we will use the [Data Dictionary – Green Taxi Trip Records](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf), provided by the TCL NYC Website. In this document, we will check the description of each field name, keeping in mind the possible values and range of values for each data field.

## Step 1: Import Dependencies

In [1]:
import pandas as pd
import numpy as np
import datetime
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType

## Step 2: Load the Data

Since the data has the same schema, we can easily perform: 

```spark.read()```

And pass the folder to it:

In [2]:
df = spark.read.parquet("gs://mobilab-tech-task-bucket/green-taxi")

We will also define the begin and the current year for future analysis

In [3]:
begin = 2020

now = datetime.datetime.now()
until = now.year

## Step 3: Exploratory Data Analysis

In order to get to know our data, we will perform a basic exploratory analysis of it:

In [4]:
print(f"There are {df.count()} rows in the data frame")

There are 3370228 rows in the data frame


In [5]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [6]:
df.select('passenger_count','trip_distance', 'total_amount', 'payment_type', 'tip_amount', 'trip_type').describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|summary|   passenger_count|     trip_distance|     total_amount|      payment_type|        tip_amount|         trip_type|
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+
|  count|           2366544|           3370228|          3370228|           2366544|           3370228|           2366533|
|   mean|1.2826940889330603| 70.45998095084565|21.15086389113421|1.4293070401395453| 1.344507695028476|1.0334062529447086|
| stddev|0.9282045229762351|2811.1694111225156| 16.2021957291594|0.5170671378178895|2.5232576853447246| 0.179694988392713|
|    min|               0.0|            -33.69|           -300.8|               1.0|             -86.0|               1.0|
|    max|              48.0|         360068.14|          2113.55|               5.0|             641.2|               2.0|
+-------+-------

In [7]:
df.select('lpep_pickup_datetime','lpep_dropoff_datetime').show(10)

+--------------------+---------------------+
|lpep_pickup_datetime|lpep_dropoff_datetime|
+--------------------+---------------------+
| 2019-12-18 15:52:30|  2019-12-18 15:54:39|
| 2020-01-01 00:45:58|  2020-01-01 00:56:39|
| 2020-01-01 00:41:38|  2020-01-01 00:52:49|
| 2020-01-01 00:52:46|  2020-01-01 01:14:21|
| 2020-01-01 00:19:57|  2020-01-01 00:30:56|
| 2020-01-01 00:52:33|  2020-01-01 01:09:54|
| 2020-01-01 00:10:18|  2020-01-01 00:22:16|
| 2020-01-01 01:03:14|  2020-01-01 01:29:45|
| 2020-01-01 00:04:11|  2020-01-01 00:09:48|
| 2020-01-01 00:25:52|  2020-01-01 00:32:16|
+--------------------+---------------------+
only showing top 10 rows



**Issues**

*   `trip_distance` < 0
*   `total_amount` < 0
*   `tip_amount` < 0
*   `passenger_count` > 10
* `lpep_pickup_datetime` < 2020

**Possible Issues**

* `passenger_count` < 0
* `payment_type` out of the range
* `lpep_dropoff_datetime` > 2022
* `lpep_dropoff_datetime` - `lpep_dropoff_datetime` < 0

We will handle with this issues in the transformation step.

## Step 4: Data Transformation

Here, we will perform a series of data transformation methods, such as filtering, type conversion, row dropping, etc. Focus on building a more robust dataset.

**Trip Distance Filtering**

In [8]:
df = df.filter('trip_distance >= 0')

In [9]:
df.filter('trip_distance < 0').count()

0

**Passenger Count Filtering**

In [10]:
df = df.filter('passenger_count <= 10')
df.agg({'passenger_count': 'max' }).show()

+--------------------+
|max(passenger_count)|
+--------------------+
|                 9.0|
+--------------------+



**Total Amount Filtering**

In [11]:
df = df.filter('total_amount >= 0')
df.agg({'total_amount': 'min' }).show()

+-----------------+
|min(total_amount)|
+-----------------+
|              0.0|
+-----------------+



**Tip Amount Filtering**

In [12]:
df = df.filter('tip_amount >= 0')
df.agg({'tip_amount': 'min' }).show()

+---------------+
|min(tip_amount)|
+---------------+
|            0.0|
+---------------+



**Payment Type Analysis**

Checkin wheater the payment is consistent with the total amount or not.

In [13]:
df.select('total_amount','payment_type').filter('total_amount == 0 and payment_type != 3').show(10)

+------------+------------+
|total_amount|payment_type|
+------------+------------+
|         0.0|         4.0|
|         0.0|         1.0|
|         0.0|         4.0|
|         0.0|         2.0|
|         0.0|         2.0|
|         0.0|         2.0|
|         0.0|         2.0|
|         0.0|         2.0|
|         0.0|         4.0|
|         0.0|         2.0|
+------------+------------+
only showing top 10 rows



In [14]:
df.filter('total_amount == 0 and payment_type != 3').count()

7097

In [15]:
df = df.withColumn("payment_type", f.when(df["total_amount"] == 0, 3).otherwise(df["payment_type"]))

df.filter('total_amount == 0 and payment_type != 3').count()

0

Replacing `0` value as `Nan` in `payment_type`, since is out of the range.

In [16]:
df = df.withColumn("payment_type", f.when(df["payment_type"] == 0, np.nan).otherwise(df["payment_type"]))
df.filter('payment_type == 0').count()

0

**Trip Type Analysis**

Checking wheater the trip type is on the range defined in the dictionary.

In [17]:
df.select("trip_type").distinct().show()

+---------+
|trip_type|
+---------+
|     null|
|      1.0|
|      2.0|
+---------+



In [18]:
df.select([f.count(f.when(f.col('trip_type').isNull(),True))]).show()

+--------------------------------------------------+
|count(CASE WHEN (trip_type IS NULL) THEN true END)|
+--------------------------------------------------+
|                                                11|
+--------------------------------------------------+



We will deal with these `Null` values at the end, writing a general rule for the dataset.

**Time Period**

**Pickup Datetime**

- 1.0 Checking wheater the `lpep_pickup_datetime` is in the range of years previously defined.

In [19]:
df.select('lpep_pickup_datetime').sort(f.col("lpep_pickup_datetime")).show(5)

+--------------------+
|lpep_pickup_datetime|
+--------------------+
| 2008-12-31 17:04:15|
| 2008-12-31 19:16:53|
| 2008-12-31 22:06:48|
| 2008-12-31 23:05:21|
| 2008-12-31 23:05:26|
+--------------------+
only showing top 5 rows



- 1.1 - Dropping the out of range rows

In [20]:
df = df.withColumn("year", f.year(f.col("lpep_pickup_datetime")))

In [21]:
df = df.filter(f'year >= {begin} and year <= {until}')
df = df.drop('year')

In [22]:
df.select('lpep_pickup_datetime').sort(f.col("lpep_pickup_datetime")).show(5)

+--------------------+
|lpep_pickup_datetime|
+--------------------+
| 2020-01-01 00:00:07|
| 2020-01-01 00:00:21|
| 2020-01-01 00:00:44|
| 2020-01-01 00:01:04|
| 2020-01-01 00:01:11|
+--------------------+
only showing top 5 rows



**Dropoff Datetime**

- 2.0 Checking wheater the `lpep_dropoff_datetime` is in the range of years previously defined.

In [23]:
df.select('lpep_dropoff_datetime').sort(f.col("lpep_dropoff_datetime").desc()).show(5)

+---------------------+
|lpep_dropoff_datetime|
+---------------------+
|  2022-09-01 16:47:53|
|  2022-09-01 09:13:18|
|  2022-09-01 08:56:55|
|  2022-09-01 08:02:58|
|  2022-09-01 05:14:10|
+---------------------+
only showing top 5 rows



In this particular case, the data frame already meets the requirement, but we will implement the filter thinking in future use cases.

- 2.1 - Dropping the out of range rows

In [24]:
df = df.withColumn("year", f.year(f.col("lpep_dropoff_datetime")))

In [25]:
df = df.filter(f'year >= {begin} and year <= {until}')
df = df.drop('year')

In [26]:
df.select('lpep_dropoff_datetime').sort(f.col("lpep_dropoff_datetime").desc()).show(5)

+---------------------+
|lpep_dropoff_datetime|
+---------------------+
|  2022-09-01 16:47:53|
|  2022-09-01 09:13:18|
|  2022-09-01 08:56:55|
|  2022-09-01 08:02:58|
|  2022-09-01 05:14:10|
+---------------------+
only showing top 5 rows



**Timestamps Analysis**

The difference between `lpep_dropoff_datetime` and `lpep_pickup_datetime` must be greater than zero.


In [27]:
df = df.withColumn('DiffInSeconds', f.unix_timestamp("lpep_dropoff_datetime") - f.unix_timestamp('lpep_pickup_datetime'))

In [28]:
df = df.filter('DiffInSeconds > 0')
df = df.drop('DiffInSeconds')

### 4.1: Timestamp Requirement

Since the data science team wants to evaluate data also based on the hours and the day of the week, we could define two extra columns in our dataset.

Our date and time values are already in a timestamp type, so it will be a quick transformation that will save the time of our team in the future.

**Hours**

On 24-hour time format.

In [29]:
df = df.withColumn("lpep_pickup_hour", f.hour(f.col("lpep_pickup_datetime"))) \
       .withColumn("lpep_dropoff_hour", f.hour(f.col("lpep_dropoff_datetime")))

In [30]:
df.select('lpep_pickup_hour', 'lpep_dropoff_hour').show(5)

+----------------+-----------------+
|lpep_pickup_hour|lpep_dropoff_hour|
+----------------+-----------------+
|               0|                0|
|               0|                0|
|               0|                1|
|               0|                0|
|               0|                1|
+----------------+-----------------+
only showing top 5 rows



**Day of the week**

This transformation will generate a column with the first three letters of the respective day of the week based on the timestamps.

In [31]:
df = df.withColumn("pickup_day", f.date_format('lpep_pickup_datetime', 'E')) \
       .withColumn("dropoff_day", f.date_format('lpep_dropoff_datetime', 'E'))


In [32]:
df.select('lpep_dropoff_datetime','pickup_day', 'lpep_dropoff_datetime', 'dropoff_day').show(5)

+---------------------+----------+---------------------+-----------+
|lpep_dropoff_datetime|pickup_day|lpep_dropoff_datetime|dropoff_day|
+---------------------+----------+---------------------+-----------+
|  2020-01-01 00:56:39|       Wed|  2020-01-01 00:56:39|        Wed|
|  2020-01-01 00:52:49|       Wed|  2020-01-01 00:52:49|        Wed|
|  2020-01-01 01:14:21|       Wed|  2020-01-01 01:14:21|        Wed|
|  2020-01-01 00:30:56|       Wed|  2020-01-01 00:30:56|        Wed|
|  2020-01-01 01:09:54|       Wed|  2020-01-01 01:09:54|        Wed|
+---------------------+----------+---------------------+-----------+
only showing top 5 rows



## Step 5: Data Schema Check

First, let us take a look on the actual data schema:

In [33]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- lpep_pickup_hour: integer (nullable = true)
 |-- lpep_dropoff_hour: integer (nullable = true

Some data types above are weird, for example: `passenger_count` as a `double` (float).

In the code cell below, we better define some data types for the final dataset schema.

In [34]:
df = df \
.withColumn("VendorID" ,df["VendorID"].cast(IntegerType())) \
.withColumn("passenger_count" ,df["passenger_count"].cast(IntegerType())) \
.withColumn("payment_type",df["payment_type"].cast(IntegerType())) \
.withColumn("trip_type" ,df["trip_type"].cast(IntegerType()))

df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- lpep_pickup_hour: integer (nullable = true)
 |-- lpep_dropoff_hour: integer (nullable 

In [35]:
df.select('passenger_count').distinct().show(100)

+---------------+
|passenger_count|
+---------------+
|              1|
|              6|
|              3|
|              5|
|              9|
|              4|
|              8|
|              7|
|              2|
|              0|
+---------------+



## Step 6: Data Transformation Check

Now, we will perform the same exploratory data analysis that before, in order to evaluate the results of the data transformation step.

The goal here is to confirm that we dealt properly with the spotted issues.

In [36]:
df.select('passenger_count','trip_distance', 'total_amount', 'payment_type', 'tip_amount', 'trip_type').describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|   passenger_count|     trip_distance|      total_amount|      payment_type|        tip_amount|         trip_type|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|           2355539|           2355539|           2355539|           2355539|           2355539|           2355538|
|   mean|1.2826486846534912|3.5308017358232973|17.140716269415297|1.4267991317486146|1.3911471726857954|1.0331334073150167|
| stddev|   0.9277478304743|237.77371887603908|14.889035858848855|0.5133153338395617|2.6982737601472744|0.1789849106344823|
|    min|                 0|               0.0|               0.0|                 1|               0.0|                 1|
|    max|                 9|         244152.01|           2113.55|                 5|             641.2|                 2|
+-------

In [37]:
df.select('lpep_pickup_datetime','lpep_dropoff_datetime').show(5)

+--------------------+---------------------+
|lpep_pickup_datetime|lpep_dropoff_datetime|
+--------------------+---------------------+
| 2020-01-01 00:45:58|  2020-01-01 00:56:39|
| 2020-01-01 00:41:38|  2020-01-01 00:52:49|
| 2020-01-01 00:52:46|  2020-01-01 01:14:21|
| 2020-01-01 00:19:57|  2020-01-01 00:30:56|
| 2020-01-01 00:52:33|  2020-01-01 01:09:54|
+--------------------+---------------------+
only showing top 5 rows



In [38]:
print(f"There are {df.count()} rows in the transformed data frame")

There are 2355539 rows in the transformed data frame


## Step 7: Outputs

As the pipeline requirements, defined by our data science team, the output datasets are required in:

1. **Colum-oriented format**
2. **Row-oriented format**
3. **Delta lake format**

Since we are working in the Google Cloud (GC) platform, to meet the requirements will use the GC resources:

1. **Colum-oriented format**

    Export to Google Cloud Storage as a `.parquet` files. After, load the files as a Big Query table. Google Big Query storage is a solution for column-oriented databases. You could more info on [Overview of BigQuery storage](https://cloud.google.com/bigquery/docs/storage_overview)


2. **Row-oriented format**

    Export to Google Cloud Storage as a `.csv` files. The `.csv` files is the standard for row-oriented databases, this files could be uploaded lately in a SQL solution (e.g: MySQL, Postgres, or even Google Cloud SQL).


3. **Delta lake format**

    No available due to libraries issues.

**1. Colum-oriented format**

Since the data does not have a  vehicle ID identification, we select the passenger count as the partition column. The reason is that this column does not have as many distinct values as the others, as the result we will generate a manageable number of files.

In [41]:
df.write.mode('overwrite').format('parquet').partitionBy('passenger_count').save('gs://mobilab-tech-task-bucket/outputs/green_trip/teste')

**2. Row-oriented format**

In [86]:
df.write.mode('overwrite').format('csv').partitionBy('passenger_count').save('gs://mobilab-tech-task-bucket/outputs/green_trip/csv')

                                                                                