In [1]:
pip install pyspark


Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [14]:
from pyspark.sql import SparkSession

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("NYC Yellow Taxi Data Analysis") \
    .getOrCreate()
spark

In [15]:
# Read the CSV file into a DataFrame
df = spark.read.csv("D:/YellowTaxiData/yellow_tripdata_2020-06.csv", header=True, inferSchema=True)

# Show the first few records to verify that the DataFrame is created successfully
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       1| 2020-06-01 00:31:23|  2020-06-01 00:49:58|              1|          3.6|         1|                 N|         140|          68|           1|       15.5|  3.0|    0.5|       4.0|         0.0|                  0.3

In [16]:
# Read the lookup CSV file into a DataFrame
dfl = spark.read.csv("D:/YellowTaxiData/taxi_zone_lookup.csv", header=True, inferSchema=True)

# Show the first few records to verify that the DataFrame is created successfully
dfl.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

### Count the number of taxi trips for each hour 

In [17]:
from pyspark.sql import functions as F

# Extract the hour from the pickup datetime
df_with_hour = df.withColumn("pickup_hour", F.hour(F.col("tpep_pickup_datetime")))

# Count the number of trips for each hour
hourly_counts = df_with_hour.groupBy("pickup_hour").count().orderBy("pickup_hour")

# Show the result
hourly_counts.show()


+-----------+-----+
|pickup_hour|count|
+-----------+-----+
|          0| 8122|
|          1| 6643|
|          2| 5111|
|          3| 5124|
|          4| 7136|
|          5| 6955|
|          6|14907|
|          7|19957|
|          8|24824|
|          9|28408|
|         10|31948|
|         11|35190|
|         12|38083|
|         13|39475|
|         14|40525|
|         15|40971|
|         16|38627|
|         17|38225|
|         18|34181|
|         19|26477|
+-----------+-----+
only showing top 20 rows



### 2.2 Create a table view of the data frame created in step 1 above and write SparkSQL queries to find out the following:

In [25]:
# Create a temporary table view for SQL queries
df.createOrReplaceTempView("yellow_taxi_data")
# Create a Zone Lookup table view for SQL queries
dfl.createOrReplaceTempView("Zone_lookup_data")

### 3. Average fare amount collected by hour of the day 

In [26]:
avg_fare_by_hour = spark.sql("""
    SELECT HOUR(tpep_pickup_datetime) as hour, AVG(fare_amount) as avg_fare
    FROM yellow_taxi_data
    GROUP BY hour
    ORDER BY hour
""")
avg_fare_by_hour.show()


+----+------------------+
|hour|          avg_fare|
+----+------------------+
|   0|18.880695641467636|
|   1|27.534966129760516|
|   2| 30.12912737233405|
|   3| 35.20126073380151|
|   4|40.932777466367725|
|   5|20.005923795830356|
|   6|11.691494599852422|
|   7|11.342811043744058|
|   8|11.184160087012577|
|   9|11.276688256829068|
|  10|11.909430324276945|
|  11| 11.99153282182437|
|  12|11.864507260457398|
|  13|11.580972260924616|
|  14|12.048847378161604|
|  15| 12.75542871787359|
|  16|12.926983198280974|
|  17| 13.05241726618704|
|  18|12.484358269213883|
|  19|12.241942818295119|
+----+------------------+
only showing top 20 rows



### 4. Average fare amount compared to the average trip distance –

In [27]:
avg_fare_vs_distance = spark.sql("""
    SELECT AVG(fare_amount) as avg_fare, AVG(trip_distance) as avg_distance
    FROM yellow_taxi_data
""")
avg_fare_vs_distance.show()


+------------------+-----------------+
|          avg_fare|     avg_distance|
+------------------+-----------------+
|13.606733865686362|4.104275283760191|
+------------------+-----------------+



### 5. Average fare amount and average trip distance by day of the week.

In [28]:
avg_fare_and_distance_by_day = spark.sql("""
    SELECT DAYOFWEEK(tpep_pickup_datetime) as day_of_week,
           AVG(fare_amount) as avg_fare,
           AVG(trip_distance) as avg_distance
    FROM yellow_taxi_data
    GROUP BY day_of_week
    ORDER BY day_of_week
""")
avg_fare_and_distance_by_day.show()


