In [1]:
#Author: Pranav Thaenraj
#Since: 06/25/19
#Yellow Cabs in NYC

#Yellow Cab and Zone Data From https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
#Instantiating the Path of both datasets
zoneLookup_path = "dbfs:/FileStore/tables/taxi__zone_lookup.csv"
yellowCab_path = "dbfs:/FileStore/tables/yellow_tripdata_2020_01_csv.001"

#Determination of if the csv file has built in schema, the first row is the header, and the delimiter for the file
infer_schema = "false"
first_row_is_harder = "true"
delimiter = ","

#reading the zone Lookup data into a dataframe
zone_df = spark.read.format("csv").option("inferSchema",infer_schema) \
  .option("header",first_row_is_harder) \
  .option("sep",delimiter).load(zoneLookup_path)


#reading the yellow cab data into dataframe
cab_df = spark.read.format("csv").option("inferSchema",infer_schema) \
  .option("header",first_row_is_harder) \
  .option("sep",delimiter).load(yellowCab_path)


#Cleansing zone Data
from pyspark.sql.functions import expr
zone_df = zone_df.where(zone_df["Borough"] != "Unknown").where(zone_df["Zone"] != "NA")
#display(zone_df) 

#Finding the duration of each trip in cab_df
##Casting the pickup and dropoff columns to timestamp types
from pyspark.sql.functions import coalesce, to_date
from pyspark.sql import functions as F 

cab_df = cab_df.withColumn( 'pickup_timeStamp', F.unix_timestamp(cab_df['tpep_pickup_datetime'], 'yyyy-MM-dd HH:mm:ss').cast('timestamp'))
cab_df = cab_df.withColumn('drop_timeStamp',F.unix_timestamp(cab_df['tpep_dropoff_datetime'], 'yyyy-MM-dd HH:mm:ss').cast('timestamp'))

# Subtracting the timestamp values to produce the duration of each trip
cab_df = cab_df.withColumn('diff_timeStamp',(F.col("drop_timeStamp").cast("long") - F.col("pickup_timeStamp").cast("long"))/60)

#Speed of each Taxi
cab_df = cab_df.withColumn("Speed(mph)", F.col('trip_distance')/(F.col('diff_timeStamp').cast("long")/60))

#joining the ZoneDf locatioonID with CabDf pickup and dropoff 
#Pick up join code and selection of needed columns from cab_df
cabJ_df = cab_df.join(zone_df, cab_df['PULocationID'] == zone_df['LocationID']).select('VendorID','trip_distance','tpep_pickup_datetime','tpep_dropoff_datetime', 'DOLocationID','passenger_count','total_amount','Speed(mph)','LocationID','Borough','Zone','service_zone')

#Renaming joined columns
cabJ_df = cabJ_df.withColumnRenamed("LocationID","PULocationID").withColumnRenamed("Borough","PUBorough").withColumnRenamed("Zone","PUZone").withColumnRenamed("service_zone",'PUService_zone')

#Joining Drop off coulumns
cabJ_df = cabJ_df.join(zone_df, cabJ_df['DOLocationID'] == zone_df['LocationID'])
#renaming joined columns
cabJ_df =cabJ_df.withColumnRenamed("LocationID","DOLocationID").withColumnRenamed("Borough","DOBorough").withColumnRenamed("Zone","DOZone").withColumnRenamed("service_zone",'DOService_zone')
#display(cabJ_df)

#Grouping the data by the Pickup and Dropoff Boroughs and calculating the average speed of cabs and average distance travelled in between these Boroughs
cabJ_df = cabJ_df.withColumn("trip_distance", cabJ_df["trip_distance"].cast("Double"))
Borough_speed_DF = cabJ_df.groupBy("PUBorough","DOBorough").avg("Speed(mph)").toDF("PUBorough","DOBorough","avg(Speed(mph))")
Borough_dist_DF = cabJ_df.groupBy("PUBorough","DOBorough").avg("trip_distance").toDF("PUBorough","DOBorough","avg(trip_distance)")

#Filtering cab travel that Only occurs in the same Borough
from pyspark.sql.functions import expr,col
Borough_dist_DF = Borough_dist_DF.where(expr("PUBorough == DOBorough"))
Borough_speed_DF = Borough_speed_DF.where(expr("PUBorough == DOBorough"))

#Joining the Borough_dist_DF and Borough_speed_DF to get both avg speed and avg distance in same DF
SameBorough_DF = Borough_speed_DF.join(Borough_dist_DF,Borough_speed_DF["PUBorough"] == Borough_dist_DF["PUBorough"])

SameBorough_Frequency = cabJ_df.groupBy("PUBorough","DOBorough").count().where(expr("PUBorough == DOBorough")).toDF("PUBorough","DOBorough","count")
total_Frequency = cabJ_df.groupBy("PUBorough","DOBorough").count().toDF("PUBorough","DOBorough","count")

from pyspark.sql.functions import sum
Same_count =SameBorough_Frequency.select(sum("count"))
Total_count = total_Frequency.select(sum("count"))

#proportion of yellow cab travel that is Intra_Borough = 95957/113396 ==.8462 
display(SameBorough_DF)
# given that about 85% for travel is Intra-Borough it becomes appearent that almost all of the cab travel in NYC is of short distance and ocurrs and slow speeds

#From This graph we can see that the Borough of EWR is most Enviromentally detrimental to take a cab in because the average distance that a cab travels within the Borough is .1 miles -- a distance that is easily walkable and the average speed inside of this borough is 6mph -- a speed thats not very far off from that of walking speed. in such a borough, where tracvel distance is short and the speed of travel is slow, using cabs is detrimental to the environment and the reduction of cab usage in this borough can lower the passenger's overall carbon footprint

#Moreover in most cases, inter-borough travel averages at under 3 miles and the travel speed is always under 25 miles per hour, a speed at which travel through cab is unecessacry and environmentally detrimental. Thus when travelling in between locations within the same Borough its important for people to consider the environmental effects of using a cab to travel such a menial distance and choose the greener option of walking over taking a cab, especially when the distance is easily walkable.



PUBorough,DOBorough,avg(Speed(mph)),PUBorough.1,DOBorough.1,avg(trip_distance)
Queens,Queens,24.257894822688677,Queens,Queens,4.872690389092121
EWR,EWR,5.999999999999999,EWR,EWR,0.162
Brooklyn,Brooklyn,13.63066936972011,Brooklyn,Brooklyn,2.641750285062715
Staten Island,Staten Island,22.92173913043478,Staten Island,Staten Island,4.013333333333334
Manhattan,Manhattan,13.882794734997898,Manhattan,Manhattan,2.1500358947088607
Bronx,Bronx,14.701467990579118,Bronx,Bronx,2.432716049382716
