In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession 

from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.recommendation import ALS

from pyspark.ml.tuning import TrainValidationSplit,CrossValidator, ParamGridBuilder

In [2]:
spark = SparkSession.builder \
    .master('local') \
    .appName('BRS-new') \
    .getOrCreate()

In [3]:
books_df = spark.read.csv("./datasets/book/books.csv",header=True,inferSchema=True)
book_ratings = spark.read.csv("./datasets/book/new_ratings.csv",header=True,inferSchema=True)

In [4]:
book_ratings.show(2)

+-------+-----------+-------+
|User-ID|Book-Rating|book_id|
+-------+-----------+-------+
|      0|          9|      9|
|      0|          9|    287|
+-------+-----------+-------+
only showing top 2 rows



In [5]:
(training, test) = book_ratings.randomSplit([0.8,0.2], seed=42)

als = ALS(userCol="User-ID",itemCol="book_id", ratingCol="Book-Rating",
                coldStartStrategy="drop",nonnegative=True,implicitPrefs=False)

In [6]:
# param_grid -> for fine-tuning of the model -> hyperparams
param_grid = ParamGridBuilder()\
             .addGrid(als.rank,[12,13,14])\
             .addGrid(als.maxIter,[18,19,20])\
             .addGrid(als.regParam, [.07,.09,.12])\
             .build()
# hyperparam "rank" - rank of U and P 
# hyperparam "maxIter" - how many times to alternate
# hyperparam "regParam" - regularization param to prevent overfitting

In [7]:
# Evaluator = RMSE -> sqroot of the avg squared differences between 
#                                   predicted and observed outcomes
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Book-Rating",
                                predictionCol="prediction")

In [9]:
cv = CrossValidator(estimator=als,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5) 


In [6]:
model = als.fit(training)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\Raul\anaconda3\envs\BRS-spark\Lib\site-packages\IPython\core\interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "C:\Users\Raul\AppData\Local\Temp\ipykernel_24912\344368942.py", line 1, in <module>
    model = als.fit(training)
            ^^^^^^^^^^^^^^^^^
  File "C:\Spark\spark\python\pyspark\ml\base.py", line 205, in fit
    return self._fit(dataset)
           ^^^^^^^^^^^^^^^^^^
  File "C:\Spark\spark\python\pyspark\ml\wrapper.py", line 381, in _fit
    java_model = self._fit_java(dataset)
                 ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Spark\spark\python\pyspark\ml\wrapper.py", line 378, in _fit_java
    return self._java_obj.fit(dataset._jdf)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Spark\spark\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
      

ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it

In [None]:
bestModel = model.bestModel

In [None]:
pred = bestModel.transform(test)
pred.show(4)

In [None]:
rmse = evaluator.evaluate(pred)

In [None]:
rmse

In [None]:
print("rank :",bestModel.rank," maxiter :",bestModel._java_obj.parent().getMaxIter(), " regparam :",bestModel._java_obj.parent().getRegParam())