In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
     |████████████████████████████████| 281.4 MB 41 kB/s              
[?25h  Preparing metadata (setup.py) ... [?25l- done
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
     |████████████████████████████████| 198 kB 3.0 MB/s            
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | / - \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=1c46a5eb156a801b21cd4464ce4e237d868d27d2ce83e0356400eeb9e2d1f0bb
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected pac

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf,col,when
from pyspark.sql.functions import to_timestamp,date_format
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [3]:
sc = SparkSession.builder.appName("Recommendations").config("spark.sql.files.maxPartitionBytes", 5000000).getOrCreate()
spark = SparkSession(sc)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/23 22:56:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
input_path = '../input/h-and-m-personalized-fashion-recommendations/'

In [5]:
transaction = spark.read.option('header',True) \
                .csv(f"{input_path}transactions_train.csv")
transaction.printSchema()

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)



In [6]:
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit
min_date, max_date = transaction.select(min('t_dat'), max('t_dat')).first()
min_date, max_date

                                                                                

('2018-09-20', '2020-09-22')

In [7]:
hm =  transaction.withColumn('t_dat', transaction['t_dat'].cast('string'))
hm = hm.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))
hm = hm.withColumn('year', year(col('date')))
hm = hm.withColumn('month', month(col('date')))
hm = hm.withColumn('day', date_format(col('date'), "d"))

hm = hm[hm['year'] == 2020]
hm = hm[hm['month'] == 9]
hm = hm[hm['day'] == 22]
transaction.unpersist()

# Prepare the dataset
hm = hm.groupby('customer_id', 'article_id').count()
hm.show(5)

                                                                                

+--------------------+----------+-----+
|         customer_id|article_id|count|
+--------------------+----------+-----+
|00f7bc5c0df4c615b...|0780418013|    1|
|02094817e46f3b692...|0791587001|    1|
|0333e5dda0257e9f4...|0839332002|    2|
|07c7a1172caf8fb97...|0573085043|    1|
|081373184e601470c...|0714790020|    1|
+--------------------+----------+-----+
only showing top 5 rows



In [8]:
hm.select('customer_id').distinct().count()

                                                                                

10528

In [9]:
print((hm.count(), len(hm.columns)))



(29486, 3)


                                                                                

In [10]:
numerator = hm.select('count').count()

num_users = hm.select('customer_id').distinct().count()
num_articles = hm.select('article_id').distinct().count()

denominator = num_users * num_articles

sparsity = (1.0 - (numerator * 1.0) / denominator) * 100
print('Sparsity: ', '%.2f'%sparsity + '%.')



Sparsity:  99.96%.


                                                                                

In [11]:
userId_count = hm.groupby('customer_id').count().orderBy('count', ascending=False)
userId_count.show(5)



+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|5e8fb4d457fdffc61...|   28|
|30b6056bacc5f5c9d...|   28|
|dc1b173e541f8d3c1...|   27|
|1796e87fd2e88932b...|   25|
|6335d496ef463bc40...|   25|
+--------------------+-----+
only showing top 5 rows



                                                                                

In [12]:
articleId_count = hm.groupBy('article_id').count().orderBy('count', ascending=False)
articleId_count.show(5)



+----------+-----+
|article_id|count|
+----------+-----+
|0924243002|   91|
|0918522001|   88|
|0866731001|   78|
|0751471001|   75|
|0448509014|   73|
+----------+-----+
only showing top 5 rows



                                                                                

In [13]:
indexer = [StringIndexer(inputCol=column, outputCol=column+'_index') for column in list(set(hm.columns)-set(['count']))]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(hm).transform(hm)
transformed.show(5)

22/03/23 23:06:52 WARN DAGScheduler: Broadcasting large task binary with size 1207.9 KiB


+--------------------+----------+-----+-----------------+----------------+
|         customer_id|article_id|count|customer_id_index|article_id_index|
+--------------------+----------+-----+-----------------+----------------+
|00f7bc5c0df4c615b...|0780418013|    1|            783.0|          2237.0|
|02094817e46f3b692...|0791587001|    1|            785.0|            35.0|
|0333e5dda0257e9f4...|0839332002|    2|           4098.0|           732.0|
|07c7a1172caf8fb97...|0573085043|    1|           1702.0|            44.0|
|081373184e601470c...|0714790020|    1|           4146.0|             5.0|
+--------------------+----------+-----+-----------------+----------------+
only showing top 5 rows



                                                                                

In [14]:
(train, test) = transformed.randomSplit([0.8, 0.2])

In [15]:
als = ALS(maxIter=5,regParam=0.09,rank=25,userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)
model = als.fit(train)

