##### source: https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

##### example loading file CSV

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.Builder().appName('Recommendation_system').getOrCreate()

In [None]:
# https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
ratings = spark.read.load("data/movie-ratings-spark.dat", format="csv", sep=":", inferSchema="true")
ratings.show(20)

##### example ALS Training

In [1]:
import findspark
findspark.init()

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

from pyspark import SparkContext
sc = SparkContext("local", "Recommendation System")

In [2]:
# Load and parse the data
data = sc.textFile("/Users/kadriansyah/Projects/spark-2.4.5-bin-hadoop2.7/data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

Mean Squared Error = 5.897688366047824e-06
