In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/26 13:14:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [4]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
from pyspark.sql import types

In [8]:
fhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropOff_datetime', types.TimestampType(), True),
    types.StructField('PUlocationID', types.IntegerType(), True),
    types.StructField('DOlocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.IntegerType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [9]:
df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [12]:
df = df.repartition(6)

In [13]:
df.write.parquet('fhv/2019/10/')

                                                                                

In [14]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [15]:
from pyspark.sql import functions as F

In [16]:
df = spark.read \
    .option("header", "true") \
    .schema(fhv_schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [18]:
df_pickup_date_sel = df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropOff_datetime)) \
    .select('pickup_date')

In [20]:
df_pickup_date_sel.registerTempTable('fhv_pickup_date')



In [24]:
df_result = spark.sql("""
SELECT COUNT(*) AS total_count_with_pickup_date
FROM fhv_pickup_date AS f
WHERE f.pickup_date == '2019-10-15'
""")

In [25]:
df_result.show()

[Stage 11:>                                                         (0 + 4) / 4]

+----------------------------+
|total_count_with_pickup_date|
+----------------------------+
|                       62610|
+----------------------------+



                                                                                

In [27]:
df_trips_hours = df \
                    .withColumn('trip_hours', (F.unix_timestamp('dropOff_datetime') - F.unix_timestamp('pickup_datetime')) / 3600) \
                    .select('trip_hours')

In [28]:
df_trips_hours.registerTempTable('trips_in_hours')



In [29]:
df_longest_hours = spark.sql("""
SELECT MAX(t.trip_hours) AS longest_trip_hours
FROM trips_in_hours AS t
""")

In [30]:
df_longest_hours.show()

[Stage 14:>                                                         (0 + 4) / 4]

+------------------+
|longest_trip_hours|
+------------------+
|          631152.5|
+------------------+



                                                                                

In [31]:
df_taxi_zone = spark.read \
    .option("header", "true") \
    .csv('taxi+_zone_lookup.csv')

In [34]:
df_taxi_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [35]:
df_taxi_zone.registerTempTable('taxi_zone')

In [36]:
df_pickup_loc = df.select('PUlocationID')

In [37]:
df_pickup_loc.registerTempTable('pickup_location')

In [57]:
df_pickup_freq = spark.sql("""
SELECT
    t.Zone,
    COUNT(t.Zone) AS Frequency
FROM pickup_location AS p
INNER JOIN taxi_zone AS t
ON p.PUlocationID = t.LocationID
GROUP BY t.Zone
ORDER BY COUNT(t.Zone) ASC
LIMIT 5
""")

In [58]:
df_pickup_freq.show()



+--------------------+---------+
|                Zone|Frequency|
+--------------------+---------+
|         Jamaica Bay|        1|
|Governor's Island...|        2|
| Green-Wood Cemetery|        5|
|       Broad Channel|        8|
|     Highbridge Park|       14|
+--------------------+---------+



                                                                                