In [1]:
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession.builder.master("local[*]").appName("Test").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/21 20:41:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Executing ALS on MovieLens with PySpark
The collaborative filtering algorithm, Alternating Least Squares (ALS), is widely recognized for matrix factorization.

This notebook demonstrates the application and assessment of the ALS PySpark ML implementation, which is based on the DataFrame API and designed for handling large-scale distributed datasets. To ensure efficient execution on multiple cores of a Data Science Virtual Machine, we utilize a smaller dataset in this example for running ALS.

## Read Data:
Reading the data in df. 

u.data     --   The full u data set, 100000 ratings by 943 users on 1682 items. Each user has rated at least 20 movies.  Users and items are numbered consecutively from 1.  The data is randomly ordered. This is a tab separated list of `user id | item id | rating | timestamp`. The time stamps are unix seconds since 1/1/1970 UTC 

In [2]:
schema = StructType([
    StructField("userId", IntegerType()),
    StructField("itemId", IntegerType()),
    StructField("rating", DoubleType()),
    StructField("timestamp", IntegerType())
])

df_data = spark.read.csv("./ml-100k/u.data", sep="\t", schema=schema, header=False)
df_data.show(5)

+------+------+------+---------+
|userId|itemId|rating|timestamp|
+------+------+------+---------+
|   196|   242|   3.0|881250949|
|   186|   302|   3.0|891717742|
|    22|   377|   1.0|878887116|
|   244|    51|   2.0|880606923|
|   166|   346|   1.0|886397596|
+------+------+------+---------+
only showing top 5 rows



## Splitting the data
Spliting the data by 80/20 for train and test purposes using randomSplit feature on spark dataframes.

In [3]:
df_train, df_test = df_data.randomSplit([0.80, 0.20], seed=45)
# show the size of each df
print("Train size: ", df_train.count())
print("Test size: ", df_test.count())

Train size:  79991
Test size:  20009


