In [1]:
import pyspark
import numpy as np
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
df1 = spark.read.csv('F:\\Siddhesh\\Study\\datasets\\2019-Nov_RecommendationSystem.csv', header=True, inferSchema= True)

In [3]:
df1.show()

+--------------------+----------+----------+-----------+--------------------+--------+------+---------+--------------------+------+
|          event_time|event_type|product_id|category_id|       category_code|   brand| price|  user_id|        user_session|Rating|
+--------------------+----------+----------+-----------+--------------------+--------+------+---------+--------------------+------+
|2019-11-01 00:00:...|      view|   1003461| 2.05301E18|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|     5|
|2019-11-01 00:00:...|      view|   5000088| 2.05301E18|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|     3|
|2019-11-01 00:00:...|      view|  17302664| 2.05301E18|                null|   creed| 28.31|561587266|755422e7-9040-477...|     5|
|2019-11-01 00:00:...|      view|   3601530| 2.05301E18|appliances.kitche...|      lg|712.87|518085591|3bfb58cd-7892-48c...|     4|
|2019-11-01 00:00:...|      view|   1004775| 2.05301E18|electronics.smart...

In [4]:
#To get null values in the dataframe
df1.select([count(when(col(c).isNull(), c)).alias(c) for c in df1.columns]).show()

+----------+----------+----------+-----------+-------------+------+-----+-------+------------+------+
|event_time|event_type|product_id|category_id|category_code| brand|price|user_id|user_session|Rating|
+----------+----------+----------+-----------+-------------+------+-----+-------+------------+------+
|         0|         0|         0|          0|       332077|154155|    0|      0|           0|     0|
+----------+----------+----------+-----------+-------------+------+-----+-------+------------+------+



In [5]:
df1 = df1['user_id','product_id','Rating']

In [6]:
df1.show()

+---------+----------+------+
|  user_id|product_id|Rating|
+---------+----------+------+
|520088904|   1003461|     5|
|530496790|   5000088|     3|
|561587266|  17302664|     5|
|518085591|   3601530|     4|
|558856683|   1004775|     1|
|520772685|   1306894|     5|
|514028527|   1306421|     5|
|518574284|  15900065|     5|
|532364121|  12708937|     5|
|532647354|   1004258|     4|
|518780843|  17200570|     5|
|518427361|   2701517|     4|
|566255262|  16700260|     5|
|512416379|  34600011|     5|
|526595547|   4600658|     5|
|512651494|  24900193|     5|
|551061950|  27400066|     5|
|520037415|   5100503|     5|
|566265908|   1004566|     5|
|514028527|   1307115|     5|
+---------+----------+------+
only showing top 20 rows



In [7]:
df1.count()

1048575

In [8]:
df1.groupBy('user_id').count().sort('count',ascending=False).show()

+---------+-----+
|  user_id|count|
+---------+-----+
|539585530|  526|
|537972582|  504|
|559249905|  311|
|529413254|  294|
|531537488|  246|
|518449281|  211|
|556444587|  197|
|561609717|  192|
|563525937|  192|
|512792872|  191|
|514127132|  187|
|512475445|  183|
|514547286|  180|
|515789329|  179|
|563459593|  178|
|513998518|  168|
|512365995|  167|
|532599748|  167|
|512542757|  162|
|541036379|  161|
+---------+-----+
only showing top 20 rows



In [9]:
df1.distinct().count()

817421

In [10]:
#To get null values in the dataframe
df1.select([count(when(col(c).isNull(), c)).alias(c) for c in df1.columns]).show()

+-------+----------+------+
|user_id|product_id|Rating|
+-------+----------+------+
|      0|         0|     0|
+-------+----------+------+



In [11]:
#To get nan values in the dataframe
df1.select([count(when(isnan(c), c)).alias(c) for c in df1.columns]).show()

+-------+----------+------+
|user_id|product_id|Rating|
+-------+----------+------+
|      0|         0|     0|
+-------+----------+------+



In [12]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [13]:
df1.describe().show()

