In [1]:
import pandas as pd
import numpy as np
import datetime 
import time

import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType
from pyspark.sql import Window

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext()
spark = SparkSession(sc)

from pyspark.ml.recommendation import ALS
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt 

In [50]:
events=spark.read.csv('data/events.csv',  header= True, inferSchema = True)
events = events.withColumn('datetime', f.from_unixtime((events.timestamp.cast('bigint')/1000)).cast('timestamp'))

In [6]:
events.where(f.col("visitorid")==102019).show()

+-------------+---------+-----------+------+-------------+-------------------+
|    timestamp|visitorid|      event|itemid|transactionid|           datetime|
+-------------+---------+-----------+------+-------------+-------------------+
|1433176736375|   102019|transaction|150318|        13556|2015-06-01 12:38:56|
|1433175894837|   102019|       view| 49521|         null|2015-06-01 12:24:54|
|1433176042269|   102019|       view| 49521|         null|2015-06-01 12:27:22|
|1433175812596|   102019|       view|150318|         null|2015-06-01 12:23:32|
|1433175801314|   102019|  addtocart| 49521|         null|2015-06-01 12:23:21|
|1433175871497|   102019|       view| 49521|         null|2015-06-01 12:24:31|
|1433175714335|   102019|       view| 49521|         null|2015-06-01 12:21:54|
|1433175945872|   102019|       view|150318|         null|2015-06-01 12:25:45|
|1433176736422|   102019|transaction| 49521|        13556|2015-06-01 12:38:56|
+-------------+---------+-----------+------+--------

In [51]:
#preprocessing data
    #count number of items that visitors view
df_nbview = events.groupby("visitorid").agg(f.count("event")).withColumnRenamed("count(DISTINCT itemid)", "nb_views")
    #drop visitors that view one item
visitors_to_drop = df_nbview.where(f.col("count(event)")==1).select("visitorid")
events = events.join(visitors_to_drop, [events.visitorid == visitors_to_drop.visitorid], how='left_anti')

    #convert events to values
def convert_event_value(event):
    if event == 'transaction':
        return 30
    if event == 'addtocart':
        return 10
    if event == 'view':
        return 1
    return 0

udf_convert_event = f.udf(convert_event_value, IntegerType())
events = events.withColumn("event", udf_convert_event(f.col("event")))

In [13]:
df_nbview.show()

+---------+------------+
|visitorid|count(event)|
+---------+------------+
|   361387|          15|
|  1282360|           5|
|  1354794|           1|
|   126191|           1|
|   957827|           1|
|    95994|           3|
|  1105097|         117|
|  1048223|           5|
|  1211143|           1|
|  1117990|           1|
|   743709|           1|
|   592542|          27|
|  1253460|           1|
|  1282975|           1|
|  1038881|           3|
|   160820|           4|
|   173691|           2|
|   580848|           4|
|  1145624|           2|
|  1310867|           1|
+---------+------------+
only showing top 20 rows



In [15]:
df_nbview.where(f.col("count(event)")==1).show()

+---------+------------+
|visitorid|count(event)|
+---------+------------+
|  1354794|           1|
|   126191|           1|
|   957827|           1|
|  1211143|           1|
|  1117990|           1|
|   743709|           1|
|  1253460|           1|
|  1282975|           1|
|  1310867|           1|
|     2366|           1|
|  1359999|           1|
|   101094|           1|
|   861999|           1|
|   720813|           1|
|   929463|           1|
|   188644|           1|
|   886150|           1|
|    29719|           1|
|  1276602|           1|
|  1080187|           1|
+---------+------------+
only showing top 20 rows



In [16]:
events.show()

+-------------------+---------+------+-----+
|           datetime|visitorid|itemid|event|
+-------------------+---------+------+-----+
|2015-07-03 11:12:12|      266|340315|    1|
|2015-08-01 00:42:41|      474|415781|    1|
|2015-07-04 23:08:19|      648|321984|    1|
|2015-05-06 12:46:39|      916|101288|    1|
|2015-05-29 12:54:29|      925|328025|    1|
|2015-05-29 13:13:01|      925|328025|    1|
|2015-05-29 11:43:06|      925|328025|    1|
|2015-08-06 12:50:32|     1370|104613|    1|
|2015-07-01 00:04:07|     1972|252780|    1|
|2015-05-18 19:04:14|     2133|137697|   30|
|2015-05-26 23:07:40|     2180| 11893|    1|
|2015-05-26 23:13:46|     2180| 11893|    1|
|2015-05-27 00:08:21|     2180| 11893|    1|
|2015-05-26 23:39:24|     2180| 11893|    1|
|2015-09-14 18:35:11|     2197|454331|    1|
|2015-06-23 17:56:32|     2320| 58161|    1|
|2015-05-11 11:30:23|     2733| 29298|    1|
|2015-08-24 12:01:56|     2849|128039|    1|
|2015-08-25 17:45:43|     3013| 49257|    1|
|2015-06-2

