<a href="https://colab.research.google.com/github/ydesquitado/DSC1113/blob/main/DSC1113_DESQUITADO_FA4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [51]:
!pip install pyspark py4j



In [52]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, avg, sum, to_date, datediff, expr

spark = SparkSession.builder.appName("PySparkColab").getOrCreate()

In [53]:
path = '/content/yellow_tripdata_2025-01.parquet'

df = spark.read.parquet(path,
                        header = True)
df.printSchema()
df.show(10)

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+------

In [54]:
dfCleaned = df.dropna(subset = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'fare_amount'])
dfCleaned.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [55]:
dfFiltered = dfCleaned.filter((dfCleaned.trip_distance > 0) & (dfCleaned.fare_amount > 0))
dfFiltered.show(10)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [56]:
dfRenamed = dfFiltered.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime')
dfRenamed = dfRenamed.withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
dfRenamed = dfRenamed.withColumnRenamed('PULocationID', 'pickup_locID')
dfRenamed = dfRenamed.withColumnRenamed('DOLocationID', 'dropoff_locID')
dfRenamed.show(10)

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|pickup_locID|dropoff_locID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1|2025-01-01 00:18:38|2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|          237

In [57]:
totalFair = dfRenamed.agg({'fare_amount': 'sum'})
totalPassenger = dfRenamed.agg({'passenger_count': 'sum'})
averageFair = totalFair.collect()[0][0] / totalPassenger.collect()[0][0]

print(f"Total fair: {totalFair.collect()[0][0]}")
print(f"Total passenger: {totalPassenger.collect()[0][0]}")
print(f"Average fair: {averageFair}")

Total fair: 59290060.92001146
Total passenger: 3680902
Average fair: 16.107481514045052


In [58]:
countPickUp = dfRenamed.groupBy('pickup_locID').count().sort('count', ascending = False)
countPickUp.show(10)

print(f'The top 1 pick-up location is: {countPickUp.collect()[0][0]}, with {countPickUp.collect()[0][1]} counts.')
print(f'The top 2 pick-up location is: {countPickUp.collect()[1][0]}, with {countPickUp.collect()[1][1]} counts.')
print(f'The top 3 pick-up location is: {countPickUp.collect()[2][0]}, with {countPickUp.collect()[2][1]} counts.')
print(f'The top 4 pick-up location is: {countPickUp.collect()[3][0]}, with {countPickUp.collect()[3][1]} counts.')
print(f'The top 5 pick-up location is: {countPickUp.collect()[4][0]}, with {countPickUp.collect()[4][1]} counts.')

+------------+------+
|pickup_locID| count|
+------------+------+
|         161|161414|
|         237|158098|
|         236|149839|
|         132|134836|
|         230|118268|
|         186|114142|
|         162|112640|
|         142|106057|
|         239| 91914|
|         163| 91567|
+------------+------+
only showing top 10 rows

The top 1 pick-up location is: 161, with 161414 counts.
The top 2 pick-up location is: 237, with 158098 counts.
The top 3 pick-up location is: 236, with 149839 counts.
The top 4 pick-up location is: 132, with 134836 counts.
The top 5 pick-up location is: 230, with 118268 counts.


In [59]:
dfRenamed = dfRenamed.withColumn("pickup_datetime", unix_timestamp(col("pickup_datetime")).cast("timestamp")) \
                       .withColumn("dropoff_datetime", unix_timestamp(col("dropoff_datetime")).cast("timestamp"))

In [60]:
dfRenamed = dfRenamed.withColumn("trip_duration", (col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 60)
dfRenamed.show(10)

+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+------------------+
|VendorID|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|pickup_locID|dropoff_locID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|     trip_duration|
+--------+-------------------+-------------------+---------------+-------------+----------+------------------+------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+------------------+
|       1|2025-01-01 00:18:38|2025-01-01 00:26:59|              1|          1.6

In [61]:
dfDaily = dfRenamed.withColumn("trip_date", to_date(col("pickup_datetime"))) \
    .groupBy("trip_date") \
    .agg(sum("trip_distance").alias("total_trip_distance"),
         avg("trip_duration").alias("avg_trip_duration"))

dfDaily.show(10)

+----------+-------------------+------------------+
| trip_date|total_trip_distance| avg_trip_duration|
+----------+-------------------+------------------+
|2025-01-09|  608176.5600000052|15.464664529227802|
|2025-01-14|  788489.4100000055|15.063421423722941|
|2025-01-18| 424121.83000000275| 14.38218315375618|
|2025-01-17|  848059.6699999978|16.066730181787868|
|2025-01-13|  545132.5200000003|15.001899916797006|
|2024-12-31|              76.81|16.805555555555557|
|2025-01-10|  544009.3200000017|15.169033019917526|
|2025-01-08| 318620.56000000116|14.771554037873857|
|2025-01-19|  637236.9100000036|13.023908918695026|
|2025-01-12|  802457.2899999972|13.956667871278771|
+----------+-------------------+------------------+
only showing top 10 rows



In [62]:
dfDaily.coalesce(1).write.csv("/content/drive/MyDrive/dfDaily.csv",
                              header = True,
                              mode = "overwrite")