+-------+--------------------+--------------------+------------------+
|summary|             user_id|          product_id|            Rating|
+-------+--------------------+--------------------+------------------+
|  count|             1048575|             1048575|           1048575|
|   mean| 5.352223267584131E8|1.0623232003500942E7| 4.176863839019622|
| stddev|2.0151963447148193E7|1.2022362511917762E7|1.3065428668686472|
|    min|           274969076|             1000978|                 1|
|    max|           566513909|            61700012|                 5|
+-------+--------------------+--------------------+------------------+



In [14]:
(train,test) = df1.randomSplit([0.75,0.25])

In [15]:
train.count()

787094

In [16]:
test.count()

261481

In [17]:
als = ALS(maxIter=5, regParam=0.1, userCol='user_id', itemCol='product_id', ratingCol='Rating', coldStartStrategy="drop")

In [18]:
model = als.fit(train)

In [19]:
pred = model.transform(test)

In [20]:
pred.show()

+---------+----------+------+----------+
|  user_id|product_id|Rating|prediction|
+---------+----------+------+----------+
|520434341|   1004666|     5| 4.8343425|
|551819695|   1004666|     5| 4.2666826|
|561963609|   1004666|     5| 4.1188827|
|566454610|   1004666|     5| 4.2077327|
|513068374|   1004666|     5|  3.469921|
|514438996|   1004666|     5|0.79235125|
|548161220|   1004666|     5|  4.102651|
|566135570|   1004666|     5| 1.9728268|
|566339517|   1004666|     5|  4.932067|
|517528820|   1004666|     3|  4.500553|
|566433168|   1004666|     5| 4.4898186|
|566417483|   1004666|     5|  5.389082|
|520842644|   1004666|     4|  1.604131|
|519570510|   1004666|     5|  1.454879|
|446661951|   1004739|     5| 2.8697844|
|536492686|   1004739|     4| 4.0655284|
|560041200|   1004739|     5|   4.46072|
|550104671|   1004739|     5|  2.941399|
|550104671|   1004739|     5|  2.941399|
|551991737|   1004739|     4|  4.760072|
+---------+----------+------+----------+
only showing top

In [21]:
evaluation_result = RegressionEvaluator(metricName='rmse', labelCol='Rating', predictionCol='prediction')

In [22]:
rmse = evaluation_result.evaluate(pred)

In [23]:
print(f"rmse: {rmse}")

rmse: 2.37291037713526


In [24]:
user_1 = test.filter(test['user_id'] == 537972582).select('product_id','user_id')

In [25]:
user_1.count()

118

In [26]:
user_1.show()

+----------+---------+
|product_id|  user_id|
+----------+---------+
|  26400185|537972582|
|  26400203|537972582|
|  26400250|537972582|
|  26400289|537972582|
|  26400291|537972582|
|  26400297|537972582|
|  26400380|537972582|
|  26400490|537972582|
|  26400493|537972582|
|  26400513|537972582|
|  26400527|537972582|
|  26401115|537972582|
|  26401427|537972582|
|  26401458|537972582|
|  26402185|537972582|
|  26402809|537972582|
|  26403511|537972582|
|  26400172|537972582|
|  26400175|537972582|
|  26400184|537972582|
+----------+---------+
only showing top 20 rows



In [27]:
rec = model.transform(user_1)

In [28]:
final_rec = rec.orderBy("prediction", ascending= False)

In [29]:
final_rec.show()

+----------+---------+----------+
|product_id|  user_id|prediction|
+----------+---------+----------+
|  26400297|537972582|  6.641469|
|  26203851|537972582| 6.0316143|
|  26402618|537972582| 5.5559673|
|  26402618|537972582| 5.5559673|
|  26403511|537972582|  5.370862|
|  26403511|537972582|  5.370862|
|  26401115|537972582|  5.359983|
|  26401115|537972582|  5.359983|
|  26401633|537972582| 5.3532877|
|  26402579|537972582| 5.3313856|
|  26200615|537972582| 5.2660775|
|  26400674|537972582| 5.2569637|
|  26404196|537972582|  5.184058|
|  26204790|537972582|  5.183878|
|  26402809|537972582|  5.170182|
|  26400490|537972582| 4.9492617|
|  26403506|537972582| 4.8833466|
|  26400657|537972582|  4.876865|
|  26400237|537972582|  4.824741|
|  26402938|537972582|  4.771587|
+----------+---------+----------+
only showing top 20 rows

