# CSE6242 - HW3 - Q1

Pyspark Imports

In [1]:
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/15 00:41:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with the all the original columns
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.types import IntegerType, FloatType
    
    # Cast passenger_count to integer
    df = df.withColumn("passenger_count", col("passenger_count").cast(IntegerType()))
    
    # Cast total_amount, tip_amount, trip_distance, fare_amount to float
    float_columns = ["total_amount", "tip_amount", "trip_distance", "fare_amount"]
    for col_name in float_columns:
        df = df.withColumn(col_name, col(col_name).cast(FloatType()))
    
    # Cast tpep_pickup_datetime and tpep_dropoff_datetime to timestamp
    df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))

    # END YOUR CODE HERE -----------
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [5]:
def common_pair(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - PULocationID
            - DOLocationID
            - passenger_count
            - per_person_rate
            
    per_person_rate is the total_amount per person for a given pair.
    
    '''
    from pyspark.sql.functions import col, sum
    from pyspark.sql.window import Window
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import rank	
    
    # START YOUR CODE HERE ---------
    spark = SparkSession.builder.appName('Spark rank()').getOrCreate()

    # Filter out trips with the same pickup and drop-off location
    df = df.filter(col("PULocationID") != col("DOLocationID"))

    # Group by pickup and drop-off location pairs and sum passenger_count
    grouped_df = df.groupBy("PULocationID", "DOLocationID").agg(
        sum("passenger_count").alias("passenger_count"),
        sum("total_amount").alias("total_amount")
    )

    # Calculate per_person_rate (total_amount per passenger)
    grouped_df = grouped_df.withColumn("per_person_rate", col("total_amount") / col("passenger_count"))

    # Create a Window specification for ranking
    window_spec = Window.orderBy(col("passenger_count").desc(), col("per_person_rate").desc())

    # Rank the rows based on passenger_count and per_person_rate
    ranked_df = grouped_df.withColumn("rank", rank().over(window_spec))

    # Select the top 10 ranked rows
    top_10_pairs = ranked_df.filter(col("rank") <= 10).drop("rank", "total_amount")

    return top_10_pairs
    # END YOUR CODE HERE -----------

### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [6]:
def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import col, when, round, avg
    # Filter the data for trips with fares greater than $2.00 and positive trip distances
    filtered_df = df.filter((col("fare_amount") > 2) & (col("trip_distance") > 1))

    # Calculate tip percent for each trip
    filtered_df = filtered_df.withColumn("tip_percent", (col("tip_amount") * 100 / col("fare_amount")))

    # Round trip distances up to the closest mile
    filtered_df = filtered_df.withColumn("trip_distance", round(col("trip_distance")).cast("integer"))

    # Group by rounded trip distances and calculate the average tip percent
    result_df = filtered_df.groupBy("trip_distance").agg(avg("tip_percent").alias("tip_percent"))

    # Sort the result in descending order of tip_percent
    result_df = result_df.orderBy(col("tip_percent").desc())

    # Take the top 15 trip distances with the highest tip percent
    result_df = result_df.limit(15)
    
    return result_df
    # END YOUR CODE HERE -----------


### Q1.d

Determine the average speed at different times of day.

In [7]:
def time_with_most_traffic(df):
    '''
    input: df a dataframe
    output: df a dataframe with following columns:
            - time_of_day
            - am_avg_speed
            - pm_avg_speed
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # START YOUR CODE HERE ---------
    from pyspark.sql.functions import col, hour, when, avg, date_format
    
    # Extract the hour of the day from the timestamp
    df = df.filter(col("trip_distance") > 0)
    df = df.withColumn("hour_of_day", hour(col("tpep_pickup_datetime")))
    
    # Calculate the average time (avg_time) by subtracting pickup time from dropoff time
    df = df.withColumn("avg_time", (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 3600)  # Convert to hours

    # Calculate the average speed for each hour
    df = df.groupBy("hour_of_day").agg(
        avg("trip_distance").alias("avg_distance"),
        avg("avg_time").alias("avg_time")
    )
    
    # Calculate the average speed as distance per hour
    df = df.withColumn("avg_speed", col("avg_distance") / col("avg_time"))
    
    # Create columns for AM and PM average speeds
    df = df.withColumn("time_of_day", when(col("hour_of_day") < 12, col("hour_of_day")).otherwise(col("hour_of_day") - 12))
    df = df.withColumn("am_avg_speed", when(col("hour_of_day") < 12, col("avg_speed")).otherwise(None))
    df = df.withColumn("pm_avg_speed", when(col("hour_of_day") >= 12, col("avg_speed")).otherwise(None))
    
    
    # Select and order the final columns
    df = df.select("time_of_day", "am_avg_speed", "pm_avg_speed", "avg_time","avg_speed").orderBy("time_of_day")

    
    # END YOUR CODE HERE -----------
    
    return df

### The below cells are for you to investigate your solutions and will not be graded
## Ensure they are commented out prior to submitting to Gradescope to avoid errors

df = load_data()
df = clean_data(df)

common_pair(df).show()

distance_with_most_tip(df).show()

time_with_most_traffic(df).show()