22/03/23 23:08:37 WARN DAGScheduler: Broadcasting large task binary with size 1223.0 KiB
22/03/23 23:08:38 WARN DAGScheduler: Broadcasting large task binary with size 1224.4 KiB
22/03/23 23:08:38 WARN DAGScheduler: Broadcasting large task binary with size 1226.0 KiB
22/03/23 23:08:39 WARN DAGScheduler: Broadcasting large task binary with size 1227.3 KiB
22/03/23 23:08:39 WARN DAGScheduler: Broadcasting large task binary with size 1226.2 KiB
22/03/23 23:08:40 WARN DAGScheduler: Broadcasting large task binary with size 1227.5 KiB
22/03/23 23:08:40 WARN DAGScheduler: Broadcasting large task binary with size 1228.3 KiB
22/03/23 23:08:41 WARN DAGScheduler: Broadcasting large task binary with size 1231.4 KiB
22/03/23 23:08:41 WARN DAGScheduler: Broadcasting large task binary with size 1232.8 KiB
22/03/23 23:08:42 WARN DAGScheduler: Broadcasting large task binary with size 1234.2 KiB
22/03/23 23:08:42 WARN DAGScheduler: Broadcasting large task binary with size 1235.5 KiB
22/03/23 23:08:43 WAR

In [16]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='count', predictionCol='prediction')
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print('RMSE='+str(rmse))

22/03/23 23:08:48 WARN DAGScheduler: Broadcasting large task binary with size 1252.0 KiB
22/03/23 23:08:49 WARN DAGScheduler: Broadcasting large task binary with size 1250.6 KiB
22/03/23 23:09:40 WARN DAGScheduler: Broadcasting large task binary with size 1217.7 KiB
22/03/23 23:09:40 WARN DAGScheduler: Broadcasting large task binary with size 1296.7 KiB


RMSE=0.5049604528496827


In [17]:
predictions.show(5)

22/03/23 23:09:42 WARN DAGScheduler: Broadcasting large task binary with size 1252.0 KiB
22/03/23 23:09:42 WARN DAGScheduler: Broadcasting large task binary with size 1250.6 KiB
                                                                                

+--------------------+----------+-----+-----------------+----------------+----------+
|         customer_id|article_id|count|customer_id_index|article_id_index|prediction|
+--------------------+----------+-----+-----------------+----------------+----------+
|0026ebdd70715d8fa...|0785018003|    1|           1671.0|            79.0| 0.7620836|
|0066eb74327937182...|0924645001|    1|            540.0|           405.0| 0.7741257|
|006908fb1f581e644...|0640021012|    1|           1673.0|          2132.0| 0.8349908|
|009b11e116c9e8c99...|0856440001|    1|            541.0|           432.0| 1.1419799|
|009b11e116c9e8c99...|0867969003|    1|            541.0|            65.0| 1.1227843|
+--------------------+----------+-----+-----------------+----------------+----------+
only showing top 5 rows



22/03/23 23:10:34 WARN DAGScheduler: Broadcasting large task binary with size 1300.4 KiB


In [18]:
user_recs = model.recommendForAllItems(10).show(5)

22/03/23 23:10:37 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB
22/03/23 23:10:44 WARN DAGScheduler: Broadcasting large task binary with size 1274.2 KiB


