In [1]:
import findspark

findspark.find()
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_timestamp, 
    date_format,
    year, 
    month, 
    hour, 
    col, 
    round,
    ceil,
    floor,
    minute, 
    second,
    when
)

In [3]:
import plotly.express as px

## Initialize SparkSession

SparkSession is created using local thread of four and can be seen in `localhost:4050` with "NYC Green Taxi" as the application name.

In [4]:
spark = SparkSession.builder \
    .master("local[4]") \
    .appName("NYC Green Taxi") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

22/04/07 11:32:55 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.43.60 instead (on interface wlp0s20f3)
22/04/07 11:32:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/07 11:32:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

## Read the data

The dataset that will be used is NYC green taxi data for all months in 2020.

In [6]:
df = spark.read \
    .format('csv') \
    .option("inferSchema", "true") \
    .option("header","true") \
    .load("/home/thomas/data/nyc_taxi/green_taxi/*2020*.csv")
df.printSchema()



root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



                                                                                

The data dictionary of this dataset is available [here](https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_green.pdf). There are some codes identifying the columns, such as:

1. `VendorID` contains a code indicating the LPEP provider
    * `1` : Creative Mobile Technologies, LLC
    * `2` : VeriFone Inc.


2. `RateCodeID` is the final rate code in effect at the end of the trip.
    * `1` : Standard rate
    * `2` : JFK
    * `3` : Newark
    * `4` : Nassau or Westchester
    * `5` : Negotiated fare
    * `6` : Group ride
    
    
3. `payment_type` is a numeric code signifying how the passenger paid for the trip.
    * 1 : Credit card
    * 2 : Cash
    * 3 : No charge
    * 4 : Dispute
    * 5 : Unknown
    * 6 : Voided trip
    

4. `PULocationID` and `DOLocationID` contain a code of pick up and drop off location zones which is referencing to this [file](https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv).

In [7]:
df \
    .write \
    .mode('overwrite') \
    .parquet('staging/nyc-green-2020')

                                                                                

In [8]:
df = spark.read \
    .format('parquet') \
    .option("header","true") \
    .load("staging/nyc-green-2020")
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



See the size of the dataset

In [9]:
df.count(), len(df.columns)

(1734051, 20)

See the summary of the dataset

In [10]:
df.summary()

                                                                                

summary,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
count,1205959.0,1734051,1734051,1205959,1205959.0,1734051.0,1734051.0,1205959.0,1734051.0,1734051.0,1734051.0,1734051.0,1734051.0,1734051.0,0.0,1734051.0,1734051.0,1205959.0,1205954.0,1205959.0
mean,1.8298316941123205,,,,1.118096054675159,108.24825682750968,128.7862756055041,1.2913266537253754,17.292239957186986,16.984788048339592,0.5418312033498438,0.3480491346563625,1.263317930095531,0.4129914979431395,,0.2932526205994669,20.16313390433051,1.4599285713693415,1.0262953644998067,0.4842113206170359
stddev,0.3757807481145078,,,,0.7140994719763627,70.80139561161255,76.42576845637467,0.9547199115132876,1198.1647047946235,13.805149119347435,0.9482827432384792,0.2325785709880442,2.2282006210319514,1.6595631387406184,,0.0491476535218622,15.273077679603343,0.5219627145750482,0.1600123105792274,1.047321940025361
min,1.0,2008-12-31 22:06:48,2008-12-31 23:12:08,N,1.0,1.0,1.0,0.0,-33.69,-210.0,-4.5,-0.5,-10.56,-6.12,,-0.3,-210.3,1.0,1.0,-2.75
25%,2.0,,,,1.0,52.0,63.0,1.0,1.13,7.5,0.0,0.0,0.0,0.0,,0.3,9.36,1.0,1.0,0.0
50%,2.0,,,,1.0,81.0,127.0,1.0,2.21,12.5,0.0,0.5,0.0,0.0,,0.3,15.7,1.0,1.0,0.0
75%,2.0,,,,1.0,166.0,194.0,1.0,4.87,22.11,1.0,0.5,2.75,0.0,,0.3,26.0,2.0,1.0,0.0
max,2.0,2041-08-17 16:24:38,2041-08-17 16:27:20,Y,99.0,265.0,265.0,9.0,205654.12,803.0,16.74,3.55,641.2,96.12,,0.3,803.8,5.0,2.0,2.75


