### Install Pyspark

In [141]:
!pip install -q findspark

!pip install pyspark

import findspark

findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Iniate Spark Session and Import Library

In [142]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

spark = SparkSession.builder \
    .master('local') \
    .appName('assignment_6') \
    .config('spark.executor.memory', '5gb') \
    .config("spark.cores.max", "6") \
    .getOrCreate()

spark.version

'3.3.0'

### Connect To Bigquery


In [188]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')


Authenticated


In [191]:
project_id = "data-fellowship-de"
from google.cloud import bigquery
import humanize
client = bigquery.Client(project=project_id)

In [193]:
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = "iykra-fellowship" # change with your bucket name
spark.conf.set('temporaryGcsBucket', bucket)

### Download Dataset

In [143]:
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet

--2022-10-03 11:54:00--  https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.226.23.72, 13.226.23.119, 13.226.23.168, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.226.23.72|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10645466 (10M) [binary/octet-stream]
Saving to: ‘fhv_tripdata_2021-02.parquet.1’


2022-10-03 11:54:00 (65.2 MB/s) - ‘fhv_tripdata_2021-02.parquet.1’ saved [10645466/10645466]



Create Table Schema

In [144]:
from pyspark.sql import types

In [145]:
schema = types.StructType(
    [
        types.StructField('hvfhs_license_num', types.StringType(), True),
        types.StructField('dispatching_base_num', types.StringType(), True),
        types.StructField('pickup_datetime', types.TimestampType(), True),
        types.StructField('dropoff_datetime', types.TimestampType(), True),
        types.StructField('PULocationID', types.DoubleType(), True),
        types.StructField('DOLocationID', types.DoubleType(), True),
        types.StructField('SR_Flag', types.IntegerType(), True)
    ]
)

In [146]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .parquet('/content/fhv_tripdata_2021-02.parquet')

In [147]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|             null|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|
|             null|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|
|             null|     B00021         |2021-02-01 00:14:03|2021-02-01 00:28:37|       173.0|        56.0|   null|
|             null|     B00021         |2021-02-01 00:27:48|2021-02-01 00:35:45|        82.0|       129.0|   null|
|             null|              B00037|2021-02-01 00:12:50|2021-02-01 00:26:38|        null|       225.0|   null|
|             null|              B00037|2021-02-01 00:00:37|2021-02-01 00:09:35|

Repartition it to 24 partitions.

In [148]:
df= df.repartition(24)

### How many taxi in trips where in the 15 February?

#### Method 1

In [149]:
from pyspark.sql import functions as F

trips_15_feb = df \
                    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
                    .filter("pickup_date = '2021-02-15'")      

trips_15_feb.show()         
print(f"total taxi trips in 15 February 2022: {trips_15_feb.count()} trips") 

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|             null|              B02292|2021-02-15 10:00:48|2021-02-15 10:13:45|        null|        35.0|   null| 2021-02-15|
|             null|              B00856|2021-02-15 14:39:46|2021-02-15 14:49:40|        null|        35.0|   null| 2021-02-15|
|             null|              B00160|2021-02-15 23:08:00|2021-02-15 23:24:00|        null|        null|   null| 2021-02-15|
|             null|              B00445|2021-02-15 12:05:26|2021-02-15 12:18:49|       173.0|        95.0|   null| 2021-02-15|
|             null|              B01338|2021-02-15 02:56:38|2021-02-15 03:10:51|        null|        42.0|   nu

#### Method 2

In [150]:
df.registerTempTable('fhvhv_2_2021')

trips_15_feb = spark.sql("""
SELECT
    *
FROM 
    fhvhv_2_2021
WHERE
    to_date(pickup_datetime) = '2021-02-15';
""").show()





