# T501-SP24 - Lab Assignment 4.2 
## *Collaborative Filtering with Spark MLlib and ALS*

*This code is based on an example at Based on code from https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html; a modified version is part of the tutorial in CloudxLab.*

<div style="background-color: rgb(230, 230, 230);">

---
## Assignment Details

### Objectives
The learning objectives of this assignment are:
1.	to gain experience running a Spark MLlib notebook in Jupyter & CloudxLab
2.  to experiment with the impact of various ML model parameters with the goal of finding an practical balance of speed and accuracy
3.  to work successfully in a team on a collaborative assignment

### General Instructions
* Read and run the code in **Sections 1-3**. This code is based on an example at Based on code from https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html; a modified version is part of the tutorial in CloudxLab.
* In **Section 4**, you are asked to repeatedly adjust model parameters and train and test the model to determine the effects of various parameter settings. You are then asked to answer some basic questions about the results.
* As with our other open-ended questions and assignments, do not try to find a single right answer. Instead, pay attention to the results of your work and make intelligent adjustments and analyses based on the results you get.
* Upload this Jupyter notebook to your Jupyter directory on CloudXLab. Enter all of your code into this notebook. You should not need to download any datasets. The given notebook uses an existing dataset installed in CloudxLab
* Your modified Jupyter notebook will serve as your group’s submission for the assignment. You should **also submit a PDF version** of the notebook (through "print" to a PDF or your system, or download as PDF from CloudXLab.)

---
 


---
## 1. Prep Environment
### 1.1 Load Libraries

In [1]:
import org.apache.spark.ml.recommendation.ALS 
import org.apache.spark.ml.evaluation.RegressionEvaluator
import scala.math._

### 1.2 Define new data class and function to work with Ratings

In [2]:
// define a new class called "Rating"
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)

// define a function that takes a single string and splits it on the "::" separator 
//   into the 4 elements of a Rating (userID, movieID, rating, timestamp)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

//Test the new function
parseRating("1::1193::5::978300760")


defined class Rating


parseRating: (str: String)Rating


Rating(1,1193,5.0,978300760)

---
## 2. Read and Modify Data
### 2.1 Read text file with ratings data

In [3]:
var rawRatingsRDD = sc.textFile("/data/ml-1m/ratings.dat")

// Check one record
//  - it should be res4: Array[String] = Array(1::1193::5::978300760)
// If this fails the location of file is wrong.
rawRatingsRDD.take(5).foreach(println)
println("========")
println("Number of ratings: " + rawRatingsRDD.count())

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
Number of ratings: 1000209


rawRatingsRDD = /data/ml-1m/ratings.dat MapPartitionsRDD[1] at textFile at <console>:32


/data/ml-1m/ratings.dat MapPartitionsRDD[1] at textFile at <console>:32

### 2.2 Parse the RDD, convert to DataFrame, and display a random sample

In [4]:
val ratingsDF = rawRatingsRDD.map(parseRating).toDF()
//check if everything is ok
ratingsDF.sample(false,0.1).show(10)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|   1193|   5.0|978300760|
|     1|    594|   4.0|978302268|
|     1|   3186|   4.0|978300019|
|     1|    588|   4.0|978824268|
|     1|   1836|   5.0|978300172|
|     1|   1022|   5.0|978300055|
|     1|   1246|   4.0|978302091|
|     2|    647|   3.0|978299351|
|     2|   2628|   3.0|978300051|
|     2|   3107|   2.0|978300002|
+------+-------+------+---------+
only showing top 10 rows



ratingsDF = [userId: int, movieId: int ... 2 more fields]


[userId: int, movieId: int ... 2 more fields]

### 2.3 Divide the data into training and testing sets

In [5]:
// split the data into 80% for training and 20% for testing
val Array(trainingData, testData) = ratingsDF.randomSplit(Array(0.8, 0.2))
// show 10 randomly sampled rows from each
val trainingDataCount = trainingData.count()
println("\nRows of training data: " + trainingDataCount)
trainingData.sample(false,0.1).show(10)

val testDataCount = testData.count()
println("Rows of test data: " + testDataCount)
testData.sample(false,0.1).show(10)

[Stage 3:>                                                          (0 + 2) / 2]
Rows of training data: 799915
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|    595|   5.0|978824268|
|     2|    356|   5.0|978299686|
|     2|   1198|   4.0|978298124|
|     2|   1245|   2.0|978299200|
|     2|   1259|   5.0|978298841|
|     2|   1945|   5.0|978298458|
|     2|   1962|   5.0|978298813|
|     2|   2006|   3.0|978299861|
|     2|   3699|   2.0|978299173|
|     2|   3893|   1.0|978299535|
+------+-------+------+---------+
only showing top 10 rows

