In [1]:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.0.172:4040
SparkContext available as 'sc' (version = 2.4.3, master = local[*], app id = local-1562514739602)
SparkSession available as 'spark'


import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SparkSession
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS


## Project 5 - Implementing a Recommender System on Spark

#### The goal of this project is give you practice beginning to work with a distributed recommender system.  It is sufficient for this assignment to build out your application on a single node.

I chose to give myself a challenge and create the recommender system in Spark Scala. Since Spark is written natively in Scala I figured it would be good to understand how it works at the base level before understaning the Python/R Spark frameworks.

What is nice is that you can create a Scala kernel in Jupyter notebook which I did here. There is a link in the references below that instructs you how to do it. Additionally I included the JVM .scala file in my git repo.

First we create a SparkSession on our local disk to get things started. With the master set as local it will only run jobs on your own computer but "distribute" the work across however many cores you have. 

In [2]:
val spark = SparkSession
    .builder
    .appName("MovieLens Recommendation")
    .master("local[*]")
    .getOrCreate()

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@240223d0


In [3]:
val ratingsFile = "/Users/deborahgemellaro/Programming/612/project_5/ratings.csv"
val moviesFile = "/Users/deborahgemellaro/Programming/612/project_5/movies.csv"


ratingsFile: String = /Users/deborahgemellaro/Programming/612/project_5/ratings.csv
moviesFile: String = /Users/deborahgemellaro/Programming/612/project_5/movies.csv


Next we create DataFrames from our .csv files. At the base level Spark has what are called Resilient Distributed Datasets, or RDDs. While this data structure was the most widely used, the Spark SQL library introduced DataSet and DataFrame structures which are rapidly becoming the data structure of choice in the industry. They behave much more like DataFrames in R and Python which allow you to perform SQL-like transformations on them. 

At first I pulled the csv in as an RDD and coverted to a DataFrame. But there is a DataFrame reader and we use that here to create `df1` and `df2` for ratings and movies, respectively.

In [4]:
val df1 = spark.read.format("csv")
    .option("header", true)
    .load(ratingsFile)

df1.printSchema

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



df1: org.apache.spark.sql.DataFrame = [userId: string, movieId: string ... 2 more fields]


In [5]:
val ratingsDF = df1.select(df1.col("userId"), 
                           df1.col("movieId"), 
                           df1.col("rating"), 
                           df1.col("timestamp"))
ratingsDF.show()


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



ratingsDF: org.apache.spark.sql.DataFrame = [userId: string, movieId: string ... 2 more fields]


In [6]:
val df2 = spark.read.format("csv")
    .option("header", true)
    .load(moviesFile)

df2.printSchema

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



df2: org.apache.spark.sql.DataFrame = [movieId: string, title: string ... 1 more field]


In [7]:
val moviesDF = df2.select(df2.col("movieId"), 
                          df2.col("title"),
                          df2.col("genres"))
moviesDF.show

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

moviesDF: org.apache.spark.sql.DataFrame = [movieId: string, title: string ... 1 more field]


The `createOrReplaceTempView` is one of the powerful things in Spark and it creates a temporary view of the DataFrame in order to run SQL queries. There are a few `dplyr` or SQL-like methods available as shown below that you can manipulate your DataFrame with, but if you need to run a SQL query you would do it on a temp view as shown in `val newTable`.

In [8]:
// Register both DataFrames as temp tables to make querying easier
ratingsDF.createOrReplaceTempView("ratings")
moviesDF.createOrReplaceTempView("movies")

In [9]:
// Count total number of ratings
ratingsDF.count()

res5: Long = 100836


In [10]:
// Count number of distinct users
ratingsDF.select(ratingsDF.col("userId")).distinct().count()

res6: Long = 610


In [11]:
// Count number of distinct movies
ratingsDF.select(ratingsDF.col("movieId")).distinct().count()

res7: Long = 9724


In [12]:
val newTable = spark.sql("SELECT ratings.userId, ratings.movieID, ratings.rating, movies.title FROM ratings JOIN movies ON movies.movieId=ratings.movieId WHERE ratings.userId=345 and ratings.rating > 4")
                        
newTable.show()

+------+-------+------+--------------------+
|userId|movieID|rating|               title|
+------+-------+------+--------------------+
|   345|    779|   5.0|'Til There Was Yo...|
|   345|    838|   5.0|         Emma (1996)|
|   345|    926|   5.0|All About Eve (1950)|
|   345|    932|   5.0|Affair to Remembe...|
|   345|   1203|   5.0| 12 Angry Men (1957)|
|   345|   1214|   5.0|        Alien (1979)|
|   345|   2359|   5.0|Waking Ned Devine...|
|   345|   2971|   5.0|All That Jazz (1979)|
|   345|   7121|   5.0|   Adam's Rib (1949)|
|   345|  27751|   5.0| 'Salem's Lot (2004)|
+------+-------+------+--------------------+



newTable: org.apache.spark.sql.DataFrame = [userId: string, movieID: string ... 2 more fields]


#### Create the model

Time to split the data up in to training and test sets. First we determine our ratio in the `splits` value, then since the DataFrames are all still string values, we convert the columns to the correct data types in `traningSet` and `testSet`.

