In [0]:
#  Import relevant Modules
import math
from pyspark import SQLContext
from pyspark.sql import functions as func
from pyspark.sql.types import DoubleType, IntegerType, StringType

In [0]:
# File location and type
file_location = "/FileStore/tables/new_york_city_taxi_trip_duration.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

In [0]:
# Display data 
display(df) # We use display because it show data in well organised format

# Show dataframe schema 
"""
  root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 
 
 This schema in correct format so we don't need to re-assigned
 
 # Data defination : 
    id - a unique identifier for each trip
    vendor_id - a code indicating the provider associated with the trip record
    pickup_datetime - date and time when the meter was engaged
    dropoff_datetime - date and time when the meter was disengaged
    passenger_count - the number of passengers in the vehicle (driver entered value)
    pickup_longitude - the longitude where the meter was engaged
    pickup_latitude - the latitude where the meter was engaged
    dropoff_longitude - the longitude where the meter was disengaged
    dropoff_latitude - the latitude where the meter was disengaged
    store_and_fwd_flag - This flag indicates whether the trip record was held in vehicle memory before sending to the vendor because the vehicle did not have a connection to the server - Y=store and forward; N=not a store                           and forward trip
    trip_duration - duration of the trip in seconds
    Disclaimer: The decision was made to not remove dropoff coordinates from the dataset order to provide an expanded set of variables to use in Kernels.
"""
df.printSchema()

In [0]:
#  Data description
"""
  Total Number of data === 1458644
  Total number of columns === 11(all nullable = true)
"""
df.describe().show()

#  Task Related modification

In [0]:
def func1(pickup_lat, pickup_long, drop_lat, drop_long): 
  R = 6373.0  #radius of the Earth
  lat1 = math.radians(pickup_lat)  #coordinates
  lon1 = math.radians(pickup_long)
  lat2 = math.radians(drop_lat)
  lon2 = math.radians(drop_long)

  dlon = lon2 - lon1  #change in coordinates
  dlat = lat2 - lat1

  a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2  #Haversine formula
  c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
  distance = R * c
  return distance
  
pickup_location = func.udf(func1, DoubleType())

updated_df = df.withColumn("distance(KM)", pickup_location(df.pickup_latitude, df.pickup_longitude, df.dropoff_latitude, df.dropoff_longitude))

In [0]:
updated_df = updated_df.withColumn("trip_duration(hr)", df.trip_duration/3600)
display(updated_df)

In [0]:
updated_df = updated_df.withColumn("pickup_week", func.date_format(func.col("pickup_datetime"), "EEEE"))
updated_df = updated_df.withColumn("dropoff_week", func.date_format(func.col("dropoff_datetime"), "EEEE"))

display(updated_df)

# Devide datetime object into day, month, year, time(h), time(min)

In [0]:
# For pickup datetime

updated_df = updated_df.withColumn("pickup_year", func.year("pickup_datetime"))
updated_df = updated_df.withColumn("pickup_month", func.month("pickup_datetime"))
updated_df = updated_df.withColumn("pickup_day", func.dayofmonth("pickup_datetime"))
updated_df = updated_df.withColumn("pickup_hour", func.hour("pickup_datetime"))
updated_df = updated_df.withColumn("pickup_minutes", func.minute("pickup_datetime"))

In [0]:
# For dropoff datetime

updated_df = updated_df.withColumn("dropoff_year", func.year("dropoff_datetime"))
updated_df = updated_df.withColumn("dropoff_month", func.month("dropoff_datetime"))
updated_df = updated_df.withColumn("dropoff_day", func.dayofmonth("dropoff_datetime"))
updated_df = updated_df.withColumn("dropoff_hour", func.hour("dropoff_datetime"))
updated_df = updated_df.withColumn("dropoff_minutes", func.minute("dropoff_datetime"))

In [0]:
updated_df.drop("pickup_datetime").collect()
updated_df.drop("dropoff_datetime").collect()
display(updated_df)

#  Task

# Description : 
  A typical taxi company faces a common problem of efficiently assigning the cabs to passengers so that the service is smooth and hassle free. One of main issue is determining the duration of the current trip so it can predict when the cab will be free for the next trip.
  
