# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [1]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/05 21:50:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    df = df.withColumn("passenger_count", df.passenger_count.cast("integer"))
    df = df.withColumn("total_amount", df.total_amount.cast("float"))
    df = df.withColumn("tip_amount", df.tip_amount.cast("float"))
    df = df.withColumn("trip_distance", df.trip_distance.cast("float"))
    df = df.withColumn("fare_amount", df.fare_amount.cast("float"))
    df = df.withColumn("tpep_pickup_datetime", df.tpep_pickup_datetime.cast("timestamp"))
    df = df.withColumn("tpep_dropoff_datetime", df.tpep_dropoff_datetime.cast("timestamp"))
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [5]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    # Filter out trips with same pickup and dropoff location
    df_filtered = df.filter(df["PULocationID"] != df["DOLocationID"])

    # Group by PULocationID and DOLocationID
    grouped_df = df_filtered.groupBy("PULocationID", "DOLocationID") \
                            .agg({"passenger_count": "sum", "total_amount": "sum"})

    # Rename the aggregated columns
    grouped_df = grouped_df.withColumnRenamed("sum(passenger_count)", "total_passenger_count") \
                           .withColumnRenamed("sum(total_amount)", "total_amount_sum")

    # Calculate per_person_rate
    grouped_df = grouped_df.withColumn("per_person_rate", 
                                       grouped_df["total_amount_sum"] / grouped_df["total_passenger_count"])

    # Select required columns
    final_df = grouped_df.select("PULocationID", "DOLocationID", "total_passenger_count", "per_person_rate")

    # Sort the DataFrame by total_passenger_count (desc) and then by per_person_rate (desc)
    sorted_df = final_df.orderBy("total_passenger_count", "per_person_rate", ascending=[False, False])

    # Select the top 10 pairs
    top_10_df = sorted_df.limit(10)

    df = top_10_df
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [10]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
  
    # Filter for fare_amount > $2 and trip_distance > 0
    filtered_df = df.filter((df.fare_amount > 2) & (df.trip_distance > 0))

    # Calculate tip_percent
    filtered_df = filtered_df.withColumn("tip_percent", (df.tip_amount * 100) / df.fare_amount)

    # Round trip distances up to the closest mile
    filtered_df = filtered_df.withColumn("rounded_trip_distance", ceil(df.trip_distance))

    # Group by rounded trip distance and calculate average tip_percent
    grouped_df = filtered_df.groupBy("rounded_trip_distance").agg({"tip_percent": "avg"})

    # Rename columns
    grouped_df = grouped_df.withColumnRenamed("avg(tip_percent)", "tip_percent") \
                           .withColumnRenamed("rounded_trip_distance", "trip_distance")

    # Sort by tip_percent in descending order and get top 15
    result_df = grouped_df.orderBy("tip_percent", ascending=False).limit(15)

    df = result_df

    # END YOUR CODE HERE -----------
    
    return df

### Q1.d

Determine the average speed at different times of day.

In [8]:
#export
from pyspark.sql.functions import (
    hour, when, col, to_timestamp, sum, count, first, lit
)

def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    # Convert pickup and dropoff times to timestamps
    df = df.withColumn("pickup_datetime", to_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("dropoff_datetime", to_timestamp("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss"))

    # Calculate trip duration in hours
    df = df.withColumn("trip_duration", (col("dropoff_datetime").cast("long") - col("pickup_datetime").cast("long")) / 3600)

    # Extract the hour from pickup datetime
    df = df.withColumn("hour_of_day", hour("pickup_datetime"))

    # Aggregate data by hour_of_day
    df_grouped = df.groupBy("hour_of_day").agg(
        sum("trip_distance").alias("total_trip_distance"),
        sum("trip_duration").alias("total_trip_duration_hours"),
        count("*").alias("trip_count")
    )

    # Calculate average speed, handling cases where duration is zero
    df_grouped = df_grouped.withColumn("average_speed",
        when(col("total_trip_duration_hours") > 0, col("total_trip_distance") / col("total_trip_duration_hours")).otherwise(lit(None))
    )

    # Separate average speed into AM and PM and replace 0.0 with null
    df_grouped = df_grouped.withColumn("time_of_day", when(col("hour_of_day") < 12, col("hour_of_day")).otherwise(col("hour_of_day") - 12))
    df_grouped = df_grouped.withColumn("am_pm", when(col("hour_of_day") < 12, "AM").otherwise("PM"))
    df_grouped = df_grouped.withColumn("am_avg_speed", when(col("am_pm") == "AM", col("average_speed")))
    df_grouped = df_grouped.withColumn("pm_avg_speed", when(col("am_pm") == "PM", col("average_speed")))

    # Pivot the DataFrame to have separate columns for AM and PM average speeds
    final_df = df_grouped.groupBy("time_of_day").pivot("am_pm").agg(first("average_speed"))

    # Rename columns
    rename_df = final_df.withColumnRenamed("AM", "am_avg_speed").withColumnRenamed("PM", "pm_avg_speed")

    # Fill null values with 'null'
    df = rename_df.na.fill({"am_avg_speed": 'null', "pm_avg_speed": 'null'})

    # END YOUR CODE HERE -----------
    
    return df