In this summary, I find something interesting. 
* The vendor is dominating by `VendorID` = 2, that is Verifone Inc. Also, it has many null values.
* Both pick up and drop off dates contain typos as this is just a dataset for 2020
* The `RateCodeID` contains a number exceeds 6
* The number of passenger exceeds 4 for green taxies, based on this [information](https://freetoursbyfoot.com/how-to-get-a-taxi-in-nyc/). The zero in passenger maybe a cancelled trip or mistyped by the driver.
* The `trip_distance` has a nonsense minimum and maximum value.
* The `fare_amount`, `extra`, `mta_tax`,`tip_amount`, `total_amount`,`improvement_surcharge`, and `congestion_surcharge` minimum value has a negative value.
* The `ehall_fee` are null for all records.
* Null values also come in `store_and_fwd_flag`, `RateCodeID`, `passenger_count`, `payment_type`, `trip_type`, and `congestion_surcharge`.

In [11]:
df.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2019-12-18 15:52:30|  2019-12-18 15:54:39|                 N|         1|         264|         264|              5|          0.0|        3.5|  0.5|    0.

In [12]:
df1 = df.filter(df.VendorID.isNull())
df1.show(100)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|    null| 2020-01-01 00:39:00|  2020-01-01 01:11:00|              null|      null|         143|          45|           null|         5.57|      25.78| 2.75|    0.

Interesting! I see a pattern here. The datetime, pick up and drop off location id, and `trip_distance` value is doubled with variation in `fare amount`. Also, `extra` column is filled with etiher 0.00 or 2.75. I might delete all these strange records.

In [13]:
df_vendor = df.filter(df.VendorID.isNotNull())

In [14]:
df_year = df_vendor.withColumn('do_year', year('lpep_dropoff_datetime'))

In [15]:
df_year.filter(col('do_year') < 2020).count()

43

In [16]:
df_year.filter(col('do_year') < 2020).show(43)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|do_year|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+
|       2| 2019-12-18 15:52:30|  2019-12-18 15:54:39|                 N|         1|         264|         264|              5|          0.0|

In [17]:
df_year.filter(col('do_year') > 2020).count()

12

In [18]:
df_year.filter(col('do_year') > 2020).show(12)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|do_year|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+
|       2| 2041-08-17 16:24:38|  2041-08-17 16:27:20|                 N|         1|         193|         193|              1|          0.0|

Many of the records happened around midnight in the new year's eve. I will delete the records since I only focused on the orders that finished in 2020.

In [19]:
df_2020 = df_year.filter(col('do_year') == 2020)

In [20]:
df_2020.summary()

                                                                                

summary,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,do_year
count,1205904.0,1205904,1205904,1205904,1205904.0,1205904.0,1205904.0,1205904.0,1205904.0,1205904.0,1205904.0,1205904.0,1205904.0,1205904.0,0.0,1205904.0,1205904.0,1205904.0,1205899.0,1205904.0,1205904.0
mean,1.8298255914235293,,,,1.1180948068834666,101.92901259138372,129.5955689673473,1.2913333067972244,3.110250733060034,12.695061795965692,0.3870120755881065,0.4820250616964535,1.223018158991176,0.1925023136170222,,0.2954926760353935,15.70616053180022,1.4599172073398878,1.026294905294722,0.4842220027464873,2020.0
stddev,0.3757861045133283,,,,0.7140978282483215,67.68064102267269,76.80990467860724,0.9547321259846556,161.04397025641256,11.531079643126713,0.6042345131895063,0.1019517456175017,2.4728695132901706,1.162974270747563,,0.0442195127545533,13.14700925707999,0.5219613040779757,0.1600109511319311,1.0473310203454935,0.0
min,1.0,2019-12-31 20:27:53,2020-01-01 00:01:09,N,1.0,1.0,1.0,0.0,-3.05,-210.0,-4.5,-0.5,-10.56,-6.12,,-0.3,-210.3,1.0,1.0,-2.75,2020.0
25%,2.0,,,,1.0,49.0,65.0,1.0,0.95,6.5,0.0,0.5,0.0,0.0,,0.3,8.3,1.0,1.0,0.0,2020.0
50%,2.0,,,,1.0,75.0,129.0,1.0,1.69,9.5,0.0,0.5,0.0,0.0,,0.3,11.8,1.0,1.0,0.0,2020.0
75%,2.0,,,,1.0,136.0,195.0,1.0,3.2,15.0,0.5,0.5,2.0,0.0,,0.3,18.55,2.0,1.0,0.0,2020.0
max,2.0,2020-12-31 23:50:13,2020-12-31 23:58:14,Y,99.0,265.0,265.0,9.0,134121.5,803.0,14.26,3.55,641.2,96.12,,0.3,803.8,5.0,2.0,2.75,2020.0


In [21]:
df_2020.withColumn('pu_year', year('lpep_pickup_datetime')).filter(col('pu_year') == 2019).show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+-------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|do_year|pu_year|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+-------+
|       2| 2019-12-31 23:59:39|  2020-01-01 00:06:24|                 N|         1|         179|         179|      

I see a possibly bad record with a trip duration almost 24 hours for pick up datetime at 2019-12-31 20:27:53 and drop off datetime at 2020-01-01 19:45:52 while the trip distance is only 10.19 miles with total fare is only $36.96. Did the cab goes extremely slow? It is nonsense to order with this kind of trip.

In [22]:
df_extract_time = df_2020 \
    .withColumn('pu_datetime', to_timestamp('lpep_pickup_datetime')) \
    .withColumn('do_datetime', to_timestamp('lpep_dropoff_datetime')) \
    .withColumn('pu_day', date_format('pu_datetime', 'EEE')) \
    .withColumn('pu_hour', hour('pu_datetime')) \
    .withColumn('pu_month', month('pu_datetime')) \
    .withColumn('trip_duration_second', col('do_datetime').cast('long') - col('pu_datetime').cast('long')) \
    .withColumn('trip_duration_minute', round(col('trip_duration_second')/60))

In [23]:
df_extract_time.show()

22/04/07 11:34:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+-------------------+-------------------+------+-------+--------+--------------------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|do_year|        pu_datetime|        do_datetime|pu_day|pu_hour|pu_month|trip_duration_second|trip_duration_minute|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------

In [24]:
df_extract_time.filter((col('fare_amount') < 0) & (col('total_amount') < 0)).count()

4195

In [25]:
df_extract_time \
    .filter((col('fare_amount') < 0) & (col('total_amount') < 0)) \
    .show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+-------------------+-------------------+------+-------+--------+--------------------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|do_year|        pu_datetime|        do_datetime|pu_day|pu_hour|pu_month|trip_duration_second|trip_duration_minute|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------

With 4195 records, I thought that negative sign in all columns about money charges to the passenger are mistyped. So, the values will be multiplied with -1 to make them positive. In addition, I will only select some columns.

In [27]:
df_fare = df_extract_time \
    .withColumn('fare_amount', 
                when(col('fare_amount') < 0, col('fare_amount') * -1).otherwise(col('fare_amount'))) \
    .withColumn('total_amount', 
                when(col('total_amount') < 0, col('total_amount') * -1).otherwise(col('total_amount'))) \
    .filter(col('fare_amount') >= 2.5) \
    .select(
        'VendorID',
        'pu_datetime',
        'do_datetime',
        'pu_month',
        'pu_day',
        'pu_hour',
        'do_year',
        'PULocationID',
        'DOLocationID',
        'passenger_count',
        'trip_duration_minute',
        'trip_duration_second',
        'trip_distance',
        'trip_type',
        'payment_type',
        'fare_amount',
        'total_amount'
    )

In [28]:
df_fare.columns

['VendorID',
 'pu_datetime',
 'do_datetime',
 'pu_month',
 'pu_day',
 'pu_hour',
 'do_year',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_duration_minute',
 'trip_duration_second',
 'trip_distance',
 'trip_type',
 'payment_type',
 'fare_amount',
 'total_amount']

In [29]:
df_fare.summary()

                                                                                

summary,VendorID,pu_month,pu_day,pu_hour,do_year,PULocationID,DOLocationID,passenger_count,trip_duration_minute,trip_duration_second,trip_distance,trip_type,payment_type,fare_amount,total_amount
count,1201084.0,1201084.0,1201084,1201084.0,1201084.0,1201084.0,1201084.0,1201084.0,1201084.0,1201084.0,1201084.0,1201079.0,1201084.0,1201084.0,1201084.0
mean,1.8317444908099685,3.901328300102241,,13.928607824265413,2020.0,101.7200254103793,129.48718490963162,1.2919595964978303,19.117044270009423,1146.7513071525389,3.1091364550689438,1.0238935157470908,1.458087028051327,12.78680814997305,15.815568053474337
stddev,0.3740931826876909,3.427111901426461,,5.50196284113043,0.0,67.58014501191036,76.79134419338351,0.955757039628988,87.83660966300663,5270.139124471364,161.36634389403045,0.1527175008648437,0.5211181528910742,11.480755017768512,13.077991620655377
min,1.0,1.0,Fri,0.0,2020.0,1.0,1.0,0.0,-5436.0,-326180.0,-3.05,1.0,1.0,2.5,2.5
25%,2.0,1.0,,10.0,2020.0,49.0,65.0,1.0,6.0,370.0,0.95,1.0,1.0,6.5,8.3
50%,2.0,2.0,,15.0,2020.0,75.0,129.0,1.0,10.0,619.0,1.69,1.0,1.0,9.5,11.8
75%,2.0,6.0,,18.0,2020.0,134.0,195.0,1.0,17.0,1008.0,3.2,1.0,2.0,15.0,18.55
max,2.0,12.0,Wed,23.0,2020.0,265.0,265.0,9.0,1511.0,90684.0,134121.5,2.0,5.0,803.0,803.8


In [30]:
df_time_pos = df_fare.filter(col('trip_duration_second') > 0)

The negative value of trip duration maybe caused from the malfunction of the meter while engaged and/or disengaged. So the records will be deleted.

The strange behaviours happen between trip duration and the amount the passenger pay.

Many trip durations have values larger than six hours, which is also nonsense, with trip distances and amount to pay are fairly normal.

In [31]:
df_time_pos \
    .filter(
#         (col('trip_distance') > 12) & 
#         (col('trip_duration_minute') > 60) & 
        (hour('do_datetime') == 0) &
        (minute('do_datetime') == 0) &
        (second('do_datetime') == 0)
    ) \
    .select(
        'pu_datetime',
        'do_datetime',
        'trip_duration_minute',
        'trip_distance',
        'fare_amount',
        'total_amount'
    ) \
    .count()

                                                                                

1456

In [32]:
df_time_pos \
    .filter(
#         (col('trip_distance') > 12) & 
#         (col('trip_duration_minute') > 60) & 
        (hour('do_datetime') == 0) &
        (minute('do_datetime') == 0) &
        (second('do_datetime') == 0)
    ) \
    .select(
        'pu_datetime',
        'do_datetime',
        'trip_duration_minute',
        'trip_distance',
        'fare_amount',
        'total_amount'
    ) \
    .show(100)

+-------------------+-------------------+--------------------+-------------+-----------+------------+
|        pu_datetime|        do_datetime|trip_duration_minute|trip_distance|fare_amount|total_amount|
+-------------------+-------------------+--------------------+-------------+-----------+------------+
|2020-01-01 00:33:09|2020-01-02 00:00:00|              1407.0|         1.87|       22.0|        23.3|
|2020-01-01 02:46:42|2020-01-02 00:00:00|              1273.0|        27.13|      225.0|       225.3|
|2020-01-01 05:11:55|2020-01-02 00:00:00|              1128.0|         0.27|       20.0|       23.34|
|2020-01-01 05:14:56|2020-01-02 00:00:00|              1125.0|          0.0|       10.0|       12.36|
|2020-01-01 07:03:17|2020-01-02 00:00:00|              1017.0|         1.13|       80.0|        80.3|
|2020-01-01 08:09:15|2020-01-02 00:00:00|               951.0|        13.66|       38.0|        38.8|
|2020-01-01 10:05:31|2020-01-02 00:00:00|               834.0|          1.3|      

For now, I will delete all the records with drop off timestamp value `00:00:00`. I think these are automatically done by the meter but the resulting timestamp made it wrong to calculate trip duration though the trip distance and fare amount are normal.

In [33]:
## Assumption
df_timestamp = df_time_pos.filter(
                    (hour('lpep_dropoff_datetime') != 0) &
                    (minute('lpep_dropoff_datetime') != 0) &
                    (second('lpep_dropoff_datetime') != 0)
               )

In [34]:
df_timestamp.filter((col('trip_duration_minute') > 500) & (col('trip_duration_minute') < 1000)).show(38)



+--------+-------------------+-------------------+--------+------+-------+-------+------------+------------+---------------+--------------------+--------------------+-------------+---------+------------+-----------+------------+
|VendorID|        pu_datetime|        do_datetime|pu_month|pu_day|pu_hour|do_year|PULocationID|DOLocationID|passenger_count|trip_duration_minute|trip_duration_second|trip_distance|trip_type|payment_type|fare_amount|total_amount|
+--------+-------------------+-------------------+--------+------+-------+-------+------------+------------+---------------+--------------------+--------------------+-------------+---------+------------+-----------+------------+
|       2|2020-01-11 19:26:47|2020-01-12 10:14:46|       1|   Sat|     19|   2020|          51|         265|              1|               888.0|               53279|        16.58|        1|           2|      25.05|       25.35|
|       2|2020-01-13 18:57:30|2020-01-14 07:12:48|       1|   Mon|     18|   2020|  



In [35]:
df_timestamp.filter(col('trip_distance') <= 0).count()

                                                                                

53973

In [36]:
df_timestamp.filter(col('trip_distance') <= 0).show()

+--------+-------------------+-------------------+--------+------+-------+-------+------------+------------+---------------+--------------------+--------------------+-------------+---------+------------+-----------+------------+
|VendorID|        pu_datetime|        do_datetime|pu_month|pu_day|pu_hour|do_year|PULocationID|DOLocationID|passenger_count|trip_duration_minute|trip_duration_second|trip_distance|trip_type|payment_type|fare_amount|total_amount|
+--------+-------------------+-------------------+--------+------+-------+-------+------------+------------+---------------+--------------------+--------------------+-------------+---------+------------+-----------+------------+
|       1|2020-01-01 00:43:43|2020-01-01 01:14:39|       1|   Wed|      0|   2020|          37|         150|              1|                31.0|                1856|          0.0|        1|           1|       28.2|        29.0|
|       1|2020-01-01 00:46:42|2020-01-01 01:06:21|       1|   Wed|      0|   2020|  

Many records are also have distance less than or equal to zero. These could be happened maybe by error sourced from the meter. For now, I will exclude them from the valid data.

In [37]:
df_distance = df_timestamp.filter(col('trip_distance') > 0)
# df_distance.summary()

In [38]:
## Check trip distance again
filters = df_distance \
        .filter(
            (col('fare_amount') > 0) &
            (col('trip_distance') > 60) &
    #         (col('trip_distance')/(col('trip_duration_minute')/60) > 50) &
            (col('fare_amount') < 2.5 + (floor(col('trip_distance')/0.2)*0.5))  # standard metering assumption
        ) \
        .select(
            'trip_duration_minute',
            'trip_distance',
            'fare_amount',
            'total_amount'
        )

filters.show()



+--------------------+-------------+-----------+------------+
|trip_duration_minute|trip_distance|fare_amount|total_amount|
+--------------------+-------------+-----------+------------+
|               384.0|        66.24|       10.0|        10.3|
|                40.0|       124.13|       40.0|        43.1|
|               176.0|     35757.46|      12.38|       12.68|
|                39.0|     25731.59|      20.25|       20.55|
|               222.0|       108.83|      120.0|      138.17|
|               100.0|       5388.1|      36.17|       38.76|
|               145.0|     12467.88|      22.79|       23.09|
|               139.0|       103.02|      200.0|      206.42|
|               194.0|        85.45|      175.0|       175.3|
|                48.0|     24430.46|      31.18|       31.48|
|               365.0|        78.26|      23.67|       30.09|
|                75.0|        79.69|      100.0|       140.3|
|                12.0|     134121.5|      21.88|       22.18|
|       

                                                                                

I filtered the records based on `trip_distance` and `fare_amount` because I saw strange values between the distance and the amount passenger paid. The assumption here was by using a standard metered fare, that is, \\$2.50 for initial charge and \\$0.50 every 1/5 mile. I filtered from the trip distance that larger than 60 miles, though the less distance trips have this behaviour too.

In [39]:
df_dist_fare = df_distance.join(filters,['trip_duration_minute','trip_distance','fare_amount','total_amount'],'leftanti')
# df_dist_fare.summary()

The cab can only afford maximum passengers of four. However, the dataset tells that there are records with passenger larger than four. Inspect using boxplot.

In [40]:
# df_pandas = df_dist_fare.filter('passenger_count > 4').toPandas()
# fig = px.box(df_pandas,x='passenger_count', y='trip_duration_minute', color='pu_day')
# fig.show()

In [41]:
# fig = px.box(df_pandas,x='passenger_count', y='total_amount', color='pu_day')
# fig.show()

In [42]:
# df_pandas = df_dist_fare.filter('passenger_count == 0').toPandas()
# fig = px.box(df_pandas,x='passenger_count', y='trip_duration_minute')
# fig.show()

Delete all records with number of passengers outside 1 and 4.

In [43]:
df_passenger = df_dist_fare.filter('passenger_count > 0 and passenger_count <= 4')
# df_passenger.summary()

In [44]:
# df_pandas = df_passenger.toPandas()
# fig = px.box(df_pandas,x='passenger_count', y='trip_duration_minute', color='pu_day')
# fig.show()

In [45]:
# fig = px.scatter(df_pandas, x='trip_distance',y='trip_duration_minute',color='total_amount')
# fig.show()

Based on the scatter plot between `trip_distance` and `trip_duration_minute`, it can be concluded that the duration recorded are doubtful, especially when the duration took hours but the fare still low. I would remove all the data that fulfilled all these following criterias:
* `trip_distance` < 60 miles
* `trip_duration_minute` > 360 minutes
* `fare_amount` < \\$152.5

In [47]:
df_final = df_passenger \
                .filter(
                        ~((col('trip_distance') < 60) &
                        (col('trip_duration_minute') > 360) &
                        (col('fare_amount') < 152.5)
                         )) \
                .select(
                        'VendorID',
                        'pu_month',
                        'pu_day',
                        'pu_hour',
                        'PULocationID',
                        'DOLocationID',
                        'passenger_count',
                        'trip_duration_minute',
                        'trip_distance',
                        'trip_type',
                        'payment_type',
                        'fare_amount',
                        'total_amount'
                )

In [48]:
vendor = [
    (1, 'Creative Mobile Technologies, LLC'),
    (2, 'VeriFone Inc')
]

vendor_cols = ['id','vendor']

In [49]:
payment = [
    (1, 'Credit card'),
    (2, 'Cash'),
    (3, 'No charge'),
    (4, 'Dispute'),
    (5, 'Unknown'),
    (6, 'Voided trip')
]

payment_cols = ['id','payment']

In [50]:
trip = [
    (1, 'Street-hail'),
    (2, 'Dispatch')
]

trip_cols = ['id','trip']

In [51]:
df_vendor = spark.createDataFrame(vendor, vendor_cols)
df_payment = spark.createDataFrame(payment, payment_cols)
df_trip = spark.createDataFrame(trip, trip_cols)

In [59]:
df_join = df_final \
    .join(df_vendor, df_final.VendorID == df_vendor.id, 'inner').drop(*('VendorID','id')) \
    .join(df_payment, df_final.payment_type == df_payment.id, 'inner').drop(*('payment_type','id')) \
    .join(df_trip, df_final.trip_type == df_trip.id, 'inner').drop(*('trip_type','id')) \
    .select(
        'vendor',
        'payment',
        'trip',
        'pu_month',
        'pu_day',
        'pu_hour',
        'PULocationID',
        'DOLocationID',
        'passenger_count',
        'trip_duration_minute',
        'trip_distance',
        'fare_amount',
        'total_amount'
    )

df_join.show(10)



+--------------------+-----------+-----------+--------+------+-------+------------+------------+---------------+--------------------+-------------+-----------+------------+
|              vendor|    payment|       trip|pu_month|pu_day|pu_hour|PULocationID|DOLocationID|passenger_count|trip_duration_minute|trip_distance|fare_amount|total_amount|
+--------------------+-----------+-----------+--------+------+-------+------------+------------+---------------+--------------------+-------------+-----------+------------+
|Creative Mobile T...|Credit card|Street-hail|       1|   Wed|      0|          35|          39|              1|                17.0|          3.0|       13.5|        14.8|
|Creative Mobile T...|Credit card|Street-hail|       1|   Wed|      0|         166|         236|              2|                15.0|          2.8|       13.0|       20.05|
|Creative Mobile T...|Credit card|Street-hail|       1|   Wed|      0|          41|         254|              2|                30.0|  

                                                                                

In [61]:
df_join.count(), len(df_join.columns)

                                                                                

(1023825, 13)

In [None]:
# df_pandas = df_final.toPandas()
# fig = px.scatter(df_pandas, x='trip_distance',y='trip_duration_minute',color='fare_amount')
# fig.show()

In [60]:
df_join \
    .write \
    .mode('overwrite') \
    .parquet('output/nyc-green-2020')

                                                                                

In [None]:
from pyspark import SparkContext as sc
spark.sparkContext._conf.getAll()