In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.sql import Window

spark = SparkSession.builder.appName("teste_aula").getOrCreate()

import pandas as pd
import statistics

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
pd.set_option('display.float_format', '{:.4f}'.format)

import plotly.express as px
import plotly.offline as pyo 
import plotly.graph_objs as go
from plotly.subplots import make_subplots

pyo.init_notebook_mode(connected=True)

In [2]:
#importing data
demand_data = spark.read.format("csv").load('gs://athena-tmp/rafa/demand_data.csv', header = True)
supply_data = spark.read.format("csv").load('gs://athena-tmp/rafa/supply_data.csv', header = True)

In [3]:
#casting data

demand_data = (demand_data.select(F.col('people_saw_0_cars').cast('int').alias('people_saw_0_cars'),
                                  F.col('people_saw_1_cars').cast('int').alias('people_saw_1_cars'),
                                  F.col('coverage_ratio').cast('float').alias('coverage_ratio'),
                                  'date'
                                 )
                          .orderBy('date',)
              )

supply_data = (supply_data.select(F.col('active_drivers').cast('int').alias('active_drivers'),
                                  F.col('online_h').cast('int').alias('online_h'),
                                  F.col('has_booking_h').cast('int').alias('has_booking_h'),
                                  F.col('waiting_for_booking_h').cast('int').alias('waiting_for_booking_h'),
                                  F.col('hours_per_active_driver').cast('float').alias('hours_per_active_driver'),
                                  F.col('rides_per_online_hours').cast('float').alias('rides_per_online_hours'),
                                  F.col('finished_rides').cast('int').alias('finished_rides'),
                                  'date'
                                 )
                         .orderBy('date')
              )

In [4]:
#historical demand data plot
fig = px.line(demand_data.toPandas(),
              x = "date",
              y = ['people_saw_0_cars','people_saw_1_cars']
             )
fig.show()

In [5]:
#joining data sets
complete_table = (demand_data.withColumn('total_demand', F.col('people_saw_0_cars') + F.col('people_saw_1_cars'))
                             .join(supply_data,'date','left')
                             .withColumn('demand_supply_ratio', F.col('total_demand') / F.col('active_drivers'))
                             .withColumn('hour', F.substring(F.col('date'),12,2))
                             .withColumn('real_date', F.substring(F.col('date'),1,10).cast('date'))
                  )

In [6]:
complete_table.select('total_demand','active_drivers','coverage_ratio').toPandas().describe()

Unnamed: 0,total_demand,active_drivers,coverage_ratio
count,839.0,839.0,839.0
mean,48.2825,61.6484,75.6615
std,36.1908,25.9282,18.3424
min,1.0,9.0,0.0
25%,25.0,41.0,66.0
50%,40.0,63.0,80.0
75%,64.0,80.5,89.0
max,270.0,129.0,100.0


In [7]:
#aggregating data
complete_table_agg = complete_table.groupBy('hour').agg(F.mean(F.col('total_demand')).alias('avg_demand'),
                                                        F.mean(F.col('active_drivers')).alias('avg_supply'),
                                                        F.mean(F.col('demand_supply_ratio')).alias('avg_ds_ratio'),
                                                        F.mean(F.col('coverage_ratio')).alias('avg_coverage_ratio'),
                                                        F.mean(F.col('people_saw_0_cars')).alias('avg_people_saw_0_cars'),
                                                        F.mean(F.col('finished_rides')).alias('avg_finished_rides'),
                                                       )

In [8]:
# coverage ratio behavior analysis
fig = px.box(complete_table.orderBy('hour').toPandas(),
              x = "hour",
              y = 'coverage_ratio'
             )
fig.show()

In [9]:
# demand/supply ratio behavior analysis
fig = px.box(complete_table.filter(F.col('hour').isin(['08','09','18'])).orderBy('hour').toPandas(),
              x = "hour",
              y = 'demand_supply_ratio'
             )
fig.show()

In [10]:
# demand and supply comparison
fig = px.line(complete_table_agg.orderBy('hour').toPandas(),
              x = "hour",
              y = ['avg_demand','avg_supply'],
              title = 'Average Supply and Demand Comparison per hour of the day',
              labels = 'average amount'
             )
