In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import TimestampType

spark = SparkSession.builder.appName("EDAtaxi").getOrCreate()


In [None]:
data = spark.read.parquet("s3a://psbigdata777/project/")

In [None]:
df = data.filter((df.trip_distance >= 10) & (df.trip_time_in_secs != 0))

df = df.withColumn("trip_duration", 
                   (F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime"))/60)

df = df.withColumn("profit_ratio", df["surcharge"] / df["trip_distance"])

df.cache()

Aggregates for visualization

In [None]:
from pyspark.sql.functions import when

monthly_aggregates = (df.groupBy("pickup_month").agg(
    F.count("*").alias("total_rides"),
    F.avg("fare_amount").alias("avg_fare"),
    F.avg("tip_amount").alias("avg_tip"),
    F.avg("trip_duration").alias("avg_trip_duration"),
    F.avg("trip_distance").alias("avg_trip_distance"),
    F.avg("total_amount").alias("avg_total_amount"),
    F.avg("profit_ratio").alias("avg_monthly_profit_ratio"),
    F.avg("tolls_amount").alias("avg_monthly_toll_amount"),
    F.sum(when(F.col("payment_type") == "CSH", 1).otherwise(0)).alias("cash_payments"),
    F.sum(when(F.col("payment_type") == "CRD", 1).otherwise(0)).alias("card_payments")
)
.withColumn("cash_percentage", F.col("cash_payments") / F.col("total_rides") * 100)
.withColumn("card_percentage", F.col("card_payments") / F.col("total_rides") * 100)
.orderBy("pickup_month"))

In [None]:
daily_aggregates = (df.groupBy("pickup_day").agg(
    F.count("*").alias("daily_rides"),
    F.avg("fare_amount").alias("daily_avg_fare"),
    F.avg("tip_amount").alias("daily_avg_tip"),
    F.avg("trip_duration").alias("daily_trip_duration"),
    F.avg("trip_distance").alias("daily_trip_distance"),
    F.avg("total_amount").alias("daily_avg_total_amount"),
    F.avg("profit_ratio").alias("avg_daily_profit_ratio"),
    F.avg("tolls_amount").alias("avg_daily_toll_amount"),
    F.sum(when(F.col("payment_type") == "CSH", 1).otherwise(0)).alias("cash_payments"),
    F.sum(when(F.col("payment_type") == "CRD", 1).otherwise(0)).alias("card_payments")
)
.withColumn("cash_percentage", F.col("cash_payments") / F.col("daily_rides") * 100)
.withColumn("card_percentage", F.col("card_payments") / F.col("daily_rides") * 100)
.orderBy("pickup_day"))

In [None]:
hourly_aggregates = (df.groupBy("pickup_hour").agg(
    F.count("*").alias("hourly_rides"),
    F.avg("fare_amount").alias("hourly_avg_fare"),
    F.avg("tip_amount").alias("hourly_avg_tip"),
    F.avg("trip_duration").alias("hourly_trip_duration"),
    F.avg("trip_distance").alias("hourly_trip_distance"),
    F.avg("total_amount").alias("hourly_avg_total_amount"),
    F.avg("profit_ratio").alias("avg_hourly_profit_ratio"),
    F.avg("tolls_amount").alias("avg_hourly_toll_amount"),
    F.sum(when(F.col("payment_type") == "CSH", 1).otherwise(0)).alias("cash_payments"),
    F.sum(when(F.col("payment_type") == "CRD", 1).otherwise(0)).alias("card_payments")
)
.withColumn("cash_percentage", F.col("cash_payments") / F.col("hourly_rides") * 100)
.withColumn("card_percentage", F.col("card_payments") / F.col("hourly_rides") * 100)
.orderBy("pickup_hour"))

Saving the data as parquet

In [None]:
monthly_aggregates.write.mode('overwrite').parquet("s3://psbigdata777/CS777 Term Project/Monthly Data/monthly_agg.parquet")

In [None]:
daily_aggregates.write.mode('overwrite').parquet("s3://psbigdata777/CS777 Term Project/Daily Data/daily_agg.parquet")

In [None]:
hourly_aggregates.write.mode('overwrite').parquet("s3://psbigdata777/CS777 Term Project/Hourly Data/hourly_agg.parquet")

Geospatial Visualization Data


In [None]:
sample_df = df.sample(False, 0.001)

In [None]:
columns_to_select = ["pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"]
sample_df = sample_df.select(*columns_to_select)


In [None]:
sample_df.write.parquet("s3://psbigdata777/CS777 Term Project/Coordinate Data/coordinate.parquet")
