In [0]:
%pyspark

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, concat, lit, to_timestamp, year, avg, month, count, weekofyear, rand, when
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, BooleanType, FloatType,DoubleType

spark = SparkSession.builder\
    .appName('yellow_taxi_filter')\
    .master('local')\
    .getOrCreate()

schema = StructType([
    StructField("vendor_name", StringType(), True),
    StructField("Trip_Pickup_DateTime", StringType(), True),
    StructField("Trip_Dropoff_DateTime", StringType(), True),
    StructField("Passenger_Count", StringType(), True),
    StructField("Trip_Distance", StringType(), True),
    StructField("Start_Lon", StringType(), True),
    StructField("Start_Lat", StringType(), True),
    StructField("Rate_Code", StringType(), True),
    StructField("store_and_forward", StringType(), True),
    StructField("End_Lon", StringType(), True),
    StructField("End_Lat", StringType(), True),
    StructField("Payment_Type", StringType(), True),
    StructField("Fare_Amt", StringType(), True),
    StructField("surcharge", StringType(), True),
    StructField("mta_tax", StringType(), True),
    StructField("Tip_Amt", StringType(), True),
    StructField("Tolls_Amt", StringType(), True),
    StructField("Total_Amt", StringType(), True)
])
#"hdfs://nodemastertah:9000/NYCtaxi/dataset/yellow_taxi/yellow_csv/*.csv"
# Read the CSV file with all fields as strings
df = spark.read.csv("hdfs://nodemastertah:9000/NYCtaxi/dataset/yellow_taxi/yellow_csv/*.csv", header=True, schema=schema)


df = df.withColumn("Trip_Pickup_DateTime", col("Trip_Pickup_DateTime").cast("timestamp")) \
       .withColumn("Trip_Dropoff_DateTime", col("Trip_Dropoff_DateTime").cast("timestamp")) \
       .withColumn("Passenger_Count", col("Passenger_Count").cast("integer")) \
       .withColumn("Trip_Distance", col("Trip_Distance").cast("double")) \
       .withColumn("Start_Lon", col("Start_Lon").cast("double")) \
       .withColumn("Start_Lat", col("Start_Lat").cast("double")) \
       .withColumn("Rate_Code", col("Rate_Code").cast("integer")) \
       .withColumn("End_Lon", col("End_Lon").cast("double")) \
       .withColumn("End_Lat", col("End_Lat").cast("double")) \
       .withColumn("Fare_Amt", col("Fare_Amt").cast("double")) \
       .withColumn("surcharge", col("surcharge").cast("double")) \
       .withColumn("mta_tax", col("mta_tax").cast("double")) \
       .withColumn("Tip_Amt", col("Tip_Amt").cast("double")) \
       .withColumn("Tolls_Amt", col("Tolls_Amt").cast("double")) \
       .withColumn("Total_Amt", col("Total_Amt").cast("double"))

df.printSchema()


min_value = 1
max_value = 6

df_filled = (df.withColumn(
    "Rate_Code",
    when(
        col("Rate_Code").isNull(),
        (rand() * (max_value - min_value + 1) + min_value).cast("integer")
    ).otherwise(col("Rate_Code"))
).withColumn(
    "store_and_forward",
    when(df["store_and_forward"].isNull() | (df["store_and_forward"] != 'Y'), 'N')
    .otherwise('Y')
).withColumn("mta_tax", lit(0.5)))

# Filter out rows with invalid latitude and longitude values
df_cleaned = df_filled.filter(
    (col("Start_Lat").isNotNull()) &
    (col("End_Lat").isNotNull()) &
    (col("Start_Lon").isNotNull()) &
    (col("End_Lon").isNotNull()) &
    (col("Start_Lat") >= -90) & (col("Start_Lat") <= 90) &
    (col("End_Lat") >= -90) & (col("End_Lat") <= 90) &
    (col("Start_Lon") >= -180) & (col("Start_Lon") <= 180) &
    (col("End_Lon") >= -180) & (col("End_Lon") <= 180)
)

print(f"Number of rows after removing invalid lat/lon: {df_cleaned.count()}")
df_cleaned.show(5)


# Remove rows where Trip_Distance is 0.00
df_cleaned = df_filled.filter(col("Trip_Distance") > 0)

print(f"Number of rows after removing zero-distance trips: {df_cleaned.count()}")
df_cleaned.show(5)


In [1]:
%pyspark
from pyspark.sql.functions import count

location_heatmap = df_cleaned.groupBy("Start_Lon", "Start_Lat", "End_Lon", "End_Lat") \
    .agg(count("*").alias("trip_count")) \
    .orderBy("trip_count", ascending=False)

z.show(location_heatmap)


In [2]:
%pyspark
from pyspark.sql.functions import sum

