In [None]:
#dbutils.fs.rm("/FileStore/tables", True)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,explode
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [None]:
spark = SparkSession.builder.appName("Collaborative filtering").getOrCreate()

In [None]:
moviesDF = spark.read.options(header="True", inferSchema="True").csv("/FileStore/tables/movies.csv")
ratingsDF = spark.read.options(header="True", inferSchema="True").csv("/FileStore/tables/ratings.csv")

In [None]:
display(moviesDF)

In [None]:
display(ratingsDF)

In [None]:
ratings = ratingsDF.join(moviesDF, 'movieId', 'left')

In [None]:
(train, test) = ratings.randomSplit([0.8,0.2])

In [None]:
ratings.count()

In [None]:
print(train.count())
train.show()

In [None]:
print(test.count())
test.show()

In [None]:
als = ALS(userCol = "userId", itemCol="movieId", ratingCol="rating", nonnegative=True,implicitPrefs=False, coldStartStrategy="drop")

In [None]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [None]:
evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction")

In [None]:
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

In [None]:
model = cv.fit(train)
best_model = model.bestModel
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [None]:
print(RMSE)

In [None]:
recommendations = best_model.recommendForAllUsers(5)

In [None]:
df = recommendations

In [None]:
display(df)

In [None]:
df2 = df.withColumn("movieid_rating", explode("recommendations"))

In [None]:
display(df2)

In [None]:
display(df2.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")))