# Import Dependencies

In [1]:
import pyspark
import wget
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import asc, desc
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

# Extracting Data

In [2]:
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet'
df = wget.download(url)

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('project1') \
    .getOrCreate()

In [4]:
df1 = spark.read.parquet('fhvhv_tripdata_2021-02.parquet')

In [5]:
df1.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampType(), True), StructField('on_scene_datetime', TimestampType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('shared_request_flag',

# How many taxi trips were there on February 15?

For trips on February 15, we could count in different approach : 
Request on February 15 :
There could be request on February 15, but pick-up on February 16.
Pick-up on February 15 :
There could be pick-up on February 15, but drop-off on February 16,
Drop-off on February 15 :
There could be pick-up on February 14, but drop-off on February 15.

In [6]:
q1 = df1.select('*') \
        .filter((df1['request_datetime']>='2021-02-15 00:00:00 UTC')&(df1['request_datetime']<'2021-02-16 00:00:00 UTC') \
            &(df1['pickup_datetime']>='2021-02-15 00:00:00 UTC')&(df1['pickup_datetime']<'2021-02-16 00:00:00 UTC') \
                &(df1['dropoff_datetime']>='2021-02-15 00:00:00 UTC')&(df1['dropoff_datetime']<'2021-02-16 00:00:00 UTC'))

In [7]:
q1.count()

363019

There are 363019 Taxi Trips on February 15

# Find the longest trip for each day ?

As the datetime using date & time, we should change it into date only. Note that we are not cleaning the dataset thus null values will appear.

In [8]:
q2 = df1.withColumn('pickup_datetime',functions.to_date(df1.pickup_datetime)) \
        .groupBy('pickup_datetime') \
            .max('trip_miles') \
                .sort(asc('pickup_datetime')) \
                    .show()

+---------------+---------------+
|pickup_datetime|max(trip_miles)|
+---------------+---------------+
|     2021-02-01|         212.43|
|     2021-02-02|         282.78|
|     2021-02-03|         184.26|
|     2021-02-04|         203.97|
|     2021-02-05|         245.35|
|     2021-02-06|         275.32|
|     2021-02-07|         216.36|
|     2021-02-08|          253.5|
|     2021-02-09|         480.73|
|     2021-02-10|          512.5|
|     2021-02-11|         240.66|
|     2021-02-12|         250.11|
|     2021-02-13|         226.24|
|     2021-02-14|         207.44|
|     2021-02-15|        173.582|
|     2021-02-16|        307.661|
|     2021-02-17|         324.19|
|     2021-02-18|         527.11|
|     2021-02-19|         224.33|
|     2021-02-20|         329.16|
+---------------+---------------+
only showing top 20 rows



Tabel above shows the longest trip for each day from February 1 to March 1

# Find Top 5 Most frequent `dispatching_base_num` !

In [9]:
# We need to count the frequency for every dispatching_base_num
q3 = df1.cube('dispatching_base_num') \
                    .count() \
                        .orderBy(functions.col('count').desc()) \
                            .show(6)

+--------------------+--------+
|dispatching_base_num|   count|
+--------------------+--------+
|                null|11613942|
|              B02510| 3233664|
|              B02764|  965568|
|              B02872|  882689|
|              B02875|  685390|
|              B02765|  559768|
+--------------------+--------+
only showing top 6 rows



Table above show top 5 most frequent `dispatching_base_num`. Ignore the `null` because 11.613.942 is the total data in the whole tabel (fhvhv_tripdata_2021-02-1)

# Find Top 5 Most common location pairs (`PUlocationID` and `DOlocationID`) !

In [10]:
q4 = df1.groupBy(['PUlocationID','DOlocationID']) \
            .count() \
                .orderBy(functions.col('count').desc()) \
                    .show(5)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|          76|          76|45041|
|          26|          26|37329|
|          39|          39|28026|
|          61|          61|25976|
|          14|          14|17934|
+------------+------------+-----+
only showing top 5 rows



Table above show top 5 most frequent pairing in `PUlocationID` and `DOlocationID`

# Write to BigQuery

In [None]:
q1.write.format('bigquery').option('table','firm-pentameter-363006.ny_taxi.q1').save()

In [None]:
q2.write.format('bigquery').option('table','firm-pentameter-363006.ny_taxi.q2').save()

In [None]:
q3.write.format('bigquery').option('table','firm-pentameter-363006.ny_taxi.q3').save()

In [None]:
q4.write.format('bigquery').option('table','firm-pentameter-363006.ny_taxi.q4').save()