# <p style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;font-size:150%;text-align:center;border-radius:10px 10px;border-style:solid;border-color:#d90b1c;">Recommendation system for H and M Fashion</p>

**For H and M Fashion EDA please check out my notebook** https://www.kaggle.com/nadianizam/h-m-fashion-eda

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">Terminologies</h1>

There are certain terminologies which needs to be understood before moving forward.

**Apache Spark:** Apache Spark is an open-source distributed general-purpose cluster-computing framework.It can be used with Hadoop too.

**Collaborative filtering:** Collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users. Consider example if a person A likes item 1, 2, 3 and B like 2,3,4 then they have similar interests and A should like item 4 and B should like item 1.

**Alternating least square(ALS) matrix factorization:** The idea is basically to take a large (or potentially huge) matrix and factor it into some smaller representation of the original matrix through alternating least squares. We end up with two or more lower dimensional matrices whose product equals the original one.ALS comes inbuilt in Apache Spark.

**PySpark:** PySpark is the collaboration of Apache Spark and Python. PySpark is the Python API for Spark.

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">1.Initialize spark session</h1>

In [None]:
!pip install pyspark




<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">2-Load libraries</h1>

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.sql import SQLContext 
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import udf,col,when
from pyspark.sql.functions import to_timestamp,date_format
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

sc = SparkSession.builder.appName("Recommendations").config("spark.sql.files.maxPartitionBytes", 5000000).getOrCreate()
spark = SparkSession(sc)



Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/18 06:28:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">3-Load Dataset in Apache Spark</h1>

In [None]:
transaction = spark.read.option("header",True).csv("../input/h-and-m-personalized-fashion-recommendations/transactions_train.csv")
transaction.printSchema()

                                                                                

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [None]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit
min_date, max_date = transaction.select(min("t_dat"), max("t_dat")).first()
min_date, max_date

                                                                                

('2018-09-20', '2020-09-22')

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">5-Select data for recommendation</h1>

In this transaction dataset we have 31,788,324 rows and 5 columns.Let's capture first what are the most recently bought articles.For recommendation I am selecting only date 2020-09-22 which is the last transaction date.</h1>

In [None]:

hm =  transaction.withColumn('t_dat', transaction['t_dat'].cast('string'))
hm = hm.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))
hm = hm.withColumn('year', year(col('date')))
hm = hm.withColumn('month', month(col('date')))
hm = hm.withColumn('day', date_format(col('date'), "d"))

hm = hm[hm['year'] == 2020]
hm = hm[hm['month'] == 9]
hm = hm[hm['day'] == 22]
transaction.unpersist()

# Prepare the dataset
hm = hm.groupby('customer_id', 'article_id').count()
hm.show(5)



+--------------------+----------+-----+
|         customer_id|article_id|count|
+--------------------+----------+-----+
|00f7bc5c0df4c615b...|0780418013|    1|
|02094817e46f3b692...|0791587001|    1|
|0333e5dda0257e9f4...|0839332002|    2|
|07c7a1172caf8fb97...|0573085043|    1|
|081373184e601470c...|0714790020|    1|
+--------------------+----------+-----+
only showing top 5 rows



                                                                                

In [None]:
print((hm.count(), len(hm.columns)))



(29486, 3)


                                                                                

In [None]:
# Count the total number of article count in the dataset
numerator = hm.select("count").count()

# Count the number of distinct customerid and distinct articleid
num_users = hm.select("customer_id").distinct().count()
num_articles = hm.select("article_id").distinct().count()

# Set the denominator equal to the number of customer multiplied by the number of articles
denominator = num_users * num_articles

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("Sparsity: ", "%.2f" % sparsity + "%.")



Sparsity:  99.96%.


                                                                                

In [None]:
userId_count = hm.groupBy("customer_id").count().orderBy('count', ascending=False)
userId_count.show()



+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|30b6056bacc5f5c9d...|   28|
|5e8fb4d457fdffc61...|   28|
|dc1b173e541f8d3c1...|   27|
|6335d496ef463bc40...|   25|
|1796e87fd2e88932b...|   25|
|f50287d9cf052d4b4...|   24|
|54e8ebd39543b5a4d...|   23|
|fd5ce8716faf00f6a...|   23|
|850ec77661a417d27...|   22|
|ad3663a848dccbdda...|   21|
|32f3a6a7ce63d302c...|   21|
|b606fe5786c00151a...|   21|
|298523b6637340717...|   21|
|b49647f84a99ced53...|   21|
|fc783381f1ea2174c...|   21|
|a08e284bb18add2d7...|   21|
|383e1b07e2c1fe169...|   21|
|3ca77aab50ae4532b...|   20|
|2a721767cd9864ed5...|   20|
|af5166e0f89b0d433...|   19|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [None]:
articleId_count = hm.groupBy("article_id").count().orderBy('count', ascending=False)
articleId_count.show()



