In [1]:
import pyspark
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .config('spark.driver.memory', '2g') \
    .getOrCreate()

23/03/06 17:33:34 WARN Utils: Your hostname, Zambo-ROG resolves to a loopback address: 127.0.1.1; using 172.30.104.214 instead (on interface eth0)
23/03/06 17:33:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/03/06 17:33:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/06 17:33:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark

In [32]:
sc = spark.sparkContext

In [21]:
schema = T.StructType([
    T.StructField('dispatching_base_num', T.StringType(), True),
    T.StructField('pickup_datetime', T.TimestampType(), True),
    T.StructField('dropoff_datetime', T.TimestampType(), True),
    T.StructField('PULocationID', T.IntegerType(), True),
    T.StructField('DOLocationID', T.IntegerType(), True),
    T.StructField('SR_Flag', T.StringType(), True),
    T.StructField('Affiliated_base_number', T.StringType(), True)
])

In [22]:
df = spark.read \
    .schema(schema) \
    .option('header', 'true') \
    .csv('data/fhvhv_tripdata_2021-06.csv') \

In [24]:
!head data/fhvhv_tripdata_2021-06.csv -n 5

dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag,Affiliated_base_number
B02764,2021-06-01 00:02:41,2021-06-01 00:07:46,174,18,N,B02764
B02764,2021-06-01 00:16:16,2021-06-01 00:21:14,32,254,N,B02764
B02764,2021-06-01 00:27:01,2021-06-01 00:42:11,240,127,N,B02764
B02764,2021-06-01 00:46:08,2021-06-01 00:53:45,127,235,N,B02764


In [23]:
df.filter(
    df.Affiliated_base_number.isNotNull()
).show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02764|2021-06-01 00:48:06|2021-06-01 01:04:10|         209|          45|      N|                B02764|
|              B02875|2021-06-01 00:18:5

In [49]:
df = df.repartition(12)

In [34]:
df.write \
    .mode('overwrite') \
    .parquet('data/fhvhv/2021/06') \

                                                                                

In [50]:
sc.setCheckpointDir('data/checkpoint')

In [51]:
df = df.coalesce(1).checkpoint()

                                                                                

In [52]:
df.filter(F.to_date(df.pickup_datetime) == '2021-06-15').count()

                                                                                

452470

In [61]:
df \
    .withColumn(
        'trip_duration_in_hours',
        (F.unix_timestamp(df.dropoff_datetime) - F.unix_timestamp(df.pickup_datetime)) / 3600
    ) \
    .select(F.max('trip_duration_in_hours')) \
    .show()

[Stage 36:>                                                         (0 + 1) / 1]

+---------------------------+
|max(trip_duration_in_hours)|
+---------------------------+
|           66.8788888888889|
+---------------------------+



                                                                                

In [71]:
lookup_schema = '''
LocationID integer,
Borough string,
Zone string,
service_zone string
'''

zone_lookup_df = spark.read \
    .schema(lookup_schema) \
    .option('header', True) \
    .csv('data/taxi_zone_lookup.csv')

In [77]:
zone_lookup_df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [76]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-02 09:03:22|2021-06-02 09:32:19|          91|          17|      N|                B02764|
|              B02889|2021-06-02 22:33:27|2021-06-02 22:44:25|          90|         231|      N|                B02889|
|              B02510|2021-06-02 22:33:16|2021-06-02 22:48:12|         140|          50|      N|                  null|
|              B02510|2021-06-01 01:58:42|2021-06-01 02:06:21|          48|         229|      N|                  null|
|              B02765|2021-06-03 00:04:41|2021-06-03 02:02:34|         179|         265|      N|                B02765|
+--------------------+------------------

In [74]:
df.createOrReplaceTempView('trips')
zone_lookup_df.createOrReplaceTempView('zones')

In [82]:
spark.sql('''
SELECT
    zones.Zone
    , count(1) AS count
FROM trips
INNER JOIN zones
    ON trips.PULocationID = zones.LocationID
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1
''').show()

[Stage 56:>                                                         (0 + 1) / 1]

+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+



                                                                                