In [1]:
from __future__ import print_function
import os
import json
import pandas as pd
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jre1.8.0_181\bin"
import sys
if sys.version >= '3':
    long = int

from pyspark.sql import SparkSession

# $example on$
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
# $example off$

spark = SparkSession\
    .builder\
    .appName("ALSExample")\
    .getOrCreate()

In [2]:
df = spark.read.csv("ml-latest-small/ratings.csv",header="true")

In [3]:
df.count()

100836

In [4]:
lines = df.rdd

In [5]:
lines.count()

100836

In [8]:
ratingsRDD = lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2])))

In [9]:
ratingsRDD.count()
ratingsRDD

PythonRDD[23] at RDD at PythonRDD.scala:52

In [10]:
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [11]:
print (ratings.count())
print (type(ratings))

100836
<class 'pyspark.sql.dataframe.DataFrame'>


In [12]:
als = ALS(maxIter=15, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

In [13]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.1580878954046268


In [14]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)

In [15]:
users = ratings.select(als.getUserCol()).distinct().limit(5)
userSubsetRecs = model.recommendForUserSubset(users, 10)

In [16]:
df = userRecs.toPandas()

In [18]:
for i in range(len(df)):
    currentId = str(df['userId'][i])
    recommendations = {currentId:[]}
    recs = df['recommendations'][i]
    for j in range(len(recs)):
#         print(recs[j].movieId)
        recommendations[currentId].append(recs[j].movieId)
    filename = 'recommendations/user_' + str(currentId) + '.json'
    with open(filename, 'w') as fp:
        json.dump(recommendations, fp)

In [19]:
userRecs.count()

610