+-----------+------------------+------------------+
|day_of_week|          avg_fare|      avg_distance|
+-----------+------------------+------------------+
|          1|14.686292601998968|3.9104436342782214|
|          2|13.409943486081072| 4.262469750235038|
|          3|13.449879403674744|3.2025723626895357|
|          4|13.279588696533745|  6.04762380811827|
|          5| 13.40498273892465| 3.999548450269279|
|          6|13.702733958318653| 3.778188952327255|
|          7|13.880926298071115| 3.602206763882438|
+-----------+------------------+------------------+



### In the month of June 2020, find the zone which had maximum number of pick ups.

In [30]:
max_pickup_zone = spark.sql("""
    SELECT Zone_lookup_data.Zone AS pickup_zone, COUNT(*) as num_pickups
    FROM yellow_taxi_data
    LEFT OUTER JOIN Zone_lookup_data ON Zone_lookup_data.LocationID=yellow_taxi_data.PULocationID
    where month(tpep_pickup_datetime)=6 and year(tpep_pickup_datetime)=2020
    GROUP BY pickup_zone
    ORDER BY num_pickups DESC
    LIMIT 1
""")
max_pickup_zone.show()


+--------------------+-----------+
|         pickup_zone|num_pickups|
+--------------------+-----------+
|Upper East Side N...|      23097|
+--------------------+-----------+



### In the month of June 2020, find the zone which had maximum number of drops.

In [31]:
max_dropoff_zone = spark.sql("""
    SELECT Zone_lookup_data.Zone AS  dropoff_zone, COUNT(*) as num_dropoffs
    FROM yellow_taxi_data
    LEFT OUTER JOIN Zone_lookup_data ON Zone_lookup_data.LocationID=yellow_taxi_data.DOLocationID
    where month(tpep_pickup_datetime)=6 and year(tpep_pickup_datetime)=2020
    GROUP BY dropoff_zone
    ORDER BY num_dropoffs DESC
    LIMIT 1
""")
max_dropoff_zone.show()


+--------------------+------------+
|        dropoff_zone|num_dropoffs|
+--------------------+------------+
|Upper East Side N...|       22254|
+--------------------+------------+



#### 8.Average no of passengers by hour of the day. –

In [32]:
avg_passengers_by_hour = spark.sql("""
    SELECT HOUR(tpep_pickup_datetime) as hour, AVG(passenger_count) as avg_passengers
    FROM yellow_taxi_data
    GROUP BY hour
    ORDER BY hour
""")
avg_passengers_by_hour.show()


+----+------------------+
|hour|    avg_passengers|
+----+------------------+
|   0|1.3300081766148815|
|   1|1.2963541666666667|
|   2| 1.308466051969824|
|   3|1.3093270365997638|
|   4|1.2861205915813425|
|   5|1.3718697829716193|
|   6|1.3303137428192664|
|   7|1.3431017976810977|
|   8| 1.359296915388592|
|   9|1.3521955975550306|
|  10| 1.361447777998543|
|  11|1.3555843529624496|
|  12|1.3567879870492352|
|  13|1.3521634939012896|
|  14|1.3638007863695938|
|  15|1.3572915863345416|
|  16|  1.35842077865147|
|  17|1.3650200560470356|
|  18| 1.376865328634901|
|  19|1.3628078030060762|
+----+------------------+
only showing top 20 rows



### Total number of payments made by different type for the month.

In [33]:
payment_types = spark.sql("""
    SELECT year(tpep_pickup_datetime) AS Year, month(tpep_pickup_datetime) AS Month ,payment_type, COUNT(*) as num_payments
    FROM yellow_taxi_data
    GROUP BY year(tpep_pickup_datetime), month(tpep_pickup_datetime),payment_type
""")
payment_types.show()


+----+-----+------------+------------+
|Year|Month|payment_type|num_payments|
+----+-----+------------+------------+
|2020|    6|           4|        2275|
|2020|    5|           1|           1|
|2020|    6|           1|      322565|
|2020|    6|           3|        5245|
|2020|    5|           2|           3|
|2020|    6|           2|      168937|
|2020|    6|           5|          12|
|2009|    1|           2|           3|
|2020|    6|        null|       50717|
|2020|    7|           1|           2|
+----+-----+------------+------------+

