In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, DoubleType




In [2]:
spark = SparkSession \
        .builder \
        .appName('NYC-Taxi') \
        .getOrCreate()
        

23/11/28 12:09:33 WARN Utils: Your hostname, milkoutofmilk.local resolves to a loopback address: 127.0.0.1; using 192.168.1.119 instead (on interface en0)
23/11/28 12:09:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/28 12:09:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
trip_data_path = 'trip_data/trip_data_*.csv'
trip_fare_path = 'trip_fare/trip_fare_*.csv'

In [4]:
trip_data_schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("vendor_id", StringType(), True),
    StructField("rate_code", StringType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),  
    StructField("dropoff_datetime", TimestampType(), True), 
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("pickup_longitude", FloatType(), True),
    StructField("pickup_latitude", FloatType(), True),
    StructField("dropoff_longitude", FloatType(), True),
    StructField("dropoff_latitude", FloatType(), True)
])

trip_fare_schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])


In [5]:
# Example of reading a CSV file with a header
trip_data = spark.read.csv(trip_data_path, header=True, schema=trip_data_schema,ignoreLeadingWhiteSpace=True)
trip_fare = spark.read.csv(trip_fare_path, header=True, schema=trip_fare_schema,ignoreLeadingWhiteSpace=True)

# Show the DataFrames to verify
trip_data.show()
trip_fare.show()


+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N|2013-01-01 15:11:48|2013-01-01 15:18:10|              4|              382|          1.0|      -73.978165|      40.757977|        -73.98984|        40.75117|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...| 

In [6]:
trip_data.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)



In [7]:
trip_fare.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [8]:
columns_to_join = ["medallion", "hack_license", "pickup_datetime", "vendor_id"]

joined_df = trip_data.join(trip_fare, columns_to_join, "inner")

joined_df.show()


[Stage 6:>                                                          (0 + 1) / 1]

+--------------------+--------------------+-------------------+---------+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|    pickup_datetime|vendor_id|rate_code|store_and_fwd_flag|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+-------------------+---------+---------+------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|00005007A9F30E289...|15929F4E67FE2FFD1...|

                                                                                

In [9]:
from pyspark.sql import Window
from pyspark.sql.functions import avg, col

windowSpec = Window.partitionBy("hack_license")
tip_analysis = joined_df.withColumn("avg_tip_per_driver", avg("tip_amount").over(windowSpec))
tip_analysis.select("hack_license", "tip_amount", "avg_tip_per_driver").show()


23/11/28 12:18:33 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
[Stage 15:>                                                         (0 + 1) / 1]

+--------------------+----------+------------------+
|        hack_license|tip_amount|avg_tip_per_driver|
+--------------------+----------+------------------+
|001EEDEA00E57988E...|       3.7|1.6314545079430605|
|001EEDEA00E57988E...|       0.0|1.6314545079430605|
|001EEDEA00E57988E...|       1.9|1.6314545079430605|
|001EEDEA00E57988E...|       1.0|1.6314545079430605|
|001EEDEA00E57988E...|       0.0|1.6314545079430605|
|001EEDEA00E57988E...|       2.0|1.6314545079430605|
|001EEDEA00E57988E...|      4.75|1.6314545079430605|
|001EEDEA00E57988E...|       0.0|1.6314545079430605|
|001EEDEA00E57988E...|       1.3|1.6314545079430605|
|001EEDEA00E57988E...|       1.3|1.6314545079430605|
|001EEDEA00E57988E...|       1.3|1.6314545079430605|
|001EEDEA00E57988E...|       0.0|1.6314545079430605|
|001EEDEA00E57988E...|      1.75|1.6314545079430605|
|001EEDEA00E57988E...|     14.45|1.6314545079430605|
|001EEDEA00E57988E...|      4.85|1.6314545079430605|
|001EEDEA00E57988E...|       2.6|1.63145450794

                                                                                

In [10]:
from pyspark.sql.functions import sum as _sum, col

top_drivers_by_tips = joined_df.groupBy("hack_license").agg(_sum("tip_amount").alias("total_tips"))

top_20_drivers = top_drivers_by_tips.orderBy(col("total_tips").desc()).limit(10)

top_20_drivers.show()


23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:20:59 WARN RowBasedKeyValueBatch: Calling spill() on

+--------------------+------------------+
|        hack_license|        total_tips|
+--------------------+------------------+
|D85749E8852FCC66A...|           22146.2|
|51C1BE97280A80EBF...| 21452.90000000002|
|3AAB94CA53FE93A64...|21280.209999999995|
|AB6F028ECDB62E44B...|          20267.51|
|AFC75912D19C0CB0D...|20162.090000000004|
|74CC809D28AE726DD...|          19826.16|
|3D757E111C78F5CAC...|19497.399999999994|
|23DF80C977D15141F...|18958.270000000008|
|23F5E8FB4BC7E65E8...|18719.449999999997|
|6DC841A5F028073A7...| 18631.61999999999|
+--------------------+------------------+



                                                                                

In [11]:
from pyspark.sql.functions import sum as _sum

profitable_areas = joined_df.groupBy("pickup_longitude", "pickup_latitude").agg(_sum("total_amount").alias("total_revenue"))
profitable_areas.orderBy(col("total_revenue").desc()).show()


23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:25:40 WARN RowBasedKeyValueBatch: Calling spill() on

