In [1]:
# We first need to import PySpark
import pyspark
from pyspark.sql import SparkSession

In [2]:
# download the dataset from web

# data_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz"
# data_loc = "data/fhvhv_tripdata_2021-06.csv.gz"
# !wget {data_url} -O {data_loc}

In [3]:
# unzip the data file
import gzip, shutil

with gzip.open('data/fhvhv_tripdata_2021-06.csv.gz', 'r') as f_in, \
     open('data/fhvhv_tripdata_2021-06.csv', 'wb') as f_out:
     shutil.copyfileobj(f_in, f_out)


In [4]:
# check that the unzipped file exists
!ls -lh data/

total 2162944
-rw-r--r--  1 pllee  staff   878M Mar  8 07:10 fhvhv_tripdata_2021-06.csv
-rw-r--r--@ 1 pllee  staff   168M Dec 20 08:13 fhvhv_tripdata_2021-06.csv.gz


In [5]:
# Instantiate a Spark session, an object that we use to interact with Spark.

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

df = spark.read \
    .option("header", "true") \
    .csv('data/fhvhv_tripdata_2021-06.csv')

df.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/08 07:10:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B0

In [6]:
# execute spark.version for question 1
spark.version

'3.3.2'

In [7]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [8]:
# get the first 1000 rows 
unzipped = "data/fhvhv_tripdata_2021-06.csv"
!head -n 1001 {unzipped} > head.csv

In [12]:
# use Pandas to read head.csv and make a schema inferring the datatypes

import pandas as pd

df_pandas = pd.read_csv('head.csv')
df_pandas.dtypes

dispatching_base_num      object
pickup_datetime           object
dropoff_datetime          object
PULocationID               int64
DOLocationID               int64
SR_Flag                   object
Affiliated_base_number    object
dtype: object

In [13]:
# create schema for the spark datafram

from pyspark.sql import types

schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [17]:
# read in the dataset

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhvhv_tripdata_2021-06.csv')

In [18]:
# repartition to 12 partitions

df = df.repartition(12)

In [19]:
# save to parquet

df.write.parquet('fhvhv/2021/06/')

                                                                                

In [20]:
!ls -lh fhvhv/2021/06/

total 566784
-rw-r--r--  1 pllee  staff     0B Mar  8 07:29 _SUCCESS
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00000-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00001-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00002-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00003-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00004-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00005-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00006-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-rw-r--r--  1 pllee  staff    23M Mar  8 07:29 part-00007-014b8501-6e33-40a4-82c1-9563c2035622-c000.snappy.parquet
-

In [21]:
# Question 2: Average size of parquet files is 24MB

In [23]:
# Question 3: How many taxi trips were there on June 15? 
# Consider only trips that started on June 15.

# read the parguet files
df = spark.read.parquet('fhvhv/2021/06/')
# check the schema
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [24]:
from pyspark.sql import functions as F

In [33]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .select('pickup_date') \
    .filter(F.to_date(df.pickup_datetime) == '2021-06-15') \
    .count()


                                                                                

452470

In [66]:
# Question 4: How long was the longest trip in hours?

df_tmp = df \
  .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
  .select('pickup_date', 'pickup_datetime', 'dropoff_datetime') \
  .where( ( F.to_date(df.pickup_datetime) <= '2021-12-31') &( F.to_date(df.pickup_datetime) >= '2021-01-01') ) \
  .withColumn("DiffInHours", F.round( (df.dropoff_datetime.cast("long") - df.pickup_datetime.cast("long"))/3600 ) ) 
 # .show()

In [67]:
df_tmp.columns

['pickup_date', 'pickup_datetime', 'dropoff_datetime', 'DiffInHours']

In [70]:
df_tmp.sort(F.desc(df_tmp.DiffInHours)).show()

[Stage 27:>                                                         (0 + 4) / 4]

