<a href="https://colab.research.google.com/github/martinpius/Practical_1/blob/main/BigDataAnalytics_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Big Data Processing Assignment: NYC Taxi Data with PySpark

# ------------------------------------------------------------
# Setup Section
# ------------------------------------------------------------
# Import required libraries
import os, requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a SparkSession
spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .getOrCreate()

# ------------------------------------------------------------
# 1. Load and Explore Dataset
# ------------------------------------------------------------
# Q1: Load the dataset (CSV or Parquet) and show the schema.
# ------------------------------------------------------------
# Step 1: Download Data
# ------------------------------------------------------------

file_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
file_name = "yellow_tripdata_2023-01.parquet"

if not os.path.exists(file_name):
    print("Downloading dataset...")
    r = requests.get(file_url)
    with open(file_name, "wb") as f:
        f.write(r.content)
    print("Download complete!")
else:
    print("File already exists.")

In [None]:
# ------------------------------------------------------------
# Step 2: Spark Setup
# ------------------------------------------------------------

spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .getOrCreate()




In [None]:
# ------------------------------------------------------------
# Step 3: Load Data
# ------------------------------------------------------------
df = spark.read.parquet(file_name)

df.show(5)

 # Q1: Print out the schema
df.printSchema()

In [None]:

# Q2: Show 5 sample rows.
df.show(5)

In [None]:

# Q3: Count total number of rows in the dataset.
df.count()

In [None]:

# ------------------------------------------------------------
# 2. Data Cleaning
# ------------------------------------------------------------
# Q4: Remove rows with nulls in pickup_datetime, dropoff_datetime, trip_distance, fare_amount.
df_clean = df.dropna(subset=["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "fare_amount"])

In [None]:

# Q5: Filter out rows with negative trip_distance or fare_amount.
df_clean = df_clean.filter((col("trip_distance") > 0) & (col("fare_amount") > 0))

In [None]:
# Q6: Count how many rows remain after cleaning.
df_clean.count()

In [None]:
# ------------------------------------------------------------
# 3. Feature Engineering
# ------------------------------------------------------------
# Q7: Extract hour and weekday from pickup_datetime.
df_features = df_clean.withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
                       .withColumn("pickup_day", date_format("tpep_pickup_datetime", "E"))

In [None]:
df_features.show(5)

In [None]:

# Q8: Compute trip duration in minutes.
df_features = df_features.withColumn("trip_duration",
                     (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60)

In [None]:
df_features.show(5)

In [None]:

# Q9: Add a "rush_hour" column (1 if pickup hour is 7–9 AM or 4–7 PM, else 0).
df_features = df_features.withColumn("rush_hour",
                     when((col("pickup_hour").between(7, 9)) | (col("pickup_hour").between(16, 19)), 1).otherwise(0))

In [None]:
df_features.show(4)

In [None]:

# ------------------------------------------------------------
# 4. Aggregation Tasks
# ------------------------------------------------------------
# Q10: Compute average trip distance and fare per pickup day.
df_features.groupBy("pickup_day").agg(
    avg("trip_distance").alias("avg_distance"),
    avg("fare_amount").alias("avg_fare")
).show()

In [None]:
# Q11: Total revenue per weekday.
df_features.groupBy("pickup_day").agg(
    sum("fare_amount").alias("total_revenue")
).show()

In [None]:
# Q12: Top 5 pickup locations (assuming pickup_location_id exists).
if "PULocationID" in df_features.columns:
    df_features.groupBy("PULocationID").count().orderBy(desc("count")).show(5)

In [None]:
# ------------------------------------------------------------
# 5. Task 1
# ------------------------------------------------------------
# Q13: Join with taxi zone lookup data (if available)
# Example: Read zone data and join


In [None]:
# ------------------------------------------------------------
# 5.Task 2
# ------------------------------------------------------------
# Q13: Join with taxi zone lookup data (if available)
# Example: Read zone data and join


# ------------------------------------------------------------
# 6. Task2: Save for Visualization
# ------------------------------------------------------------
# Q14: Convert to Pandas for plotting (if desired)
# pdf = df_features.select("pickup_hour", "fare_amount").sample(False, 0.01).toPandas()

# ------------------------------------------------------------
# End of Notebook
# ------------------------------------------------------------
spark.stop()