In [1]:
%run spark.ipynb

In [8]:
from pyspark.sql.types import *

start_date = '2018-05-01'
end_date = '2018-06-01'

ride_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("originId", IntegerType(), True),
    StructField("destinationId", IntegerType(), True), 
    StructField("price", FloatType(), True),
    StructField("status", StringType(), True),
    StructField("path", StringType(), True),
    StructField("commission", FloatType(), True),
    StructField("isPassengerRated", BooleanType(), True),
    StructField("isDriverRated", BooleanType(), True),
    StructField("code", StringType(), True), 
    StructField("passengerShare", IntegerType(), True), 
    StructField("cancelStatus", StringType(), True),
    StructField("driverId", IntegerType(), True), 
    StructField("passengerId", IntegerType(), True),
    StructField("congestionControl", IntegerType(), True),
    StructField("payment", IntegerType(), True),
    StructField("createdAt", TimestampType(), True),
    StructField("hasReturn", BooleanType(), True),
    StructField("returnPrice", IntegerType(), True),
    StructField("waitingTime", IntegerType(), True),
    StructField("waitingTimePrice", IntegerType(), True),
    StructField("dropAmount", IntegerType(), True),
    StructField("takerId", IntegerType(), True),
    StructField("creditInsufficientNotice", IntegerType(), True),
    StructField("rebatePercentage", IntegerType(), True), 
    StructField("cancellationReasonCode", StringType(), True),
    StructField("city", StringType(), True),
    StructField("carCategoryId", IntegerType(), True),
    StructField("finishedAt", TimestampType(), True),
    StructField("estimatedETA", DoubleType(), True),
    StructField("googleDistance", DoubleType(), True),
    StructField("surgeCoefficient", FloatType(), True),
    StructField("driveId", StringType(), True),
    StructField("passengerCount", IntegerType(), True),
    StructField("initiatedVia", StringType(), True), 
    StructField("driverEta", IntegerType(), True),
    StructField("passengerServiceCost", IntegerType(), True),
    StructField("matchCoefficient", FloatType(), True),
    StructField("carCategory", StringType(), True), 
    StructField("discount", IntegerType(), True),
    StructField("uniqueId", StringType(), True),
    StructField("paymentMethod", StringType(), True),
    StructField("updatedAt", TimestampType(), True)])

location_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("longitude", StringType(), True),
    StructField("latitude", StringType(), True), 
    StructField("address", StringType(), True), 
    StructField("neighbourhoodCode", StringType(), True), 
    StructField("congestionControl", IntegerType(), True),
    StructField("type", StringType(), True), 
    StructField("createdAt", TimestampType(), True), 
    StructField("updatedAt", TimestampType(), True),
    StructField("rideId", IntegerType(), True),
    StructField("seqNum", IntegerType(), True),
    StructField("shortAddress", StringType(), True)])  
    
    
ride_df_raw = spark.read.csv('./ride_2018_05.csv', schema=ride_schema, header=False)
location_df_raw = spark.read.csv('./location_2018_05.csv', schema=location_schema, header=False) 

ride_df_raw = ride_df_raw.filter(ride_df_raw["createdAt"] >= start_date).filter(ride_df_raw["createdAt"] < end_date)
location_df = location_df_raw.filter(location_df_raw["createdAt"] >= start_date).filter(location_df_raw["createdAt"] < end_date)


In [33]:
sample_ride_df = ride_df_raw.sample(fraction=1.0)

ride_df = sample_ride_df
print("len ride_df={}".format(ride_df.count()))

In [34]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# filter unique requests
INF = 1000000000
VALID_CREATED_AT_DIFF = 1200

# filter city
ride_df = ride_df.filter(ride_df["city"] == 'TEHRAN')


# todo: we did not use latlongcode
passenger_window = Window.partitionBy("passengerId").orderBy("createdAt")
ride_df = ride_df.withColumn("prevCreatedAt", F.lag(ride_df["createdAt"]).over(passenger_window))
ride_df = ride_df.withColumn("diffCreatedAt", F.when(F.isnull(ride_df["prevCreatedAt"]), INF).otherwise(F.unix_timestamp(ride_df["createdAt"]) - F.unix_timestamp(ride_df["prevCreatedAt"])))
ride_df = ride_df.filter(ride_df["diffCreatedAt"] > VALID_CREATED_AT_DIFF)


