#Setup Environment

In [13]:
import os
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk-19"
os.environ["SPARK_HOME"] = "C:\Spark"

In [14]:
import findspark
findspark.init()
findspark.find()

'C:\\Spark'

In [27]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *

In [16]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [26]:
!python -m wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet


Saved under yellow_tripdata_2021-02.parquet


In [29]:
df = spark.read.parquet("yellow_tripdata_2021-02.parquet", header=True, inferSchema=True)
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [31]:
df.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 07:40:47|  2021-02-01 07:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

In [32]:
df = df \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
df.registerTempTable('data_table')



# How many taxi trips were there on February 15?

In [51]:
taxi_trips_15feb = spark.sql(""" 
    SELECT COUNT(pickup_datetime) AS taxi_trips_15feb
    FROM data_table
    WHERE pickup_datetime >= '2021-02-15 00:00:00' AND pickup_datetime < '2021-02-16 00:00:00'
""")
taxi_trips_15feb.show()

+----------------+
|taxi_trips_15feb|
+----------------+
|           43686|
+----------------+



# Find the longest trip for each day?

In [37]:
longest_trip = df.withColumn("pickup_datetime" , to_date(df['pickup_datetime']))\
                      .select(['pickup_datetime','trip_distance'])\
                      .where("pickup_datetime >= '2021-02-01' ")\
                      .groupby(F.col('pickup_datetime')).agg(F.max('trip_distance').alias('longest_trip')).sort(desc("longest_trip"))
longest_trip.show(10)

+---------------+------------+
|pickup_datetime|longest_trip|
+---------------+------------+
|     2021-02-16|   221188.25|
|     2021-02-20|   188054.03|
|     2021-02-08|   186617.92|
|     2021-02-07|   186510.67|
|     2021-02-03|   186079.73|
|     2021-02-17|   140145.44|
|     2021-02-14|   115928.92|
|     2021-02-05|    91134.16|
|     2021-02-26|    90796.21|
|     2021-02-24|    90073.44|
+---------------+------------+
only showing top 10 rows



# Find Top 5 Most frequent _dispatching_base_num_?

In [39]:
!python -m wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet


Saved under fhv_tripdata_2021-02.parquet


In [40]:
df_fhv = spark.read.parquet("fhv_tripdata_2021-02.parquet", header=True, inferSchema=True)
df_fhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: double (nullable = true)
 |-- DOlocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [41]:
df_fhv.show(10)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 07:01:00|2021-02-01 08:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 07:55:40|2021-02-01 08:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 07:14:03|2021-02-01 07:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 07:27:48|2021-02-01 07:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 07:12:50|2021-02-01 07:26:38|        null|       225.0|   null|                B00037|
+--------------------+------------------

In [47]:
most_freq = df_fhv.groupBy("dispatching_base_num").count() \
                    .orderBy(F.col('count').desc())
most_freq.show(5)

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+
only showing top 5 rows



# Find Top 5 Most common location pairs (_PUlocationID_ and _DOlocationID_) ?

In [52]:
common_loc = df.where("PUlocationID IS NOT NULL AND DOlocationID IS NOT NULL") \
                      .groupBy(["PUlocationID",'DOlocationID']) \
                      .count() \
                      .orderBy(F.col('count').desc())
common_loc.show(10)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|         237|         236|11455|
|         236|         237| 9901|
|         236|         236| 8819|
|         237|         237| 7324|
|         264|         264| 5732|
|         239|         238| 4991|
|         141|         236| 4756|
|         239|         142| 4243|
|         238|         239| 4199|
|         236|         141| 4048|
+------------+------------+-----+
only showing top 10 rows



In [54]:
common_loc_fhv = df_fhv.where("PUlocationID IS NOT NULL AND DOlocationID IS NOT NULL") \
                          .groupBy(["PUlocationID",'DOlocationID']) \
                          .count() \
                          .orderBy(F.col('count').desc())
common_loc_fhv.show(10)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
|       221.0|       221.0| 1562|
|       223.0|       223.0| 1522|
|        92.0|        92.0| 1383|
|       206.0|       221.0| 1309|
|        56.0|        56.0| 1073|
+------------+------------+-----+
only showing top 10 rows



# Save to BQuery

In [None]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = ".google/credentials/google_credentials.json"
gcs_bucket = 'playgrounddata-bucket'
bq_dataset = 'taxi_trips'
bq_table = 'yellow_tripdata_2021-02'

df.write.format("bigquery") \
  .option("table","{}.{}".format(bq_dataset, bq_table)) \
  .option("temporaryGcsBucket", gcs_bucket) \
  .mode('overwrite') \
  .save()