In [53]:
events.where(f.col("visitorid")==102019).orderBy('timestamp').show()

+-------------+---------+-----+------+-------------+-------------------+
|    timestamp|visitorid|event|itemid|transactionid|           datetime|
+-------------+---------+-----+------+-------------+-------------------+
|1433175714335|   102019|    1| 49521|         null|2015-06-01 12:21:54|
|1433175801314|   102019|   10| 49521|         null|2015-06-01 12:23:21|
|1433175812596|   102019|    1|150318|         null|2015-06-01 12:23:32|
|1433175871497|   102019|    1| 49521|         null|2015-06-01 12:24:31|
|1433175894837|   102019|    1| 49521|         null|2015-06-01 12:24:54|
|1433175945872|   102019|    1|150318|         null|2015-06-01 12:25:45|
|1433176042269|   102019|    1| 49521|         null|2015-06-01 12:27:22|
|1433176736375|   102019|   30|150318|        13556|2015-06-01 12:38:56|
|1433176736422|   102019|   30| 49521|        13556|2015-06-01 12:38:56|
+-------------+---------+-----+------+-------------+-------------------+



In [54]:
#keep one interaction for each pair visitor-item
w = Window.partitionBy(events["visitorid"], events["itemid"])
events = events.withColumn('maxEvent', f.max(f.col("event")).
                             over(w)).where(f.col("event")==f.col("maxEvent")).drop(f.col("maxEvent"))
events = events.select('datetime','visitorid', 'itemid', 'event')
df_final = events.\
    select(f.col('datetime'),
           f.col('visitorid').cast('int'),
           f.col('itemid').cast('int'),
           f.col('event').cast('int')
          )


In [59]:
events.agg(f.countDistinct("visitorid")).show()

+----------------+
|count(visitorid)|
+----------------+
|          406020|
+----------------+



In [60]:
events.agg(f.countDistinct("itemid")).show()

+-------------+
|count(itemid)|
+-------------+
|       166753|
+-------------+



In [61]:
# split data to training set and testing set
X_train,X_test = df_final.randomSplit([0.8, 0.2], seed = 42)

In [62]:
#train using ALS
als = ALS(maxIter=10, regParam=0.01, rank=25, userCol="visitorid", itemCol="itemid", ratingCol="event", 
          coldStartStrategy="drop", nonnegative=True,implicitPrefs=False)
model = als.fit(X_train)

In [63]:
#evaluate the model on the testing set
Z = X_test.agg({"event":"max"}).collect()[0][0] - X_test.agg({"event":"min"}).collect()[0][0]
def predict(model, toPredict):
    return model.transform(toPredict).withColumn('prediction', f.round('prediction'))