In [13]:
val splits = ratingsDF.randomSplit(Array(0.75, 0.25), seed = 123L)
val (trainingData, testData) = (splits(0), splits(1))

println("Counts for -- Training Data: " + trainingData.count() + ", Test Data: " + testData.count())

Counts for -- Training Data: 75834, Test Data: 25002


splits: Array[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = Array([userId: string, movieId: string ... 2 more fields], [userId: string, movieId: string ... 2 more fields])
trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [userId: string, movieId: string ... 2 more fields]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [userId: string, movieId: string ... 2 more fields]


In [14]:
val trainingSet = trainingData.map(row => {
    val userId = row.getString(0)
    val movieId = row.getString(1)
    val ratings = row.getString(2)
    Rating(userId.toInt, movieId.toInt, ratings.toDouble)
})

trainingSet: org.apache.spark.sql.Dataset[org.apache.spark.mllib.recommendation.Rating] = [user: int, product: int ... 1 more field]


In [15]:
val testSet = testData.map(row => {
    val userId = row.getString(0)
    val movieId = row.getString(1)
    val ratings = row.getString(2)
    Rating(userId.toInt, movieId.toInt, ratings.toDouble)
})

testSet: org.apache.spark.sql.Dataset[org.apache.spark.mllib.recommendation.Rating] = [user: int, product: int ... 1 more field]


#### Build the model

We are choosing choosing the Alternating Least Squares model since it is a model type that is known to work best on distributed networks. Spark has the `ALS()` API as part of their machine learning libraries. Once we set up the model we fit it to our `traningSet`. 

In [16]:
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("user")
  .setItemCol("product")
  .setRatingCol("rating")

val model = als.fit(trainingSet)

als: org.apache.spark.ml.recommendation.ALS = als_36b96ee0fd0a
model: org.apache.spark.ml.recommendation.ALSModel = als_36b96ee0fd0a


From here we run the `model` on the `testSet` using the `transform()` method in the `ALS()` class. There are some NaNs in this file so we drop them in order to obtain a numerical result.

In [17]:
val predictions = model.transform(testSet).na.drop()

predictions: org.apache.spark.sql.DataFrame = [user: int, product: int ... 2 more fields]


#### Evaluate the model

The Spark ML library has a `RegressionEvaluator()` API that allows you to evaluate your models across a number of different metrics. Here we score the model using RMSE, MSE, and MAE just for comparison.

In [18]:
val evaluatorRMSE = new RegressionEvaluator()
    .setMetricName("rmse")
    .setLabelCol("rating")
    .setPredictionCol("prediction")

val evaluatorMSE = new RegressionEvaluator()
    .setMetricName("mse")
    .setLabelCol("rating")
    .setPredictionCol("prediction")

val evaluatorMAE = new RegressionEvaluator()
    .setMetricName("mae")
    .setLabelCol("rating")
    .setPredictionCol("prediction")

val rmse = evaluatorRMSE.evaluate(predictions)
val mse = evaluatorMSE.evaluate(predictions)
val mae = evaluatorMAE.evaluate(predictions)

println(f"RMSE = $rmse")
println(f"MSE = $mse")
println(f"MAE = $mae")

RMSE = 1.0986410985069273
MSE = 1.2070122633285083
MAE = 0.8289660242729547


evaluatorRMSE: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_5e8b152c1c9e
evaluatorMSE: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_c3088d9c93fc
evaluatorMAE: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_93b79dbca67d
rmse: Double = 1.0986410985069273
mse: Double = 1.2070122633285083
mae: Double = 0.8289660242729547


In [19]:
// Always stop your Spark Session when finished 
spark.stop()

## Conclusion
When comparing the MSE to that of a baseline Matrix Factorization model used in project 3, it appears there is a higher degree of error in the ALS model chosen here, which is very surprising to me. I would have to go back and have more control over the way the data is shaped to see why the results are the way they are. This is a case where I believe user error could be a factor.

Overall, Scala is an interesting language and can see how using a functional paradigm is applied. Performance is reported to be a lot better than using Pyspark or R, but converting teams over to Scala may be time consuming and lose productivity at first.

For a dataset like I've used here with 100k rows of movie ratings, Spark is not actually necessary, but as things get order of magnitude larger, one CPU will not do the job. That is when Spark comes in. It's really an excellent framework that "magically" manages the distributed job behind the scenes when connecting to a cluster of machines and at much less of a cost than its Hadoop predecessor. 


#### References:

How to run Scala and Spark in the Jupyter notebook

https://medium.com/@bogdan.cojocar/how-to-run-scala-and-spark-in-the-jupyter-notebook-328a80090b3b


Collaborative Filtering

https://spark.apache.org/docs/2.0.2/ml-collaborative-filtering.html


Scala Machine Learning Projects: Recommendation Systems

https://medium.com/@navdeepsingh_2336/scala-machine-learning-projects-recommendation-systems-d41d9eebbb06


Why Spark ML ALS algorithm print RMSE = NaN?

https://stackoverflow.com/questions/43544815/why-spark-ml-als-algorithm-print-rmse-nan/47236423