+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|             null|              B02292|2021-02-15 10:00:48|2021-02-15 10:13:45|        null|        35.0|   null|
|             null|              B00856|2021-02-15 14:39:46|2021-02-15 14:49:40|        null|        35.0|   null|
|             null|              B00160|2021-02-15 23:08:00|2021-02-15 23:24:00|        null|        null|   null|
|             null|              B00445|2021-02-15 12:05:26|2021-02-15 12:18:49|       173.0|        95.0|   null|
|             null|              B01338|2021-02-15 02:56:38|2021-02-15 03:10:51|        null|        42.0|   null|
|             null|              B01016|2021-02-15 06:49:20|2021-02-15 06:51:33|

In [196]:
spark.sql("""
SELECT
    COUNT(1) as trips_15_feb
FROM 
    fhvhv_2_2021
WHERE
    to_date(pickup_datetime) = '2021-02-15';
""").show()

+------------+
|trips_15_feb|
+------------+
|       34814|
+------------+



Send to Bigquery

In [None]:
trips_15_feb.write.format('com.google.cloud.spark.bigquery') \
  .option('table', 'nyc_taxi_record.trips_15_feb') \
  .save()

### Find Longest Time Each Day?

Calculate the duration for each trip

#### Method 1

In [152]:
from pyspark.sql.functions import col, asc, desc

df \
    .withColumn('duration_hours', df.dropoff_datetime-df.pickup_datetime) \
    .orderBy(col('duration_hours').desc()) \
    .show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+--------------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|      duration_hours|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+--------------------+
|             null|              B03297|2021-02-04 18:45:00|2021-04-22 19:24:00|        null|        null|   null|INTERVAL '77 00:3...|
|             null|              B00477|2021-02-01 08:00:00|2021-03-05 11:30:00|        null|        null|   null|INTERVAL '32 03:3...|
|             null|              B00477|2021-02-01 12:00:00|2021-03-05 11:29:00|        null|        null|   null|INTERVAL '31 23:2...|
|             null|              B02925|2021-02-25 15:00:00|2021-03-25 17:49:00|       143.0|       143.0|   null|INTERVAL '28 02:4...|
|             null|              B02925|2021-02-

#### Method 2

In [153]:
trips_duration = spark.sql("""
                              SELECT
                                  pickup_datetime, dropoff_datetime,
                                  (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) AS duration_seconds
                              FROM
                                  fhvhv_2_2021
                              SORT BY
                                  duration_seconds DESC
                              """)

trips_duration.show()

+-------------------+-------------------+----------------+
|    pickup_datetime|   dropoff_datetime|duration_seconds|
+-------------------+-------------------+----------------+
|2021-02-13 13:16:55|2021-02-19 09:39:36|          505361|
|2021-02-05 05:35:51|2021-02-07 10:40:32|          191081|
|2021-02-12 16:19:26|2021-02-14 13:29:11|          162585|
|2021-02-18 06:43:14|2021-02-19 14:58:00|          116086|
|2021-02-25 13:45:02|2021-02-26 14:15:35|           88233|
|2021-02-17 00:08:18|2021-02-18 00:33:04|           87886|
|2021-02-09 00:07:57|2021-02-10 00:27:56|           87599|
|2021-02-11 11:26:38|2021-02-12 08:50:26|           77028|
|2021-02-02 18:01:40|2021-02-03 13:28:06|           69986|
|2021-02-27 11:31:00|2021-02-28 06:41:53|           69053|
|2021-02-20 18:22:18|2021-02-21 13:30:31|           68893|
|2021-02-07 20:37:50|2021-02-08 15:19:10|           67280|
|2021-02-13 17:48:14|2021-02-14 10:39:50|           60696|
|2021-02-11 14:07:05|2021-02-12 05:37:05|           5580

Send To Bigquery

In [None]:
trips_duration.write.format('com.google.cloud.spark.bigquery') \
  .option('table', 'nyc_taxi_record.trips_duration') \
  .save()

### Most frequent dispatching_base_num

Find the most frequently occurring dispatching_base_num in this dataset.