payment_revenue = df_cleaned.groupBy("Payment_Type") \
    .agg(sum("Total_Amt").alias("total_revenue")) \
    .orderBy("total_revenue", ascending=False)

z.show(payment_revenue)


In [3]:
%pyspark
from pyspark.sql.functions import hour

hourly_demand = df_cleaned.withColumn("hour", hour("Trip_Pickup_DateTime")) \
    .groupBy("hour") \
    .agg(count("*").alias("trip_count")) \
    .orderBy("hour")

z.show(hourly_demand)


In [4]:
%pyspark
z.show(df_cleaned.select("Trip_Distance", "Total_Amt"))

In [5]:
%pyspark
fraud_trips = df_cleaned.filter((df_cleaned.Trip_Distance < 0.5) & (df_cleaned.Total_Amt > 50)) \
    .select("Trip_Distance", "Total_Amt", "Start_Lon", "Start_Lat", "End_Lon", "End_Lat")

z.show(fraud_trips)


In [6]:
%pyspark
from pyspark.sql.functions import avg

tip_heatmap = df_cleaned.groupBy("Start_Lon", "Start_Lat") \
    .agg(avg("Tip_Amt").alias("avg_tip")) \
    .orderBy("avg_tip", ascending=False)

z.show(tip_heatmap)


In [7]:
%pyspark
from pyspark.sql.functions import hour

fare_demand = df_cleaned.withColumn("hour", hour("Trip_Pickup_DateTime")) \
    .groupBy("hour") \
    .agg(
        count("*").alias("trip_count"),
        avg("Total_Amt").alias("avg_fare")
    ) \
    .orderBy("hour")

z.show(fare_demand)


In [8]:
%pyspark
from pyspark.sql.functions import when

toll_impact = df_cleaned.withColumn("Toll_Category", when(df_cleaned.Tolls_Amt > 0, "With Toll").otherwise("No Toll")) \
    .groupBy("Toll_Category") \
    .agg(avg("Total_Amt").alias("avg_fare"))

z.show(toll_impact)


In [9]:
%pyspark
from pyspark.sql.functions import avg, when

# Function to tag landmarks based on pickup location
def add_landmark_column(df):
    return df.withColumn(
        "Landmark",
        when(
            (col("Start_Lat").between(40.7550, 40.7590)) & (col("Start_Lon").between(-73.9900, -73.9840)), "Times Square"
        ).when(
            (col("Start_Lat").between(40.6390, 40.6500)) & (col("Start_Lon").between(-73.7940, -73.7780)), "JFK Airport"
        ).when(
            (col("Start_Lat").between(40.7660, 40.7770)) & (col("Start_Lon").between(-73.8900, -73.8700)), "LaGuardia Airport"
        ).when(
            (col("Start_Lat").between(40.7640, 40.8000)) & (col("Start_Lon").between(-73.9810, -73.9490)), "Central Park"
        ).otherwise("Other")
    )

# Add landmark column to the DataFrame
df_with_landmark = add_landmark_column(df_cleaned)

# 🔹 Drop duplicate locations to prevent one landmark from dominating
df_unique_landmarks = df_with_landmark.dropDuplicates(["Start_Lon", "Start_Lat", "Landmark"])

# Compute average tip amount per pickup location & landmark
tip_heatmap = df_unique_landmarks.groupBy("Start_Lon", "Start_Lat", "Landmark") \
    .agg(avg("Tip_Amt").alias("avg_tip")) \
    .orderBy("avg_tip", ascending=False)

# Show top 200 records in Zeppelin
z.show(tip_heatmap)


In [10]:
%pyspark
from pyspark.sql.functions import avg, when

# Function to tag landmarks based on pickup location
def add_landmark_column(df):
    return df.withColumn(
        "Landmark",
        when(
            (col("Start_Lat").between(40.7550, 40.7590)) & (col("Start_Lon").between(-73.9900, -73.9840)), "Times Square"
        ).when(
            (col("Start_Lat").between(40.6390, 40.6500)) & (col("Start_Lon").between(-73.7940, -73.7780)), "JFK Airport"
        ).when(
            (col("Start_Lat").between(40.7660, 40.7770)) & (col("Start_Lon").between(-73.8900, -73.8700)), "LaGuardia Airport"
        ).when(
            (col("Start_Lat").between(40.7640, 40.8000)) & (col("Start_Lon").between(-73.9810, -73.9490)), "Central Park"
        ).otherwise("Other")
    )

# Add landmark column to the DataFrame
df_with_landmark = add_landmark_column(df_cleaned)

# Compute average tip amount per pickup location & landmark
tip_heatmap = df_with_landmark.groupBy("Start_Lon", "Start_Lat", "Landmark") \
    .agg(avg("Tip_Amt").alias("avg_tip")) \
    .orderBy("avg_tip", ascending=False)

# Show top 200 records in Zeppelin
z.show(tip_heatmap)
