In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/02 16:41:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
print(f'Spark Version : {spark.version}')

Spark Version : 3.3.2


In [5]:
df = spark.read \
    .option("header", "true") \
    .csv('./data/fhv_tripdata_2019-10.csv')

                                                                                

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
!head -n 1001 ./data/fhv_tripdata_2019-10.csv > head.csv

In [8]:
import pandas as pd

In [24]:
df_pandas = pd.read_csv('head.csv')

In [25]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [41]:
from pyspark.sql import types
from pyspark.sql.functions import *

In [47]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [48]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('./data/fhv_tripdata_2019-10.csv')

In [49]:
df = df.repartition(6)

In [51]:
df.write.mode('overwrite').parquet('./data/parquet')

                                                                                

In [52]:
df = spark.read.parquet('./data/parquet')

In [53]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [54]:
from pyspark.sql import functions as F

In [55]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   null|                  null|
|              B01315|2019-10-05 15:13:04|2019-10-05 15:19:48|         264|          74|   null|                B01315|
|              B01984|2019-10-12 17:13:00|2019-10-12 17:40:00|         264|          75|   null|                B01984|
|              B00310|2019-10-15 10:55:04|2019-10-15 11:00:45|         264|         247|   null|                B03047|
|              B00932|2019-10-08 06:58:42|2019-10-08 07:11:11|         264|          37|   null|                B00932|
|              B01029|2019-10-10 14:45:0

In [70]:
df.registerTempTable('trips_data')

In [71]:
spark.sql("""
SELECT
    count(*) AS total_trips
FROM
    trips_data
WHERE pickup_datetime BETWEEN '2019-10-15 00:00:00' AND '2019-10-15 23:59:59'
""").show()

+-----------+
|total_trips|
+-----------+
|      62610|
+-----------+



In [88]:
df.withColumn('DiffInSeconds',unix_timestamp("dropoff_datetime") - unix_timestamp('pickup_datetime')) \
    .withColumn('DiffInHours', col('DiffInSeconds')/3600) \
    .sort('DiffInSeconds', ascending=False) \
    .show(truncate=False)

[Stage 40:>                                                         (0 + 2) / 2]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+------------------+
|dispatching_base_num|pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|DiffInSeconds|DiffInHours       |
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------+------------------+
|B02832              |2019-10-11 18:00:00|2091-10-11 18:30:00|264         |264         |null   |B02832                |2272149000   |631152.5          |
|B02832              |2019-10-28 09:00:00|2091-10-28 09:30:00|264         |264         |null   |B02832                |2272149000   |631152.5          |
|B02416              |2019-10-31 23:46:33|2029-11-01 00:13:00|null        |null        |null   |B02416                |315620787    |87672.44083333333 |
|B00746              |2019-10-01 21:43:42|2027-10-01 21:45:23|159         |264    

                                                                                

In [89]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('./data/taxi+_zone_lookup.csv')

In [90]:
df_zones.show(truncate=False)

+----------+-------------+-----------------------+------------+
|LocationID|Borough      |Zone                   |service_zone|
+----------+-------------+-----------------------+------------+
|1         |EWR          |Newark Airport         |EWR         |
|2         |Queens       |Jamaica Bay            |Boro Zone   |
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |
|4         |Manhattan    |Alphabet City          |Yellow Zone |
|5         |Staten Island|Arden Heights          |Boro Zone   |
|6         |Staten Island|Arrochar/Fort Wadsworth|Boro Zone   |
|7         |Queens       |Astoria                |Boro Zone   |
|8         |Queens       |Astoria Park           |Boro Zone   |
|9         |Queens       |Auburndale             |Boro Zone   |
|10        |Queens       |Baisley Park           |Boro Zone   |
|11        |Brooklyn     |Bath Beach             |Boro Zone   |
|12        |Manhattan    |Battery Park           |Yellow Zone |
|13        |Manhattan    |Battery Park C

In [91]:
df_zones.createOrReplaceTempView("zones")

In [103]:
spark.sql("""
SELECT
    zones.LocationID,
    zones.Zone,
    COUNT(1) AS total_trips
FROM
    trips_data
INNER JOIN zones
    ON trips_data.PULocationID = zones.LocationID
GROUP BY zones.LocationID,
    zones.Zone
ORDER BY total_trips
""").show()

[Stage 51:>                                                         (0 + 2) / 2]

+----------+--------------------+-----------+
|LocationID|                Zone|total_trips|
+----------+--------------------+-----------+
|         2|         Jamaica Bay|          1|
|       105|Governor's Island...|          2|
|       111| Green-Wood Cemetery|          5|
|        30|       Broad Channel|          8|
|       120|     Highbridge Park|         14|
|        12|        Battery Park|         15|
|       207|Saint Michaels Ce...|         23|
|        27|Breezy Point/Fort...|         25|
|       154|Marine Park/Floyd...|         26|
|         8|        Astoria Park|         29|
|       128|    Inwood Hill Park|         39|
|       253|       Willets Point|         47|
|        96|Forest Park/Highl...|         53|
|        34|  Brooklyn Navy Yard|         57|
|        59|        Crotona Park|         62|
|        58|        Country Club|         77|
|        99|     Freshkills Park|         89|
|       190|       Prospect Park|         98|
|        54|     Columbia Street| 

                                                                                