Rows of test data: 200294                                                       
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|     48|   5.0|978824351|
|     2|    589|   4.0|978299773|
|     2|   1917|   3.0|978300174|
|     2|   2571|   4.0|978299773|
|     5|     36|   3.0|978244808|
|     5|    357|   2.0|978245829|


trainingData = [userId: int, movieId: int ... 2 more fields]
testData = [userId: int, movieId: int ... 2 more fields]
trainingDataCount = 799915
testDataCount = 200294


200294

---
## 3. Alternating Least Squares (ALS) Machine Learning Model
### 3.1 Create ALS

In [6]:
// create ALS with default parameter values

val ratingsALS = new ALS()
    .setMaxIter(10)
    .setRank(10)
    .setRegParam(0.1)
    .setUserCol("userId")
    .setItemCol("movieId")
    .setRatingCol("rating")

ratingsALS = als_73effdc5b8e0


als_73effdc5b8e0

### 3.2 Train ALS with Training portion of data set

In [7]:
val myALSmodel = ratingsALS.fit(trainingData)
//you can also save the model for future replication
//myALSmodel.save("myALSmodel")

[Stage 37:>                                                        (0 + 0) / 10]

myALSmodel = als_73effdc5b8e0


als_73effdc5b8e0

### 3.3 Run model on the Test portion of the data set

In [8]:
//Prepare the predicted ratings 
val predictedRatings = myALSmodel.transform(testData)

// show a sampling of results; note the new prediction column
predictedRatings.sample(false,0.1).show(10)

|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|    26|    463|   3.0|978271588| 2.9657395|
|  3841|    471|   4.0|965996668| 3.3257422|
|  4227|    471|   3.0|965319625| 2.7798407|
|   157|    471|   3.0|977249978| 3.6272252|
|  1939|    471|   4.0|974695248|  3.629592|
|  2941|    471|   5.0|971348751| 3.5521705|
|  2967|    471|   1.0|971133756| 3.2005475|
|  4647|    471|   2.0|963978127|  3.490961|
|  4480|    471|   3.0|965026114|  3.411218|
|  1377|    471|   1.0|974770883|  2.449363|
+------+-------+------+---------+----------+
only showing top 10 rows



predictedRatings = [userId: int, movieId: int ... 3 more fields]


[userId: int, movieId: int ... 3 more fields]

### 3.4 Compute the difference between the actual and predicted

In [9]:
// Roll our own RMSE calculation as the RegressionEvaluator often returns NaN

// compute difference between actual (column 2) and predicted (column 4)
var sumMSE = predictedRatings
    .map(r => r(2).asInstanceOf[Float] - r(4).asInstanceOf[Float])
    .map(x => x*x)
    .filter(!_.isNaN)
    .reduce(_ + _)

var numVals = predictedRatings.count()
var MSE = sumMSE/numVals
var RMSE = sqrt(MSE)
println("RMSE (root mean squared error) = " + RMSE)

// below  is the prescribed way to compute RMSE, but it often returns NaN because of empty values 
/*
val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val RMSE2 = evaluator.evaluate(predictedRatings)
println(s"Root-mean-square error = $RMSE2")
*/

RMSE (root mean squared error) = 0.8709834075972182                             


sumMSE = 151945.45
numVals = 200294
MSE = 0.7586121
RMSE = 0.8709834075972182


0.8709834075972182

### 3.5 Show difference between actual and predicted for a sampling of rows

In [10]:
predictedRatings
    .select("userId", "movieId", "rating", "prediction")
    .withColumn("error",$"rating"-$"prediction")
    .sample(false,0.1)
    .show(30)

