In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, count, when, col

In [4]:
spark=SparkSession.builder.appName('new-york-taxi').getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [3]:
spark

### Trip Data

In [5]:
trip_data = spark.read.option("header", True).option("inferSchema", True).csv('data\\trip_data\\trip_data_test.csv')

In [6]:
trip_data.count()

10000

In [7]:
old_column_names = [' hack_license',
 ' vendor_id',
 ' rate_code',
 ' store_and_fwd_flag',
 ' pickup_datetime',
 ' dropoff_datetime',
 ' passenger_count',
 ' trip_time_in_secs',
 ' trip_distance',
 ' pickup_longitude',
 ' pickup_latitude',
 ' dropoff_longitude',
 ' dropoff_latitude']

In [8]:
old_column_names

[' hack_license',
 ' vendor_id',
 ' rate_code',
 ' store_and_fwd_flag',
 ' pickup_datetime',
 ' dropoff_datetime',
 ' passenger_count',
 ' trip_time_in_secs',
 ' trip_distance',
 ' pickup_longitude',
 ' pickup_latitude',
 ' dropoff_longitude',
 ' dropoff_latitude']

In [9]:
new_column_names = [
 'hack_license',
 'vendor_id',
 'rate_code',
 'store_and_fwd_flag',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude']

In [10]:
for i in range(len(old_column_names)):
    trip_data = trip_data.withColumnRenamed(old_column_names[i], new_column_names[i])

In [11]:
#string to datetime
trip_data = trip_data.withColumn('pickup_datetime', to_timestamp('pickup_datetime')) \
            .withColumn('dropoff_datetime', to_timestamp('dropoff_datetime'))

In [12]:
trip_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- medallion: integer (nullable = true)
 |-- hack_license: integer (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)



In [13]:
trip_data.limit(5)

id,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,2013000001,2013000001,VTS,1,,2013-01-01 00:00:00,2013-01-01 00:28:00,1,1680,3.94,-73.990784,40.76088,-73.954185,40.778847
1,2013000002,2013000002,VTS,1,,2013-01-01 00:00:00,2013-01-01 00:06:00,5,360,0.98,-73.978325,40.778091,-73.981834,40.768639
2,2013000003,2013000003,VTS,1,,2013-01-01 00:00:00,2013-01-01 00:10:00,1,600,2.77,-73.989616,40.729988,-74.013779,40.705036
3,2013000004,2013000004,VTS,1,,2013-01-01 00:00:00,2013-01-01 00:08:00,2,480,1.68,-73.981575,40.767632,-73.977737,40.757927
4,2013000005,2013000005,VTS,1,,2013-01-01 00:00:00,2013-01-01 00:16:00,4,960,4.05,-74.000526,40.737343,-73.977226,40.783607


### Fare Data

In [14]:
fare_data = spark.read.option("header", True).option("inferSchema", True).csv('data\\fare_data\\fare_data_test.csv')

In [15]:
fare_data.count()

10000

In [16]:
fare_data.columns

['id',
 'medallion',
 ' hack_license',
 ' vendor_id',
 ' pickup_datetime',
 ' payment_type',
 ' fare_amount',
 ' surcharge',
 ' mta_tax',
 ' tip_amount',
 ' tolls_amount',
 ' total_amount']

In [17]:
old_column_names = [
 ' hack_license',
 ' vendor_id',
 ' pickup_datetime',
 ' payment_type',
 ' fare_amount',
 ' surcharge',
 ' mta_tax',
 ' tip_amount',
 ' tolls_amount',
 ' total_amount']

In [18]:
new_column_names = [
 'hack_license',
 'vendor_id',
 'pickup_datetime',
 'payment_type',
 'fare_amount',
 'surcharge',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'total_amount']

In [19]:
for i in range(len(old_column_names)):
    fare_data = fare_data.withColumnRenamed(old_column_names[i], new_column_names[i])

In [20]:
fare_data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- medallion: integer (nullable = true)
 |-- hack_license: integer (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



### Join Trip Data and Fare Data

In [21]:
trip_data.createOrReplaceTempView("trip_data")
fare_data.createOrReplaceTempView("fare_data")

In [22]:
fare_data.columns

['id',
 'medallion',
 'hack_license',
 'vendor_id',
 'pickup_datetime',
 'payment_type',
 'fare_amount',
 'surcharge',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'total_amount']

In [23]:
final_df = spark.sql('''
select
    a.id, 
    a.medallion,
    a.hack_license,
    a.vendor_id,
    a.rate_code,
    a.pickup_datetime,
    a.dropoff_datetime,
    a.passenger_count,
    a.trip_time_in_secs,
    a.trip_distance,
    a.pickup_longitude,
    a.pickup_latitude,
    a.dropoff_longitude,
    a.dropoff_latitude,
    b.payment_type,
    b.fare_amount,
    b.surcharge,
    b.mta_tax,
    b.tip_amount,
    b.tolls_amount,
    b.total_amount
from trip_data as a 
    left join fare_data as b
        on a.id = b.id
''')

In [24]:
len(final_df.columns)

21

In [25]:
final_df.limit(5)

id,medallion,hack_license,vendor_id,rate_code,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,surcharge,mta_tax,tip_amount,tolls_amount,total_amount
0,2013000001,2013000001,VTS,1,2013-01-01 00:00:00,2013-01-01 00:28:00,1,1680,3.94,-73.990784,40.76088,-73.954185,40.778847,CSH,20.5,0.5,0.5,0.0,0.0,21.5
1,2013000002,2013000002,VTS,1,2013-01-01 00:00:00,2013-01-01 00:06:00,5,360,0.98,-73.978325,40.778091,-73.981834,40.768639,CSH,6.0,0.5,0.5,0.0,0.0,7.0
2,2013000003,2013000003,VTS,1,2013-01-01 00:00:00,2013-01-01 00:10:00,1,600,2.77,-73.989616,40.729988,-74.013779,40.705036,CRD,10.5,0.5,0.5,2.2,0.0,13.7
3,2013000004,2013000004,VTS,1,2013-01-01 00:00:00,2013-01-01 00:08:00,2,480,1.68,-73.981575,40.767632,-73.977737,40.757927,CRD,8.0,0.5,0.5,1.7,0.0,10.7
4,2013000005,2013000005,VTS,1,2013-01-01 00:00:00,2013-01-01 00:16:00,4,960,4.05,-74.000526,40.737343,-73.977226,40.783607,CRD,14.5,0.5,0.5,4.65,0.0,20.15


### Data Validation and Data Accuracy

In [50]:
#check for null values
null_counts = final_df.select([count(when(col(c).isNull(), c)).alias(c) for c in final_df.columns])
null_counts.show()

+---+---------+------------+---------+---------+---------------+----------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
| id|medallion|hack_license|vendor_id|rate_code|pickup_datetime|dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+---+---------+------------+---------+---------+---------------+----------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|  0|        0|           0|        0|        0|              0|               0|              0|                0|            0|               0|              0|    

In [51]:
final_df.columns

['id',
 'medallion',
 'hack_license',
 'vendor_id',
 'rate_code',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'payment_type',
 'fare_amount',
 'surcharge',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'total_amount']

In [52]:
final_df = final_df.na.drop(subset=[
    'passenger_count',
    'trip_time_in_secs',
    'trip_distance',
    'fare_amount'
])

In [55]:
#'passenger_count', 'trip_time_in_secs', 'trip_distance', 'fare_amount' must be greater than 0
final_df = final_df.filter((final_df.passenger_count > 0) & \
                           (final_df.trip_time_in_secs > 0) & \
                           (final_df.trip_distance > 0) & \
                           (final_df.fare_amount > 0)
                          )