In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.dataframe import *

In [2]:
spark = SparkSession.builder \
    .appName("taxi") \
    .getOrCreate() 

In [3]:
ride_df = spark.read.csv('data.csv', inferSchema=True, header=True)

In [4]:
ride_df.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- number: string (nullable = true)
 |-- pick_lat: double (nullable = true)
 |-- pick_lng: double (nullable = true)
 |-- drop_lat: double (nullable = true)
 |-- drop_lng: double (nullable = true)



In [5]:
ride_df.count()

8381556

In [6]:
ride_df.createOrReplaceTempView("rider")

In [7]:
print("Number of taxis", spark.sql('SELECT distinct number from rider where number >=0').count())

Number of taxis 94274


In [8]:
ride_df = ride_df.withColumn("number", ride_df["number"].cast(IntegerType()))
ride_df.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- number: integer (nullable = true)
 |-- pick_lat: double (nullable = true)
 |-- pick_lng: double (nullable = true)
 |-- drop_lat: double (nullable = true)
 |-- drop_lng: double (nullable = true)



In [9]:
# cleaned the dataset
ride_df = spark.sql('SELECT * from rider where number >=0')
ride_df.count()

8381183

In [10]:
ride_df.createOrReplaceTempView('rider')

In [11]:
ride_df.describe().show()

+-------+------------------+------------------+------------------+------------------+------------------+
|summary|            number|          pick_lat|          pick_lng|          drop_lat|          drop_lng|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|           8381183|           8381183|           8381183|           8381183|           8381183|
|   mean| 49836.32284571283|13.010247505849685| 77.62874600026429|13.010846611327048| 77.60708484316284|
| stddev|28903.490367448314|0.9032978033556502|0.4490997661942238|0.9516671807115192|1.4753016402249166|
|    min|             00000|         -48.77217|         -93.95859|        -57.946835|        -127.81789|
|    max|             99999|          67.69665|          92.81412|          61.52401|         174.88597|
+-------+------------------+------------------+------------------+------------------+------------------+



In [12]:
# averages of aggregated counts per hour
spark.sql("select per_hour, avg(rides) from(select date_trunc('hour',ts) per_hour, count(number) rides from rider group by 1)group by 1 order by 1").show()

+-------------------+----------+
|           per_hour|avg(rides)|
+-------------------+----------+
|2018-04-07 01:00:00|      20.0|
|2018-04-07 02:00:00|       5.0|
|2018-04-07 03:00:00|       1.0|
|2018-04-07 05:00:00|       6.0|
|2018-04-07 06:00:00|      66.0|
|2018-04-07 07:00:00|     178.0|
|2018-04-07 08:00:00|     319.0|
|2018-04-07 09:00:00|     698.0|
|2018-04-07 10:00:00|     506.0|
|2018-04-07 11:00:00|     611.0|
|2018-04-07 12:00:00|     677.0|
|2018-04-07 13:00:00|     551.0|
|2018-04-07 14:00:00|     549.0|
|2018-04-07 15:00:00|     472.0|
|2018-04-07 16:00:00|     544.0|
|2018-04-07 17:00:00|     607.0|
|2018-04-07 18:00:00|     696.0|
|2018-04-07 19:00:00|     945.0|
|2018-04-07 20:00:00|     936.0|
|2018-04-07 21:00:00|     412.0|
+-------------------+----------+
only showing top 20 rows



In [13]:
# averages of aggregated counts per day
spark.sql("select per_day, avg(rides) from(select date_trunc('day',ts) per_day, count(number) rides from rider group by 1)group by 1 order by 1").show()

+-------------------+----------+
|            per_day|avg(rides)|
+-------------------+----------+
|2018-04-07 00:00:00|    9374.0|
|2018-04-08 00:00:00|    5716.0|
|2018-04-09 00:00:00|   11552.0|
|2018-04-10 00:00:00|   11186.0|
|2018-04-11 00:00:00|   12126.0|
|2018-04-12 00:00:00|   10359.0|
|2018-04-13 00:00:00|   13468.0|
|2018-04-14 00:00:00|    6588.0|
|2018-04-15 00:00:00|    4192.0|
|2018-04-16 00:00:00|   10534.0|
|2018-04-17 00:00:00|    9454.0|
|2018-04-18 00:00:00|    9644.0|
|2018-04-19 00:00:00|   12261.0|
|2018-04-20 00:00:00|   10061.0|
|2018-04-21 00:00:00|    6916.0|
|2018-04-22 00:00:00|    3225.0|
|2018-04-23 00:00:00|    9049.0|
|2018-04-24 00:00:00|    8205.0|
|2018-04-25 00:00:00|    9159.0|
|2018-04-26 00:00:00|    9009.0|
+-------------------+----------+
only showing top 20 rows