+----------------+--------------------+
|article_id_index|     recommendations|
+----------------+--------------------+
|               1|[{4907, 4.2080398...|
|              12|[{4907, 4.735945}...|
|              13|[{4907, 4.742931}...|
|              22|[{4907, 5.528489}...|
|              26|[{4907, 4.8742094...|
+----------------+--------------------+
only showing top 5 rows



                                                                                

In [19]:
item_recs = model.recommendForAllUsers(10).show(5)

22/03/23 23:10:46 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB

+-----------------+--------------------+
|customer_id_index|     recommendations|
+-----------------+--------------------+
|                1|[{6383, 2.6694212...|
|               12|[{6383, 3.306033}...|
|               13|[{6383, 3.4023504...|
|               22|[{4146, 4.6609235...|
|               26|[{4146, 2.455671}...|
+-----------------+--------------------+
only showing top 5 rows



22/03/23 23:10:52 WARN DAGScheduler: Broadcasting large task binary with size 1274.2 KiB
                                                                                

In [20]:
%%time
userRecsDf = model.recommendForAllUsers(10).cache()
userRecsDf.count()

22/03/23 23:10:54 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB
22/03/23 23:10:59 WARN DAGScheduler: Broadcasting large task binary with size 1281.3 KiB

CPU times: user 29.4 ms, sys: 6.44 ms, total: 35.8 ms
Wall time: 9.01 s


                                                                                

9618

In [21]:
userRecsDf.printSchema()

root
 |-- customer_id_index: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- article_id_index: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [22]:
userRecsDf.select('customer_id_index', 'recommendations.article_id_index').show(10, False)

+-----------------+------------------------------------------------------------+
|customer_id_index|article_id_index                                            |
+-----------------+------------------------------------------------------------+
|1580             |[6383, 4146, 1661, 1891, 6874, 5111, 4910, 368, 2511, 3869] |
|4900             |[4146, 1661, 4910, 3869, 6383, 2511, 368, 1891, 5891, 5111] |
|5300             |[4146, 6383, 1891, 6874, 2511, 1661, 4894, 368, 3663, 6046] |
|6620             |[6383, 4146, 1661, 1891, 2511, 4910, 5891, 3869, 5111, 4894]|
|7240             |[6874, 6383, 6848, 1891, 1661, 5111, 2573, 2511, 3031, 1035]|
|7340             |[6383, 4146, 5111, 5891, 1661, 2511, 2455, 1891, 6975, 368] |
|7880             |[6383, 4146, 3018, 7348, 7390, 5111, 3633, 7146, 7574, 7398]|
|9900             |[7092, 7093, 6874, 3258, 1661, 6848, 4910, 3869, 7074, 2874]|
|471              |[1661, 4146, 5111, 6383, 6944, 4170, 4780, 4894, 7574, 1891]|
|1591             |[6383, 25

22/03/23 23:11:07 WARN DAGScheduler: Broadcasting large task binary with size 1282.2 KiB


In [23]:
import gc
gc.collect()

201

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pandas as pd

In [25]:
recs = model.recommendForAllUsers(10).toPandas()
nrecs = recs.recommendations.apply(pd.Series) \
        .merge(recs, right_index=True, left_index=True) \
        .drop(['recommendations'], axis=1) \
        .melt(id_vars=['customer_id_index'], value_name='recommendations') \
        .drop('variable', axis=1) \
        .dropna()
nrecs = nrecs.sort_values('customer_id_index')
nrecs = pd.concat([nrecs['recommendations'].apply(pd.Series), nrecs['customer_id_index']], axis=1)

22/03/23 23:11:14 WARN DAGScheduler: Broadcasting large task binary with size 1296.8 KiB
22/03/23 23:11:19 WARN DAGScheduler: Broadcasting large task binary with size 1274.3 KiB
                                                                                

In [26]:
nrecs.columns = ['ArticleID_index', 'count', 'UserID_index']
md = transformed.select(transformed['article_id'], transformed['article_id_index'], transformed['customer_id'], transformed['customer_id_index'])
md = md.toPandas()

22/03/23 23:12:39 WARN DAGScheduler: Broadcasting large task binary with size 1205.2 KiB
                                                                                

In [27]:
dict1 = dict(zip(md['article_id_index'], md['article_id']))
dict2 = dict(zip(md['customer_id_index'], md['customer_id']))
nrecs['article_id'] = nrecs['ArticleID_index'].map(dict1)
nrecs['customer_id'] = nrecs['UserID_index'].map(dict2)

In [28]:
import warnings
warnings.filterwarnings('ignore')

In [29]:
nrecs = nrecs.sort_values('customer_id')
nrecs.reset_index(drop=True, inplace=True)
new = nrecs[['customer_id', 'article_id', 'count']]
new['prediction'] = list(new.article_id)
recs = new[['customer_id', 'prediction']]
recs_new = recs['prediction'].groupby([recs.customer_id]).apply(list).reset_index()
recs_new.head()

Unnamed: 0,customer_id,prediction
0,0003e867a930d0d6842f923d6ba7c9b77aba33fe2a0fbf...,"[0866261001, 0904076003, 0297078008, 088531500..."
1,0010e8eb18f131e724d6997909af0808adbba057529edb...,"[0871638002, 0653275007, 0297078008, 072199000..."
2,001436e2c83cda28548dd668cfc7d621d70d2baf6f6cf0...,"[0857347002, 0297078008, 0750330003, 090496100..."
3,0026ebdd70715d8fa2befa14dfed317a1ffe5451aba839...,"[0904961001, 0857347002, 0885315003, 087726100..."
4,002faf80a68267264102e08eb4f1f21a59236773e4ab90...,"[0825109005, 0757971006, 0316441001, 029707800..."


In [30]:
customers = pd.read_csv('../input/h-and-m-personalized-fashion-recommendations/customers.csv',
                       usecols=['customer_id'])
customers.shape

(1371980, 1)

In [31]:
recs_new['prediction'] = list(map(lambda x: ' '.join(x), recs_new['prediction']))

In [32]:
submission = customers.merge(recs_new, on='customer_id', how='left')
submission.head()

Unnamed: 0,customer_id,prediction
0,00000dbacae5abe5e23885899a1fa44253a17956c6d1c3...,
1,0000423b00ade91418cceaf3b26c6af3dd342b51fd051e...,
2,000058a12d5b43e67d225668fa1f8d618c13dc232df0ca...,
3,00005ca1c9ed5f5146b52ac8639a40ca9d57aeff4d1bd2...,
4,00006413d8573cd20ed7128e53b7b13819fe5cfc2d801f...,


In [33]:
submission.to_csv('submission_als.csv',index=False)