# Spark Matrix Factorization with ALS

## load package

In [1]:
import csv

from pyspark.sql import SparkSession, Row
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark import SparkContext, SparkConf

from utils.MovieLens import MovieLens

## read MovieLens data

In [2]:
spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/24 14:17:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
lines = spark.read.option("header", "true").csv("../ml-latest-small/ratings.csv").rdd
ratingsRDD = lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=int(p[3])))

ratings = spark.createDataFrame(ratingsRDD)

                                                                                

## model training

In [5]:
(training, test) = ratings.randomSplit([0.8, 0.2])

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop")
model = als.fit(training)

21/10/24 14:30:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
21/10/24 14:30:02 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
21/10/24 14:30:03 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


## prediction

In [6]:
predictions = model.transform(test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.1096010378963432


## top-N recommendation

In [None]:
userRecs = model.recommendForAllUsers(10)
    
user85Recs = userRecs.filter(userRecs['userId'] == 85).collect()
    
spark.stop()

In [8]:
## get movie name
ml = MovieLens()
ml.loadMovieLensLatestSmall()
        
for row in user85Recs:
    for rec in row.recommendations:
        print(ml.getMovieName(rec.movieId))

One Crazy Summer (1986)
Man with Two Brains, The (1983)
Go Fish (1994)
...And Justice for All (1979)
Celebrity (1998)
Match Point (2005)
Frida (2002)
Clerks II (2006)
Transformers: The Movie (1986)
Opposite of Sex, The (1998)


## try larger dataset to test scale up

In [2]:
def loadMovieNames():
    movieID_to_name = {}
    with open("../ml-20m/movies.csv", newline='', encoding='ISO-8859-1') as csvfile:
        movieReader = csv.reader(csvfile)
        next(movieReader)  #Skip header line
        for row in movieReader:
            movieID = int(row[0])
            movieName = row[1]
            movieID_to_name[movieID] = movieName
    return movieID_to_name

In [3]:
spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .config("spark.executor.cores", '4')\
        .config("spark.driver.memory", "64G")\
        .config("spark.executor.memory", "64G")\
        .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/24 14:30:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
lines = spark.read.option("header", "true").csv("../ml-20m/ratings.csv").rdd
ratingsRDD = lines.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

In [5]:
(training, test) = ratings.randomSplit([0.8, 0.2])

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop")
model = als.fit(training)

                                                                                

In [6]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))



Root-mean-square error = 0.8163785791796362


                                                                                

In [7]:
userRecs = model.recommendForAllUsers(10)
    
user85Recs = userRecs.filter(userRecs['userId'] == 85).collect()
    
spark.stop()

                                                                                

In [8]:
movieID_to_name = loadMovieNames()
        
for row in user85Recs:
    for rec in row.recommendations:
        if rec.movieId in movieID_to_name:
            print(movieID_to_name[rec.movieId])

Violence at Noon (Hakuchu no torima) (1966)
Bo Burnham: what. (2013)
World According to Monsanto, The (monde selon Monsanto, Le) (2008)
Chhoti Si Baat (1975)
Godâs Wedding (As Bodas de Deus) (1999)
White Mane (Crin blanc: Le cheval sauvage) (1953)
On Top of the Whale (Het dak van de Walvis) (1982)
Hamoun (1990)
Class Trip, The (La classe de neige) (1998)
Wedding March, The (1928)