Apply different techniques of data analysis to get insights about the data and determine how different variables are dependent on the target variable trip duration

# KPIs to calculate
1. Passenger Count for every trip
2. Distribution of Pickup and Drop off day of the week
3. Distribution of Pickup and Drop off day of the day [morning(4hrs to 10hrs), midday(10hrs to 16hrs), evening(16hrs to 22hrs), and late night(22hrs to 4hrs)]
4. Distribution of the trip duration
5. Relationship between Trip Duration and The time of the day
6. Relationship between geographical location and duration

In [0]:
updated_df.printSchema()

In [0]:
df_passenger_count = df.select("passenger_count").distinct()
# display(df_passenger_count)

In [0]:
#  Passenger Count for every trip

"""
%sql
SELECT COUNT(passenger_count) 
FROM new_york_city_taxi_trip_duration_csv
GROUP BY passenger_count
"""
passenger_count_df = updated_df.groupBy("passenger_count").count()
# display(passenger_count_df)

In [0]:
#  Distribution of Pickup and Drop off day of the week

"""
%sql
SELECT COUNT(pickup_week) 
FROM new_york_city_taxi_trip_duration_csv
GROUP BY pickup_week
"""
distribution_pickup_week = updated_df.groupBy("pickup_week").count() 
distribution_dropoff_week = updated_df.groupBy("dropoff_week").count()

In [0]:
# Distribution Pickup day of week count 
display(distribution_pickup_week)

# Distribution dropoff day of week count 
display(distribution_dropoff_week)

#  For Pickup data

In [0]:
#  Distribution of Pickup and Drop off day of the day [morning(4hrs to 10hrs), midday(10hrs to 16hrs), evening(16hrs to 22hrs), and late night(22hrs to 4hrs)]
#  In Between 4 to 10 hours
dummy_df = updated_df.filter(func.col("pickup_hour").between('4','10'))
dummy_df.groupBy("pickup_hour").count().show()

#  In Between 10 to 16
dummy_df = updated_df.filter(func.col("pickup_hour").between('10','16'))
dummy_df.groupBy("pickup_hour").count().show()

#  In Between 16 to 22
dummy_df = updated_df.filter(func.col("pickup_hour").between('16','22'))
dummy_df.groupBy("pickup_hour").count().show()

#  In Between 22 to 24
dummy_df = updated_df.filter(func.col("pickup_hour").between('22','24'))
dummy_df.groupBy("pickup_hour").count().show()

# For Dropoff only

In [0]:
#  In Between 4 to 10 hours
dummy_df = updated_df.filter(func.col("dropoff_hour").between('4','10'))
dummy_df.groupBy("dropoff_hour").count().show()

#  In Between 10 to 16
dummy_df = updated_df.filter(func.col("dropoff_hour").between('10','16'))
dummy_df.groupBy("dropoff_hour").count().show()

#  In Between 16 to 22
dummy_df = updated_df.filter(func.col("dropoff_hour").between('16','22'))
dummy_df.groupBy("dropoff_hour").count().show()

#  In Between 22 to 24
dummy_df = updated_df.filter(func.col("dropoff_hour").between('22','24'))
dummy_df.groupBy("dropoff_hour").count().show()

In [0]:
# Distribution of the trip duration
updated_df.groupBy("trip_duration").count().show()

In [0]:
#  Relationship between Trip Duration and The time of the day
updated_df.groupBy("pickup_hour", "trip_duration").count().show()

In [0]:
updated_df.groupBy("pickup_minutes", "trip_duration").count().show()

In [0]:
#  Relationship between geographical location and duration
"""
  Solution of this is to create a new column of velocity this show the relationship between geographical location and duration
"""
def velocity_calculater(distance, time):
  velocity = distance/time
  return velocity

velocity = func.udf(velocity_calculater, DoubleType())
updated_df = updated_df.withColumn("velocity(KM/H)", velocity(updated_df["distance(KM)"], updated_df["trip_duration(hr)"]))
velocity_df = updated_df.select("distance(KM)", "trip_duration(hr)", "velocity(KM/H)")
display(velocity_df)