# Recommendation system

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [2]:
spark = SparkContext("local", "sqlContext")
sql = SQLContext(spark)

#### Load data

In [3]:
path = 'D:/ProgramFiles/Spark/spark-3.0.0-bin-hadoop2.7/data/mllib/'
lines = sql.read.text(path + "als/sample_movielens_ratings.txt").rdd
lines.take(10)

[Row(value='0::2::3::1424380312'),
 Row(value='0::3::1::1424380312'),
 Row(value='0::5::2::1424380312'),
 Row(value='0::9::4::1424380312'),
 Row(value='0::11::1::1424380312'),
 Row(value='0::12::2::1424380312'),
 Row(value='0::15::1::1424380312'),
 Row(value='0::17::1::1424380312'),
 Row(value='0::19::1::1424380312'),
 Row(value='0::21::1::1424380312')]

#### Data preparation

In [9]:
parts = lines.map(lambda row: row.value.split("::"))
rdd = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]), rating=float(p[2]), timestamp=float(p[3])))
df = sql.createDataFrame(rdd)
(df_train, df_test) = df.randomSplit([0.8, 0.2])

#### Build the model

In [10]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
als_fit = als.fit(df_train)

#### Validation

In [11]:
pred = als_fit.transform(df_test)
eval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = eval.evaluate(pred)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.8518172765304592


#### Generate recommendations

In [18]:
print('Top 10 movie recommendations for each user\n')
userRecs = als_fit.recommendForAllUsers(10)
userRecs.show()

print('Top 10 user recommendations for each movie\n')
movieRecs = als_fit.recommendForAllItems(10)
movieRecs.show()

print('Top 10 movie recommendations for a specified set of users\n')
users = df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = als_fit.recommendForUserSubset(users, 10)
userSubsetRecs.show()

print('Top 10 movie user recommendations for a specified set of movies\n')
movies = df.select(als.getItemCol()).distinct().limit(3)
movieSubsetRecs = als_fit.recommendForItemSubset(movies, 10)
movieSubsetRecs.show()

Top 10 movie recommendations for each user

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[[46, 6.4157724],...|
|    26|[[18, 6.1649218],...|
|    27|[[19, 3.1087563],...|
|    12|[[46, 6.6188993],...|
|    22|[[22, 5.2609615],...|
|     1|[[18, 5.778796], ...|
|    13|[[93, 3.847912], ...|
|     6|[[92, 4.2404537],...|
|    16|[[32, 6.154178], ...|
|     3|[[51, 4.843397], ...|
|    20|[[22, 4.49481], [...|
|     5|[[18, 5.886023], ...|
|    19|[[90, 4.1057715],...|
|    15|[[32, 5.0378904],...|
|    17|[[18, 5.42172], [...|
|     9|[[28, 6.0178432],...|
|     4|[[74, 4.4687047],...|
|     8|[[29, 5.2050834],...|
|    23|[[90, 6.024208], ...|
|     7|[[52, 5.427095], ...|
+------+--------------------+
only showing top 20 rows

Top 10 user recommendations for each movie

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     31|[[16, 5.0786905],...|
|     85|[[7, 4.7242293], ...|
|     65|[[

## Credits & Links

http://spark.apache.org/docs/latest/ml-collaborative-filtering.html