# Collaborative filtering

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Collaborative filtering").getOrCreate()


In [2]:
moviesDF = spark.read.options(inferSchema=True, header=True).csv(
    "data/collaborative_filtering/movies.csv")
ratingsDF = spark.read.options(inferSchema=True, header=True).csv(
    "data/collaborative_filtering/ratings.csv")


In [3]:
moviesDF.printSchema()
ratingsDF.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [4]:
ratings = ratingsDF.join(moviesDF, "movieId", "left")
train, test = ratings.randomSplit([0.8, 0.2])


In [5]:
train.count(), test.count()


(80770, 20066)

In [6]:
from pyspark.ml.recommendation import ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",
          nonnegative=True, implicitPrefs=False, coldStartStrategy="drop",)

# coldStartStrategy = "drop" --> drop the value of user who have not calificated


## Hyperparameter tuning

In [7]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


In [8]:
param_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [10, 50, 100, 150])
    .addGrid(als.regParam, [0.1, 0.5, 0.1, 0.15])
).build()


In [9]:
evaluator = (
    RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )
)


In [10]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [11]:
model = cv.fit(train)
best_model = model.bestModel
test_predictions = best_model.transform(test)


TypeError: 'RegressionEvaluator' object is not callable

In [12]:
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8696997246369751


In [13]:
recommendations = best_model.recommendForAllUsers(5)

In [14]:
recommendations.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{89904, 4.514392...|
|   463|[{60943, 4.834961...|
|   496|[{8477, 4.5131974...|
|   148|[{89904, 4.517269...|
|   540|[{60943, 5.131281...|
|   392|[{8477, 5.032757}...|
|   243|[{8477, 5.4483294...|
|    31|[{134796, 4.94217...|
|   516|[{6201, 4.6313033...|
|   580|[{60943, 4.756531...|
|   251|[{132333, 5.30952...|
|   451|[{132333, 5.12794...|
|    85|[{1140, 4.8483987...|
|   137|[{6650, 4.800531}...|
|    65|[{8477, 4.690697}...|
|   458|[{42730, 5.280007...|
|   481|[{3451, 4.0450783...|
|    53|[{78836, 6.262577...|
|   255|[{1194, 4.087688}...|
|   588|[{132333, 4.33673...|
+------+--------------------+
only showing top 20 rows