+----------+-----+
|article_id|count|
+----------+-----+
|0924243002|   91|
|0918522001|   88|
|0866731001|   78|
|0751471001|   75|
|0448509014|   73|
|0714790020|   72|
|0762846027|   68|
|0928206001|   67|
|0893432002|   66|
|0918292001|   65|
|0915529005|   64|
|0788575004|   63|
|0915529003|   63|
|0863583001|   60|
|0930380001|   59|
|0573085028|   59|
|0919273002|   58|
|0850917001|   57|
|0573085042|   56|
|0874110016|   53|
+----------+-----+
only showing top 20 rows



                                                                                

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">5-Importing important modules</h1>

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">6-Converting String to index</h1>

Before making an ALS model it needs to be clear that ALS only accepts integer value as parameters. Hence we need to convert customer_id and article_id column in index form.

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(hm.columns)-set(['count'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(hm).transform(hm)
transformed.show()

22/03/18 06:52:52 WARN DAGScheduler: Broadcasting large task binary with size 1207.9 KiB


+--------------------+----------+-----+----------------+-----------------+
|         customer_id|article_id|count|article_id_index|customer_id_index|
+--------------------+----------+-----+----------------+-----------------+
|00f7bc5c0df4c615b...|0780418013|    1|          2237.0|            783.0|
|02094817e46f3b692...|0791587001|    1|            35.0|            785.0|
|0333e5dda0257e9f4...|0839332002|    2|           732.0|           4098.0|
|07c7a1172caf8fb97...|0573085043|    1|            44.0|           1702.0|
|081373184e601470c...|0714790020|    1|             5.0|           4146.0|
|09bec2a61046ccbea...|0860336002|    1|          2368.0|           6792.0|
|0be4f1ecce204ee32...|0573085028|    1|            14.0|            799.0|
|0c4b30343292b5101...|0918522001|    1|             1.0|           6825.0|
|0e10e02358875468b...|0579541001|    1|            53.0|           2689.0|
|0fc371e67e61a31d7...|0907170001|    1|          1978.0|           1737.0|
|10817b19177f6a53e...|071

                                                                                


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">7-Creating training and test data</h1>

In [None]:
(training,test)=transformed.randomSplit([0.8, 0.2])


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">8-Creating ALS model and fitting data</h1>

To build the model explicitly specify the columns. Set nonnegative as ‘True’, since we are looking count greater than 0. The model also gives an option to select implicit ratings. Since we are working with explicit, set it to ‘False’ or by default it takes explicit.

When using simple random splits as in Spark’s CrossValidator or TrainValidationSplit, it is actually very common to encounter users and/or items in the evaluation set that are not in the training set. By default, Spark assigns NaN predictions during ALSModel.transform when a user and/or item factor is not present in the model.We set cold start strategy to ‘drop’ to ensure we don’t get NaN evaluation metrics.

In [None]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)
model=als.fit(training)

22/03/18 06:57:05 WARN DAGScheduler: Broadcasting large task binary with size 1223.0 KiB
22/03/18 06:57:06 WARN DAGScheduler: Broadcasting large task binary with size 1224.4 KiB
22/03/18 06:57:07 WARN DAGScheduler: Broadcasting large task binary with size 1226.0 KiB
22/03/18 06:57:07 WARN DAGScheduler: Broadcasting large task binary with size 1227.3 KiB
22/03/18 06:57:07 WARN DAGScheduler: Broadcasting large task binary with size 1226.2 KiB
22/03/18 06:57:08 WARN DAGScheduler: Broadcasting large task binary with size 1227.5 KiB
22/03/18 06:57:09 WARN DAGScheduler: Broadcasting large task binary with size 1228.3 KiB
22/03/18 06:57:09 WARN DAGScheduler: Broadcasting large task binary with size 1231.4 KiB
22/03/18 06:57:12 WARN DAGScheduler: Broadcasting large task binary with size 1232.8 KiB
22/03/18 06:57:14 WARN DAGScheduler: Broadcasting large task binary with size 1234.2 KiB
22/03/18 06:57:16 WARN DAGScheduler: Broadcasting large task binary with size 1235.5 KiB
22/03/18 06:57:17 WAR


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">9-Evaluate rmse</h1>

In [None]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="count",predictionCol="prediction")
predictions=model.transform(test)
rmse=evaluator.evaluate(predictions)


22/03/18 06:57:35 WARN DAGScheduler: Broadcasting large task binary with size 1252.0 KiB
22/03/18 06:57:35 WARN DAGScheduler: Broadcasting large task binary with size 1250.6 KiB
22/03/18 06:59:30 WARN DAGScheduler: Broadcasting large task binary with size 1217.7 KiB
22/03/18 06:59:32 WARN DAGScheduler: Broadcasting large task binary with size 1296.6 KiB