|userId|movieId|rating|prediction|      error|
+------+-------+------+----------+-----------+
|  5367|    471|   3.0| 3.1691663|-0.16916633|
|  1475|    471|   5.0|  3.668871|  1.3311291|
|  2174|    471|   3.0| 3.4339888| -0.4339888|
|  3777|    471|   3.0| 3.2297559|-0.22975588|
|  1790|    471|   4.0|  3.516504| 0.48349595|
|  5718|    471|   4.0| 3.4710252|  0.5289748|
|  1980|    471|   4.0| 3.5605521| 0.43944788|
|  4728|    471|   2.0| 3.5355308| -1.5355308|
|  2411|    471|   3.0| 3.5146306|-0.51463056|
|   294|    471|   3.0| 3.6393967|-0.63939667|
|  5530|    471|   5.0| 3.5013292|  1.4986708|
|   928|    496|   4.0|   3.55404| 0.44596004|
|  2024|    833|   1.0| 2.2018604| -1.2018604|
|  3512|   1088|   3.0|  3.230005|-0.23000503|
|  2724|   1088|   3.0| 3.0706909|-0.07069087|
|  4882|   1088|   4.0| 2.9115334|  1.0884666|
|  4786|   1088|   3.0| 3.4856484| -0.4856484|
|   531|   1088|   4.0|  3.359896| 0.64010406|
|  5840|   1088|   5.0| 3.5951297|  1.4048703|
|  2793|   10

### 3.6 Use model to make recommendations for Users or for Items

(This requires Spark 2.2 or later, so does not work in CloudxLab (Spark 1.63) )

In [11]:
/*
// Generate top 10 movie recommendations for each user
val userRecs = myALSmodel.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = myALSmodel.recommendForAllItems(10)
*/

Name: Syntax Error.
Message: 
StackTrace: 

---
---

## 4. Your Assignment: Run a "Parameter Sweep" of ALS Models
Your assignment is to run the following code block multiple times with different parameters to determine a reasonable tradeoff between accuracy and time. 
Note the following:
* Obviously, it is not necessary to read the data each time.
* We **do not** want to repeat the splitting process each time as that will result in different training and test data sets.
* We can use the "currentTimeMillis()" function to get an *approximate* timing for the computation. (The shared nature and varying load of CloudxLab resources is a confounding factor to timing.)
* We include windows covering the creation, training, prediction, and error measurement stages.

Prediction performance of a Spark ALS model is affected by the following parameters

|Parameter|Description|Default value|Notes |
|:-----|:-----|:-----:|:----- |
|rank|Number of latent factors|10|The larger the value, the more intrinsic factors considered in the factorization modeling.|
|regParam|Regularization parameter|1.0|The value needs to be selected empirically to avoid overfitting.|
|maxIters|Maximum number of iterations | 10 |The more iterations the better the model converges to the optimal point.|

### 4.1 Collect timings
Run the following code block **at least 12 times** with a variety of parameters. Copy and paste the output from each run into the markdown table below. 

#### Collaborative Filtering ALS Parameter Sweep 

| Run | MaxIter | Rank | RegParam | RMSE | time_create | time_fit | time_predict | time_RMSE |
| --- | --- | --- | --- | --- | --- | --- | --- | --- |
| x | x | x | x | x | x | x | x | x |
| x | x | x | x | x | x | x | x | x |
| x | x | x | x | x | x | x | x | x |
| 1.1 | 1 | 2 | 1.0|   3.7382 | 0.007 | 5.678 | 0.048 | 22.014 |
| 1.2 | 5 | 5 | 0.1|   0.8863 | 0.004 | 8.996 | 0.049 | 18.612 |
| 1.3 | 5 | 10 | 0.1|   0.8916 | 0.003 | 5.038 | 0.037 | 18.765 |
| 1.4 | 5 | 20 | 0.1|   0.8920 | 0.006 | 8.229 | 0.043 | 20.878 |
| 1.5 | 10 | 5 | 0.5|   1.0410 | 0.002 | 7.477 | 0.033 | 18.211 |
| 1.6 | 10 | 10 | 0.5|   1.0408 | 0.002 | 9.624 | 0.036 | 13.98 |
| 1.7 | 10 | 20 | 0.5|   1.0384 | 0.004 | 11.586 | 0.045 | 18.28 |
| 1.8 | 20 | 5 | 1|   1.3526 | 0.004 | 11.434 | 0.043 | 17.846 |
| 1.9 | 20 | 10 | 1|   1.3526 | 0.003 | 12.155 | 0.04 | 16.002 |
| 1.10 | 20 | 20 | 1|   1.3526 | 0.002 | 15.016 | 0.033 | 19.728 |
| 1.11 | 3 | 10 | 0.05|   0.8849 | 0.004 | 8.121 | 0.046 | 21.112 |
| 1.12 | 15 | 15 | 0.2|   0.9139 | 0.004 | 11.999 | 0.047 | 20.573 |
| 1.13 | 10 | 15 | 0.8|   1.2130 | 0.003 | 9.124 | 0.036 | 17.912 |
| 1.14 | 15 | 5 | 0.05|   0.8643 | 0.003 | 10.035 | 0.037 | 19.373 |
| 1.15 | 15 | 10 | 0.05|   0.8558 | 0.003 | 10.778 | 0.044 | 18.145 |
| 1.16 | 15 | 20 | 0.05|   0.8577 | 0.004 | 12.219 | 0.027 | 15.792 |
| 1.17 | 10 | 5 | 0.1|   0.8771 | 0.003 | 6.863 | 0.053 | 13.873 |
| 1.18 | 20 | 5 | 0.1|   0.8722 | 0.004 | 12.634 | 0.035 | 18.854 |
| 1.19 | 1 | 5 | 0.1|   3.3544 | 0.003 | 7.05 | 0.038 | 18.162 |
#### Optimal / Recommended settings for given dataset
_From your table above, copy the parameter combination that minimizes the error in the most reasonable amount of time_