# filter short cancellations
# TODO: why don't we remove all cancelled rides
MINIMUM_VALID_CANCELLATION_DELAY = 10

ride_df = ride_df.filter((ride_df["status"] != 'CANCELED') | (F.unix_timestamp(ride_df["updatedAt"]) - F.unix_timestamp(ride_df["createdAt"]) > MINIMUM_VALID_CANCELLATION_DELAY))


# filter invalid distances
MINIMUM_VALID_DISTANCE = 0
MAXIMUM_VALID_DISTANCE = 65000

ride_df = ride_df.filter(ride_df["googleDistance"] > MINIMUM_VALID_DISTANCE).filter(ride_df["googleDistance"] < MAXIMUM_VALID_DISTANCE)


# filter invalid etas
MINIMUM_VALID_ETA = 180
MAXIMUM_VALID_ETA = 10800

ride_df = ride_df.filter(ride_df["estimatedETA"] > MINIMUM_VALID_ETA).filter(ride_df["estimatedETA"] < MAXIMUM_VALID_ETA)

# filter invalid speeds
# todo: definition of speed
MAXIMUM_VALID_SPEED = 28

ride_df = ride_df.withColumn("speed", ride_df["googleDistance"]/ride_df["estimatedETA"])



In [35]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta

# find region of rides
def get_grid_id(latitude, longitude):
    return "{}_{}".format(
        int(latitude/0.016486436999999996),
        int(longitude/0.030950938)
    )
udf_get_grid_id = F.udf(get_grid_id, StringType())

surge_df = ride_df

surge_df = surge_df.join(location_df, surge_df["originId"] == location_df["id"]).drop(location_df["createdAt"]).drop(location_df["id"])
surge_df = surge_df.withColumn("originLatitude", surge_df[ "latitude"].cast(DoubleType())).drop(surge_df["latitude"])
surge_df = surge_df.withColumn("originLongitude", surge_df["longitude"].cast(DoubleType())).drop(surge_df["longitude"])

surge_df = surge_df.withColumn("gridId", udf_get_grid_id(surge_df["originLatitude"], surge_df["originLongitude"]))

# truncate the datetime
def get_datetime_id(dt):
    td = timedelta(
        minutes=dt.minute % 5,
        seconds=dt.second,
        microseconds=dt.microsecond)

    return dt - td
udf_get_datetime_id = F.udf(get_datetime_id, TimestampType())

surge_df = surge_df.withColumn("datetimeId", udf_get_datetime_id(surge_df["createdAt"]))

speed_df = surge_df.groupBy('datetimeId').agg(F.avg('speed').alias('speed'))
region_df = surge_df.groupBy(surge_df["datetimeId"], surge_df["gridId"]).agg(F.count(surge_df["id"]).alias("count"), F.avg(surge_df["surgeCoefficient"]).alias("surge"))

speed_df.cache()
region_df.cache()

DataFrame[datetimeId: timestamp, gridId: string, count: bigint, surge: double]

In [36]:
print(region_df.count())
region_df.show()


634352
+-------------------+---------+-----+------------------+
|         datetimeId|   gridId|count|             surge|
+-------------------+---------+-----+------------------+
|2018-05-01 06:20:00|2169_1665|    3|               1.0|
|2018-05-01 08:20:00|2164_1658|    9|               1.0|
|2018-05-02 11:10:00|2169_1663|    2| 0.949999988079071|
|2018-05-02 15:50:00|2167_1660|    6| 0.800000011920929|
|2018-05-02 23:15:00|2170_1662|    5|1.2000000476837158|
|2018-05-05 06:00:00|2170_1662|    2| 1.100000023841858|
|2018-05-06 11:05:00|2166_1662|    4| 0.949999988079071|
|2018-05-06 18:55:00|2168_1659|    3| 0.800000011920929|
|2018-05-06 19:25:00|2170_1662|    6|0.8666666547457377|
|2018-05-07 20:30:00|2166_1662|    7|1.3285714047295707|
|2018-05-08 21:05:00|2166_1664|    2|0.8999999761581421|
|2018-05-09 08:30:00|2169_1659|   18|1.0666666825612385|
|2018-05-09 09:35:00|2172_1664|    2|               1.0|
|2018-05-09 10:50:00|2167_1664|   10| 0.949999988079071|
|2018-05-10 12:35:00|216

