In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
# get the version
pyspark.__version__

'3.3.2'

In [3]:
# create a local spark session

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/05 19:39:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark.version

'3.3.2'

In [5]:
# download the October 2019 FHV data

!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-05 19:39:09--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240305T193910Z&X-Amz-Expires=300&X-Amz-Signature=d68cd19efdffea35187bf73c382d7c75e61f56abdec74f08b52e436abc7af3bc&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-05 19:39:10--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [6]:
!ls -al

total 135936
drwxrwxr-x  3 patlee patlee      4096 Mar  5 19:39 .
drwxr-xr-x 17 patlee patlee      4096 Mar  5 17:57 ..
drwxrwxr-x  2 patlee patlee      4096 Mar  5 18:00 .ipynb_checkpoints
-rw-rw-r--  1 patlee patlee 119796110 Dec  2  2022 fhv_tripdata_2019-10.csv
-rw-rw-r--  1 patlee patlee  19375751 Dec  2  2022 fhv_tripdata_2019-10.csv.gz
-rw-rw-r--  1 patlee patlee      5533 Mar  5 19:35 spark_hw5.ipynb


In [7]:
import pandas as pd
pd.DataFrame.iteritems = pd.DataFrame.items
pd.__version__  # iteritems has been deprecated in pandas, but this version of Spark relies on iteritems

'2.1.4'

In [8]:
# Want to use pandas to infer the data types, by reading the first 1001 rows

df_pandas = pd.read_csv("fhv_tripdata_2019-10.csv.gz", compression="gzip", header=0, nrows=1000)
df_pandas.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [9]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [10]:
len(df_pandas)

1000

In [11]:
spark.createDataFrame(df_pandas).schema

TypeError: field Affiliated_base_number: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>

In [12]:
# The above TypeError in the column Affiliated_base_number arose because the NaN's in the column
# are interpreted as DoubleType, while the values are StringType.  In view of this, I will write
# the schema directly instead of using schema inference via Pandas.

In [13]:
from pyspark.sql import types


In [31]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [32]:
# read in the fhv data using the above schema

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [33]:
# repartition to 6 partitions, and save to parquet

df = df.repartition(6)

In [34]:
df.write.parquet('fhv/2019/10/')

                                                                                

In [35]:
!ls -al fhv/2019/10/

total 37964
drwxr-xr-x 2 patlee patlee    4096 Mar  5 20:23 .
drwxr-xr-x 3 patlee patlee    4096 Mar  5 20:23 ..
-rw-r--r-- 1 patlee patlee       8 Mar  5 20:23 ._SUCCESS.crc
-rw-r--r-- 1 patlee patlee   50204 Mar  5 20:23 .part-00000-9cea4082-890d-4e38-953c-9da8ea8d0a5f-c000.snappy.parquet.crc
-rw-r--r-- 1 patlee patlee   50144 Mar  5 20:23 .part-00001-9cea4082-890d-4e38-953c-9da8ea8d0a5f-c000.snappy.parquet.crc
-rw-r--r-- 1 patlee patlee   50152 Mar  5 20:23 .part-00002-9cea4082-890d-4e38-953c-9da8ea8d0a5f-c000.snappy.parquet.crc
-rw-r--r-- 1 patlee patlee   50144 Mar  5 20:23 .part-00003-9cea4082-890d-4e38-953c-9da8ea8d0a5f-c000.snappy.parquet.crc
-rw-r--r-- 1 patlee patlee   50176 Mar  5 20:23 .part-00004-9cea4082-890d-4e38-953c-9da8ea8d0a5f-c000.snappy.parquet.crc
-rw-r--r-- 1 patlee patlee   50312 Mar  5 20:23 .part-00005-9cea4082-890d-4e38-953c-9da8ea8d0a5f-c000.snappy.parquet.crc
-rw-r--r-- 1 patlee patlee       0 Mar  5 20:23 _SUCCESS
-rw-r--r-- 1 patlee patlee 6424988 Mar  5 

In [36]:
# How many taxi trips were there on 15 October?

from pyspark.sql import functions as F


In [37]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .filter("pickup_date = '2019-10-15'") \
    .count()

                                                                                

62610

In [39]:
# Length of longest trip in hours

df \
    .withColumn('duration_hr', (df.dropOff_datetime.cast('long') - df.pickup_datetime.cast('long'))/3600 ) \
    .orderBy('duration_hr', ascending=False) \
    .limit(5) \
    .show()



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|      duration_hr|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   null|                B02832|         631152.5|
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   null|                B02832|         631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|        null|        null|   null|                B02416|87672.44083333333|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   null|       B00746         |70128.02805555555|
|              B02921|2019-10-17 1

                                                                                

In [40]:
# download the zones data

!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

--2024-03-05 20:31:06--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240305T203106Z&X-Amz-Expires=300&X-Amz-Signature=bed6c57a1f66047e562c831b322be967cdafe419e2b16f358a686504f5d51d3e&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2024-03-05 20:31:06--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6e

In [48]:
df_zones = spark.read.csv('taxi_zone_lookup.csv', header=True)

In [49]:
df_zones.head(5)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [50]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [52]:
# load the zone lookup data into a temp view

df_zones.createOrReplaceTempView('zones')

In [53]:
# load the fhv data into a temp view

df.registerTempTable('fhv_2019_10')

In [54]:
# use zone data and FHV October 2019 data, find the least frequent pickup zone

spark.sql("""
SELECT
    pul.Zone,
    COUNT(1)
FROM 
    fhv_2019_10 fhv LEFT JOIN zones pul ON fhv.PUlocationID = pul.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show()




+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
+--------------------+--------+



                                                                                