# Tutorial 12

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("Chapter4-4").getOrCreate()

24/05/04 00:30:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
ratings = (
    spark.read.csv(
        path="file:///home/hduser/Downloads/ratings.csv",
        sep=",",
        header=True,
        quote='"',
        schema="userId INT, movieId INT, rating DOUBLE, timestamp INT",
    )
    # .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
    .select("userId", "movieId", "rating")
    .cache()
)

The ALS class has this signature:

```python
class pyspark.ml.recommendation.ALS(
    rank=10,
    maxIter=10,
    regParam=0.1,
    numUserBlocks=10,
    numItemBlocks=10,
    implicitPrefs=False,
    alpha=1.0,
    userCol="user",
    itemCol="item",
    seed=None,
    ratingCol="rating",
    nonnegative=False,
    checkpointInterval=10,
    intermediateStorageLevel="MEMORY_AND_DISK",
    finalStorageLevel="MEMORY_AND_DISK",
    coldStartStrategy="nan",
)
```

In [6]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator



In [7]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
)

(training_data, validation_data) = ratings.randomSplit([8.0, 2.0])

evaluator = RegressionEvaluator(
    metricName="rmse", labelCol="rating", predictionCol="prediction"
)

model = als.fit(training_data)
predictions = model.transform(validation_data)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [8]:
predictions.show(10, False)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|1     |1127   |4.0   |3.9200945 |
|1     |804    |4.0   |3.5716279 |
|1     |593    |4.0   |4.9135103 |
|1     |500    |3.0   |3.9667256 |
|1     |1032   |5.0   |4.1951494 |
|1     |423    |3.0   |3.3850315 |
|1     |1060   |4.0   |4.5238147 |
|1     |648    |3.0   |4.303916  |
|1     |216    |5.0   |4.297819  |
|1     |480    |4.0   |4.521273  |
+------+-------+------+----------+
only showing top 10 rows



In [9]:
rmse = evaluator.evaluate(predictions.na.drop())

In [10]:
print(rmse)

0.8810046017927589


In [11]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

parameter_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [1, 5, 10])
    .addGrid(als.maxIter, [20])
    .addGrid(als.regParam, [0.05, 0.1])
    .build()
)

In [12]:
type(parameter_grid)

list

In [13]:
from pprint import pprint

pprint(parameter_grid)

[{Param(parent='ALS_8cbdbaa32692', name='regParam', doc='regularization parameter (>= 0).'): 0.05,
  Param(parent='ALS_8cbdbaa32692', name='rank', doc='rank of the factorization'): 1,
  Param(parent='ALS_8cbdbaa32692', name='maxIter', doc='max number of iterations (>= 0).'): 20},
 {Param(parent='ALS_8cbdbaa32692', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
  Param(parent='ALS_8cbdbaa32692', name='rank', doc='rank of the factorization'): 1,
  Param(parent='ALS_8cbdbaa32692', name='maxIter', doc='max number of iterations (>= 0).'): 20},
 {Param(parent='ALS_8cbdbaa32692', name='regParam', doc='regularization parameter (>= 0).'): 0.05,
  Param(parent='ALS_8cbdbaa32692', name='rank', doc='rank of the factorization'): 5,
  Param(parent='ALS_8cbdbaa32692', name='maxIter', doc='max number of iterations (>= 0).'): 20},
 {Param(parent='ALS_8cbdbaa32692', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
  Param(parent='ALS_8cbdbaa32692', name='rank', doc='rank of

In [14]:
crossvalidator = CrossValidator(
    estimator=als,
    estimatorParamMaps=parameter_grid,
    evaluator=evaluator,
    numFolds=2,
)

crossval_model = crossvalidator.fit(training_data)
predictions = crossval_model.transform(validation_data)


In [15]:
rmse = evaluator.evaluate(predictions.na.drop())
print(rmse)

0.8802792436590022


In [16]:
model = crossval_model.bestModel

In [17]:
model

ALSModel: uid=ALS_8cbdbaa32692, rank=1

## References
* Mastering Big Data Analytics with PySpark, By Danny Meijer, Publisher:Packt Publishing, June 2020