In [1]:
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import SQLContext

In [2]:
# Read the data from the vtc-cab repos
raw_data = pd.read_csv('./activities_201802011009.csv')

In [3]:
raw_data['accountid'].replace('', np.nan, inplace=True)
raw_data['key'].replace('', np.nan, inplace=True)
raw_data.dropna(subset=['accountid'], inplace=True)
raw_data.dropna(subset=['key'], inplace=True)

In [4]:
# Let's see how many items and customers there are in the dataset
num_cust = len(raw_data.accountid.unique())
num_items = len(raw_data.key.unique())
print('Number of customers: ' + str(num_cust))
print('Number of items bought: ' + str(num_items))

Number of customers: 32729
Number of items bought: 13575


In [5]:
# add one quantity column to dataframe, for simple we just add 1 to everywhere
raw_data['quantity'] = 1
# add more quantity for the completed video to denote the stonger preference 
raw_data.quantity.loc[raw_data.value == 'complete'] = 2

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self._setitem_with_indexer(indexer, value)


In [6]:
#clean up the raw data 
retail_data = raw_data.loc[pd.isnull(raw_data.accountid) == False]

In [7]:
retail_data.head()

Unnamed: 0,name,accountid,userid,id,deviceid,key,metadata,tstamp,value,quantity
0,watch,7041046,,99ef7d20-f289-11e7-824b-fda2ff9f7794,Android,LYS005228795,"[Synopsis=Từ 04/04/2017, Title=ON FOOTBALL, bo...",2018-01-06 09:30:46,watching,1
1,watch,7041046,,16836d10-f28a-11e7-a231-9114f613577e,Android,tapchiclbvidaibayernmunich_1p,"[Synopsis=, Title=Tạp chí CLB vĩ đại - Bayern ...",2018-01-06 09:34:15,watching,1
2,watch,7041046,,b280e9d0-f4cf-11e7-b167-f75a20dec89d,Android,tapchiderbyrealmadridvsbarcelona_lep,"[Synopsis=, Title=Tạp chí Derby - Real Madrid ...",2018-01-09 06:57:34,watching,1
3,watch,7041046,,6dd1cb50-f4d0-11e7-a231-9114f613577e,Android,aquayoga4tuthechienbinhp,"[Synopsis=Khoe va Dep, Title=Aqua Yoga 4 Tư th...",2018-01-09 07:02:49,watching,1
4,watch,7041046,,199f1e11-f4d1-11e7-824b-fda2ff9f7794,Android,LYS013573731,"[Synopsis=Nhịp đập 360° thể thao, Title=Nhịp đ...",2018-01-09 07:07:37,watching,1


## because pyspark will need user_id and item_id as integer type so we need to encode it to categorical data

In [8]:
# Get list of unique customers
cust_list = list(np.sort(retail_data.accountid.unique()))
# Get list of unique items bought
item_list = list(np.sort(retail_data.key.unique()))
# Get list of all the purchase quantities
quantity_list = list(retail_data.quantity)