In [14]:
# averages of aggregated counts per week
spark.sql("select per_week, avg(rides) from(select date_trunc('week',ts) per_week, count(number) rides from rider group by 1)group by 1 order by 1").show()

+-------------------+----------+
|           per_week|avg(rides)|
+-------------------+----------+
|2018-04-02 00:00:00|   15090.0|
|2018-04-09 00:00:00|   69471.0|
|2018-04-16 00:00:00|   62095.0|
|2018-04-23 00:00:00|   55833.0|
|2018-04-30 00:00:00|   47553.0|
|2018-05-07 00:00:00|   51371.0|
|2018-05-14 00:00:00|   56614.0|
|2018-05-21 00:00:00|   57283.0|
|2018-05-28 00:00:00|   63352.0|
|2018-06-04 00:00:00|   59812.0|
|2018-06-11 00:00:00|   60669.0|
|2018-06-18 00:00:00|   63241.0|
|2018-06-25 00:00:00|   60759.0|
|2018-07-02 00:00:00|   61134.0|
|2018-07-09 00:00:00|   64492.0|
|2018-07-16 00:00:00|   65746.0|
|2018-07-23 00:00:00|   67281.0|
|2018-07-30 00:00:00|   64243.0|
|2018-08-06 00:00:00|   74065.0|
|2018-08-13 00:00:00|   79563.0|
+-------------------+----------+
only showing top 20 rows



In [15]:
# averages of aggregated counts per month
spark.sql("select per_month, avg(rides) from(select date_trunc('month',ts) per_month, count(number) rides from rider group by 1)group by 1 order by 1").show()


+-------------------+----------+
|          per_month|avg(rides)|
+-------------------+----------+
|2018-04-01 00:00:00|  209246.0|
|2018-05-01 00:00:00|  246221.0|
|2018-06-01 00:00:00|  263026.0|
|2018-07-01 00:00:00|  282412.0|
|2018-08-01 00:00:00|  389824.0|
|2018-09-01 00:00:00|  573085.0|
|2018-10-01 00:00:00|  623563.0|
|2018-11-01 00:00:00|  610488.0|
|2018-12-01 00:00:00|  671155.0|
|2019-01-01 00:00:00|  876294.0|
|2019-02-01 00:00:00| 1293469.0|
|2019-03-01 00:00:00| 1761307.0|
|2019-04-01 00:00:00|  581093.0|
+-------------------+----------+



In [16]:
export_week = spark.sql("select ts,  date_trunc('week', ts) ts_week, number, pick_lat, pick_lng, drop_lat, drop_lng from rider")
export_week.show()

+-------------------+-------------------+------+----------+-----------------+---------+-----------------+
|                 ts|            ts_week|number|  pick_lat|         pick_lng| drop_lat|         drop_lng|
+-------------------+-------------------+------+----------+-----------------+---------+-----------------+
|2018-04-07 07:07:17|2018-04-02 00:00:00| 14626|12.3136215|76.65819499999998|12.287301|76.60228000000002|
|2018-04-07 07:32:27|2018-04-02 00:00:00| 85490| 12.943947|        77.560745|12.954014|         77.54377|
|2018-04-07 07:36:44|2018-04-02 00:00:00| 05408| 12.899603|          77.5873| 12.93478|         77.56995|
|2018-04-07 07:38:00|2018-04-02 00:00:00| 58940| 12.918229|77.60754399999998|12.968971|        77.636375|
|2018-04-07 07:39:29|2018-04-02 00:00:00| 05408|  12.89949|77.58726999999998| 12.93478|         77.56995|
|2018-04-07 07:43:08|2018-04-02 00:00:00| 05408| 12.899421|        77.587326| 12.93478|         77.56995|
|2018-04-07 07:43:55|2018-04-02 00:00:00| 5026

In [18]:
# dumping results for cohort analysis
export_week.coalesce(1).write.csv('result-dumps-week')

In [None]:
# trying out window funcs
spark.sql("select count(number), date_trunc('week',first_time) from(select distinct number, first_value(ts) over (partition by number order by ts) as first_time from rider) group by 2 order by 2").show(53)