In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.functions import col,avg,count,desc

In [3]:
spark = SparkSession.builder \
    .appName("NYC Taxi") \
    .getOrCreate()

In [4]:
df = spark.read.csv("w3.csv.csv",header =True,inferSchema=True)

In [5]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



In [6]:
df.show(7)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [42]:
df_clean = df.filter(
    (col("fare_amount")>0)&
    (col("passenger_count")>0)&
    (col("trip_distance")>0)
)

In [43]:
df_clean.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       1| 2025-01-01 00:18:38|  2025-01-01 00:26:59|              1|          1.6|         1|                 N|         229|    

In [44]:
df_avg = df_clean.groupBy("passenger_count") \
    .agg(avg("fare_amount").alias("average_fare")) \
    .orderBy("passenger_count")

In [45]:
df_avg.show()

+---------------+------------------+
|passenger_count|      average_fare|
+---------------+------------------+
|              1|17.617009353033016|
|              2|20.944938963178263|
|              3|21.247605566109012|
|              4| 23.76566653571659|
|              5| 17.25056575308021|
|              6| 17.93215799614644|
|              7|              79.0|
|              8|              80.0|
|              9|              90.0|
+---------------+------------------+



In [46]:
count_pickup = df_clean.groupBy("PULocationID") \
    .agg(count("*").alias("trip_count")) \
    .orderBy(desc("trip_count"))

In [47]:
count_pickup.show()

+------------+----------+
|PULocationID|trip_count|
+------------+----------+
|         132|     65565|
|         237|     60423|
|         161|     60125|
|         236|     58225|
|         230|     49238|
|         186|     46902|
|         162|     42377|
|         142|     42063|
|         138|     38316|
|         239|     36059|
|         163|     35591|
|         170|     32677|
|          68|     32315|
|         234|     31607|
|          48|     31591|
|         141|     30243|
|         140|     26663|
|         164|     25962|
|          79|     25082|
|         249|     24728|
+------------+----------+
only showing top 20 rows



In [48]:
df_avg.coalesce(1).write.csv("output/avg_fare_by_passenger", header=True)
count_pickup.coalesce(1).write.csv("output/top_pickups", header = True)

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/C:/Users/Santosh/project1/output/avg_fare_by_passenger already exists. Set mode as "overwrite" to overwrite the existing path.

In [None]:
spark.stop()