#Install Pyspark & Preparation

In [119]:
!pip install -q findspark

In [120]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [121]:
import findspark
findspark.init()

In [122]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()

In [123]:
spark

#Download Data & Preparation

In [124]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet

--2022-10-03 15:37:36--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.84.11, 65.9.84.37, 65.9.84.167, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.84.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21777258 (21M) [application/x-www-form-urlencoded]
Saving to: ‘yellow_tripdata_2021-02.parquet.2’


2022-10-03 15:37:36 (71.6 MB/s) - ‘yellow_tripdata_2021-02.parquet.2’ saved [21777258/21777258]



In [125]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet

--2022-10-03 15:37:36--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.84.11, 65.9.84.37, 65.9.84.167, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.84.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1145679 (1.1M) [binary/octet-stream]
Saving to: ‘green_tripdata_2021-02.parquet.3’


2022-10-03 15:37:36 (43.9 MB/s) - ‘green_tripdata_2021-02.parquet.3’ saved [1145679/1145679]



In [126]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

--2022-10-03 15:37:37--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.9.84.11, 65.9.84.37, 65.9.84.167, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.9.84.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet.2’


2022-10-03 15:37:37 (52.5 MB/s) - ‘fhv_tripdata_2021-02.parquet.2’ saved [10645466/10645466]



In [127]:
df_yellow = spark.read.parquet('yellow_tripdata_2021-02.parquet')
df_green = spark.read.parquet('green_tripdata_2021-02.parquet')
df_fhv = spark.read.parquet('fhv_tripdata_2021-02.parquet')

In [128]:
df_yellow.show()
df_green.show()
df_fhv.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 00:40:47|  2021-02-01 00:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

In [129]:
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
df_green = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [130]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [131]:
common_colums

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [132]:
from pyspark.sql import functions as f

In [133]:
df_yellow_sel = df_yellow.select(common_colums).withColumn('service_type', f.lit('yellow'))
df_green_sel = df_green.select(common_colums).withColumn('service_type', f.lit('green'))

In [134]:
df_join = df_yellow_sel.unionAll(df_green_sel)

In [135]:
df_join.show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       1|2021-02-01 00:40:47|2021-02-01 00:48:28|                 N|       1.0|         141|         226|            1.0|          2.3|        8.5|  3.0|    0.5|       0.0|         0.0|       

#1. How many taxi trips were there on February 15 ?

In [136]:
df_join

DataFrame[VendorID: bigint, pickup_datetime: timestamp, dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, payment_type: double, congestion_surcharge: double, service_type: string]

In [137]:
df1 = df_join.withColumn('dropoff_datetime',f.to_date(df_join.dropoff_datetime)).filter((df_join['dropoff_datetime']>='2021-02-15')&(df_join['dropoff_datetime']<'2021-02-16')).select('dropoff_datetime').groupBy('dropoff_datetime').count()

In [138]:
df1.show()

+----------------+-----+
|dropoff_datetime|count|
+----------------+-----+
|      2021-02-15|42170|
+----------------+-----+



#2. Find the longest trip for each day ?

In [139]:
df2 = df_join.withColumn('pickup_datetime',f.to_date(df_join.pickup_datetime)).select('pickup_datetime','trip_distance').filter(df_join['pickup_datetime']>='2021-02-01').groupBy('pickup_datetime').max('trip_distance').sort('pickup_datetime')

In [140]:
df2.show(28)

+---------------+------------------+
|pickup_datetime|max(trip_distance)|
+---------------+------------------+
|     2021-02-01|             38.89|
|     2021-02-02|             73.24|
|     2021-02-03|         186079.73|
|     2021-02-04|             82.19|
|     2021-02-05|          91134.16|
|     2021-02-06|              43.5|
|     2021-02-07|         186510.67|
|     2021-02-08|         186617.92|
|     2021-02-09|           60382.7|
|     2021-02-10|           60382.7|
|     2021-02-11|          43174.56|
|     2021-02-12|          66659.27|
|     2021-02-13|         115928.92|
|     2021-02-14|             58.03|
|     2021-02-15|             52.89|
|     2021-02-16|         221188.25|
|     2021-02-17|         140145.44|
|     2021-02-18|             75.81|
|     2021-02-19|              70.4|
|     2021-02-20|         188054.03|
|     2021-02-21|             55.87|
|     2021-02-22|             49.56|
|     2021-02-23|          30195.95|
|     2021-02-24|          90073.44|
|

#3. Find Top 5 Most frequent `dispatching_base_num` ?

In [141]:
df3 = df_fhv.groupBy('dispatching_base_num').count().sort('count',ascending=False)

In [142]:
df3.show(5)

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+
only showing top 5 rows



#4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [143]:
df4 = df_fhv.filter('PUlocationID is not NULL AND DOlocationID is not NULL').groupBy(['PUlocationID','DOlocationID']).count().sort('count',ascending=False)

In [144]:
df4.show(5)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
+------------+------------+-----+
only showing top 5 rows



#5. Write all of the result to BigQuery table (additional - point plus)

In [None]:
df1.write.mode('overwrite').format('bigquery').save('Data Fellowship 7.schema.df_answer_1')

In [None]:
df2.write.mode('overwrite').format('bigquery').save('Data Fellowship 7.schema.df_answer_2')

In [None]:
df3.write.mode('overwrite').format('bigquery').save('Data Fellowship 7.schema.df_answer_3')

In [None]:
df4.write.mode('overwrite').format('bigquery').save('Data Fellowship 7.schema.df_answer_4')