# CSE6242 - HW3 - Q1

Pyspark Imports

In [1]:
### DO NOT MODIFY THIS CELL ###
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.functions import hour, when, col, date_format, to_timestamp, round, coalesce

Initialize PySpark Context

In [2]:
### DO NOT MODIFY THIS CELL ###
sc = pyspark.SparkContext(appName="HW3-Q1")
sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/18 00:11:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Define function for loading data

In [3]:
### DO NOT MODIFY THIS CELL ###
def load_data():
    df = sqlContext.read.option("header",True) \
     .csv("yellow_tripdata_2019-01_short.csv")
    return df

### Q1.a

Perform data casting to clean incoming dataset

In [4]:
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import IntegerType, FloatType

def clean_data(df):
    '''
    input: df a dataframe
    output: df a dataframe with all the original columns and specified data types
    '''
    
    # 1. Cast columns to the specified data types
    df = df.withColumn("passenger_count", df["passenger_count"].cast(IntegerType()))
    df = df.withColumn("total_amount", df["total_amount"].cast(FloatType()))
    df = df.withColumn("tip_amount", df["tip_amount"].cast(FloatType()))
    df = df.withColumn("trip_distance", df["trip_distance"].cast(FloatType()))
    df = df.withColumn("fare_amount", df["fare_amount"].cast(FloatType()))

    # 2. Convert datetime columns to timestamps
    df = df.withColumn("tpep_pickup_datetime", to_timestamp(df["tpep_pickup_datetime"]))
    df = df.withColumn("tpep_dropoff_datetime", to_timestamp(df["tpep_dropoff_datetime"]))
    
    return df

### Q1.b

Find rate per person for based on how many passengers travel between pickup and dropoff locations. 

In [35]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def common_pair(df):
    # Filter out trips with the same pick-up and drop-off location
    df = df.filter(F.col("PULocationID") != F.col("DOLocationID"))

    # Group by pickup and drop-off locations, sum passenger counts, and sum total amounts
    grouped_df = df.groupBy("PULocationID", "DOLocationID").agg(
        F.sum("passenger_count").alias("passenger_count"),
        F.sum("total_amount").alias("total_amount")
    )

    # Calculate per_person_rate (total_amount divided by passenger_count)
    with_per_person_rate = grouped_df.withColumn(
        "per_person_rate", F.col("total_amount") / F.col("passenger_count")
    )

    # Sort by total passengers in descending order
    sorted_df = with_per_person_rate.sort(F.col("passenger_count").desc())

    # Select the top 10 pairs with the highest total passengers and per_person_rate
    window_spec = Window.orderBy(
        F.col("passenger_count").desc(),
        F.col("per_person_rate").desc()
    )
    ranked_df = sorted_df.withColumn("rank", F.row_number().over(window_spec))
    ranked_df = ranked_df.withColumnRenamed("total_passengers", "passenger_count")

    top_10_df = ranked_df.filter(F.col("rank") <= 10).select(
        "PULocationID", "DOLocationID", "passenger_count", "per_person_rate"
    )
    

    return top_10_df


### Q1.c

Find trips which trip distances generate the highest tip percentage.

In [8]:
from pyspark.sql.functions import col, round

def distance_with_most_tip(df):
    '''
    input: df a dataframe
    output: df a dataframe with the following columns:
            - trip_distance
            - tip_percent
            
    trip_percent is the percent of tip out of fare_amount
    
    '''
    
    # Filter the data for trips with fares greater than $2.00 and positive trip distances
    df = df.filter((col("fare_amount") > 2.00) & (col("trip_distance") > 0))
    
    # Calculate the tip percent (tip_amount * 100 / fare_amount)
    df = df.withColumn("tip_percent", (col("tip_amount") * 100) / col("fare_amount"))
    
    # Round trip distances up to the closest mile
    df = df.withColumn("trip_distance", round(col("trip_distance")))
    
    # Group by trip_distance and calculate the average tip_percent
    df = df.groupBy("trip_distance").agg({"tip_percent": "avg"})
    
    # Sort the result in descending order of tip_percent
    df = df.orderBy(col("avg(tip_percent)").desc())

    # Limit the result to the top 15 trip distances
    df = df.limit(15)
    
    # Rename the columns
    df = df.withColumnRenamed("trip_distance", "trip_distance")
    df = df.withColumnRenamed("avg(tip_percent)", "tip_percent")
    
    return df