## Train
taking recommendation for hyper params from [here](http://mymedialite.net/examples/datasets.html)

In [4]:
RANK = 10
MAX_ITER = 15
REG_PARAM = 0.05

COL_USER = "userId"
COL_ITEM = "itemId"
COL_RATING = "rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "timestamp"

als = ALS(
    maxIter=MAX_ITER, 
    rank=RANK,
    regParam=REG_PARAM, 
    userCol=COL_USER, 
    itemCol=COL_ITEM, 
    ratingCol=COL_RATING, 
    coldStartStrategy="drop"
)

model = als.fit(df_train)

24/01/21 20:42:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/01/21 20:42:01 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


## Evaluating the Model

Evaluating the model performance with metrics like RMSE, MAE, R2 and Explained Variance.

In [5]:
df_prediction = model.transform(df_test)
df_prediction.show()

+------+------+------+---------+----------+
|userId|itemId|rating|timestamp|prediction|
+------+------+------+---------+----------+
|   148|    98|   3.0|877017714|  4.345466|
|   148|   116|   5.0|877398648| 3.5346677|
|   148|   140|   1.0|877019882| 2.6252813|
|   148|   163|   4.0|877021402| 3.7345974|
|   148|   177|   2.0|877020715| 4.8391724|
|   148|   181|   5.0|877399135| 4.3997464|
|   148|   185|   1.0|877398385|  4.905327|
|   148|   222|   4.0|877398901| 4.4192543|
|   148|   418|   3.0|877019251| 3.6862793|
|   148|   521|   1.0|877398836|  3.476709|
|   148|   549|   3.0|877398385|  3.962182|
|   463|    14|   1.0|890453075|   3.60675|
|   463|    16|   4.0|877385830| 1.9752834|
|   463|   125|   4.0|877385590| 3.4353237|
|   463|   151|   4.0|877385341|  3.058948|
|   463|   249|   2.0|889936035| 1.4408547|
|   463|   253|   5.0|877387935|  2.502262|
|   463|   301|   5.0|889936512| 2.3057375|
|   463|   362|   1.0|889943741| 2.8863873|
|   463|   473|   4.0|877385731|

In [6]:
# Evaluate the model by computing the RMSE on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol=COL_RATING, predictionCol=COL_PREDICTION)
rmse = evaluator.evaluate(df_prediction)
print("Root-mean-square error = " + str(rmse))

# MAE
evaluator = RegressionEvaluator(metricName="mae", labelCol=COL_RATING, predictionCol=COL_PREDICTION)
mae = evaluator.evaluate(df_prediction)
print("Mean absolute error = " + str(mae))

# R2 score
evaluator = RegressionEvaluator(metricName="r2", labelCol=COL_RATING, predictionCol=COL_PREDICTION)
r2 = evaluator.evaluate(df_prediction)
print("R2 score = " + str(r2))

# Explained variance
evaluator = RegressionEvaluator(metricName="var", labelCol=COL_RATING, predictionCol=COL_PREDICTION)
var = evaluator.evaluate(df_prediction)
print("Explained variance = " + str(var))


Root-mean-square error = 0.9567427744947872
Mean absolute error = 0.7423525855668276
R2 score = 0.2709882370597655
Explained variance = 0.672904484834605


for fine tuning lets try with different cominations of these parameters and chose the best performing params for metric.

In [7]:
# SOME more trainsing with different parameters
best_rmse = None
params = {
    "rank": None,
    "maxIter": None,
    "regParam": None
}
for rank in range(4, 21):
    for maxIter in [10, 15, 20]:
        for regParam in [0.05, 0.1, 0.15]:
            als = ALS(
                maxIter=maxIter, 
                rank=rank,
                regParam=regParam, 
                userCol=COL_USER, 
                itemCol=COL_ITEM, 
                ratingCol=COL_RATING, 
                coldStartStrategy="drop"
            )
            model = als.fit(df_train)
            df_prediction = model.transform(df_test)
            evaluator = RegressionEvaluator(metricName="rmse", labelCol=COL_RATING, predictionCol=COL_PREDICTION)
            rmse = evaluator.evaluate(df_prediction)
            print("rank: {}, maxIter: {}, regParam: {}, RMSE: {}".format(rank, maxIter, regParam, rmse))
            if best_rmse is None or best_rmse > rmse:
                best_rmse = rmse
                params = {
                    "rank": rank,
                    "maxIter": maxIter,
                    "regParam": regParam
                }

print(f"best rmse was: {best_rmse}, with params: {params}")
                

rank: 4, maxIter: 10, regParam: 0.05, RMSE: 0.9316641679705179
rank: 4, maxIter: 10, regParam: 0.1, RMSE: 0.9219903819291951
rank: 4, maxIter: 10, regParam: 0.15, RMSE: 0.9263350517142951
rank: 4, maxIter: 15, regParam: 0.05, RMSE: 0.9295670982885488
rank: 4, maxIter: 15, regParam: 0.1, RMSE: 0.9193331016818193
rank: 4, maxIter: 15, regParam: 0.15, RMSE: 0.9217740259492229
rank: 4, maxIter: 20, regParam: 0.05, RMSE: 0.929506961872426
rank: 4, maxIter: 20, regParam: 0.1, RMSE: 0.9180518738412239
rank: 4, maxIter: 20, regParam: 0.15, RMSE: 0.9200244481859389
rank: 5, maxIter: 10, regParam: 0.05, RMSE: 0.9326016133416611
rank: 5, maxIter: 10, regParam: 0.1, RMSE: 0.9178282290325158
rank: 5, maxIter: 10, regParam: 0.15, RMSE: 0.9227998511901279
rank: 5, maxIter: 15, regParam: 0.05, RMSE: 0.93109669777094
rank: 5, maxIter: 15, regParam: 0.1, RMSE: 0.9147788407289761
rank: 5, maxIter: 15, regParam: 0.15, RMSE: 0.9187885729125281
rank: 5, maxIter: 20, regParam: 0.05, RMSE: 0.9308216849804807


In [None]:
# cleanup spark instance
spark.stop()