predictions = predict(model, X_test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="event",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Normalized root-mean-square error on testing set= " + str(rmse/Z))

Normalized root-mean-square error on testing set= 0.11826072813567927


In [22]:
#convert predictions to 1, 10, 30
from pyspark.sql.types import FloatType
def convert_predict_value(score):
    if score<10.0:
        return 1
    
    if score<20.0:
        return 10
    return 30

convert_score = f.udf(convert_predict_value)
predictions = predictions.withColumn("score", convert_score(f.col("prediction")))
predictions = predictions.withColumn("score",f.col("score").cast("double"))

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator1 = MulticlassClassificationEvaluator(
    labelCol="event", predictionCol="score", metricName="accuracy")
accuracy = evaluator1.evaluate(predictions)
print("Accuracy on testing set = %g" % accuracy)

Accuracy on testing set = 0.967083


In [28]:
predictions.orderBy(f.rand()).show()

+-------------------+---------+------+-----+----------+-----+
|           datetime|visitorid|itemid|event|prediction|score|
+-------------------+---------+------+-----+----------+-----+
|2015-08-20 12:05:51|    66657|380107|    1|       1.0|  1.0|
|2015-05-20 17:54:14|   385039|192953|    1|       1.0|  1.0|
|2015-06-02 15:47:35|   198281| 43485|    1|       1.0|  1.0|
|2015-05-28 20:03:10|   855235|169007|    1|       1.0|  1.0|
|2015-09-02 14:57:30|   660977|126887|    1|       0.0|  1.0|
|2015-06-08 21:26:32|  1184594|310446|    1|       1.0|  1.0|
|2015-07-02 12:11:54|   183766| 96024|    1|       1.0|  1.0|
|2015-08-24 09:51:33|   896347|459475|   30|       1.0|  1.0|
|2015-07-14 20:28:14|     2358|433905|    1|       0.0|  1.0|
|2015-06-05 17:40:16|  1277783|456456|    1|       1.0|  1.0|
|2015-07-27 21:46:19|  1048836| 16190|    1|       1.0|  1.0|
|2015-05-24 21:15:06|  1331967|462868|   10|       0.0|  1.0|
|2015-07-15 23:22:23|    85356|217068|    1|       1.0|  1.0|
|2015-07

In [31]:
def get_recs_for_visitor(recs):
    #recs should be for a specific visitor
    recs = recs.select("recommendations.itemid","recommendations.rating")
    items = recs.select("itemid").toPandas().iloc[0,0]
#     events = recs.select("rating").toPandas().iloc[0,0]
    event_matrix = pd.DataFrame(items,columns = ["itemid_recs"])
#     event_matrix["score"] = events
    event_matrix_ps = spark.createDataFrame(event_matrix)
    return event_matrix_ps

from collections import namedtuple

def visitor_rec(visitor):
    user_row = namedtuple('user_row','visitorid')
    data = [user_row(visitor)]
    return spark.createDataFrame(data)

def display_recs(visitor,model):
    visitor = visitor_rec(visitor)
    userSubsetRecs = model.recommendForUserSubset(visitor, 10)
    items_recs = get_recs_for_visitor(userSubsetRecs)
    return items_recs.show()
    

In [32]:
# Generate top 10 item recommendations for a specific visitor
display_recs(102019,model)

+-----------+
|itemid_recs|
+-----------+
|     205756|
|     398447|
|     245985|
|      96085|
|     192468|
|     167986|
|     276096|
|     215178|
|     415311|
|     175893|
+-----------+



show properties of items recommended and bought

In [33]:
items=pd.read_csv('data/item_properties_part1.csv')
items1=pd.read_csv('data/item_properties_part2.csv')
items=pd.concat([items1,items])
times=[]
for i in items['timestamp']:
    times.append(datetime.datetime.fromtimestamp(i//1000.0))
items['datatime'] = times

def item_property(datetime, item_set, items):
    temp = items[(items.itemid.isin(item_set)) & (items.timestamp<datetime)]
    temp = temp[temp.timestamp==temp.timestamp.max()]
    return temp
    

In [34]:
pref_item_true = item_property(1433176736375,[150318,49521],items)
pref_item_true

Unnamed: 0,timestamp,itemid,property,value,datatime
1965555,1433041200000,150318,available,1,2015-05-30 23:00:00
2094576,1433041200000,49521,888,222207 927133 1307549,2015-05-30 23:00:00
3198360,1433041200000,49521,365,1116693,2015-05-30 23:00:00
4861168,1433041200000,49521,792,1116693,2015-05-30 23:00:00
5912026,1433041200000,150318,917,331643,2015-05-30 23:00:00
7302564,1433041200000,49521,364,490489,2015-05-30 23:00:00
8676901,1433041200000,150318,888,1265514 1224110 734600 629205,2015-05-30 23:00:00
730641,1433041200000,150318,categoryid,1236,2015-05-30 23:00:00
936625,1433041200000,150318,790,n54120.000,2015-05-30 23:00:00
1283131,1433041200000,49521,1058,n48.000,2015-05-30 23:00:00


In [46]:
recs_item = item_property(1433176736375,[205756],items)
recs_item 

Unnamed: 0,timestamp,itemid,property,value,datatime
904383,1433041200000,205756,categoryid,490,2015-05-30 23:00:00
1869263,1433041200000,205756,available,1,2015-05-30 23:00:00
3049655,1433041200000,205756,888,1197157 1288107 n516.000,2015-05-30 23:00:00
7218153,1433041200000,205756,790,n29640.000,2015-05-30 23:00:00
9003961,1433041200000,205756,283,452949 1113285 1314085 1197157 1288107 452949 ...,2015-05-30 23:00:00
9067145,1433041200000,205756,6,452949 1113285,2015-05-30 23:00:00
10750289,1433041200000,205756,112,679677,2015-05-30 23:00:00


In [47]:
recs_item.property.unique()

array(['categoryid', 'available', '888', '790', '283', '6', '112'],
      dtype=object)

In [48]:
#common properties
set(pref_item_true.property).intersection(set(recs_item.property))

{'790', '888', 'available', 'categoryid'}

In [49]:
pref_item_true.property.unique()

array(['available', '888', '365', '792', '917', '364', 'categoryid',
       '790', '1058', '839', '202', '463'], dtype=object)