fig.show()

In [11]:
# average coverage ratio per hour
fig = px.bar(complete_table_agg.orderBy('hour').toPandas(),
              x = "hour",
              y = 'avg_coverage_ratio',
              title = 'Average coverage ratio per hour of the day'
             )
fig.show()

In [12]:
# validating undersupply hours at 18 hour
complete_table.filter((F.col('hour') == '18') & (F.col('demand_supply_ratio') > 1)).count()

14

In [13]:
complete_table.filter((F.col('hour') == '09')).count()

35

In [14]:
# checking how much undersupplied bolt was at 8, 9 and 18 hour, based on coverage ratio

coverage_ratio_us = (complete_table.filter((F.col('hour').isin(['08', '09', '18'])) & (F.col('people_saw_0_cars') > 0))
                                   .select('hour','date','people_saw_0_cars')
                    )

In [15]:
coverage_ratio_us.show()

+----+-------------+-----------------+
|hour|         date|people_saw_0_cars|
+----+-------------+-----------------+
|  08|2016-11-14 08|               38|
|  09|2016-11-14 09|               43|
|  18|2016-11-14 18|               11|
|  08|2016-11-15 08|               58|
|  09|2016-11-15 09|               41|
|  18|2016-11-15 18|               13|
|  08|2016-11-16 08|               33|
|  09|2016-11-16 09|               24|
|  18|2016-11-16 18|               16|
|  08|2016-11-17 08|               23|
|  09|2016-11-17 09|               33|
|  18|2016-11-17 18|               34|
|  08|2016-11-18 08|               26|
|  09|2016-11-18 09|               25|
|  18|2016-11-18 18|               16|
|  08|2016-11-19 08|               12|
|  09|2016-11-19 09|               16|
|  18|2016-11-19 18|                6|
|  08|2016-11-20 08|                6|
|  09|2016-11-20 09|                7|
+----+-------------+-----------------+
only showing top 20 rows



In [29]:
# checking how much undersupplied bolt was at 8, 9 and 18 hour, based on demand supply ratio
ds_ratio_us = (complete_table.filter((F.col('hour').isin(['08', '09', '18'])) & (F.col('demand_supply_ratio') > 1))
                             .withColumn('undersupply', F.col('total_demand') - F.col('active_drivers'))
                             .select('hour','date','total_demand','active_drivers','undersupply','rides_per_online_hours','people_saw_0_cars')
              )

In [30]:
ds_ratio_us.show()

+----+-------------+------------+--------------+-----------+----------------------+-----------------+
|hour|         date|total_demand|active_drivers|undersupply|rides_per_online_hours|people_saw_0_cars|
+----+-------------+------------+--------------+-----------+----------------------+-----------------+
|  08|2016-11-14 08|          77|            47|         30|                  1.19|               38|
|  09|2016-11-14 09|          85|            55|         30|                  0.95|               43|
|  08|2016-11-15 08|          93|            44|         49|                  1.07|               58|
|  09|2016-11-15 09|          93|            59|         34|                  1.21|               41|
|  08|2016-11-16 08|          86|            55|         31|                   1.0|               33|
|  09|2016-11-16 09|          91|            71|         20|                  1.07|               24|
|  08|2016-11-17 08|          68|            59|          9|                  0.96

In [31]:
# rides per online hour
fig = px.box(ds_ratio_us.filter(F.col('hour').isin(['08','09','18'])).orderBy('hour').toPandas(),
              x = "hour",
              y = 'rides_per_online_hours'
             )
fig.show()

In [32]:
(ds_ratio_us.filter(F.col('hour').isin(['08'
                                        #,'09'
                                        #,'18'
                                       ])
                   )
            .select('undersupply','rides_per_online_hours','total_demand','people_saw_0_cars')
            .toPandas()
            .describe()
)

