In [1]:
# start the Spark Context
import findspark
findspark.init()

In [2]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

Collaborative Filtering

In [3]:
# Collaborative Filtering is a mean of recommendation 
# based on users’ past behavior.
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [4]:
lines = spark.read.text("sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2])))

In [5]:
ratings = spark.createDataFrame(ratingsRDD)

In [6]:
ratings.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [7]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [8]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [9]:
model = als.fit(training)

In [10]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [11]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.8903952689149899


In [12]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [13]:
userRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[[32, 6.716896], ...|
|    26|[[22, 5.3148794],...|
|    27|[[79, 3.7492013],...|
|    12|[[17, 4.8160768],...|
|    22|[[51, 4.99123], [...|
|     1|[[7, 4.0598125], ...|
|    13|[[8, 2.815679], [...|
|     6|[[62, 3.8843865],...|
|    16|[[85, 5.206078], ...|
|     3|[[46, 6.1292014],...|
|    20|[[22, 4.2018137],...|
|     5|[[55, 4.415578], ...|
|    19|[[51, 4.8350086],...|
|    15|[[53, 6.171276], ...|
|    17|[[17, 5.613444], ...|
|     9|[[17, 5.3470526],...|
|     4|[[2, 4.3587255], ...|
|     8|[[29, 5.0400243],...|
|    23|[[49, 5.116645], ...|
|     7|[[25, 4.853827], ...|
+------+--------------------+
only showing top 20 rows



In [14]:
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

In [15]:
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     31|[[9, 4.2971845], ...|
|     85|[[16, 5.206078], ...|
|     65|[[23, 4.9456534],...|
|     53|[[15, 6.171276], ...|
|     78|[[26, 1.436444], ...|
|     34|[[26, 3.1095715],...|
|     81|[[11, 3.940272], ...|
|     28|[[9, 3.3060458], ...|
|     76|[[14, 4.711523], ...|
|     26|[[14, 3.1876962],...|
|     27|[[11, 5.1851377],...|
|     44|[[18, 3.9012063],...|
|     12|[[28, 4.785045], ...|
|     91|[[7, 4.014581], [...|
|     22|[[26, 5.3148794],...|
|     93|[[2, 5.125813], [...|
|     47|[[22, 4.844349], ...|
|      1|[[15, 3.829822], ...|
|     52|[[14, 5.1703763],...|
|     13|[[11, 3.7557821],...|
+-------+--------------------+
only showing top 20 rows



In [16]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

In [17]:
userSubsetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[[22, 5.3148794],...|
|    19|[[51, 4.8350086],...|
|    29|[[46, 5.0938053],...|
+------+--------------------+



In [18]:
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [19]:
movieSubSetRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     65|[[23, 4.9456534],...|
|     26|[[14, 3.1876962],...|
|     29|[[8, 5.0400243], ...|
+-------+--------------------+



In [20]:
spark.stop()