In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
spark = SparkSession.builder.appName('rec').getOrCreate()
data = spark.read.options(inferSchema=True,header=True).csv("/FileStore/tables/movielens_ratings.csv")
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [0]:
train,test = data.randomSplit([0.8,0.2])

In [0]:
als = ALS(maxIter=5,regParam=0.01,userCol='userId',itemCol='movieId',ratingCol='rating')

In [0]:
model = als.fit(train)
predictions = model.transform(test)
predictions.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|      0|   3.0|    28|  -0.240143|
|      1|   1.0|    28|   3.998542|
|      2|   4.0|    28| -0.6411375|
|      0|   1.0|    27| -1.5952483|
|      1|   1.0|    26|  0.8329944|
|      5|   2.0|    26|   2.056287|
|      2|   1.0|    12|  1.8004713|
|      3|   2.0|    22|-0.97574925|
|      6|   1.0|     1|   2.772672|
|      3|   1.0|    13|  1.4257386|
|      0|   1.0|     6| 0.72302777|
|      6|   1.0|     6| 0.54042184|
|      0|   1.0|    20| 0.40272674|
|      3|   1.0|    17| -2.2445576|
|      4|   1.0|     9|   2.771164|
|      5|   1.0|     9|  0.6686266|
|      6|   1.0|     9|  0.6441575|
|      1|   3.0|    25| 0.21801847|
|      2|   1.0|    25|  1.0165447|
|      4|   1.0|    24|  1.7850165|
+-------+------+------+-----------+
only showing top 20 rows



In [0]:
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
rmse

Out[23]: 1.8829215265306698

In [0]:
single_user = test.filter(test.userId == 11).select(['movieId','userId'])
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|      9|    11|
|     23|    11|
|     35|    11|
|     36|    11|
|     39|    11|
|     50|    11|
|     62|    11|
|     72|    11|
|     79|    11|
|     82|    11|
|     94|    11|
|     97|    11|
+-------+------+



In [0]:
recommendations = model.transform(single_user)
recommendations.orderBy('prediction',ascending=False).show()

+-------+------+-----------+
|movieId|userId| prediction|
+-------+------+-----------+
|      9|    11|   3.350803|
|     39|    11|  2.1177056|
|     97|    11|  1.3256621|
|     36|    11| 0.88194096|
|     72|    11|  0.6656874|
|     94|    11|  0.5158758|
|     23|    11| 0.30383065|
|     50|    11|0.104614615|
|     79|    11|-0.11627808|
|     82|    11| -0.2762805|
|     35|    11| -0.4837041|
|     62|    11| -1.5668689|
+-------+------+-----------+