| Run | MaxIter | Rank | RegParam | RMSE | time_create | time_fit | time_predict | time_RMSE |
| --- | --- | --- | --- | --- | --- | --- | --- | --- |
| x | x | x | x | x | x | x | x | x |
| 1.15 | 15 | 10 | 0.05|   0.8558 | 0.003 | 10.778 | 0.044 | 18.145 |

In [16]:
// change the following four values
var runName = "1.20"  //any string that makes sense to you
// the parameters for the ALS system
// see https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html for details
var maxIter = 25 // maximum number of iterations of the alternating algorithm
var rank = 5 // also known as number of latent factors; default is 10
var regParam = .1 // regularization parameter; default is 0.1

var t0 = System.currentTimeMillis()

val ratingsALS = new ALS()
    .setMaxIter(maxIter)
    .setRank(rank)
    .setRegParam(regParam)
    .setUserCol("userId")
    .setItemCol("movieId")
    .setRatingCol("rating")

val t1 = System.currentTimeMillis()

val myALSmodel = ratingsALS.fit(trainingData)

val t2 = System.currentTimeMillis()

val predictedRatings = myALSmodel.transform(testData)

val t3 = System.currentTimeMillis()

var sumMSE = predictedRatings
    .map(r => r(2).asInstanceOf[Float] - r(4).asInstanceOf[Float])
    .map(x => x*x)
    .filter(!_.isNaN)
    .reduce(_ + _)
var numVals = testDataCount // should really divide by predictedRatings.count(), but close enough
var MSE = sumMSE/numVals
var RMSE = sqrt(MSE)
println("RMSE (root mean squared error) = " + RMSE)

val t4 = System.currentTimeMillis()

println("| Run | MaxIter | Rank | RegParam | RMSE | time_create | time_fit | time_predict | time_RMSE |")
print("| " + runName + " | " + maxIter + " | " + rank + " | " + regParam)
print(f"| $RMSE%8.4f")
print(" | " + ((t1-t0)/1000.0) )
print(" | " + ((t2-t1)/1000.0) )
print(" | " + ((t3-t2)/1000.0) )
print(" | " + ((t4-t3)/1000.0) )
println(" |")

RMSE (root mean squared error) = 0.8714798554867962                             
| Run | MaxIter | Rank | RegParam | RMSE | time_create | time_fit | time_predict | time_RMSE |
| 1.20 | 25 | 5 | 0.1|   0.8715 | 0.003 | 12.528 | 0.029 | 22.058 |


runName = 1.20
maxIter = 25
rank = 5
regParam = 0.1
t0 = 1713415068387
ratingsALS = als_305314dc6f0a
t1 = 1713415068390
myALSmodel = als_305314dc6f0a
t2 = 1713415080918
predictedRatings = [userId: int, movieId: int ... 3 more fields]
t3 = 1713415080947
sumMSE = 152118.72
numVals = 200294
MSE = 0.75947714
RMSE = 0.8714798554867962
t4 = 1713415103005


lastException: Throwable = null


1713415103005

---
### 4.2 Analyze Results

*Answer each of the following in several complete sentences. Please refer to specific values or results in your table when justifying your answers.*




#### 4.2.1 General Trends
What general trends did you notice? Do these trends seem reasonable to you? Why or why not?

(Type your answer here)

Examining the data reveals some general trends related to the interaction between the ALS model parameters (`MaxIter`, `Rank`, `RegParam`) and the resulting RMSE. Notably, increasing `MaxIter` tends to greatly reduce RMSE from 1-5 iterations, and slightly reduces RMSE from 5-10 iterations. This suggests more iterations allow the model to converge better on the training data, seen in runs from 1.1 to 1.10 and 1.17 to 1.19 where iterations increase from 1 to 20. However, this improvement plateaus, indicating diminishing returns after a certain point.