### Q1.d

Determine the average speed at different times of day.

In [9]:
from pyspark.sql.functions import hour, when, date_format

def time_with_most_traffic(df):
    # Convert the pickup time to hours
    df = df.withColumn("pickup_hour", hour(df["tpep_pickup_datetime"]))

    # Separate hours into AM and PM
    df = df.withColumn("time_of_day", 
                      when((col("pickup_hour") >= 0) & (col("pickup_hour") < 12), "AM")
                      .otherwise("PM"))

    # Calculate the average speed (trip_distance / trip_time) for each hour
    df = df.withColumn("trip_time_hours", (df["tpep_dropoff_datetime"].cast("long") - df["tpep_pickup_datetime"].cast("long")) / 3600)
    df = df.groupBy("time_of_day", "pickup_hour").agg(
        avg("trip_distance").alias("avg_distance"),
        avg("trip_time_hours").alias("avg_trip_time")
    )
    df = df.withColumn("avg_speed", col("avg_distance") / col("avg_trip_time"))

    # Convert pickup_hour to 12-hour format
    df = df.withColumn("pickup_hour_12", date_format("tpep_pickup_datetime", "h"))
    
    # Pivot the data to get AM and PM columns
    df = df.groupBy("pickup_hour_12").pivot("time_of_day").agg(avg("avg_speed")).drop("pickup_hour_12")
    df = df.withColumnRenamed("AM", "am_avg_speed")
    df = df.withColumnRenamed("PM", "pm_avg_speed")
    
    # Fill null values with 0
    df = df.fillna(0)
    
    return df


### The below cells are for you to investigate your solutions and will not be graded
## Ensure they are commented out prior to submitting to Gradescope to avoid errors

In [6]:
#df = load_data()
#df = clean_data(df)

                                                                                

In [36]:
# common_pair(df).show()

23/10/17 07:18:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/17 07:18:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/17 07:18:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/17 07:18:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/17 07:18:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/17 07:18:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/10/17 0

+------------+------------+---------------+------------------+
|PULocationID|DOLocationID|passenger_count|   per_person_rate|
+------------+------------+---------------+------------------+
|         239|         238|             62|  4.26274198870505|
|         237|         236|             60| 4.482500068346659|
|         263|         141|             52|3.4190384974846473|
|         161|         236|             42| 5.368571440378825|
|         148|          79|             42| 4.711904752822149|
|         142|         238|             39|  5.05487182812813|
|         141|         236|             37| 4.355675723101641|
|         239|         143|             37| 4.252162224537617|
|         239|         142|             35| 3.817714350564139|
|          79|         170|             34| 6.394705884596881|
+------------+------------+---------------+------------------+



                                                                                

In [10]:
#distance_with_most_tip(df).show()

                                                                                

+-------------+------------------+
|trip_distance|       tip_percent|
+-------------+------------------+
|          1.0|16.948128912014667|
|          0.0|15.421387863221868|
|          2.0|15.316307008502697|
|         17.0| 15.22190379132884|
|          5.0|14.791297045921036|
|          3.0|14.467840950136962|
|         21.0|14.318693182631236|
|         19.0|14.024168214294264|
|          9.0| 13.56675764510684|
|          4.0|13.548640690629924|
|          6.0|13.301329030626446|
|          8.0|11.935883845460518|
|         23.0|11.666666666666666|
|         10.0|11.469710555853517|
|         18.0|11.405847876237262|
|          7.0|11.104493870718887|
|         27.0| 9.615384615384615|
|         11.0| 9.463471596929628|
|         13.0| 9.455648543587467|
|         12.0| 8.520943433239987|
+-------------+------------------+
only showing top 20 rows



In [None]:
# time_with_most_traffic(df).show()