In [46]:
import math
import pyspark.sql.functions as F
from pyspark.sql.types import *


#region_df = surge_df.groupBy(surge_df["datetimeId"], surge_df["gridId"]).agg(F.count(surge_df["id"]).alias("count"), F.avg(surge_df["surgeCoefficient"]).alias("surge"))
#region_df = region_df.withColumn("date", region_df["datetimeId"].cast(DateType()))
#top_region_df = region_df.groupBy(region_df["gridId"], region_df["date"]).agg(F.sum(region_df["count"]).alias("sum"))

top_region_df = region_df.withColumn("date", F.to_date('datetimeId')).groupBy("gridId", "date").agg(F.sum("count").alias("sum"))
top_region_df = top_region_df.groupBy("gridId").agg(F.mean("sum").alias("mean"))
top_region_df = top_region_df.filter(F.col("mean") > 20)

region_filtered_df = region_df.join(top_region_df, 'gridId').drop('mean')

datetime_id_complete_df = region_filtered_df.select("datetimeId").distinct()
region_complete_df = datetime_id_complete_df.crossJoin(top_region_df.select('gridId'))

#region_complete_df.printSchema()
#region_df_filtered.printSchema()

#region_complete_df.show()
#region_df_filtered.show()

#region_complete_df = region_complete_df.join(region_df_filtered, (region_df_filtered["gridId"] == region_complete_df["gridId"]), how='left')
region_complete_df = region_complete_df.join(region_filtered_df.toDF(*region_filtered_df.columns),
                    ["gridId", "datetimeId"], how='left').fillna({'count': 0, 'surge': 1.0}).persist()
#region_complete_df.count()

#region_complete_df.printSchema()


region_complete_df.sort(['gridId', 'datetimeId']).show()



+---------+-------------------+-----+-----+
|   gridId|         datetimeId|count|surge|
+---------+-------------------+-----+-----+
|2147_1652|2018-05-01 00:00:00|    0|  1.0|
|2147_1652|2018-05-01 00:05:00|    0|  1.0|
|2147_1652|2018-05-01 00:10:00|    1|  1.0|
|2147_1652|2018-05-01 00:15:00|    0|  1.0|
|2147_1652|2018-05-01 00:20:00|    1|  1.0|
|2147_1652|2018-05-01 00:25:00|    0|  1.0|
|2147_1652|2018-05-01 00:30:00|    2|  1.0|
|2147_1652|2018-05-01 00:35:00|    1|  1.0|
|2147_1652|2018-05-01 00:40:00|    1|  1.0|
|2147_1652|2018-05-01 00:45:00|    0|  1.0|
|2147_1652|2018-05-01 00:50:00|    0|  1.0|
|2147_1652|2018-05-01 00:55:00|    1|  1.0|
|2147_1652|2018-05-01 01:00:00|    0|  1.0|
|2147_1652|2018-05-01 01:05:00|    0|  1.0|
|2147_1652|2018-05-01 01:10:00|    0|  1.0|
|2147_1652|2018-05-01 01:15:00|    0|  1.0|
|2147_1652|2018-05-01 01:20:00|    0|  1.0|
|2147_1652|2018-05-01 01:25:00|    1|  1.0|
|2147_1652|2018-05-01 01:30:00|    0|  1.0|
|2147_1652|2018-05-01 01:35:00| 

In [47]:
region_complete_df.filter(region_complete_df['count'] == 0).count()

534302

In [48]:

