In [1]:
# TODO : add code here

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import dayofweek, to_date, to_timestamp, date_format, year, hour, minute, month, when, dayofmonth, dayofweek
from pyspark.sql.functions import concat_ws, substring, concat, lpad, lit
from pyspark.sql.functions import round, sum, count, avg
from pyspark.sql.functions import lag
from pyspark.sql.window import Window
from pyspark.sql import functions, types
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

In [3]:
input_df = spark.read.parquet("hdfs://hdfs/ny-taxi-data/raw")

In [4]:
input_df.count()

12106669

In [7]:
df = input_df.select(
        to_date(input_df.pickup_datetime).alias("day_date")
        , year(input_df.pickup_datetime).alias('year')
        , month(input_df.pickup_datetime).alias('month')
        , dayofmonth(input_df.pickup_datetime).alias("dayofmonth")
        , dayofweek(input_df.pickup_datetime).alias("dayofweek")
        , hour(input_df.pickup_datetime).alias("hour")
        , minute(input_df.pickup_datetime).alias("minute")
        , input_df.driver_pay
    )
    
df = df.withColumn("minute_group", when(df.minute < 30, '00').otherwise('30'))
df = df.withColumn("time_group",concat_ws(":", lpad(df.hour, 2, '0'), df.minute_group, lit('00')))
df = df.withColumn("ts",concat_ws(" ", df.day_date, df.time_group))
    
dfs = df.select(
        to_timestamp(df.ts, "yyyy-MM-dd HH:mm:ss").alias("date_group")
        , df.minute_group
        , df.year
        , df.hour
        , df.month
        , df.dayofmonth
        , df.dayofweek
        , df.driver_pay
).groupby("date_group", "minute_group", "hour", "year", "month", "dayofmonth", "dayofweek").agg(functions.count('driver_pay').alias('no_rides'), functions.round(functions.sum('driver_pay'), 2).alias('total_bill'), functions.round(functions.avg('driver_pay'), 2).alias('avg_bill')).orderBy("date_group")

In [8]:
dfs.show()

+-------------------+------------+----+----+-----+----------+---------+--------+----------+--------+
|         date_group|minute_group|hour|year|month|dayofmonth|dayofweek|no_rides|total_bill|avg_bill|
+-------------------+------------+----+----+-----+----------+---------+--------+----------+--------+
|2020-09-01 00:00:00|          00|   0|2020|    9|         1|        3|    4794|  63682.32|   13.28|
|2020-09-01 00:30:00|          30|   0|2020|    9|         1|        3|    3600|  47761.62|   13.27|
|2020-09-01 01:00:00|          00|   1|2020|    9|         1|        3|    3036|  40665.95|   13.39|
|2020-09-01 01:30:00|          30|   1|2020|    9|         1|        3|    2511|  32701.71|   13.02|
|2020-09-01 02:00:00|          00|   2|2020|    9|         1|        3|    1860|  24725.59|   13.29|
|2020-09-01 02:30:00|          30|   2|2020|    9|         1|        3|    1567|  21246.81|   13.56|
|2020-09-01 03:00:00|          00|   3|2020|    9|         1|        3|    1380|  19993.27|

In [9]:

windowSpec  = Window.partitionBy("hour").orderBy("date_group")
    
dfs = dfs.withColumn("lag",lag("no_rides",2).over(windowSpec))
dfs = dfs.filter("lag IS NOT NULL")
    
scaler = StandardScaler()
classifier = IsolationForest(contamination=0.005, n_estimators=200, max_samples=0.7, random_state=42, n_jobs=-1)
    
df_model = dfs.select(dfs.minute_group, dfs.hour, dfs.year, dfs.month, dfs.dayofmonth, dfs.dayofweek, dfs.no_rides, dfs.total_bill, dfs.avg_bill, dfs.lag)
    
x_train = scaler.fit_transform(df_model.collect())
clf = classifier.fit(x_train)
    
SCL = spark.sparkContext.broadcast(scaler)
CLF = spark.sparkContext.broadcast(clf)
    
def predict_using_broadcasts(minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag):
    prediction = 0
    x_test = [[minute_group, hour, year, month, dayofmonth, dayofweek, no_rides, total_bill, avg_bill, lag]]
    try:
        x_test = SCL.value.transform(x_test)
        prediction = CLF.value.predict(x_test)[0]
    except ValueError:
        import traceback
        traceback.print_exc()
        print('Cannot predict:', x_test)
    return int(prediction)
    
udf_predict_using_broadcasts = functions.udf(predict_using_broadcasts, types.IntegerType())

df_pred = dfs.withColumn(
    'prediction',
    udf_predict_using_broadcasts('minute_group', 'hour', 'year', 'month', 'dayofmonth', 'dayofweek', 'no_rides', 'total_bill', 'avg_bill', 'lag')
)

# map to table columns
df_out = df_pred.select(
    df_pred.date_group.alias("pickup_ts")
    , df_pred.minute_group.alias("pickup_minute_group")
    , df_pred.hour.alias("pickup_hour")
    , df_pred.year.alias("pickup_year")
    , df_pred.month.alias("pickup_month")
    , df_pred.dayofmonth.alias("pickup_dayofmonth")
    , df_pred.dayofweek.alias("pickup_dayofweek")
    , df_pred.no_rides.alias("norides")
    , df_pred.total_bill.alias("total_bill")
    , df_pred.avg_bill.alias("avg_bill")
    , df_pred.lag.alias("norides_lag")
    , df_pred.prediction.alias("pred")
)

In [10]:
df_out.show()

+-------------------+-------------------+-----------+-----------+------------+-----------------+----------------+-------+----------+--------+-----------+----+
|          pickup_ts|pickup_minute_group|pickup_hour|pickup_year|pickup_month|pickup_dayofmonth|pickup_dayofweek|norides|total_bill|avg_bill|norides_lag|pred|
+-------------------+-------------------+-----------+-----------+------------+-----------------+----------------+-------+----------+--------+-----------+----+
|2020-09-02 00:00:00|                 00|          0|       2020|           9|                2|               4|   5196|  69838.45|   13.44|       4794|   1|
|2020-09-02 00:30:00|                 30|          0|       2020|           9|                2|               4|   3818|  51395.77|   13.46|       3600|   1|
|2020-09-03 00:00:00|                 00|          0|       2020|           9|                3|               5|   6113|  79580.45|   13.02|       5196|   1|
|2020-09-03 00:30:00|                 30|     

In [14]:
df_out.persist()

DataFrame[pickup_ts: timestamp, pickup_minute_group: string, pickup_hour: int, pickup_year: int, pickup_month: int, pickup_dayofmonth: int, pickup_dayofweek: int, norides: bigint, total_bill: double, avg_bill: double, norides_lag: bigint, pred: int]

In [18]:
import pandas as pd

In [None]:
total_bill = df_out.select("total_bill").toPandas()