In [0]:
#importing pyspark libraries
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.types import *

# loading the data
uberdf = spark.read.csv("/FileStore/dataset.csv",header=True,inferSchema=True)

#replacing the nulls with previous know dates
uberdf=uberdf.withColumn("Counter",monotonically_increasing_id())
uberdf=uberdf.withColumn("Dates",when(uberdf.Date>1,uberdf.Date).otherwise(last(uberdf.Date,ignorenulls=True).over(Window.orderBy("Counter"))))
uberdf.drop("Counter")
#uberdf=uberdf.withColumn("Time (Local)",col("`Time (Local)`").cast(TimestampType()))
uberdf=uberdf.select(to_date(col("Dates"),"dd-MMM-yy").alias("Date"),hour(to_timestamp(col("Time (Local)")*3600)).alias("Time"),"`Eyeballs `","`Zeroes `","`Completed Trips `","`Requests `","`Unique Drivers`")
display(uberdf)

Date,Time,Eyeballs,Zeroes,Completed Trips,Requests,Unique Drivers
2012-09-10,7,5,0,2,2,9
2012-09-10,8,6,0,2,2,14
2012-09-10,9,8,3,0,0,14
2012-09-10,10,9,2,0,1,14
2012-09-10,11,11,1,4,4,11
2012-09-10,12,12,0,2,2,11
2012-09-10,13,9,1,0,0,9
2012-09-10,14,12,1,0,0,9
2012-09-10,15,11,2,1,2,7
2012-09-10,16,11,2,3,4,6


**1. Date had the most completed trips**

In [0]:
from pyspark.sql.functions import *

tripsByDate=uberdf.groupBy("Date").sum("Completed Trips ")
dateMostCompletedTrips = tripsByDate.orderBy("sum(Completed Trips )", ascending=False).select("Date").first()["Date"]
print(dateMostCompletedTrips)

2012-09-22


**2. The highest number of completed trips within a 24-hour period**

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import Window

#max trips in a day
highestTrips=tripsByDate.orderBy("sum(Completed Trips )", ascending=False).select(col("sum(Completed Trips )").alias("MaxTrips")).first()["MaxTrips"]
print(highestTrips)

#max trips in rolling 24 hours period

'''
tripsby24hrwindow=uberdf.groupBy(window("Time","24 hours")).agg(sum("Completed Trips ").alias("CompletedTrips")).orderBy("CompletedTrips",ascending=False)
tripsby24hrwindow.show(truncate=False)

'''

248
Out[30]: '\ntripsby24hrwindow=uberdf.groupBy(window("Time","24 hours")).agg(sum("Completed Trips ").alias("CompletedTrips")).orderBy("CompletedTrips",ascending=False)\ntripsby24hrwindow.show(truncate=False)\n\n'

**Busiest Hour of the day (had the most requests)**

In [0]:
from pyspark.sql.functions import *

maxtripsbyhours = uberdf.groupBy(col("Time").alias("Hours")).agg(sum("Requests ").alias("No of requests"))
maxhour = maxtripsbyhours.orderBy("No of requests",ascending=False).select("Hours","No of requests")
maxhour.show()

+-----+--------------+
|Hours|No of requests|
+-----+--------------+
|   23|           184|
|   22|           174|
|   19|           156|
|    0|           142|
|   18|           119|
|   21|           112|
|   20|           107|
|    2|           100|
|   17|            98|
|    1|            96|
|   16|            82|
|   15|            71|
|   14|            71|
|   13|            55|
|   12|            53|
|   11|            47|
|    3|            35|
|    8|            29|
|    6|            28|
|   10|            28|
+-----+--------------+
only showing top 20 rows



**Percentages of all zeroes occurred on weekends (Friday at 5 pm to Sunday at 3 am)**

In [0]:
from pyspark.sql.functions import *

weekendZeros = uberdf.filter((col("Time")>=17) & (dayofweek(col("Date"))==5)).agg(sum(col("Zeroes ")).alias("weekendZeros")).collect()[0]["weekendZeros"] + \
uberdf.filter((col("Time")<3) & (dayofweek(col("Date"))==7)).agg(sum(col("Zeroes ")).alias("weekendZeros")).collect()[0]["weekendZeros"]+\
uberdf.filter(dayofweek(col("Date"))==6).agg(sum(col("Zeroes ")).alias("weekendZeros")).collect()[0]["weekendZeros"]
print(weekendZeros)
weekZeros = uberdf.agg(sum(col("Zeroes ")).alias("weekZeros")).collect()[0]["weekZeros"]
print(weekZeros)


percentageWeekendZeros = weekendZeros/weekZeros * 100
print("Percentage of weeekend zeros",percentageWeekendZeros)

#weekendZeros = uberdf.where(dayofweek(col("Date"))==6 | dayofweek(col("Date"))==7).sum(col("Zeroes"))

297
1429
Percentage of weeekend zeros 20.78376487053884


**The weighted average ratio of completed trips per driver**

In [0]:
from pyspark.sql.functions import *

weightedAvg = uberdf.withColumn("CompletedTripsbyDriver",uberdf["Completed Trips "]/uberdf["Unique Drivers"]).groupBy("Date","Time").agg(avg("CompletedTripsbyDriver").alias("Avg_CompletedTripsbyDriver"),sum("CompletedTripsbyDriver").alias("total_CompletedTripsbyDriver")).withColumn("WeightedRatio", col("Avg_CompletedTripsbyDriver") * col("total_CompletedTripsbyDriver")).agg(sum("WeightedRatio")/sum("total_CompletedTripsbyDriver")).collect()[0][0]
print(weightedAvg)

0.8395043084828406


**Time when to consider a true “end day” instead of midnight (i.e when are supply and demand at both their natural minimums)**

In [0]:
from pyspark.sql.functions import *

endDayHour = uberdf.groupBy("Time").agg(avg("Requests ").alias("Requests"),avg("Unique Drivers").alias("Drivers")).orderBy("Requests","Drivers",ascending=True).first()["Time"]
print(endDayHour)

4


**Adding more drivers to any single hour of every day based on demand**

In [0]:
from pyspark.sql.functions import *

moreDriverHour = uberdf.groupBy("Time").agg((sum("requests ")/countDistinct("Unique Drivers")).alias("reqperdriver")).orderBy("reqperdriver",ascendening=True).first()["Time"]
print(moreDriverHour)

4
