In [2]:
!apt-get install openjdk-11-jdk -y
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.27+6~us1-0ubuntu1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 35 not upgraded.


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("NYC Taxi").getOrCreate()


In [4]:
from google.colab import files
uploaded = files.upload()


Saving yellow_tripdata_2020-01.parquet to yellow_tripdata_2020-01.parquet


In [5]:
df = spark.read.parquet("yellow_tripdata_2020-01.parquet")
df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|            1.0|          1.2|       1.0|                 N|         238|         239|           1|        6.0|  3.0|    0.5|      1.4

In [6]:
df.printSchema()


root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)



In [7]:
df.select("vendorID", "passenger_count", "trip_distance", "total_amount").show(10)
df.describe(["trip_distance", "total_amount"]).show()


+--------+---------------+-------------+------------+
|vendorID|passenger_count|trip_distance|total_amount|
+--------+---------------+-------------+------------+
|       1|            1.0|          1.2|       11.27|
|       1|            1.0|          1.2|        12.3|
|       1|            1.0|          0.6|        10.8|
|       1|            1.0|          0.8|        8.16|
|       2|            1.0|          0.0|         4.8|
|       2|            1.0|         0.03|         3.8|
|       2|            1.0|          0.0|        3.81|
|       2|            1.0|          0.0|        2.81|
|       2|            4.0|          0.0|         6.3|
|       1|            2.0|          0.7|       14.15|
+--------+---------------+-------------+------------+
only showing top 10 rows

+-------+-----------------+------------------+
|summary|    trip_distance|      total_amount|
+-------+-----------------+------------------+
|  count|          6405008|           6405008|
|   mean| 2.92964393330939| 18

In [8]:
from pyspark.sql.functions import to_timestamp

df = df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
       .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))


In [9]:
df.filter((df["trip_distance"] > 10) & (df["total_amount"] > 50)).show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2020-01-01 00:42:15|  2020-01-01 01:16:20|            1.0|        14.52|       1.0|                 N|         185|         226|           1|       41.0|  0.5|    0.5|      8.4

In [10]:
df.groupBy("payment_type").avg("total_amount").show()


+------------+------------------+
|payment_type| avg(total_amount)|
+------------+------------------+
|           0| 37.21709142586299|
|           5|               0.0|
|           1|19.602178445639286|
|           3| 9.933256637164838|
|           2| 15.51622246778336|
|           4|0.8906255189593136|
+------------+------------------+



In [11]:
from pyspark.sql.functions import col

df = df.withColumn("Revenue",
    col("fare_amount") + col("extra") + col("mta_tax") +
    col("improvement_surcharge") + col("tip_amount") +
    col("tolls_amount") + col("total_amount")
)
df.select("Revenue").show(5)


+-------+
|Revenue|
+-------+
|  22.54|
|   24.6|
|   21.6|
|  16.32|
|    9.6|
+-------+
only showing top 5 rows



In [12]:
df.groupBy("PULocationID") \
  .sum("passenger_count") \
  .withColumnRenamed("sum(passenger_count)", "total_passengers") \
  .orderBy("total_passengers", ascending=False) \
  .show()


+------------+----------------+
|PULocationID|total_passengers|
+------------+----------------+
|         237|        433243.0|
|         161|        425986.0|
|         236|        403347.0|
|         230|        360096.0|
|         162|        351011.0|
|         186|        338952.0|
|         132|        326402.0|
|          48|        297148.0|
|         142|        294502.0|
|         170|        289593.0|
|         234|        284965.0|
|         163|        267047.0|
|         239|        263583.0|
|          79|        244515.0|
|         141|        237341.0|
|          68|        227635.0|
|         164|        218138.0|
|         107|        215684.0|
|         238|        199367.0|
|         138|        197352.0|
+------------+----------------+
only showing top 20 rows



