In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
#Creating a SparkContext First 
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [4]:
rdd2 = sc.textFile("/Users/spillai/Downloads/sample_movielens_ratings.txt")\
.map(lambda line: line.split("::"))\
.map(lambda splits: (int(splits[0]), int(splits[1]), float(splits[2]),int(splits[3])))

In [5]:
#Then, take a look at the contents of rdd
for elem in rdd2.take(5):
   print(elem)

(0, 2, 3.0, 1424380312)
(0, 3, 1.0, 1424380312)
(0, 5, 2.0, 1424380312)
(0, 9, 4.0, 1424380312)
(0, 11, 1.0, 1424380312)


In [6]:
from pyspark.sql.types import *
userId_field = StructField("userId", IntegerType(), True)
movieId_field = StructField("movieId", IntegerType(), True)
rating_field = StructField("rating", FloatType(), True)
timestamp_field=StructField("timestamp", LongType(), True)
schema = StructType([userId_field, movieId_field,rating_field,timestamp_field])

In [7]:
ratings = spark.createDataFrame(rdd2, schema)

In [10]:
ratings.show(2)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
+------+-------+------+----------+
only showing top 2 rows



In [8]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [9]:
(training, test) = ratings.randomSplit([0.8, 0.2])

In [10]:
training.show(2)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
+------+-------+------+----------+
only showing top 2 rows



In [11]:
test.show(2)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
+------+-------+------+----------+
only showing top 2 rows



In [12]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [13]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.7967291719377918


In [14]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)


In [16]:
userRecs.show(10)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    28|[[53, 7.430837], ...|
|    26|[[75, 6.2184954],...|
|    27|[[79, 3.8349588],...|
|    12|[[51, 5.938544], ...|
|    22|[[94, 6.250917], ...|
|     1|[[75, 3.114649], ...|
|    13|[[70, 3.7557003],...|
|     6|[[25, 4.863719], ...|
|    16|[[25, 5.9628716],...|
|     3|[[32, 5.556713], ...|
+------+--------------------+
only showing top 10 rows



In [27]:
movieRecs.show(2)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     31|[[28, 4.9854684],...|
|     85|[[0, 5.441595], [...|
+-------+--------------------+
only showing top 2 rows



In [19]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

If the rating matrix is derived from another source of information (i.e. it is inferred from other signals), you can set implicitPrefs to True to get better results:

In [None]:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")