# CSE6242 - HW3 - Q1

<div class="alert alert-block alert-danger">
    WARNING: Do <strong>NOT</strong> remove any comment that says "#export" because that will crash the autograder in Gradescope. We use this comment to export your code in these cells for grading.
</div>

Pyspark Imports

In [None]:
#export
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, ceil, coalesce

Initialize PySpark Context

In [None]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Define function for loading data

In [None]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.1

Perform data casting to clean incoming dataset

In [None]:
#export
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import col, ceil, avg
    df =df.withColumn("passenger_count",col("passenger_count").cast('integer'))
    df =df.withColumn("total_amount",col("total_amount").cast('float'))
    df =df.withColumn("tip_amount",col("tip_amount").cast('float'))
    df =df.withColumn("trip_distance",col("trip_distance").cast('float'))
    df =df.withColumn("fare_amount",col("fare_amount").cast('float'))
    df =df.withColumn("tpep_pickup_datetime",col("tpep_pickup_datetime").cast('timestamp'))
    df =df.withColumn("tpep_dropoff_datetime",col("tpep_dropoff_datetime").cast('timestamp'))

    # END YOUR CODE HERE -----------
    
    return df

### Q1.2

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [None]:
#export
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - total_passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import col, avg, sum
    
    # Filter out trips where pickup and dropoff locations are the same
    df = df.filter(col("PULocationID") != col("DOLocationID"))
    
    # Group by pickup and dropoff locations, summing total passengers and total amount
    df = df.groupBy("PULocationID", "DOLocationID").agg(
        sum("passenger_count").alias("total_passenger_count"),
        (sum("total_amount") / sum("passenger_count")).alias("per_person_rate")
    )
    
    # Sort by total passengers in descending order, then by per_person_rate in descending order
    df = df.orderBy(col("total_passenger_count").desc(), col("per_person_rate").desc())
    
    # Select the top 10 location pairs
    df = df.limit(10)
    
    
    # END YOUR CODE HERE -----------
    
    return df

### Q1.3

Find trips which trip distances generate the highest tip percentage.

In [None]:
#export
def distance_with_most_tip(df):
    '''
    input: df a dataframe (PySpark DataFrame)
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    '''

    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import col, ceil, expr

    df = df.filter((col("fare_amount") > 2.00) & (col("trip_distance") > 0))

    df = df.withColumn("tip_percent", expr("(tip_amount * 100) / fare_amount"))

    df = df.withColumn("trip_distance_rounded", ceil("trip_distance"))

    df = df.groupBy("trip_distance_rounded").agg(expr("avg(tip_percent)").alias("tip_percent"))

    df = df.withColumnRenamed("trip_distance_rounded", "trip_distance")

    df = df.orderBy(col("tip_percent").desc()).limit(15)

    # END YOUR CODE HERE -----------
    
    return df

### Q1.4

Determine the average speed at different times of day.

In [8]:
#export
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    am_avg_speed and pm_avg_speed are the average trip distance / average trip time calculated for each hour
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import col, hour, avg, unix_timestamp, when, to_timestamp

    # Convert datetime columns to timestamp type if they aren't already
    df = df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime"))
    df = df.withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))

    # Calculate trip duration in hours
    df = df.withColumn("trip_duration_hours", 
                       (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 3600)

    # Extract hour from pickup time
    df = df.withColumn("hour", hour("tpep_pickup_datetime"))

    # Compute average distance and average duration by hour
    df = df.groupBy("hour").agg(
        avg("trip_distance").alias("avg_distance"),
        avg("trip_duration_hours").alias("avg_duration")
    )

    # Compute average speed
    df = df.withColumn("avg_speed", col("avg_distance") / col("avg_duration"))

    # Convert hour to 12-hour time_of_day format (0 to 11 where 0 = 12 o'clock)
    df = df.withColumn("time_of_day", col("hour") % 12)

    # Separate into AM and PM speeds
    df = df.withColumn("am_avg_speed", when(col("hour") < 12, col("avg_speed")))
    df = df.withColumn("pm_avg_speed", when(col("hour") >= 12, col("avg_speed")))

    # Aggregate by time_of_day to combine AM and PM values
    df = df.groupBy("time_of_day").agg(
        avg("am_avg_speed").alias("am_avg_speed"),
        avg("pm_avg_speed").alias("pm_avg_speed")
    ).orderBy("time_of_day")

    # END YOUR CODE HERE -----------
    
    return df

## The below cells are for you to investigate your solutions and will not be graded

In [None]:
df = load_data()
df = clean_data(df)

In [None]:
common_pair(df).show()

In [None]:
distance_with_most_tip(df).show()

In [None]:
time_with_most_traffic(df).show()