In [9]:
user_id_cat = retail_data.accountid.astype('category', categories = cust_list).cat.codes
key_id_cat = retail_data.key.astype('category', categories = item_list).cat.codes

  """Entry point for launching an IPython kernel.
  


In [10]:
retail_data['key_id_cat'] = key_id_cat.astype(int)
retail_data['user_id_cat'] = user_id_cat.astype(int)

In [11]:
retail_data.head()

Unnamed: 0,name,accountid,userid,id,deviceid,key,metadata,tstamp,value,quantity,key_id_cat,user_id_cat
0,watch,7041046,,99ef7d20-f289-11e7-824b-fda2ff9f7794,Android,LYS005228795,"[Synopsis=Từ 04/04/2017, Title=ON FOOTBALL, bo...",2018-01-06 09:30:46,watching,1,230,25542
1,watch,7041046,,16836d10-f28a-11e7-a231-9114f613577e,Android,tapchiclbvidaibayernmunich_1p,"[Synopsis=, Title=Tạp chí CLB vĩ đại - Bayern ...",2018-01-06 09:34:15,watching,1,12221,25542
2,watch,7041046,,b280e9d0-f4cf-11e7-b167-f75a20dec89d,Android,tapchiderbyrealmadridvsbarcelona_lep,"[Synopsis=, Title=Tạp chí Derby - Real Madrid ...",2018-01-09 06:57:34,watching,1,12316,25542
3,watch,7041046,,6dd1cb50-f4d0-11e7-a231-9114f613577e,Android,aquayoga4tuthechienbinhp,"[Synopsis=Khoe va Dep, Title=Aqua Yoga 4 Tư th...",2018-01-09 07:02:49,watching,1,8267,25542
4,watch,7041046,,199f1e11-f4d1-11e7-824b-fda2ff9f7794,Android,LYS013573731,"[Synopsis=Nhịp đập 360° thể thao, Title=Nhịp đ...",2018-01-09 07:07:37,watching,1,4997,25542


In [12]:
# Let's group purchase quantities by Stock Code and CustomerID
retail_data = retail_data[['key_id_cat', 'quantity', 'user_id_cat']]
retail_grouped = retail_data.groupby(['user_id_cat', 'key_id_cat']).sum().reset_index()

In [13]:
retail_grouped.head()

Unnamed: 0,user_id_cat,key_id_cat,quantity
0,0,9632,1
1,0,9945,2
2,0,10805,1
3,0,12101,1
4,0,13101,1


In [14]:
# If the quantity sum is 0, replace with 1 to indicate that there was a purchase of that item atleast
retail_grouped.quantity.loc[retail_grouped.quantity == 0] = 1
# Filter out all negative quantities so that we can focus the recommendation of items that the customer purchased and liked
retail_grouped_final = retail_grouped[retail_grouped.quantity > 0]
print ('\nFinal Matrix of grouped purchases')
print (retail_grouped_final.head())


Final Matrix of grouped purchases
   user_id_cat  key_id_cat  quantity
0            0        9632         1
1            0        9945         2
2            0       10805         1
3            0       12101         1
4            0       13101         1


### export the data in pyspark format to csv, to be latter read by pyspark, we can improve by convert from pandas df to pyspark df, but not try yet ==> point to improve

In [15]:
retail_grouped_final.to_csv('spark_data.csv',index=False)

In [16]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [17]:
data = spark.read.csv('spark_data.csv',inferSchema=True,header=True)

In [18]:
data.show()

+-----------+----------+--------+
|user_id_cat|key_id_cat|quantity|
+-----------+----------+--------+
|          0|      9632|       1|
|          0|      9945|       2|
|          0|     10805|       1|
|          0|     12101|       1|
|          0|     13101|       1|
|          1|      9448|       1|
|          1|      9771|       1|
|          1|      9949|       1|
|          1|     10028|       1|
|          1|     10789|       1|
|          1|     13267|       1|
|          2|      9691|       1|
|          3|       207|       2|
|          3|      7106|       1|
|          3|      8218|       1|
|          3|      8275|       1|
|          3|      8751|       2|
|          3|      8753|       3|
|          3|      9512|       1|
|          3|     10265|       1|
+-----------+----------+--------+
only showing top 20 rows



### split data for training and testing 

In [19]:
(training, test) = data.randomSplit([0.9, 0.1])

In [20]:
# Building the recommendation model using ALS on the training data
als = ALS(maxIter=10, regParam=0.01, userCol="user_id_cat", itemCol="key_id_cat", ratingCol="quantity",nonnegative=True)
model = als.fit(training)

In [21]:
# Evaluating the model by computing the RMSE on the test data
predictions = model.transform(training)

In [22]:
predictions.show()

+-----------+----------+--------+----------+
|user_id_cat|key_id_cat|quantity|prediction|
+-----------+----------+--------+----------+
|      11932|       148|       2|  1.968441|
|        713|       463|       2| 2.2392673|
|       7689|       471|       2| 1.8814936|
|       8272|       496|       4| 3.9805136|
|       8020|       496|       2| 2.1459835|
|       6436|       833|       8| 7.9492006|
|       2568|       833|       4| 4.0522027|
|      10140|      1088|       2| 1.9249035|
|      11362|      1238|       2|  1.925275|
|      12698|      1342|       2| 1.9243176|
|       5642|      1580|       2| 2.0501993|
|       1930|      1591|       2| 1.9061768|
|       1674|      1645|       2|  1.922136|
|      16327|      1829|       2| 1.9896543|
|       3732|      1959|       2| 1.9864552|
|        617|      1959|       2| 1.9731699|
|      13557|      2122|       2| 1.8814496|
|       3693|      2142|       2| 1.9362671|
|      18529|      2366|       2| 1.9982082|
|      196

In [23]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="quantity",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.3840680992160791


In [24]:
single_user = data.filter(data['user_id_cat']==0).select(['key_id_cat','user_id_cat'])

In [25]:
single_user.show()

+----------+-----------+
|key_id_cat|user_id_cat|
+----------+-----------+
|      9632|          0|
|      9945|          0|
|     10805|          0|
|     12101|          0|
|     13101|          0|
+----------+-----------+



In [26]:
reccomendations = model.transform(single_user)

In [27]:
reccomendations.show()

+----------+-----------+----------+
|key_id_cat|user_id_cat|prediction|
+----------+-----------+----------+
|     13101|          0| 1.0011083|
|      9945|          0| 1.9544966|
|     12101|          0| 1.0003455|
|      9632|          0|0.99991435|
|     10805|          0|  1.000295|
+----------+-----------+----------+



### make recommendation for all users by 10 items and extract to csv

In [28]:
all_users_recomendation = model.recommendForAllUsers(10)

In [29]:
df = all_users_recomendation.toPandas()

In [30]:
df.to_csv('all_users_recomendation.csv',index=False)