# Prerequisites
Sign up to https://ngrok.com/ to be able to reach Spark UI

In [65]:
%%capture
!pip install pyspark
!pip install findspark
!pip install pyngrok

In [66]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [67]:
spark = SparkSession.builder \
        .appName('testColab') \
        .getOrCreate()

# Start a tunnel to access SparkUI

Open a ngrok tunnel to the HTTP server

In [18]:
from pyngrok import ngrok, conf
import getpass

print("Enter your authtoken, which can be copied "
"from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken
··········
 * ngrok tunnel "https://c513-104-155-217-65.ngrok-free.app" -> "http://127.0.0.1:4040"


## Download Yellow Taxi Trip records data, read it in with Spark, and count the number of rows

Data source: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [68]:
from pyspark import SparkFiles
from pyspark.sql.functions import *

file_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet'
spark.sparkContext.addFile(file_url)

df = spark.read.parquet(SparkFiles.get('yellow_tripdata_2024-10.parquet'))

df.count()

3833771

In [69]:
#Use repartition() and partitionBy() together
df.repartition(4)\
        .write\
        .mode("overwrite") \
        .parquet("hw05/")

In [70]:
#specify start and end dates
dates = ('2024-10-15', '2024-10-16')

#filter DataFrame to only show rows between start and end dates
df.filter(df.tpep_pickup_datetime.between(*dates)).count()

128895

In [71]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [72]:
df\
  .withColumn('DiffInSeconds',unix_timestamp("tpep_dropoff_datetime") - unix_timestamp('tpep_pickup_datetime')) \
  .withColumn('Longest',round(col('DiffInSeconds')/3600))\
  .sort(col('Longest').desc())\
  .show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------+-------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|DiffInSeconds|Longest|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------------+-------+
|       2| 2024-10-16 13:03:49|  2024-10-23 07:40:53|              1|        32.37|         3|                 N|        

In [73]:
lookup_url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'
spark.sparkContext.addFile(lookup_url)

df_lu = spark.read.csv(SparkFiles.get('taxi_zone_lookup.csv'), header=True)
df_lu.count()

265

In [74]:
df_lu.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [75]:
df_lu.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [76]:
df.join(df_lu,df.PULocationID ==  df_lu.LocationID,'left') \
  .groupBy('Zone')\
  .count()\
  .sort(col('count').asc())\
  .show(truncate=False)

+---------------------------------------------+-----+
|Zone                                         |count|
+---------------------------------------------+-----+
|Governor's Island/Ellis Island/Liberty Island|1    |
|Rikers Island                                |2    |
|Arden Heights                                |2    |
|Jamaica Bay                                  |3    |
|Green-Wood Cemetery                          |3    |
|Charleston/Tottenville                       |4    |
|Rossville/Woodrow                            |4    |
|West Brighton                                |4    |
|Port Richmond                                |4    |
|Eltingville/Annadale/Prince's Bay            |4    |
|Great Kills                                  |6    |
|Crotona Park                                 |6    |
|Mariners Harbor                              |7    |
|Heartland Village/Todt Hill                  |7    |
|Saint George/New Brighton                    |9    |
|Oakwood                    

In [None]:
# Optionally put the tunnel down
# ngrok.disconnect(public_url)