The `Rank` parameter, which determines the complexity of the model, shows that a middle range (about 10-15) tends to offer the best balance between underfitting and overfitting. For instance, runs 1.11 and 1.14-1.16, with ranks around 10-15, have some of the lowest RMSEs.

Regarding `RegParam`, lower values (around 0.05-0.1) correspond to lower RMSEs, suggesting that too much regularization (higher values like 0.5 or 1.0 in runs 1.5-1.10) can prevent the model from fitting well to the data.

These trends appear reasonable as they align with typical expectations of how these parameters influence model performance in collaborative filtering tasks. Adjusting these parameters allows for fine-tuning the model's ability to generalize without overfitting.

#### 4.2.2 Most time-consuming task
Which part or parts of the workflow took the *most* time? What do you think accounts for that?

(Type your answer here)

The most time-consuming task in the workflow is the calculation of RMSE, as evidenced by the 'time_RMSE' column, where values are notably high, such as 22.014 seconds in run 1.1. This phase is particularly intensive because it involves complex operations for each prediction—calculating errors, squaring them, and handling exceptions like NaN values, which can be computationally demanding. The lengthy duration for RMSE calculations arises from the need to process each element in a potentially large dataset comprehensively. This task's duration is affected by the dataset's size and the complexity of the operations required to compute the mean squared error across all predictions. This highlights the impact of data handling and error computation tasks on the overall performance of the model evaluation process.

#### 4.2.3 Least time-consuming task
Which part or parts of the workflow took the *least* time? What do you think accounts for that?

(Type your answer here)

The least time-consuming part of the workflow is consistently the model creation step, as indicated by the 'time_create' column. For instance, in run 1.16, this step took only 0.004 seconds. This step involves setting up the ALS model with specified parameters such as `MaxIter`, `Rank`, and `RegParam`, which is relatively quick because it primarily consists of initializing settings rather than performing any intensive computations. The simplicity and minimal computational demand of initializing parameters, as opposed to executing complex calculations or iterative processes found in fitting and predicting stages, account for the short duration of this step. This quick initialization is typical in machine learning workflows where the bulk of the computational effort is spent on training and predicting rather than on the setup phase.

#### 4.2.4 Most variable task
Which part or parts of the workflow had the greatest variation in time? What do you think accounts for that?

(Type your answer here)

The 'time_fit' part of the workflow showed the greatest variation in time across different runs, as seen in the significant differences between runs such as 1.10 (15.016 seconds) compared to others like 1.1 (5.678 seconds). This variability can be attributed to the different configurations of the `MaxIter`, `Rank`, and `RegParam` parameters. The `MaxIter` influences how many times the algorithm iteratively adjusts the model to minimize errors, with more iterations potentially leading to longer fitting times. The `Rank` affects the complexity of the model, with higher ranks increasing the number of latent factors to compute, thereby extending the fitting duration. The varying combinations of these parameters across different runs result in different computational loads and optimization challenges, explaining the observed variability in the model fitting times. This stage's time consumption is inherently sensitive to the algorithm's configuration and the data's characteristics.

#### 4.2.5 Recommended set of parameters
For the given dataset, what do you think is the optimal parameter combination that minimizes the error in the most reasonable amount of time?

(Type your answer here)

Based on the observed results and underlying logic of parameter influence, an optimal combination of parameters that balances error minimization and computational efficiency appears to be around a `MaxIter` of 15, a `Rank` of 10, and a `RegParam` of 0.05, as evidenced by run 1.15 which delivered an RMSE of 0.8558. This configuration presents a robust model with good generalization capabilities and efficient computation for the given dataset.

However, for potential further optimization, I would consider the following slight adjustments:

1. Slightly adjust `Rank`: Testing values slightly higher than 10, such as 12 or 15, might capture more complex patterns in the data without significant overfitting, potentially improving accuracy if the increase in computational time is justified.
   
2. Fine-tune `RegParam`: Experimenting within a tighter range around 0.05, such as 0.03 or 0.07, could refine the balance between model complexity and regularization strength, potentially squeezing additional performance out of the model.

3. Experiment with `MaxIter`: Modifying iterations to either 18 or down to 12 could optimize the trade-off between convergence time and accuracy, based on the algorithm’s convergence behavior observed in more detailed diagnostics.

These adjustments encourage iterative refinement in tuning the machine learning model, aiming for even better predictive accuracy while maintaining reasonable computational demands.