In [None]:
print("RMSE="+str(rmse))

RMSE=0.4860669125503995



<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">10-Generate predictions</h1>

In [None]:
predictions.show()

22/03/18 07:00:50 WARN DAGScheduler: Broadcasting large task binary with size 1252.0 KiB
22/03/18 07:00:50 WARN DAGScheduler: Broadcasting large task binary with size 1250.6 KiB
22/03/18 07:02:47 WARN DAGScheduler: Broadcasting large task binary with size 1218.8 KiB
                                                                                

+--------------------+----------+-----+----------------+-----------------+----------+
|         customer_id|article_id|count|article_id_index|customer_id_index|prediction|
+--------------------+----------+-----+----------------+-----------------+----------+
|1f4d4f43ace92c96f...|0902419001|    1|           280.0|            148.0| 0.9794985|
|1f4d4f43ace92c96f...|0922037001|    1|          1624.0|            148.0|0.89283216|
|279874c4d0ed6623c...|0894956008|    1|           890.0|           1829.0| 0.8487479|
|970bdc24215ac9cb6...|0372860002|    1|            61.0|            471.0| 0.8780332|
|970bdc24215ac9cb6...|0918292001|    1|             9.0|            471.0| 0.8860156|
|b4db5e5259234574e...|0837939001|    1|          3384.0|           2366.0| 0.6379638|
|cf5666882664f2fd3...|0824995001|    1|          1826.0|           3794.0| 0.6718024|
|d97e200fbf2e50ba7...|0893141002|    1|           219.0|           1088.0|0.97295254|
|d97e200fbf2e50ba7...|0909588006|    1|           514.

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">11-Providing Recommendations by Article id</h1>

In [None]:
user_recs=model.recommendForAllItems(10).show(10)

22/03/18 07:03:14 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB
22/03/18 07:03:29 WARN DAGScheduler: Broadcasting large task binary with size 1274.2 KiB