In [13]:
df.groupBy("VendorID") \
  .agg({"total_amount": "avg"}) \
  .withColumnRenamed("avg(total_amount)", "average_earning") \
  .show()


+--------+-----------------+
|VendorID|  average_earning|
+--------+-----------------+
|       5|           63.754|
|       1|18.11342940530567|
|       2|18.93019697824438|
+--------+-----------------+



In [14]:
from pyspark.sql.functions import window

df.groupBy(
    window("tpep_pickup_datetime", "1 day"),  # Change window as needed
    "payment_type"
).count().orderBy("window").show(truncate=False)


+------------------------------------------+------------+------+
|window                                    |payment_type|count |
+------------------------------------------+------------+------+
|{2003-01-01 00:00:00, 2003-01-02 00:00:00}|2           |1     |
|{2008-12-31 00:00:00, 2009-01-01 00:00:00}|1           |5     |
|{2008-12-31 00:00:00, 2009-01-01 00:00:00}|2           |5     |
|{2009-01-01 00:00:00, 2009-01-02 00:00:00}|1           |1     |
|{2009-01-01 00:00:00, 2009-01-02 00:00:00}|2           |18    |
|{2019-12-18 00:00:00, 2019-12-19 00:00:00}|1           |2     |
|{2019-12-31 00:00:00, 2020-01-01 00:00:00}|1           |84    |
|{2019-12-31 00:00:00, 2020-01-01 00:00:00}|3           |2     |
|{2019-12-31 00:00:00, 2020-01-01 00:00:00}|2           |43    |
|{2020-01-01 00:00:00, 2020-01-02 00:00:00}|4           |665   |
|{2020-01-01 00:00:00, 2020-01-02 00:00:00}|0           |859   |
|{2020-01-01 00:00:00, 2020-01-02 00:00:00}|3           |1041  |
|{2020-01-01 00:00:00, 20

In [15]:
from pyspark.sql.functions import to_date

df_with_date = df.withColumn("date", to_date("tpep_pickup_datetime"))

df_with_date.filter(df_with_date["date"] == "2020-01-15") \
    .groupBy("VendorID") \
    .agg({"total_amount": "sum", "passenger_count": "sum", "trip_distance": "sum"}) \
    .orderBy("sum(total_amount)", ascending=False) \
    .show(2)


+--------+------------------+--------------------+------------------+
|VendorID| sum(total_amount)|sum(passenger_count)|sum(trip_distance)|
+--------+------------------+--------------------+------------------+
|       2| 2781421.719999112|            233339.0|426187.19000000175|
|       1|1319816.5300006857|             82508.0|190960.49999999945|
+--------+------------------+--------------------+------------------+



In [16]:
df.groupBy("PULocationID", "DOLocationID") \
  .sum("passenger_count") \
  .withColumnRenamed("sum(passenger_count)", "total_passengers") \
  .orderBy("total_passengers", ascending=False) \
  .show(5)


+------------+------------+----------------+
|PULocationID|DOLocationID|total_passengers|
+------------+------------+----------------+
|         237|         236|         67885.0|
|         236|         236|         57662.0|
|         236|         237|         56488.0|
|         237|         237|         49757.0|
|         264|         264|         44789.0|
+------------+------------+----------------+
only showing top 5 rows



In [17]:
latest_time = df.agg({"tpep_pickup_datetime": "max"}).collect()[0][0]

from pyspark.sql.functions import expr

df.filter(
    (col("tpep_pickup_datetime") >= expr(f"timestamp('{latest_time}') - interval 10 seconds"))
).groupBy("PULocationID") \
 .sum("passenger_count") \
 .orderBy("sum(passenger_count)", ascending=False) \
 .show()


+------------+--------------------+
|PULocationID|sum(passenger_count)|
+------------+--------------------+
|          90|                 1.0|
+------------+--------------------+



In [18]:
df.write.mode("overwrite").parquet("yellow_tripdata_with_revenue.parquet")
