In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, count

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('spark') \
    .getOrCreate()

# Read Data Green Taxi and Yellow Taxi

In [4]:
df_green = spark.read.parquet('green_tripdata_2021-02.parquet')
df_yellow = spark.read.parquet('yellow_tripdata_2021-02.parquet')
df_fhvhv = spark.read.parquet('fhvhv_tripdata_2021-02.parquet')

In [5]:
df_yellow.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [7]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [8]:
df_fhvhv.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp (nullable = true)
 |-- on_scene_datetime: timestamp (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_flag: string (nul

In [9]:
df_yellow.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 07:40:47|  2021-02-01 07:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

In [10]:
df_green.show(10)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-02-01 07:34:03|  2021-02-01 07:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.

In [11]:
df_fhvhv.show(10)

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

# How many taxi trips were there on February 15?

In [12]:
yellow_trip = df_yellow.filter(col("tpep_pickup_datetime").like("2021-02-15%"))
yell_count = yellow_trip.count()

green_trip = df_green.filter(col("lpep_pickup_datetime").like("2021-02-15%"))
green_count = green_trip.count()

fhvhc_trip = df_fhvhv.filter(col("pickup_datetime").like("2021-02-15%"))
fhvhc_count = fhvhc_trip.count()

print("There were", yell_count, "Yellow Taxi Trips on February 15.")
print("There were", green_count, "Green Taxi Trips on February 15.")
print("There were", fhvhc_count, "Fhvhc Trips on February 15.")

There were 43686 Yellow Taxi Trips on February 15.
There were 1811 Green Taxi Trips on February 15.
There were 425928 Fhvhc Trips on February 15.


# The longest trip for each day?

In [13]:
yellow_longest = (
    df_yellow.withColumn("date", col("tpep_pickup_datetime").substr(1, 10))
    .groupBy(col("date"))
    .agg(max(col("trip_distance")).alias("longest_trip"))
    .orderBy(col("date"))
)

yellow_longest.show()

+----------+------------+
|      date|longest_trip|
+----------+------------+
|2009-01-01|        2.89|
|2009-01-02|        0.84|
|2021-02-01|       38.89|
|2021-02-02|       73.24|
|2021-02-03|   186079.73|
|2021-02-04|       82.19|
|2021-02-05|    91134.16|
|2021-02-06|       48.35|
|2021-02-07|   186510.67|
|2021-02-08|   186617.92|
|2021-02-09|    89416.24|
|2021-02-10|       99.96|
|2021-02-11|        54.4|
|2021-02-12|    34346.07|
|2021-02-13|    54381.65|
|2021-02-14|   115928.92|
|2021-02-15|       52.89|
|2021-02-16|   221188.25|
|2021-02-17|   140145.44|
|2021-02-18|       900.0|
+----------+------------+
only showing top 20 rows



In [14]:
green_longest = (
    df_green.withColumn("date", col("lpep_pickup_datetime").substr(1, 10))
    .groupBy(col("date"))
    .agg(max(col("trip_distance")).alias("longest_trip"))
    .orderBy(col("date"))
)

green_longest.show()

+----------+------------+
|      date|longest_trip|
+----------+------------+
|2009-01-01|         0.0|
|2021-02-01|       27.52|
|2021-02-02|        48.1|
|2021-02-03|       36.33|
|2021-02-04|   102620.98|
|2021-02-05|       36.37|
|2021-02-06|       38.75|
|2021-02-07|        90.0|
|2021-02-08|      5634.0|
|2021-02-09|       34.64|
|2021-02-10|     60382.7|
|2021-02-11|    43174.56|
|2021-02-12|    66659.27|
|2021-02-13|       47.79|
|2021-02-14|       58.03|
|2021-02-15|       44.04|
|2021-02-16|    16191.56|
|2021-02-17|    16240.75|
|2021-02-18|    29501.25|
|2021-02-19|       34.95|
+----------+------------+
only showing top 20 rows



In [15]:
fhvhv_longest = (
    df_fhvhv.withColumn("date", col("pickup_datetime").substr(1, 10))
    .groupBy(col("date"))
    .agg(max(col("trip_miles")).alias("longest_trip"))
    .orderBy(col("date"))
)

fhvhv_longest.show()

+----------+------------+
|      date|longest_trip|
+----------+------------+
|2021-02-01|      212.43|
|2021-02-02|      282.78|
|2021-02-03|      184.26|
|2021-02-04|      203.97|
|2021-02-05|      245.35|
|2021-02-06|      275.32|
|2021-02-07|      216.36|
|2021-02-08|       253.5|
|2021-02-09|      480.73|
|2021-02-10|       512.5|
|2021-02-11|      240.66|
|2021-02-12|      250.11|
|2021-02-13|      226.24|
|2021-02-14|      207.44|
|2021-02-15|     173.582|
|2021-02-16|     307.661|
|2021-02-17|      324.19|
|2021-02-18|      527.11|
|2021-02-19|      224.33|
|2021-02-20|      329.16|
+----------+------------+
only showing top 20 rows



# Top 5 Most frequent dispatching_base_num

In [16]:
df_fhvhv.registerTempTable('fhvhv')



In [17]:
top5_disp_bases = spark.sql("SELECT \
                            dispatching_base_num, count(*) as trip_day \
                            FROM fhvhv \
                            group by dispatching_base_num \
                            order by count(*) desc limit 5")
top5_disp_bases.show()

+--------------------+--------+
|dispatching_base_num|trip_day|
+--------------------+--------+
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
+--------------------+--------+



# Top 5 Most common location pairs (PUlocationID and DOlocationID)

In [18]:
yell_top_loc = (
    df_yellow.groupBy(col("PULocationID"), col("DOLocationID")).count()
    .orderBy(col("count").desc())
)

yell_top_loc.show(5)

+------------+------------+-----+
|PULocationID|DOLocationID|count|
+------------+------------+-----+
|         237|         236|11455|
|         236|         237| 9901|
|         236|         236| 8819|
|         237|         237| 7324|
|         264|         264| 5732|
+------------+------------+-----+
only showing top 5 rows



In [19]:
green_top_loc = (
    df_green.groupBy(col("PULocationID"), col("DOLocationID")).count()
    .orderBy(col("count").desc())
)

green_top_loc.show(5)

+------------+------------+-----+
|PULocationID|DOLocationID|count|
+------------+------------+-----+
|          74|          75|  994|
|          75|          74|  949|
|          74|          74|  651|
|          41|          42|  535|
|          74|          42|  497|
+------------+------------+-----+
only showing top 5 rows



In [20]:
fhvhc_top_loc = (
    df_fhvhv.groupBy(col("PULocationID"), col("DOLocationID")).count()
    .orderBy(col("count").desc())
)

fhvhc_top_loc.show(5)

+------------+------------+-----+
|PULocationID|DOLocationID|count|
+------------+------------+-----+
|          76|          76|45041|
|          26|          26|37329|
|          39|          39|28026|
|          61|          61|25976|
|          14|          14|17934|
+------------+------------+-----+
only showing top 5 rows

