# Using ALS with Spark 

This basic Recommender System will use the Spark ALS and explore the effects of outcome of a cross-validation on the dataset. 
This Notebook requires the Movielens dataset to be present in Google Drive

In [0]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://www-eu.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

!pip install -q findspark
#!pip install pyspark
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

!java -version
!python --version

openjdk version "1.8.0_232"
OpenJDK Runtime Environment (build 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09)
OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
Python 3.6.9


In [0]:
import findspark
findspark.init("/content/spark-2.4.4-bin-hadoop2.7")

In [0]:
  #install spark
  #!pip install pyspark
  import pyspark
  # get a spark context
  sc = pyspark.SparkContext.getOrCreate()
  print(sc)
  # and a spark session
  spark = pyspark.sql.SparkSession.builder.getOrCreate()
  print(spark)
  spark.version

<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7f584b0febe0>


'2.4.4'

In [0]:
from google.colab import files
!ls -l
#!rm spark-2.4.4-bin-hadoop2.7.tgz.1

!echo $JAVA_HOME/bin
!export PATH=$PATH:$JAVA_HOME/bin
!echo $PATH

total 449412
drwx------  4 root root      4096 Dec 27 12:17 drive
drwxr-xr-x  1 root root      4096 Dec 18 16:52 sample_data
drwxr-xr-x 13 1000 1000      4096 Aug 27 21:30 spark-2.4.4-bin-hadoop2.7
-rw-r--r--  1 root root 230091034 Aug 27 22:01 spark-2.4.4-bin-hadoop2.7.tgz
-rw-r--r--  1 root root 230091034 Aug 27 22:01 spark-2.4.4-bin-hadoop2.7.tgz.1
/usr/lib/jvm/java-8-openjdk-amd64/bin
/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin:/opt/bin


check versions and dependencies, this may change with changes to Colab and Spark

In [0]:
!java -version
!python --version
!ls /usr/lib/jvm/
!echo $JAVA_HOME
spark.version
!ls /content
!echo $PATH

openjdk version "1.8.0_232"
OpenJDK Runtime Environment (build 1.8.0_232-8u232-b09-0ubuntu1~18.04.1-b09)
OpenJDK 64-Bit Server VM (build 25.232-b09, mixed mode)
Python 3.6.9
default-java		   java-11-openjdk-amd64     java-8-openjdk-amd64
java-1.11.0-openjdk-amd64  java-1.8.0-openjdk-amd64
/usr/lib/jvm/java-8-openjdk-amd64
drive	     spark-2.4.4-bin-hadoop2.7	    spark-2.4.4-bin-hadoop2.7.tgz.1
sample_data  spark-2.4.4-bin-hadoop2.7.tgz
/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin:/opt/bin


## Step 1 - Load the Data
Read the data, split into tokens and create a structured DataFrame

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType
# the imports are used creating the data frame

spark = SparkSession.builder.getOrCreate() # create a SparkSession 
# this gets us an RDD. (could also be done with RDD.textFile in this case)
lines = spark.read.text("/content/drive/My Drive/Colab Notebooks/data/movielens-small/sample_movielens_ratings.txt").rdd
# now split the lines at the '::'
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
ratings.createOrReplaceTempView('ratings') # register the DataFrame so that we can use it with Spark SQL.
(training, test) = ratings.randomSplit([0.8, 0.2]) # split into test and training set
training.printSchema() # just for testing, should show the four columns
print(training.count()) # just for testing, should be around 1200

root
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- userId: long (nullable = true)

1213


# Step 2 - Create a baseline

Now take a very simple estimate as the baseline: calculate the mean of all ratings.    

The average can be calculated with the SQL `AVG` command, within an SQL `SELECT` statement. 

Then calculate the MSE with respect to the average (as predictor)

Calculate the RMSE as a naive baseline to compare the trained model against



In [0]:
SQL1 = 'SELECT AVG(rating) FROM ratings'
row = spark.sql(SQL1).collect()[0] # get the single row with the result

meanRating = row['avg(rating)'] # access Row as a map 
print('meanRating',meanRating)

se_rdd = test.rdd.map(lambda row: Row(se = pow(row['rating']-meanRating,2)) ) 
se_df = spark.createDataFrame(se_rdd) 
se_df.createOrReplaceTempView('se')
print('se_df',se_df)
SQL2 = 'SELECT AVG(se) FROM se'
row = spark.sql(SQL2).collect()[0]
meanSE = row['avg(se)'] # access Row as a map 
print('RMSE',pow(meanSE,0.5))

meanRating 1.7741505662891406
se_df DataFrame[se: double]
RMSE 1.142816731411422


## Step 3 - Train an ALS Estimator and perform CV 

Now create an ALS estimator and a parameter grid to explore different values for the `rank` and `regParam` parameter of the ALS. Then build a cross-validator to train the model.

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=3, rank=10, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating")

paramGrid = ParamGridBuilder() \
  .addGrid(als.regParam, [0.03,0.1,0.3]) \
  .addGrid(als.rank, [3,10,30]).build()

regEval = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

crossVal = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=regEval)
print('starting cross-validation')
cvModel = crossVal.fit(training)
print('finished cross-validation')

starting cross-validation
finished cross-validation


## Step 4 - Evaluate The Model

Take the trained cvModel and extract the best parameter values by inspecting the estimatorParameterMaps. 
Compare the RMSE value to that of the mean for different parameter settings.

In [0]:
print(cvModel.avgMetrics) # the metrics form the CrossValidation
print(cvModel.getEstimatorParamMaps()) # gives you the parameter combinations
paramMap = list(zip(cvModel.getEstimatorParamMaps(),cvModel.avgMetrics))
paramMin = min(paramMap, key=lambda x: x[1])
print(paramMin)

# Evaluate the model by computing the RMSE on the test data
predictions = cvModel.transform(test)
rmse = regEval.evaluate(predictions)
print("RMSE = " + str(rmse))

[1.5254898760653246, 2.0261790089927163, 1.57165590851438, 1.3294585606453688, 1.4275271366035416, 1.2792006504743827, 1.2335536001799277, 1.2394600343653657, 1.226236628398472]
[{Param(parent='ALS_0217f748ceb9', name='regParam', doc='regularization parameter (>= 0).'): 0.03, Param(parent='ALS_0217f748ceb9', name='rank', doc='rank of the factorization'): 3}, {Param(parent='ALS_0217f748ceb9', name='regParam', doc='regularization parameter (>= 0).'): 0.03, Param(parent='ALS_0217f748ceb9', name='rank', doc='rank of the factorization'): 10}, {Param(parent='ALS_0217f748ceb9', name='regParam', doc='regularization parameter (>= 0).'): 0.03, Param(parent='ALS_0217f748ceb9', name='rank', doc='rank of the factorization'): 30}, {Param(parent='ALS_0217f748ceb9', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='ALS_0217f748ceb9', name='rank', doc='rank of the factorization'): 3}, {Param(parent='ALS_0217f748ceb9', name='regParam', doc='regularization parameter (>= 0).'): 