+----------------+--------------------+
|article_id_index|     recommendations|
+----------------+--------------------+
|               1|[{4907, 4.789394}...|
|               3|[{4907, 5.4234543...|
|               5|[{4907, 5.4816947...|
|               6|[{4907, 4.479425}...|
|               9|[{9001, 4.6863546...|
|              12|[{4907, 5.217934}...|
|              13|[{4907, 4.7051945...|
|              15|[{9001, 4.935132}...|
|              16|[{4907, 4.791131}...|
|              17|[{4907, 4.4958687...|
+----------------+--------------------+
only showing top 10 rows



                                                                                


<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">12-Providing Recommendations by Customer id</h1>

In [None]:
item_recs=model.recommendForAllUsers(10).show(10)

22/03/18 07:03:36 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB
22/03/18 07:03:48 WARN DAGScheduler: Broadcasting large task binary with size 1274.2 KiB


+-----------------+--------------------+
|customer_id_index|     recommendations|
+-----------------+--------------------+
|                1|[{5040, 2.8026729...|
|                3|[{5040, 2.9924715...|
|                5|[{4146, 3.3452044...|
|                6|[{5040, 8.028572}...|
|                9|[{4146, 2.771582}...|
|               12|[{4146, 2.556902}...|
|               13|[{4146, 3.2950058...|
|               15|[{1891, 2.3136094...|
|               16|[{4146, 3.2609243...|
|               17|[{4146, 2.696253}...|
+-----------------+--------------------+
only showing top 10 rows



                                                                                

In [None]:
%%time
userRecsDf = model.recommendForAllUsers(10).cache()
userRecsDf.count()

22/03/18 07:03:54 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB
22/03/18 07:04:06 WARN DAGScheduler: Broadcasting large task binary with size 1281.3 KiB

CPU times: user 66.4 ms, sys: 16.9 ms, total: 83.3 ms
Wall time: 19.9 s


                                                                                

9633

In [None]:
userRecsDf.printSchema()

root
 |-- customer_id_index: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- article_id_index: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [None]:
userRecsDf.select("customer_id_index","recommendations.article_id_index").show(10,False)

+-----------------+------------------------------------------------------------+
|customer_id_index|article_id_index                                            |
+-----------------+------------------------------------------------------------+
|1580             |[4146, 4405, 6383, 5040, 368, 1891, 4249, 2511, 5111, 6874] |
|4900             |[4146, 6383, 5040, 368, 4249, 4405, 1891, 2511, 3869, 4910] |
|5300             |[4146, 368, 4249, 1891, 4405, 7828, 6874, 2511, 5040, 6383] |
|6620             |[4146, 3869, 4910, 1891, 3013, 4405, 5891, 2716, 368, 4249] |
|7240             |[4146, 1891, 6874, 1765, 6848, 4405, 7828, 6577, 7864, 5891]|
|7880             |[4146, 1891, 4910, 3869, 4405, 5891, 2511, 6848, 1221, 3859]|
|9900             |[6874, 4405, 2573, 4170, 5111, 7267, 6848, 5891, 1341, 4212]|
|471              |[4146, 5040, 6383, 4910, 3869, 368, 1891, 6944, 6874, 4405] |
|1591             |[5040, 6383, 4146, 368, 4405, 6874, 7828, 3869, 4910, 4249] |
|4101             |[4146, 50

22/03/18 07:04:24 WARN DAGScheduler: Broadcasting large task binary with size 1282.2 KiB


In [None]:
import gc #This is to free up the memory
gc.collect()

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">13-Converting back to string form</h1>

As seen in above image the results are in integer form we need to convert it back to its original name.The code is little bit longer given so many conversions.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pandas as pd
recs=model.recommendForAllUsers(10).toPandas()
nrecs=recs.recommendations.apply(pd.Series) \
            .merge(recs, right_index = True, left_index = True) \
            .drop(["recommendations"], axis = 1) \
            .melt(id_vars = ['customer_id_index'], value_name = "recommendations") \
            .drop("variable", axis = 1) \
            .dropna() 
nrecs=nrecs.sort_values('customer_id_index')
nrecs=pd.concat([nrecs['recommendations'].apply(pd.Series), nrecs['customer_id_index']], axis = 1)

22/03/18 07:04:35 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB
22/03/18 07:04:47 WARN DAGScheduler: Broadcasting large task binary with size 1274.3 KiB
                                                                                

In [None]:
nrecs.columns = ['ArticleID_index','count','UserID_index']
md=transformed.select(transformed['article_id'],transformed['article_id_index'],transformed['customer_id'],transformed['customer_id_index'])
md=md.toPandas()

22/03/18 07:08:47 WARN DAGScheduler: Broadcasting large task binary with size 1205.2 KiB
                                                                                

In [None]:
dict1 =dict(zip(md['article_id_index'],md['article_id']))
dict2=dict(zip(md['customer_id_index'],md['customer_id']))
nrecs['article_id']=nrecs['ArticleID_index'].map(dict1)
nrecs['customer_id']=nrecs['UserID_index'].map(dict2)

<h1 style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;">14-Final result Recommendations by Customer id</h1>

In [None]:
nrecs=nrecs.sort_values('customer_id')
nrecs.reset_index(drop=True, inplace=True)
new=nrecs[['customer_id','article_id','count']]
new['recommendations'] = list(new.article_id)
res=new[['customer_id','recommendations']]  
res_new=res['recommendations'].groupby([res.customer_id]).apply(list).reset_index()
#print(res_new)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  after removing the cwd from sys.path.


In [None]:
res_new[:15]

Unnamed: 0,customer_id,recommendations
0,0003e867a930d0d6842f923d6ba7c9b77aba33fe2a0fbf...,"[0913340001, 0880815006, 0316441001, 082748700..."
1,000525e3fe01600d717da8423643a8303390a055c578ed...,"[0717490070, 0316441001, 0857347002, 089448100..."
2,0010e8eb18f131e724d6997909af0808adbba057529edb...,"[0111586001, 0316441001, 0571048002, 087163800..."
3,001436e2c83cda28548dd668cfc7d621d70d2baf6f6cf0...,"[0857347002, 0757971006, 0316441001, 087163800..."
4,0026ebdd70715d8fa2befa14dfed317a1ffe5451aba839...,"[0571048002, 0894481001, 0871638002, 071749007..."
5,002faf80a68267264102e08eb4f1f21a59236773e4ab90...,"[0571048002, 0316441001, 0724905016, 071749007..."
6,00357b192b81fc83261a45be87f5f3d59112db7d117513...,"[0724905016, 0904961001, 0717490070, 089448100..."
7,003bb11f4a641d26e321b80c099618b2c1cf26fee040dd...,"[0877261003, 0717490070, 0871638002, 050286900..."
8,0049f94e2289eed627a49f2a5a6418bc8ce6ee7e2a902d...,"[0571048002, 0757971006, 0877261003, 071749007..."
9,004d932f7a27ac3167c77db81d9cfd89392729e7f7e0d4...,"[0904961001, 0316441001, 0877261003, 075048101..."


# <p style="background-color:#f7e9ec;font-family:newtimeroman;color:#d90b1c;font-size:150%;text-align:center;border-radius:10px 10px;border-style:solid;border-color:#d90b1c;">Please do leave your comments /suggestions and if you like this kernel greatly appreciate to UPVOTE</p>