#### Method 1

In [154]:
df \
    .groupBy('dispatching_base_num') \
    .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .show()

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



#### Method 2

In [155]:
most_dispatching_base_num = spark.sql("""
    SELECT 
          dispatching_base_num,
          COUNT(1) as count_dispatching_base_num
    FROM 
          fhvhv_2_2021
    GROUP BY
          1
    ORDER BY
          2 DESC
    LIMIT 
          5
""")

most_dispatching_base_num.show()

+--------------------+--------------------------+
|dispatching_base_num|count_dispatching_base_num|
+--------------------+--------------------------+
|              B00856|                     35077|
|              B01312|                     33089|
|              B01145|                     31114|
|              B02794|                     30397|
|              B03016|                     29794|
+--------------------+--------------------------+



Send to BigQuery

In [None]:
most_dispatching_base_num.write.format('com.google.cloud.spark.bigquery') \
  .option('table', 'nyc_taxi_record.most_dispatching_base_num') \
  .save()

### 5 Most common location pairs

Find the most common pickup-dropoff pair.

In [156]:
# Import Zone Lookup dataset

!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

--2022-10-03 11:55:50--  https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 13.226.23.168, 13.226.23.47, 13.226.23.72, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|13.226.23.168|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [text/csv]
Saving to: ‘taxi+_zone_lookup.csv.3’


2022-10-03 11:55:50 (30.0 MB/s) - ‘taxi+_zone_lookup.csv.3’ saved [12322/12322]



In [157]:
# Create schema dataset
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [None]:
df_zones = spark.read.option('header', 'true').schema(zones_schema).csv('/content/taxi+_zone_lookup.csv')

In [170]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: double (nullable = true)
 |-- DOLocationID: double (nullable = true)
 |-- SR_Flag: integer (nullable = true)



In [165]:
df_zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [171]:
# create schema for zones pick up table
zpu = df_zones \
    .withColumnRenamed('Zone', 'PUzone') \
    .withColumnRenamed('LocationID', 'zPULocationID') \
    .withColumnRenamed('Borough', 'PUBorough') \
    .drop('service_zone')

# create schema for zones drop down table
zdo = df_zones \
    .withColumnRenamed('Zone', 'DOzone') \
    .withColumnRenamed('LocationID', 'zDOLocationID') \
    .withColumnRenamed('Borough', 'DOBorough') \
    .drop('service_zone')

In [181]:
# Join the table
df_join_temp = df.join(zpu, df.PULocationID == zpu.zPULocationID)
df_join = df_join_temp.join(zdo, df_join_temp.DOLocationID == zdo.zDOLocationID)

In [173]:
df_join.drop('PULocationID', 'DOLocationID', 'zPULocationID', 'zDOLocationID').write.parquet('zones')

In [176]:
df_join = spark.read.parquet('zones')
df_join.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- PUBorough: string (nullable = true)
 |-- PUzone: string (nullable = true)
 |-- DOBorough: string (nullable = true)
 |-- DOzone: string (nullable = true)



In [177]:
df_join.registerTempTable('zones_table_lookup')



In [187]:
pair_location = spark.sql("""
SELECT
    CONCAT(coalesce(PUzone, 'Unknown'), '/', coalesce(DOzone, 'Unknown')) AS zone_pair,
    COUNT(1) as total_count
FROM
    zones_table_lookup
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT
    5
;
""")

pair_location.show()

+--------------------+-----------+
|           zone_pair|total_count|
+--------------------+-----------+
|Saint George/New ...|       2374|
|Stapleton/Saint G...|       2112|
|Jackson Heights/J...|       1902|
|     Astoria/Astoria|       1829|
|Old Astoria/Old A...|       1736|
+--------------------+-----------+



Send to Bigquery

In [None]:
pair_location.write.format('com.google.cloud.spark.bigquery') \
  .option('table', 'nyc_taxi_record.pair_location') \
  .save()