In [6]:
transaction = spark.read.option("header","true").csv("s3://hmrecomm/transactions_train.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
transaction.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- t_dat: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- article_id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sales_channel_id: string (nullable = true)

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.functions import min, max
from pyspark.sql.functions import unix_timestamp, lit

min_date, max_date = transaction.select(min("t_dat"), max("t_dat")).first()
print(min_date, max_date)
datahm =  transaction.withColumn('t_dat', transaction['t_dat'].cast('string'))
datahm = datahm.withColumn('date', from_unixtime(unix_timestamp('t_dat', 'yyyy-MM-dd')))
datahm = datahm.withColumn('year', year(col('date')))
datahm = datahm.withColumn('month', month(col('date')))
datahm = datahm.withColumn('day', date_format(col('date'), "d"))

datahm = datahm[datahm['year'] == 2020]
datahm = datahm[datahm['month'] == 9]
datahm = datahm[datahm['day'] == 22]
transaction.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2018-09-20 2020-09-22
DataFrame[t_dat: string, customer_id: string, article_id: string, price: string, sales_channel_id: string]

In [9]:
datahm = datahm.groupby('customer_id', 'article_id').count()
datahm.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-----+
|         customer_id|article_id|count|
+--------------------+----------+-----+
|0333e5dda0257e9f4...|0839332002|    2|
|1601fa3c3f39aa623...|0730683001|    1|
|164e1a251f0e3d764...|0831267001|    1|
|166546028742fe655...|0767423013|    1|
|2a5d52077398d99a5...|0768931002|    2|
|2b02d26a952ca69ad...|0695803009|    1|
|2dbee384d53ac4e8d...|0829054005|    1|
|476976af7b5a03fa8...|0754238023|    1|
|5840d651a26936daf...|0573085057|    1|
|5dc21abea76c66e34...|0759465001|    1|
|63f608a6c76ea08df...|0598515022|    1|
|63f8173b620eff63f...|0756322009|    1|
|65fc15a91310772fe...|0898918007|    1|
|6f5e8efa459c8d26d...|0888908001|    1|
|86730c52576c3f24f...|0855793001|    1|
|8827264dd69d9c1fa...|0610776105|    1|
|9bd9a0895458fa858...|0768912001|    1|
|9cc571b49c11b8f3c...|0896169002|    2|
|a1bed5192c2ff9db5...|0912095005|    1|
|b5ae6677e1f1ebb6f...|0905957007|    1|
+--------------------+----------+-----+
only showing top 20 rows

In [10]:
userId_count = datahm.groupBy("customer_id").count().orderBy('count', ascending=False)
userId_count.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|         customer_id|count|
+--------------------+-----+
|30b6056bacc5f5c9d...|   28|
|5e8fb4d457fdffc61...|   28|
|dc1b173e541f8d3c1...|   27|
|1796e87fd2e88932b...|   25|
|6335d496ef463bc40...|   25|
|f50287d9cf052d4b4...|   24|
|fd5ce8716faf00f6a...|   23|
|54e8ebd39543b5a4d...|   23|
|850ec77661a417d27...|   22|
|298523b6637340717...|   21|
|a08e284bb18add2d7...|   21|
|383e1b07e2c1fe169...|   21|
|b49647f84a99ced53...|   21|
|b606fe5786c00151a...|   21|
|32f3a6a7ce63d302c...|   21|
|fc783381f1ea2174c...|   21|
|ad3663a848dccbdda...|   21|
|3ca77aab50ae4532b...|   20|
|2a721767cd9864ed5...|   20|
|af5166e0f89b0d433...|   19|
+--------------------+-----+
only showing top 20 rows

In [11]:
articleId_count = datahm.groupBy("article_id").count().orderBy('count', ascending=False)
articleId_count.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-----+
|article_id|count|
+----------+-----+
|0924243002|   91|
|0918522001|   88|
|0866731001|   78|
|0751471001|   75|
|0448509014|   73|
|0714790020|   72|
|0762846027|   68|
|0928206001|   67|
|0893432002|   66|
|0918292001|   65|
|0915529005|   64|
|0915529003|   63|
|0788575004|   63|
|0863583001|   60|
|0573085028|   59|
|0930380001|   59|
|0919273002|   58|
|0850917001|   57|
|0573085042|   56|
|0874110016|   53|
+----------+-----+
only showing top 20 rows

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(datahm.columns)-set(['count'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(datahm).transform(datahm)
transformed.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+-----+----------------+-----------------+
|         customer_id|article_id|count|article_id_index|customer_id_index|
+--------------------+----------+-----+----------------+-----------------+
|0333e5dda0257e9f4...|0839332002|    2|           732.0|           6082.0|
|1601fa3c3f39aa623...|0730683001|    1|          3565.0|           4483.0|
|164e1a251f0e3d764...|0831267001|    1|          7941.0|           4206.0|
|166546028742fe655...|0767423013|    1|          1017.0|            982.0|
|2a5d52077398d99a5...|0768931002|    2|           454.0|           2825.0|
|2b02d26a952ca69ad...|0695803009|    1|          7407.0|            886.0|
|2dbee384d53ac4e8d...|0829054005|    1|          1677.0|            208.0|
|476976af7b5a03fa8...|0754238023|    1|           699.0|            138.0|
|5840d651a26936daf...|0573085057|    1|            66.0|           1250.0|
|5dc21abea76c66e34...|0759465001|    1|           318.0|           1600.0|
|63f608a6c76ea08df...|059

In [13]:
(training,test)=transformed.randomSplit([0.8, 0.2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


#create ALS model
als=ALS(userCol="customer_id_index",itemCol="article_id_index",ratingCol="count",coldStartStrategy="drop",nonnegative=True)
#define evaluator as RMSE
evaluator = RegressionEvaluator(metricName = "rmse",labelCol = 'count', predictionCol = 'prediction')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
#tune model using ParamGridBuilder
param_grid = ParamGridBuilder()\
            .addGrid(als.rank, [15,20,25])\
            .addGrid(als.maxIter,[5,10,15])\
            .addGrid(als.regParam,[0.05])\
            .build()
#Build cross validation using CrossValidator
crossvalidate = CrossValidator(estimator=als,estimatorParamMaps=param_grid, evaluator=evaluator,numFolds=5)

#load the crovalidator into the model
tuned_model = crossvalidate.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-15:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2114



In [16]:
model = tuned_model.bestModel

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
#print evaluation metrics and model parameters
print("RMSE =" + str(rmse))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE =0.43664054214548154

In [19]:
recommendation_customer = model.recommendForAllUsers(20)
recommendation_customer.show(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+--------------------+
|customer_id_index|     recommendations|
+-----------------+--------------------+
|               12|[[1867, 2.726563]...|
|               65|[[3503, 2.6587913...|
|               76|[[1867, 2.628014]...|
|               81|[[1867, 2.9240685...|
|              122|[[1867, 2.275898]...|
|              126|[[1867, 2.404537]...|
|              133|[[1867, 2.2347238...|
|              140|[[6599, 2.5554457...|
|              148|[[1938, 3.5521169...|
|              177|[[1938, 2.3279598...|
|              192|[[1867, 4.88021],...|
|              209|[[1867, 2.1306722...|
|              243|[[1867, 2.241063]...|
|              300|[[1867, 2.3830864...|
|              333|[[1867, 2.3220134...|
|              406|[[1867, 2.249863]...|
|              417|[[1867, 2.7465048...|
|              444|[[1867, 2.4268873...|
|              481|[[1938, 2.206344]...|
|              496|[[1867, 2.2705157...|
+-----------------+--------------------+
only showing top

In [20]:
recommendation_customer = recommendation_customer.select("customer_id_index","recommendations.article_id_index")
recommendation_customer.show(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+--------------------+
|customer_id_index|    article_id_index|
+-----------------+--------------------+
|               12|[1867, 7955, 5605...|
|               65|[3503, 5605, 1867...|
|               76|[1867, 5620, 2891...|
|               81|[1867, 5605, 2891...|
|              122|[1867, 5605, 5662...|
|              126|[1867, 1938, 5782...|
|              133|[1867, 5605, 1938...|
|              140|[6599, 4817, 2891...|
|              148|[1938, 1867, 5620...|
|              177|[1938, 1867, 5605...|
|              192|[1867, 5605, 1938...|
|              209|[1867, 5782, 6289...|
|              243|[1867, 5605, 1938...|
|              300|[1867, 2891, 5782...|
|              333|[1867, 5605, 1938...|
|              406|[1867, 5605, 3503...|
|              417|[1867, 7955, 6036...|
|              444|[1867, 5605, 1938...|
|              481|[1938, 1867, 5605...|
|              496|[1867, 5605, 5620...|
+-----------------+--------------------+
only showing top

In [22]:
recommendation_customer.write.parquet("s3://hmrecomm/transactions_train2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-20:
Traceback (most recent call last):
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/mnt/notebook-env/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/notebook-env/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 4465