Unnamed: 0,undersupply,rides_per_online_hours,total_demand,people_saw_0_cars
count,21.0,21.0,21.0,21.0
mean,27.1905,1.0957,79.4762,37.5714
std,20.721,0.227,17.6086,16.1511
min,1.0,0.67,32.0,17.0
25%,9.0,0.94,71.0,26.0
50%,30.0,1.0,78.0,36.0
75%,45.0,1.26,93.0,50.0
max,72.0,1.58,112.0,80.0


In [33]:
(ds_ratio_us.filter(F.col('hour').isin([#'08'
                                        '09'
                                        #,'18'
                                       ])
                   )
            .select('undersupply','rides_per_online_hours','total_demand','people_saw_0_cars')
            .toPandas()
            .describe()
)

Unnamed: 0,undersupply,rides_per_online_hours,total_demand,people_saw_0_cars
count,21.0,21.0,21.0,21.0
mean,36.4762,1.1781,100.0952,41.4762
std,25.9029,0.2406,22.443,16.3543
min,5.0,0.86,62.0,17.0
25%,20.0,1.0,86.0,32.0
50%,30.0,1.13,97.0,39.0
75%,45.0,1.37,111.0,43.0
max,116.0,1.68,163.0,90.0


In [34]:
(ds_ratio_us.filter(F.col('hour').isin([#'08'
                                        #'09'
                                        '18'
                                       ])
                   )
            .select('undersupply','rides_per_online_hours','total_demand','people_saw_0_cars')
            .toPandas()
            .describe()
)

Unnamed: 0,undersupply,rides_per_online_hours,total_demand,people_saw_0_cars
count,14.0,14.0,14.0,14.0
mean,56.3571,1.1221,140.7143,53.7143
std,40.0338,0.2064,37.698,29.741
min,17.0,0.81,91.0,22.0
25%,31.25,1.055,121.25,31.75
50%,41.0,1.115,126.5,49.0
75%,67.25,1.1725,156.5,61.0
max,139.0,1.72,231.0,124.0


In [35]:
# undersupply volume
fig = px.box(ds_ratio_us.filter(F.col('hour').isin(['08','09','18'])).orderBy('hour').toPandas(),
              x = "hour",
              y = 'people_saw_0_cars'
             )
fig.show()

In [47]:
# drivers earnings

drivers_earnings_table = (complete_table.filter(F.col('hour').isin(['08', '09', '18']))
                                        .withColumn('earnings_per_drive', F.lit(10))
                                        .withColumn('total_earnings', F.col('finished_rides') * F.col('earnings_per_drive'))
                                        .withColumn('ideal_earnings', (F.col('finished_rides') + F.col('people_saw_0_cars')) * F.col('earnings_per_drive'))
                                        .select('hour','total_earnings','ideal_earnings','active_drivers')
                         )

In [48]:
drivers_earnings_table.filter(F.col('hour') == '08').toPandas().describe()

Unnamed: 0,total_earnings,ideal_earnings,active_drivers
count,35.0,35.0,35.0
mean,146.8571,402.0,47.2857
std,91.1938,261.8015,13.997
min,10.0,20.0,23.0
25%,40.0,140.0,39.0
50%,180.0,450.0,47.0
75%,220.0,605.0,60.0
max,310.0,970.0,69.0


In [49]:
drivers_earnings_table.filter(F.col('hour') == '09').toPandas().describe()

Unnamed: 0,total_earnings,ideal_earnings,active_drivers
count,33.0,33.0,35.0
mean,213.9394,507.8788,57.1143
std,123.6916,311.6564,17.7942
min,10.0,70.0,22.0
25%,80.0,160.0,45.5
50%,260.0,560.0,57.0
75%,300.0,700.0,73.5
max,370.0,1250.0,84.0


In [50]:
drivers_earnings_table.filter(F.col('hour') == '18').toPandas().describe()

Unnamed: 0,total_earnings,ideal_earnings,active_drivers
count,35.0,35.0,35.0
mean,266.0,531.1429,85.5429
std,110.8576,372.9593,12.4105
min,60.0,80.0,59.0
25%,175.0,250.0,79.0
50%,290.0,450.0,83.0
75%,345.0,725.0,92.0
max,470.0,1670.0,113.0
