In [1]:
!apt-get install openjdk-11-jdk -y
!pip install pyspark


Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following additional packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-11-jre x11-utils
Suggested packages:
  libxt-doc openjdk-11-demo openjdk-11-source visualvm mesa-utils
The following NEW packages will be installed:
  fonts-dejavu-core fonts-dejavu-extra libatk-wrapper-java libatk-wrapper-java-jni libfontenc1
  libxkbfile1 libxt-dev libxtst6 libxxf86dga1 openjdk-11-jdk openjdk-11-jre x11-utils
0 upgraded, 12 newly installed, 0 to remove and 29 not upgraded.
Need to get 5,453 kB of archives.
After this operation, 15.5 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-core all 2.37-2build1 [1,041 kB]
Get:2 http://archive.ubuntu.com/ubuntu jammy/main amd64 fonts-dejavu-extra all 2.37-2build1 [2,041 kB

In [3]:
!pip install kaggle




In [4]:
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json  # Set permissions


In [5]:
#!/bin/bash
!kaggle datasets download elemento/nyc-yellow-taxi-trip-data

Dataset URL: https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data
License(s): U.S. Government Works


In [6]:
!unzip nyc-yellow-taxi-trip-data.zip -d /content/


Archive:  nyc-yellow-taxi-trip-data.zip
  inflating: /content/yellow_tripdata_2015-01.csv  
  inflating: /content/yellow_tripdata_2016-01.csv  
  inflating: /content/yellow_tripdata_2016-02.csv  
  inflating: /content/yellow_tripdata_2016-03.csv  


In [7]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("NYC_Taxi_Analysis").getOrCreate()


In [8]:
df = spark.read.csv("/content/yellow_tripdata_2016-01.csv", header=True, inferSchema=True)

# Show first 5 rows
df.show(5)


+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2016-01-01 00:00:00|  2016-01-01 00:00:00|              2|          1.1|-73.99037170410156| 40.73469543457031|         1|    

In [9]:
df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



In [26]:
row_count = df.count()
print(f"Total number of trips: {row_count}")

Total number of trips: 10906858


#  Average trip distance by pickup hour

In [18]:

df_with_hour = df.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))
avg_distance_by_hour = df_with_hour.groupBy("pickup_hour").avg("trip_distance").orderBy("pickup_hour")
avg_distance_by_hour.show(24)

+-----------+------------------+
|pickup_hour|avg(trip_distance)|
+-----------+------------------+
|          0| 3.328650267401285|
|          1| 3.247242986764335|
|          2|3.2484838146758577|
|          3|3.4685267545674927|
|          4| 4.109216089043877|
|          5| 4.603352816724641|
|          6| 3.526673113628757|
|          7|2.8310358234853608|
|          8| 2.529338012205217|
|          9| 2.522817470884903|
|         10|12.339673500979927|
|         11| 5.871557961930464|
|         12|2.5972457354015446|
|         13|2.7525329680351542|
|         14| 21.45645973595721|
|         15|2.8751887404654557|
|         16|2.8934940079476843|
|         17| 3.824401189309708|
|         18| 2.576705789539844|
|         19| 2.626591070816759|
|         20| 2.904406535365733|
|         21|3.0478349601585526|
|         22| 6.080801514570257|
|         23|3.3807939627779318|
+-----------+------------------+



#  Count trips by passenger count

In [19]:

trips_by_passenger = df.groupBy("passenger_count").count().orderBy("passenger_count")
trips_by_passenger.show()

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              0|    520|
|              1|7726984|
|              2|1561977|
|              3| 436431|
|              4| 210641|
|              5| 601079|
|              6| 369155|
|              7|     22|
|              8|     26|
|              9|     23|
+---------------+-------+



#  Average fare amount by payment type

In [20]:

avg_fare_by_payment = df.groupBy("payment_type").avg("fare_amount").orderBy("payment_type")
avg_fare_by_payment.show()

+------------+------------------+
|payment_type|  avg(fare_amount)|
+------------+------------------+
|           1|13.000073149307978|
|           2|11.500276313672684|
|           3|11.392890994023855|
|           4|11.101148311087911|
|           5|              10.0|
+------------+------------------+



# Revenue Impact of Extra Charges

In [22]:
from pyspark.sql.functions import sum as spark_sum
revenue_breakdown = df.agg(
    spark_sum("fare_amount").alias("total_fare"),
    spark_sum("extra").alias("total_extra"),
    spark_sum("mta_tax").alias("total_mta_tax"),
    spark_sum("tolls_amount").alias("total_tolls"),
    spark_sum("improvement_surcharge").alias("total_surcharge"),
    spark_sum("total_amount").alias("grand_total")
)
revenue_breakdown.show()

+--------------------+-----------------+-------------+------------------+-----------------+--------------------+
|          total_fare|      total_extra|total_mta_tax|       total_tolls|  total_surcharge|         grand_total|
+--------------------+-----------------+-------------+------------------+-----------------+--------------------+
|1.3619316658999982E8|3414672.090000001|   5428021.03|3199475.1200007726|3269052.129950934|1.7059847689100492E8|
+--------------------+-----------------+-------------+------------------+-----------------+--------------------+



# Tip Amount vs. Fare Amount Correlation

In [25]:
# Average tip amount by fare amount ranges (binning fares)
from pyspark.sql.functions import when, col
df_with_bins = df.withColumn("fare_bin",
                             when(col("fare_amount") < 10, "0-10")
                             .when(col("fare_amount") < 20, "10-20")
                             .when(col("fare_amount") < 30, "20-30")
                             .otherwise("30+"))
tip_by_fare = df_with_bins.groupBy("fare_bin").avg("tip_amount").orderBy("fare_bin")
tip_by_fare.show()

+--------+------------------+
|fare_bin|   avg(tip_amount)|
+--------+------------------+
|    0-10|0.9652382087608522|
|   10-20|1.8102988509274376|
|   20-30|3.3707501651749965|
|     30+| 6.249755560833871|
+--------+------------------+



# Payment Type Analysis

In [27]:
from pyspark.sql.functions import hour, count

In [28]:
df.groupBy("payment_type").agg(count("*").alias("payment_count")) \
    .orderBy(col("payment_count").desc()).show()


+------------+-------------+
|payment_type|payment_count|
+------------+-------------+
|           1|      7181476|
|           2|      3673651|
|           3|        38319|
|           4|        13411|
|           5|            1|
+------------+-------------+



#  Peak Travel Hours

In [29]:
# Extract hour from pickup time
df = df.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))

# Count trips per hour
df.groupBy("pickup_hour").agg(count("*").alias("trip_count")) \
    .orderBy(col("pickup_hour")).show()

+-----------+----------+
|pickup_hour|trip_count|
+-----------+----------+
|          0|    396034|
|          1|    300325|
|          2|    229220|
|          3|    169130|
|          4|    125601|
|          5|    111548|
|          6|    236287|
|          7|    404567|
|          8|    491429|
|          9|    491927|
|         10|    480614|
|         11|    502554|
|         12|    535279|
|         13|    531803|
|         14|    562636|
|         15|    564002|
|         16|    511344|
|         17|    591940|
|         18|    692093|
|         19|    682974|
+-----------+----------+
only showing top 20 rows

