In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import pandas as pd

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/03 14:51:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.3.2'

In [4]:
schema = types.StructType([ 
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
]
)

In [5]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv("fhvhv_tripdata_2021-06.csv")

In [6]:
df = df.repartition(12)

In [8]:
df.write.parquet('fhvhv/2021/06/', mode = 'overwrite')



23/03/03 14:52:13 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [35]:
df \
    .withColumn("travel_time", df["dropoff_datetime"] - df["pickup_datetime"]) \
    .select(["pickup_datetime", "dropoff_datetime", "travel_time"]).show(truncate=False)



+-------------------+-------------------+-----------------------------------+
|pickup_datetime    |dropoff_datetime   |travel_time                        |
+-------------------+-------------------+-----------------------------------+
|2021-06-04 02:34:00|2021-06-04 02:40:36|INTERVAL '0 00:06:36' DAY TO SECOND|
|2021-06-01 17:43:52|2021-06-01 17:51:20|INTERVAL '0 00:07:28' DAY TO SECOND|
|2021-06-04 15:55:56|2021-06-04 16:18:28|INTERVAL '0 00:22:32' DAY TO SECOND|
|2021-06-03 22:54:55|2021-06-03 23:02:51|INTERVAL '0 00:07:56' DAY TO SECOND|
|2021-06-04 16:28:35|2021-06-04 17:00:47|INTERVAL '0 00:32:12' DAY TO SECOND|
|2021-06-01 18:25:31|2021-06-01 18:45:22|INTERVAL '0 00:19:51' DAY TO SECOND|
|2021-06-03 16:51:43|2021-06-03 16:58:59|INTERVAL '0 00:07:16' DAY TO SECOND|
|2021-06-02 15:52:18|2021-06-02 16:59:36|INTERVAL '0 01:07:18' DAY TO SECOND|
|2021-06-04 07:39:11|2021-06-04 07:50:57|INTERVAL '0 00:11:46' DAY TO SECOND|
|2021-06-04 18:58:42|2021-06-04 19:05:48|INTERVAL '0 00:07:06' D

                                                                                

In [37]:
df = df \
    .withColumn("travel_time", df["dropoff_datetime"] - df["pickup_datetime"])

In [50]:
df_zones = spark.read.parquet('zones/')

# SQL editor

In [39]:
df.createOrReplaceTempView('fhvhv_data')

In [51]:
df_zones.createOrReplaceTempView('zone_data')

In [14]:
spark.sql("""
SELECT
    COUNT(*)
FROM 
    fhvhv_data
WHERE
    DATE_TRUNC('day', pickup_datetime) = '2021-06-15'
""").show()



+--------+
|count(1)|
+--------+
|  452470|
+--------+



                                                                                

In [45]:
spark.sql("""
WITH temp_table AS
(
    SELECT 
        pickup_datetime, dropoff_datetime, DATEDIFF(second, pickup_datetime, dropoff_datetime) / 3600.0 AS travel_time_hours
    FROM
        fhvhv_data
)
SELECT
    MAX(travel_time_hours)
FROM
    temp_table
""").show()



+----------------------+
|max(travel_time_hours)|
+----------------------+
|             66.878889|
+----------------------+



                                                                                

In [49]:
spark.sql("""
WITH temp_table AS
(
    SELECT 
        pickup_datetime, 
        dropoff_datetime, 
        (DATE_PART('HOUR', travel_time) + (DATE_PART('MINUTE', travel_time) / 60) + (DATE_PART('SECOND', travel_time) / 3600)) AS travel_time_hours
    FROM
        fhvhv_data
)
SELECT
    MAX(travel_time_hours)
FROM
    temp_table
""").show()



+----------------------+
|max(travel_time_hours)|
+----------------------+
|    19.980833333336665|
+----------------------+



                                                                                

In [52]:
df_zones.columns

['locationid', 'borough', 'zone', 'service_zone']

In [53]:
df.columns

['dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag',
 'Affiliated_base_number',
 'travel_time']

In [63]:
spark.sql("""
SELECT 
    zone_data.*
FROM
    zone_data
WHERE 
    zone_data.locationid = 
        (
        SELECT
            locationid
        FROM
            fhvhv_data
        JOIN
            zone_data
        ON 
            fhvhv_data.PULocationID = zone_data.locationid
        GROUP BY
            locationid
        ORDER BY 
            COUNT(locationid) DESC
        LIMIT 1
        )
""").show()



+----------+--------+-------------------+------------+
|locationid| borough|               zone|service_zone|
+----------+--------+-------------------+------------+
|        61|Brooklyn|Crown Heights North|   Boro Zone|
+----------+--------+-------------------+------------+



                                                                                

In [66]:
spark.sql("""
SELECT
    locationid, COUNT(locationid)
FROM
    fhvhv_data
JOIN
    zone_data
ON 
    fhvhv_data.PULocationID = zone_data.locationid
GROUP BY
    locationid
ORDER BY 
    COUNT(locationid) DESC
""").show()

                                                                                

+----------+-----------------+
|locationid|count(locationid)|
+----------+-----------------+
|        61|           231279|
|        79|           221244|
|       132|           188867|
|        37|           187929|
|        76|           186780|
|       231|           164344|
|       138|           161596|
|       234|           158937|
|       249|           154698|
|         7|           152493|
|       148|           151020|
|        68|           147673|
|        42|           146402|
|       255|           143683|
|       181|           143594|
|       225|           141427|
|        48|           139611|
|       246|           139431|
|        17|           138428|
|       170|           137879|
+----------+-----------------+
only showing top 20 rows



In [68]:
spark.sql("""
SELECT
    PULocationID, COUNT(PULocationID)
FROM
    fhvhv_data
JOIN
    zone_data
ON 
    fhvhv_data.PULocationID = zone_data.locationid
GROUP BY
    PULocationID
ORDER BY 
    COUNT(PULocationID) DESC
""").show()

                                                                                

+------------+-------------------+
|PULocationID|count(PULocationID)|
+------------+-------------------+
|          61|             231279|
|          79|             221244|
|         132|             188867|
|          37|             187929|
|          76|             186780|
|         231|             164344|
|         138|             161596|
|         234|             158937|
|         249|             154698|
|           7|             152493|
|         148|             151020|
|          68|             147673|
|          42|             146402|
|         255|             143683|
|         181|             143594|
|         225|             141427|
|          48|             139611|
|         246|             139431|
|          17|             138428|
|         170|             137879|
+------------+-------------------+
only showing top 20 rows

