### Recommendation Engine with Pyspark

#### We are going to build a recommendation system using Alternating Least Squares method

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import col,explode

##### Initiate Spark session

In [2]:
spark = SparkSession.builder.appName("Recommendation").getOrCreate()

#### Loading data

In [8]:
movies = spark.read.csv("data/movies.csv",inferSchema=True,header=True)
ratings = spark.read.csv("data/ratings.csv",inferSchema=True,header=True)

In [10]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [11]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [16]:
moviesCount = ratings.select('movieID').distinct().count()
moviesCount

9724

In [17]:
users = ratings.select('userId').distinct().count()
users

610

In [23]:
userRated = ratings.groupby('userId').count().orderBy('count',ascending=False)
userRated.toPandas()

Unnamed: 0,userId,count
0,414,2698
1,599,2478
2,474,2108
3,448,1864
4,274,1346
...,...,...
605,569,20
606,320,20
607,576,20
608,595,20


In [25]:
moviesrated = ratings.groupby('movieId').count().orderBy('count',ascending=False)
moviesrated.toPandas()

Unnamed: 0,movieId,count
0,356,329
1,318,317
2,296,307
3,593,279
4,2571,278
...,...,...
9719,5836,1
9720,6477,1
9721,73501,1
9722,2965,1


#### Building model

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator

In [46]:
train ,test = ratings.randomSplit([0.8,0.2],seed=133)

als = ALS(userCol='userId',itemCol='movieId',ratingCol='rating' ,nonnegative=True,)

In [47]:
train.count()

80631

In [48]:
type(als)

pyspark.ml.recommendation.ALS

#### Hyperparameters tuning

In [52]:
params = ParamGridBuilder()\
            .addGrid(als.rank,[ 10,20 ])\
            .addGrid(als.regParam,[0.01, 0.05])\
            .build()

evl = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
print(len(params))

20


#### Building CV pipeline

In [53]:
cv = CrossValidator(estimator=als,estimatorParamMaps=params,evaluator=evl,numFolds=5)
print(cv)

CrossValidator_c0307d5d8112


#### Best model and best params

In [54]:
model = cv.fit(train)

In [56]:
best_model = model.bestModel

In [57]:
# Print best_model
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

<class 'pyspark.ml.recommendation.ALSModel'>
**Best Model**
  Rank: 10
  MaxIter: 10
  RegParam: 0.01


In [78]:
test_pred = best_model.transform(test)
rmse = evl.evaluate(test_pred.select('rating','prediction'))
print(rmse)

nan


In [77]:
test_pred.show()

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|      1|   4.0|964982703|  5.017557|
|     1|      6|   4.0|964982224| 4.4696703|
|     1|     70|   3.0|964982400|  5.399613|
|     1|    151|   5.0|964984041| 2.6546545|
|     1|    216|   5.0|964981208|   4.25392|
|     1|    235|   4.0|964980908| 3.9837303|
|     1|    260|   5.0|964981680| 5.0018134|
|     1|    480|   4.0|964982346|  4.077233|
|     1|    608|   5.0|964982931| 4.9090195|
|     1|    648|   3.0|964982563|  3.744081|
|     1|    673|   3.0|964981775| 3.0497093|
|     1|   1023|   5.0|964982681|  4.171769|
|     1|   1030|   3.0|964982903| 4.1609726|
|     1|   1049|   5.0|964982400|  2.773957|
|     1|   1060|   4.0|964980924| 4.0169554|
|     1|   1073|   5.0|964981680|  4.407525|
|     1|   1127|   4.0|964982513|  3.690847|
|     1|   1197|   5.0|964981872| 4.6823425|
|     1|   1206|   5.0|964983737| 4.6268873|
|     1|  

In [85]:
import pyspark.sql.functions as psf

def compute_RMSE(df,expected_col, actual_col):

  rmse = df.withColumn("squarederror",
                           psf.pow(psf.col(actual_col) - psf.col(expected_col),
                                   psf.lit(2)
                           ))\
  .agg(psf.avg(psf.col("squarederror")).alias("mse"))\
  .withColumn("rmse", psf.sqrt(psf.col("mse")))

  return(rmse)


res = compute_RMSE(test_pred,"rating", "prediction")

In [86]:
res.show()

+---+----+
|mse|rmse|
+---+----+
|NaN| NaN|
+---+----+

