# PySpark Installation

In [174]:
!pip install -q findspark

In [175]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [176]:
import findspark 
findspark.init()

In [177]:
findspark.find()

'/usr/local/lib/python3.7/dist-packages/pyspark'

In [178]:
import pyspark

In [179]:
print(pyspark.__version__)

3.3.0


In [180]:
pyspark.__file__

'/usr/local/lib/python3.7/dist-packages/pyspark/__init__.py'

# Creating Session

In [181]:
from pyspark.sql import SparkSession

In [182]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('FirstSpark') \
    .getOrCreate()

# Data Preparation

In [183]:
#Download Data Green Taxi
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet

--2022-10-03 11:35:39--  https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 108.138.90.118, 108.138.90.225, 108.138.90.71, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|108.138.90.118|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1145679 (1.1M) [binary/octet-stream]
Saving to: ‘green_tripdata_2021-02.parquet.1’


2022-10-03 11:35:39 (31.2 MB/s) - ‘green_tripdata_2021-02.parquet.1’ saved [1145679/1145679]



In [184]:
#Download Data Green Yellow
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet

--2022-10-03 11:35:40--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 108.138.90.118, 108.138.90.225, 108.138.90.71, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|108.138.90.118|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21777258 (21M) [application/x-www-form-urlencoded]
Saving to: ‘yellow_tripdata_2021-02.parquet.1’


2022-10-03 11:35:40 (70.0 MB/s) - ‘yellow_tripdata_2021-02.parquet.1’ saved [21777258/21777258]



In [185]:
df_green = spark.read.parquet('green_tripdata_2021-02.parquet')

In [186]:
df_yellow = spark.read.parquet('yellow_tripdata_2021-02.parquet')

In [187]:
# Get row count
rows = df_green.count()
print(f"DataFrame df_green Rows count : {rows}")

# Get columns count
cols = len(df_green.columns)
print(f"DataFrame df_green Columns count : {cols}")

DataFrame df_green Rows count : 64572
DataFrame df_green Columns count : 20


In [188]:
# Get row count
rows = df_yellow.count()
print(f"DataFrame df_yellow Rows count : {rows}")

# Get columns count
cols = len(df_yellow.columns)
print(f"DataFrame df_yellow Columns count : {cols}")

DataFrame df_yellow Rows count : 1371709
DataFrame df_yellow Columns count : 19


In [189]:
df_green.show(5)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-02-01 00:34:03|  2021-02-01 00:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.

In [190]:
df_yellow.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 00:40:47|  2021-02-01 00:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

In [191]:
df_green = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [192]:
in_col = []
not_in_col = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
      in_col.append(col)
    else:
      not_in_col.append(col)

In [193]:
not_in_col

['ehail_fee', 'trip_type']

In [194]:
in_col

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [195]:
from pyspark.sql import functions as F

In [196]:
df_green_sel = df_green \
    .select(in_col) \
    .withColumn('service_type', F.lit('green'))

In [197]:
df_yellow_sel = df_yellow \
    .select(in_col) \
    .withColumn('service_type', F.lit('yellow'))

In [198]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [199]:
df_trips_data.show(5)

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2021-02-01 00:34:03|2021-02-01 00:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.5|      10.0|         0.0|       

In [200]:
# df_trips_data.withColumn("pickup_datetime", F.year(F.to_date(df_trips_data.pickup_datetime, "dd/MM/yyyy"))).show()

In [201]:
df_trips_data.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- service_type: string (nullable = false)



In [202]:
df_trips_data.groupBy('service_type').count().show()

+------------+-------+
|service_type|  count|
+------------+-------+
|       green|  64572|
|      yellow|1371709|
+------------+-------+



# How many taxi trips were there on February 15?

In [203]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col,lit

In [204]:
df_trips_data.select('pickup_datetime').where(F.to_date(df_trips_data.pickup_datetime)=='2021-02-15').count()

42100

# Find the longest trip for each day ?

In [205]:
df_trips_data.groupBy(F.to_date('pickup_datetime')) \
  .agg({'trip_distance':'max'}) \
  .where(F.to_date('pickup_datetime')>='2021-02-01') \
  .show()
  
  # .sort('pickup_datetime') \

+------------------------+------------------+
|to_date(pickup_datetime)|max(trip_distance)|
+------------------------+------------------+
|              2021-02-15|             52.89|
|              2021-02-02|             73.24|
|              2021-02-26|          90796.21|
|              2021-02-21|             55.87|
|              2021-02-05|          91134.16|
|              2021-02-10|           60382.7|
|              2021-02-01|             38.89|
|              2021-02-06|              43.5|
|              2021-02-19|              70.4|
|              2021-02-20|         188054.03|
|              2021-02-08|         186617.92|
|              2021-02-09|           60382.7|
|              2021-02-11|          43174.56|
|              2021-02-17|         140145.44|
|              2021-02-25|          50422.63|
|              2021-03-01|             28.34|
|              2021-02-27|             91.05|
|              2021-02-24|          90073.44|
|              2021-02-18|        

# Find Top 5 Most frequent `dispatching_base_num` ?

In [206]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

--2022-10-03 11:35:48--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 108.138.90.118, 108.138.90.225, 108.138.90.71, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|108.138.90.118|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet.2’


2022-10-03 11:35:48 (50.6 MB/s) - ‘fhv_tripdata_2021-02.parquet.2’ saved [10645466/10645466]



In [207]:
df_fhv = spark.read.parquet('/content/fhv_tripdata_2021-02.parquet')

In [208]:
# Get row count
rows = df_fhv.count()
print(f"DataFrame df_fhv Rows count : {rows}")

# Get columns count
cols = len(df_fhv.columns)
print(f"DataFrame df_fhv Columns count : {cols}")

DataFrame df_fhv Rows count : 1037692
DataFrame df_fhv Columns count : 7


In [209]:
df_fhv.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 00:14:03|2021-02-01 00:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 00:27:48|2021-02-01 00:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 00:12:50|2021-02-01 00:26:38|        null|       225.0|   null|                B00037|
+--------------------+------------------

In [210]:
df_fhv.groupBy('dispatching_base_num').count() \
  .sort('count', ascending=False) \
  .show(5)

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+
only showing top 5 rows



# Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [211]:
df_fhv.filter('PUlocationID is not NULL AND DOlocationID is not NULL') \
  .groupBy(['PUlocationID','DOlocationID']) \
  .count().sort('count',ascending=False).show(5)

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
+------------+------------+-----+
only showing top 5 rows



# Write all of the result to BigQuery table (additional - point plus)

In [None]:
from google.cloud import bigquery
client = bigquery.Client()