+-----------+-------------------+-------------------+-----------+
|pickup_date|    pickup_datetime|   dropoff_datetime|DiffInHours|
+-----------+-------------------+-------------------+-----------+
| 2021-06-25|2021-06-25 13:55:41|2021-06-28 08:48:25|       67.0|
| 2021-06-22|2021-06-22 12:09:45|2021-06-23 13:42:44|       26.0|
| 2021-06-27|2021-06-27 10:32:29|2021-06-28 06:31:20|       20.0|
| 2021-06-26|2021-06-26 22:37:11|2021-06-27 16:49:01|       18.0|
| 2021-06-23|2021-06-23 20:40:43|2021-06-24 13:08:44|       16.0|
| 2021-06-24|2021-06-24 23:11:00|2021-06-25 13:05:35|       14.0|
| 2021-06-23|2021-06-23 22:03:31|2021-06-24 12:19:39|       14.0|
| 2021-06-04|2021-06-04 20:56:02|2021-06-05 08:36:14|       12.0|
| 2021-06-27|2021-06-27 07:45:19|2021-06-27 19:07:16|       11.0|
| 2021-06-20|2021-06-20 17:05:12|2021-06-21 04:04:16|       11.0|
| 2021-06-01|2021-06-01 12:01:46|2021-06-01 21:59:45|       10.0|
| 2021-06-18|2021-06-18 08:50:29|2021-06-18 18:27:57|       10.0|
| 2021-06-

                                                                                

In [71]:
# Get the zones data
zones_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"


In [72]:
!wget {zones_url} -O data/taxi_zone_lookup.csv

--2023-03-08 10:23:58--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230308%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230308T022358Z&X-Amz-Expires=300&X-Amz-Signature=f0a2243c6a371432ff3c223f57f6c02d436fdb9a6f314c59fbaae05acd2fb3a8&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-08 10:23:58--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62

In [73]:
# check the zones download
!ls -lh data/

total 2162976
-rw-r--r--  1 pllee  staff   878M Mar  8 07:10 fhvhv_tripdata_2021-06.csv
-rw-r--r--@ 1 pllee  staff   168M Dec 20 08:13 fhvhv_tripdata_2021-06.csv.gz
-rw-r--r--  1 pllee  staff    12K Jul 19  2022 taxi_zone_lookup.csv


In [74]:
# read the zones data into spark
df_zones = spark.read \
           .option("header", "true") \
           .csv('data/taxi_zone_lookup.csv')
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [75]:
df_zones.write.parquet('zones')

In [76]:
!ls -lh

total 752
drwxr-xr-x  5 pllee  staff   160B Mar  8 10:23 [34mdata[m[m
drwxr-xr-x  4 pllee  staff   128B Mar  8 07:30 [34mfhvhv[m[m
-rw-r--r--@ 1 pllee  staff    60K Mar  8 07:11 head.csv
-rw-r--r--  1 pllee  staff    21K Mar  8 10:26 hw5-batch-processing.ipynb
-rw-r--r--@ 1 pllee  staff   285K Mar  4 20:12 hw5-questions.pdf
drwxr-xr-x  6 pllee  staff   192B Mar  8 10:27 [34mzones[m[m


In [79]:
df_zones = spark.read.parquet('zones/')


In [80]:

df_result = df.join(df_zones, df.PULocationID == df_zones.LocationID)
df_result.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|              B02878|2021-06-05 00:23:56|2021-06-05 00:51:46|          20|         233|      N|                B02878|        20|    Bronx|             Belmont|   Boro Zone|
|              B02510|2021-06-04 21:14:22|2021-06-04 21:25:13|         162|         263|      N|                  null|       162|Manhattan|        Midtown East| Yellow Zone|
|              B02866|2021-06-05 12:07:28|2021-06-05 12:12:41|          35|          35|      N|                B02866|      

[Stage 35:>                                                         (0 + 1) / 1]                                                                                

In [85]:
df_result \
    .drop('PULocationID', 'LocationID') \
    .groupBy('Zone') \
    .count() \
    .sort(F.desc('Count')) \
    .show()

[Stage 45:>                                                         (0 + 4) / 4]

+--------------------+------+
|                Zone| count|
+--------------------+------+
| Crown Heights North|231279|
|        East Village|221244|
|         JFK Airport|188867|
|      Bushwick South|187929|
|       East New York|186780|
|TriBeCa/Civic Center|164344|
|   LaGuardia Airport|161596|
|            Union Sq|158937|
|        West Village|154698|
|             Astoria|152493|
|     Lower East Side|151020|
|        East Chelsea|147673|
|Central Harlem North|146402|
|Williamsburg (Nor...|143683|
|          Park Slope|143594|
|  Stuyvesant Heights|141427|
|        Clinton East|139611|
|West Chelsea/Huds...|139431|
|             Bedford|138428|
|         Murray Hill|137879|
+--------------------+------+
only showing top 20 rows



                                                                                