+----------------+---------------+-------------------+
|pickup_longitude|pickup_latitude|      total_revenue|
+----------------+---------------+-------------------+
|             0.0|            0.0|4.548423964999969E7|
|       -73.99338|      40.764465|           685908.1|
|       -73.96863|      40.762646|          541432.56|
|       -73.86291|      40.769062| 163262.00999999998|
|       -74.02305|       40.76599|          107411.18|
|       -73.94872|      40.744843| 102233.90999999999|
|       -74.00173|       40.71953|           82291.47|
|       -73.94904|      40.744915|  62336.26999999999|
|       -73.96771|      40.755966| 61739.979999999996|
|      -73.990944|      40.736053|           39561.78|
|      -73.937584|       40.75801| 38353.409999999996|
|        -73.9488|      40.744625|           32358.03|
|       -73.98807|      40.622555|           31362.04|
|      -73.965034|      40.759426|           30350.54|
|       -73.93752|       40.75815| 27878.170000000006|
|       -7

In [12]:
from pyspark.sql.functions import month

seasonal_usage = joined_df.withColumn("month", month("pickup_datetime")) \
    .groupBy("month").count()
seasonal_usage.show()


23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:31:33 WARN RowBasedKeyValueBatch: Calling spill() on

+-----+--------+
|month|   count|
+-----+--------+
|   12|13971118|
|    1|14776615|
|    6|14385456|
|    3|15749674|
|    5|15285049|
|    9|14107693|
|    4|15101772|
|    8|12601137|
|    7|13823840|
|   10|15004556|
|   11|14388451|
|    2|13990176|
+-----+--------+



                                                                                

traffic impact
join, groupby

In [13]:
from pyspark.sql.functions import hour



hours_df = spark.createDataFrame([(i,) for i in range(24)], ["hour"])

peak_hours = joined_df.withColumn("hour", hour("pickup_datetime")).groupBy("hour").count()

peak_hours_complete = hours_df.join(peak_hours, "hour", "left_outer")

peak_hours_complete = peak_hours_complete.na.fill({'count': 0})

peak_hours_complete.orderBy("hour").show(24)


23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:36:03 WARN RowBasedKeyValueBatch: Calling spill() on

+----+--------+
|hour|   count|
+----+--------+
|   0| 6913355|
|   1| 5106334|
|   2| 3789150|
|   3| 2748857|
|   4| 2041728|
|   5| 1736161|
|   6| 3587773|
|   7| 6286016|
|   8| 7785468|
|   9| 8061740|
|  10| 7820912|
|  11| 8071800|
|  12| 8506933|
|  13| 8419756|
|  14| 8683450|
|  15| 8256548|
|  16| 6925238|
|  17| 8436620|
|  18|10382944|
|  19|10858113|
|  20|10276327|
|  21|10068149|
|  22| 9767663|
|  23| 8654502|
+----+--------+



                                                                                

In [14]:
hours_df = spark.createDataFrame([(i,) for i in range(24)], ["hour"])

traffic_impact = joined_df.withColumn("hour", hour("pickup_datetime")) \
    .groupBy("hour").agg(avg("trip_time_in_secs").alias("avg_trip_time"))

traffic_impact_complete = hours_df.join(traffic_impact, "hour", "left_outer")

traffic_impact_complete = traffic_impact_complete.na.fill({'avg_trip_time': 0})

traffic_impact_complete.orderBy("hour").show(24)

23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:16 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/11/28 12:40:17 WARN RowBasedKeyValueBatch: Calling spill() on

+----+-----------------+
|hour|    avg_trip_time|
+----+-----------------+
|   0|786.9639183869482|
|   1|753.0351639747811|
|   2|739.9250977132075|
|   3|802.0624350411825|
|   4|803.4567586867595|
|   5|729.1247309437316|
|   6|642.9505080170902|
|   7|711.5702487871491|
|   8|801.7268577817031|
|   9|810.7737106381501|
|  10|804.3750993490273|
|  11|828.5795383929235|
|  12|846.1517151951238|
|  13|  860.07562974509|
|  14|911.7039434786865|
|  15|919.1898602176116|
|  16|906.4243445496024|
|  17|  899.08709530594|
|  18|851.2316361332586|
|  19|791.7185499911449|
|  20|758.0192109495932|
|  21| 746.320219635208|
|  22|760.8269272803535|
|  23|798.5005142988008|
+----+-----------------+



                                                                                

drivers by rank

window, group by

In [15]:
from pyspark.sql.functions import sum as _sum, rank
from pyspark.sql.window import Window

windowSpec = Window.orderBy(col("total_fares").desc())

driver_fares = joined_df.groupBy("hack_license").agg(_sum("fare_amount").alias("total_fares"))
driver_ranks = driver_fares.withColumn("fare_rank", rank().over(windowSpec))
driver_ranks.select("hack_license", "total_fares", "fare_rank").show()


23/11/28 12:42:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/28 12:42:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/28 12:42:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/28 12:43:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/28 12:43:25 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/28 12:44:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/11/28 1

+--------------------+------------------+---------+
|        hack_license|       total_fares|fare_rank|
+--------------------+------------------+---------+
|CFCD208495D565EF6...| 588844.4700000003|        1|
|1EDF99EE9DAC18202...|         227184.81|        2|
|D85749E8852FCC66A...|         206460.62|        3|
|3AAB94CA53FE93A64...|         192491.28|        4|
|3D757E111C78F5CAC...|         191819.58|        5|
|74CC809D28AE726DD...|         185978.72|        6|
|51C1BE97280A80EBF...|          184240.0|        7|
|AB6F028ECDB62E44B...|          181987.0|        8|
|23DF80C977D15141F...|          178128.0|        9|
|C4B62F0697BC53B98...|          171440.1|       10|
|6DC841A5F028073A7...|          170971.0|       11|
|23F5E8FB4BC7E65E8...|          168516.0|       12|
|AFC75912D19C0CB0D...|         168012.75|       13|
|03173DD93C1171DA1...|          167293.0|       14|
|C4E07B8C06FAB8E54...|          160842.8|       15|
|9D1B49F1300FE0067...|          159692.2|       16|
|CE625FD96D0

                                                                                