feature_df = region_complete_df.withColumn('date', F.to_date('datetimeId'))
feature_df = feature_df.withColumn('hour', F.hour('datetimeId'))
hour_translator = ['0-5'] * 6 + ['6-8'] * 3 + ['9-15'] * 7 + ['16-18'] * 3 + ['19-23'] * 5
feature_df = feature_df.withColumn('hourGroup', F.udf((lambda h: hour_translator[h]), StringType())('hour'))
feature_df = feature_df.withColumn('weekday', F.dayofweek('datetimeId'))
feature_df = feature_df.withColumn('yearday', F.dayofyear('datetimeId'))
feature_df = feature_df.join(speed_df, 'datetimeId', how='left').fillna({'speed': 28}) # todo: max speed
feature_df = feature_df.withColumn("speedLaged", F.lag("speed").over(Window.partitionBy("gridId").orderBy("datetimeId")))


In [51]:

def f(grid_id):
    x, y = [int(x) for x in grid_id.split('_')]
    return [('{}_{}'.format(x+i,y+j)) for i in [-1,0,1] for j in [-1,0,1] if i!=0 or j!=0]

hams_func = F.udf(f, ArrayType(StringType()))

hams = (feature_df.select('datetimeId', 'count', F.explode(hams_func('gridId')).alias('gridId')).
        groupby('datetimeId', 'gridId').agg(F.mean('count').alias('hamsCount')))

feature_df = feature_df.join(hams.toDF(*hams.columns), ['datetimeId', 'gridId'], how='left').\
    fillna({'hamsCount': 0}).withColumn("hamsCountLaged", F.lag("hamsCount").\
    over(Window.partitionBy("gridId").orderBy("datetimeId")))

feature_df.cache()
feature_df.show()

+-------------------+---------+-----+-----+----------+----+---------+-------+-------+------------------+------------------+-------------------+-------------------+
|         datetimeId|   gridId|count|surge|      date|hour|hourGroup|weekday|yearday|             speed|        speedLaged|          hamsCount|     hamsCountLaged|
+-------------------+---------+-----+-----+----------+----+---------+-------+-------+------------------+------------------+-------------------+-------------------+
|2018-05-01 00:00:00|2161_1660|    2|  1.0|2018-05-01|   0|      0-5|      3|    121| 9.923103428743413|              null|0.14285714285714285|               null|
|2018-05-01 00:05:00|2161_1660|    0|  1.0|2018-05-01|   0|      0-5|      3|    121| 9.681033710711315| 9.923103428743413| 0.2857142857142857|0.14285714285714285|
|2018-05-01 00:10:00|2161_1660|    1|  1.0|2018-05-01|   0|      0-5|      3|    121|10.218804663134165| 9.681033710711315|0.14285714285714285| 0.2857142857142857|
|2018-05-01 00:1

In [52]:

window = Window.partitionBy("gridId").orderBy("datetimeId")
feature_df = (feature_df
              .withColumn('m1', F.lag('count', count=1).over(window))
              .withColumn('m2', F.lag('count', count=2).over(window))
              .withColumn('m3', F.lag('count', count=3).over(window))
              .withColumn('d0', F.lag('count', count=24*12-1).over(window))
              .withColumn('d1', F.lag('count', count=24*12).over(window))
              .withColumn('d2', F.lag('count', count=24*12+1).over(window))
              .withColumn('w0', F.lag('count', count=7*24*12-1).over(window))
              .withColumn('w1', F.lag('count', count=7*24*12).over(window))
              .withColumn('w2', F.lag('count', count=7*24*12+1).over(window))
              )

feature_df = (feature_df
              .withColumn('unixts', F.unix_timestamp('datetimeId'))
              .withColumn('sd', F.sin(2*math.pi*F.col('unixts')/(24*3600)))
              .withColumn('cd', F.cos(2*math.pi*F.col('unixts')/(24*3600)))
              .withColumn('sw', F.sin(2*math.pi*F.col('unixts')/(7*24*3600)))
              .withColumn('cw', F.cos(2*math.pi*F.col('unixts')/(7*24*3600)))
              .drop('unixts')
              )

#TODO: add poisson regressor

feature_df.write.csv('./feature.csv', mode='overwrite', header=True)

In [53]:
feature_df.coalesce(1).write.csv('./feature.csv', mode='overwrite', header=True)