### Import libraries

In [1]:
import pandas as pd
from pyspark.sql import functions as fn

### Initiate spark session

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

# 1. Load data

In [14]:
df = spark.read.csv("ratings_beauty.csv",header=True)#.limit(600000)
print(df.count())

2023070


# 2. String index features

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [5]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in ['UserId', 'ProductId'] ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df_r.show()

+--------------+----------+------+----------+------------+---------------+
|        UserId| ProductId|Rating| Timestamp|UserId_index|ProductId_index|
+--------------+----------+------+----------+------------+---------------+
|A39HTATAQ9V7YF|0205616461|   5.0|1369699200|    310478.0|        34788.0|
|A3JM6GV9MNOF9X|0558925278|   3.0|1355443200|    339571.0|        26412.0|
|A1Z513UWSAAO0F|0558925278|   5.0|1404691200|    177109.0|        26412.0|
|A1WMRR494NWEWV|0733001998|   4.0|1382572800|    169869.0|        34789.0|
|A3IAAVS479H7M7|0737104473|   1.0|1274227200|      9458.0|        34790.0|
| AKJHHD5VEH7VG|0762451459|   5.0|1404518400|       291.0|        34791.0|
|A1BG8QW55XHN6U|1304139212|   5.0|1371945600|       186.0|        34792.0|
|A22VW0P4VZHDE3|1304139220|   5.0|1373068800|       742.0|        34793.0|
|A3V3RE4132GKRO|130414089X|   5.0|1401840000|    372382.0|        34794.0|
|A327B0I7CYTEJC|130414643X|   4.0|1389052800|    289307.0|        21564.0|
|A1BG8QW55XHN6U|130414643

In [6]:
df_r.printSchema()

root
 |-- UserId: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- UserId_index: double (nullable = false)
 |-- ProductId_index: double (nullable = false)



In [7]:
model_cols = ['UserId_index', 'ProductId_index', 'Rating']
df_r = df_r.select(*(fn.col(c).cast("float").alias(c) for c in model_cols))

## Build Out An ALS Model

In [8]:
from pyspark.ml.recommendation import ALS

In [9]:
# Split the ratings dataframe into training and test data
(train, test) = df_r.randomSplit([0.8, 0.2], seed=42)

# Set the ALS hyperparameters
als = ALS(
    userCol="UserId_index", 
    itemCol="ProductId_index", 
    ratingCol="Rating", 
    rank =10, 
    maxIter =10, 
    regParam =.1,
    coldStartStrategy="drop", 
    nonnegative =True, 
    implicitPrefs = False
)

# Fit the model to the training_data
als_model = als.fit(train)

# Generate predictions on the test_data
test_predictions = als_model.transform(test)
test_predictions.show()

+------------+---------------+------+----------+
|UserId_index|ProductId_index|Rating|prediction|
+------------+---------------+------+----------+
|      4663.0|          148.0|   2.0| 2.5258408|
|     74206.0|          148.0|   5.0| 2.5545416|
|     23226.0|          148.0|   4.0|  1.102417|
|      9432.0|          148.0|   5.0| 1.9763458|
|     31223.0|          148.0|   5.0| 3.7067816|
|     10483.0|          148.0|   2.0| 2.8975804|
|      4497.0|          148.0|   4.0|  1.089476|
|     13943.0|          148.0|   4.0| 2.7961543|
|     42925.0|          148.0|   5.0| 2.0089037|
|       113.0|          148.0|   4.0| 3.4126592|
|     37610.0|          148.0|   5.0| 1.8207655|
|     18127.0|          148.0|   3.0| 2.9809241|
|     52810.0|          148.0|   4.0| 1.6117176|
|      5107.0|          148.0|   5.0| 3.5777962|
|     14333.0|          463.0|   5.0|   3.52846|
|      5572.0|          463.0|   5.0| 2.0656528|
|     73891.0|          463.0|   5.0| 3.4963653|
|     14568.0|      

## Build RMSE Evaluator

In [12]:
# Import RegressionEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

# Complete the evaluator code
evaluator = RegressionEvaluator(metricName="rmse", labelCol="UserId_index", predictionCol="ProductId_index")

# Extract the 3 parameters
print(evaluator.getMetricName())
print(evaluator.getLabelCol())
print(evaluator.getPredictionCol())


# Evaluate the "test_predictions" dataframe
RMSE = evaluator.evaluate(test_predictions)

# Print the RMSE
print (RMSE)

rmse
UserId_index
ProductId_index
32703.39096689963


In [13]:
test_predictions.count()

35775