#  Alternating Least Square

## NOTE
Here we are using **PySpark API** for implementing recommendation using **ALS METHOD.**

**PySpark** is the Python API written in python to support Apache Spark. Apache Spark is a distributed framework that can handle Big Data analysis. Spark is basically a computational engine, that works with huge sets of data by processing them in parallel and batch systems.

In [1]:
# Mounting drive to google colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive



### Downloading Dependancies

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java.

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Next, we will install Apache Spark 3.0.1 with Hadoop 2.7

In [3]:
!wget -q ftp://mirror.klaus-uwe.me/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz

Now, we just need to unzip that folder.

In [4]:
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.

In [5]:
!pip install -q findspark

In [6]:
!pip install py4j

Collecting py4j
[?25l  Downloading https://files.pythonhosted.org/packages/2b/e2/543019a6e620b759a59f134158b4595766f9bf520a1081a2ba1a1809ba32/py4j-0.10.9.2-py2.py3-none-any.whl (198kB)
[K     |█▋                              | 10kB 19.8MB/s eta 0:00:01[K     |███▎                            | 20kB 26.0MB/s eta 0:00:01[K     |█████                           | 30kB 30.4MB/s eta 0:00:01[K     |██████▋                         | 40kB 33.5MB/s eta 0:00:01[K     |████████▎                       | 51kB 31.6MB/s eta 0:00:01[K     |██████████                      | 61kB 23.1MB/s eta 0:00:01[K     |███████████▌                    | 71kB 23.3MB/s eta 0:00:01[K     |█████████████▏                  | 81kB 18.6MB/s eta 0:00:01[K     |██████████████▉                 | 92kB 20.0MB/s eta 0:00:01[K     |████████████████▌               | 102kB 20.5MB/s eta 0:00:01[K     |██████████████████▏             | 112kB 20.5MB/s eta 0:00:01[K     |███████████████████▉            | 122kB 20.

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [8]:
import findspark
findspark.find() #To find folder of SPARK HOME

'/content/spark-2.4.7-bin-hadoop2.7'

In [9]:
import findspark
# findspark.find() =To find folder of SPARK HOME
findspark.init("/content/spark-2.4.7-bin-hadoop2.7")# SPARK_HOME

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

## Importing libraries

In [10]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

### Starting the SPARK Session

In [11]:
from pyspark.sql import SparkSession
sc = SparkContext
# sc.setCheckpointDir('checkpoint')
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [12]:
# path config
# /content/drive/MyDrive/ml-latest
data_path = '/content/drive/MyDrive/'

### Path configuration


### Loading the data in the Colab from drive 

In [13]:
movies = spark.read.csv(data_path+'ml-latest-small/movies.csv', header=True)
ratings = spark.read.csv(data_path+'ml-latest-small/ratings.csv',  header=True)

In [14]:
ratings = ratings.drop('Timestamp')
ratings = ratings.withColumn('UserID', col('UserID').cast('integer'))
ratings = ratings.withColumn('MovieID', col('MovieID').cast('integer'))
ratings = ratings.withColumn('Rating', col('rating').cast('float'))
ratings.limit(10).show()

+------+-------+------+
|UserID|MovieID|Rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    157|   5.0|
+------+-------+------+



## Calculating sparsity

In [15]:
numerator = ratings.select("Rating").count()

# Count the number of distinct userIds and distinct movieIds
unique_users = ratings.select("UserID").distinct().count()
unique_movies = ratings.select("MovieID").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = unique_users * unique_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings data is ", "%.2f" % sparsity + "% empty.")

The ratings data is  98.30% empty.


## Interpreting Ratings

In [16]:
# Group data by userId, count ratings
UserID_pivot = ratings.groupBy("UserID").count().orderBy('count', ascending=False)
UserID_pivot.limit(10).show()

+------+-----+
|UserID|count|
+------+-----+
|   414| 2698|
|   599| 2478|
|   474| 2108|
|   448| 1864|
|   274| 1346|
|   610| 1302|
|    68| 1260|
|   380| 1218|
|   606| 1115|
|   288| 1055|
+------+-----+



In [17]:
# Group data by userId, count ratings
MovieID_pivot = ratings.groupBy("MovieID").count().orderBy('count', ascending=False)
MovieID_pivot.limit(10).show()

+-------+-----+
|MovieID|count|
+-------+-----+
|    356|  329|
|    318|  317|
|    296|  307|
|    593|  279|
|   2571|  278|
|    260|  251|
|    480|  238|
|    110|  237|
|    589|  224|
|    527|  220|
+-------+-----+



# Implementing ALS(Alternating Least Square) algorithm in Spark 

The approach is generally divided into 4 basic steps.
1. Load the data
2. Spliting data : train,validation,test
3. ALS model and evaluation
4. Test the model

### 1. Load the Data

Import libraries that are needed 

In [18]:
# Import the required functions

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

### 2. Spliting the data

We Split the data using randomSplit() function in spark. 

To divide the data into train and test data, we divide them in 7:3 ratio respectively.


Spark allows users to set the coldStartStrategy parameter to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values. The evaluation metric will then be computed over the non-NaN data and will be valid

implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).

In [19]:
# Create test and train set
(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="UserID", itemCol="MovieID", ratingCol="Rating", nonnegative = True, implicitPrefs = False
          , coldStartStrategy="drop")

### 3. ALS model and evaluation

#### Tune the ALS model

In [20]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()
            #             .addGrid(als.maxIter, [5, 50, 100, 200]) \

           
# Define evaluator as RMSE and print length of evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  16


#### Build your cross validation pipeline

In [21]:
# Build cross validation using CrossValidator
# numFolds=3 means the CrossValidator will create 3 different models.
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

#### Obtain Best Model and Best Model Parameters

In [22]:
# We fit the cross validator to the 'train' dataset
model = cv.fit(train)

# We Extract best model from the cv model above
best_model = model.bestModel

In [23]:
params = [{p.name: v for p, v in m.items()} for m in cv.getEstimatorParamMaps()]

In [24]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 100
  MaxIter: 10
  RegParam: 0.15


### 4. Test the Model 

Test and evaluate the final model by calculating the RMSE of model for best hyperparameters i.e. number of iterations, latent factors and regularization parameters

In [25]:
# View the predictions
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.865499666447013


In [26]:
test_predictions.show()

+------+-------+------+----------+
|UserID|MovieID|Rating|prediction|
+------+-------+------+----------+
|   385|    471|   4.0| 3.2199137|
|   462|    471|   2.5|   2.49348|
|   387|    471|   3.0| 2.9785595|
|   171|    471|   3.0|  4.435947|
|    32|    471|   3.0| 3.7408144|
|   469|    471|   5.0|  3.454264|
|   357|    471|   3.5| 3.9528108|
|   132|   1088|   4.0| 2.7562292|
|   563|   1088|   4.0| 3.3933873|
|   594|   1088|   4.5| 4.1311927|
|   307|   1088|   3.0| 2.3777387|
|    51|   1088|   4.0|  3.604006|
|   221|   1088|   3.0| 3.0407107|
|   414|   1088|   3.0| 3.0470037|
|   200|   1088|   4.0|  3.690107|
|   104|   1088|   3.0| 3.6271515|
|    19|   1238|   3.0|  3.203387|
|   156|   1238|   4.0| 4.0440884|
|   425|   1342|   3.5|   2.22978|
|   600|   1342|   2.5| 2.0980847|
+------+-------+------+----------+
only showing top 20 rows



In [27]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

def somefunc1(value1):
  if   value1<3: 
      return 0
  else:
      return 1


def somefunc2(value1,value2):
  if   value1 == value2: 
      return 1
  else:
      return 0      
#convert to a UDF Function by passing in the function and return type of function

udfsomefunc1 = F.udf(somefunc1, IntegerType())
udfsomefunc2 = F.udf(somefunc2, IntegerType())
ratings_1 = test_predictions.withColumn("Rating_binary", udfsomefunc1("Rating"))
ratings_2 = ratings_1.withColumn("predictions_binary", udfsomefunc1("prediction"))
ratings_with_high_low = ratings_2.withColumn("Truth", udfsomefunc2("Rating_binary","predictions_binary"))
ratings_with_high_low.show()
# ratings_1.show()

+------+-------+------+----------+-------------+------------------+-----+
|UserID|MovieID|Rating|prediction|Rating_binary|predictions_binary|Truth|
+------+-------+------+----------+-------------+------------------+-----+
|   385|    471|   4.0| 3.2199137|            1|                 1|    1|
|   462|    471|   2.5|   2.49348|            0|                 0|    1|
|   387|    471|   3.0| 2.9785595|            1|                 0|    0|
|   171|    471|   3.0|  4.435947|            1|                 1|    1|
|    32|    471|   3.0| 3.7408144|            1|                 1|    1|
|   469|    471|   5.0|  3.454264|            1|                 1|    1|
|   357|    471|   3.5| 3.9528108|            1|                 1|    1|
|   132|   1088|   4.0| 2.7562292|            1|                 0|    0|
|   563|   1088|   4.0| 3.3933873|            1|                 1|    1|
|   594|   1088|   4.5| 4.1311927|            1|                 1|    1|
|   307|   1088|   3.0| 2.3777387|    

In [28]:
Perf_values = ratings_with_high_low.groupBy("Truth").count().orderBy('count', ascending=False)
Perf_values.limit(6).show()

+-----+-----+
|Truth|count|
+-----+-----+
|    1|15016|
|    0| 4396|
+-----+-----+



In [29]:
a=list(Perf_values.select('Truth').toPandas()['Truth']) 
b=list(Perf_values.select('count').toPandas()['count'])

print(a)
print(b)

[1, 0]
[15016, 4396]


In [30]:
accuracy=b[0]/(b[0]+b[1])
print(accuracy*100)

77.35421388831651


# Recommending Movies

The final part of our code comes i.e. predicting the best movies for the user based on personalized choice and recommending the movies to the user. 

In [31]:
# Generate n Recommendations for all users
recommendations = best_model.recommendForAllUsers(10)
recommendations.limit(10).show()

+------+--------------------+
|UserID|     recommendations|
+------+--------------------+
|   471|[[3379, 4.6720667...|
|   463|[[3379, 4.7757664...|
|   496|[[26326, 4.422524...|
|   148|[[77846, 4.412014...|
|   540|[[3379, 5.3685517...|
|   392|[[3379, 4.6377344...|
|   243|[[86237, 5.383588...|
|    31|[[8477, 5.0337596...|
|   516|[[4429, 4.7545447...|
|   580|[[3379, 4.667716]...|
+------+--------------------+



In [32]:
recommendations = recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

##### 50th User’s Actual Preference:

In [33]:
ratings.join(movies, on='MovieID').filter('UserID = 50').sort('Rating', ascending=False).limit(10).show()

+-------+------+------+--------------------+--------------------+
|MovieID|UserID|Rating|               title|              genres|
+-------+------+------+--------------------+--------------------+
|   1251|    50|   4.5|   8 1/2 (8½) (1963)|       Drama|Fantasy|
|    924|    50|   4.5|2001: A Space Ody...|Adventure|Drama|S...|
|   1204|    50|   4.5|Lawrence of Arabi...| Adventure|Drama|War|
|   1208|    50|   4.5|Apocalypse Now (1...|    Action|Drama|War|
|   1136|    50|   4.0|Monty Python and ...|Adventure|Comedy|...|
|    903|    50|   4.0|      Vertigo (1958)|Drama|Mystery|Rom...|
|   1199|    50|   4.0|       Brazil (1985)|      Fantasy|Sci-Fi|
|    750|    50|   4.0|Dr. Strangelove o...|          Comedy|War|
|   1201|    50|   4.0|Good, the Bad and...|Action|Adventure|...|
|    899|    50|   4.0|Singin' in the Ra...|Comedy|Musical|Ro...|
+-------+------+------+--------------------+--------------------+



##### 50th User’s ALS Recommentions:

In [34]:
recommendations.join(movies, on='MovieID').filter('UserID = 50').show()

+-------+------+---------+--------------------+-----------------+
|movieId|userId|   rating|               title|           genres|
+-------+------+---------+--------------------+-----------------+
|   3379|    50|3.8758972| On the Beach (1959)|            Drama|
|  26326|    50|3.7777364|Holy Mountain, Th...|            Drama|
|   7748|    50|3.7236607|Pierrot le fou (1...|      Crime|Drama|
|   8477|    50|3.6442552|    Jetée, La (1962)|   Romance|Sci-Fi|
|   7767|    50|3.6171787|Best of Youth, Th...|            Drama|
|   1178|    50| 3.598996|Paths of Glory (1...|        Drama|War|
|   8405|    50| 3.583364|Hour of the Wolf ...|     Drama|Horror|
|   5747|    50|3.5750237|    Gallipoli (1981)|        Drama|War|
| 132333|    50|3.5724738|         Seve (2014)|Documentary|Drama|
|   5490|    50|3.5724738|  The Big Bus (1976)|    Action|Comedy|
+-------+------+---------+--------------------+-----------------+

