<a href="https://colab.research.google.com/github/shafaqueahmareen/Recommendation_system/blob/main/recommendation_spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator # for the continous Data evaluation
from pyspark.ml.recommendation import ALS


In [None]:
# create the spark instance

spark = SparkSession.builder.appName('RecomALS').getOrCreate()

# load the data
path = '/content/movielens_ratings.csv'

data = spark.read.csv(path,header=True,inferSchema=True)
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [None]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [None]:
# split the data into train and test
TrainData , TestData = data.randomSplit([0.8,0.2],10)

# build the recommendation ALS model , Using Training data

alsModel = ALS(userCol='userId',ratingCol='rating',itemCol='movieId',maxIter=10,
               regParam=0.1)

#train the model
model = alsModel.fit(TrainData)






In [None]:

# lets apply the trained model on the test data
# to evaluate the rmse performance of the model

perdiction = model.transform(TestData)

perdiction.show(100)

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      3|   1.0|    28|0.78267884|
|      0|   1.0|    27|0.80784273|
|      3|   1.0|    26| 1.3120326|
|      4|   4.0|    26| 2.4216769|
|      9|   1.0|    27| 0.9630326|
|     18|   3.0|    26| 3.3258872|
|     19|   3.0|    27| 2.0690465|
|     22|   2.0|    27| 1.6874615|
|     14|   1.0|    12| 1.3610764|
|     15|   2.0|    12|0.89237535|
|     17|   5.0|    12| 2.8700898|
|     25|   1.0|    12| 2.3434594|
|      0|   1.0|    22| 0.7597292|
|      5|   2.0|    22|  2.195764|
|     29|   3.0|    22| 3.5752323|
|      2|   2.0|     1| 1.8459567|
|      9|   3.0|     1| 1.4814622|
|     13|   1.0|     1| 0.9978119|
|     21|   3.0|     1|  1.397656|
|     28|   3.0|     1| 0.6060812|
|      3|   1.0|    13|0.98289704|
|     20|   1.0|    13|0.41501307|
|     31|   1.0|    13| 1.2776219|
|      2|   1.0|    16| 2.1812754|
|      9|   1.0|    16| 1.7326615|
|     14|   1.0|    

In [None]:
# calculate the RMSE for the model
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
rmse = evaluator.evaluate(perdiction)
print('RMSE of the ALS recommendation: ',rmse)

RMSE of the ALS recommendation:  1.0535091849219107


In [None]:
# filter out the user 11 and show the data with recommndation for user 11 only

user11Data = TestData.filter(TestData['userId']==11).select(['movieId','userId'])
user11Data.show()

+-------+------+
|movieId|userId|
+-------+------+
|     12|    11|
|     13|    11|
|     20|    11|
|     21|    11|
|     40|    11|
|     41|    11|
|     48|    11|
|     71|    11|
|     86|    11|
|     99|    11|
+-------+------+



In [None]:
# recommend the movies to user11

recom11 = model.transform(user11Data)

recom11.show()
# arrange the movies on the basis of the max to min recommendation values i.e. descending order

recom11.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     12|    11| 1.7662998|
|     13|    11| 2.4518628|
|     86|    11| 1.1530622|
|     20|    11| 1.6518066|
|     40|    11| 1.2363977|
|     48|    11| 1.6764425|
|     41|    11|  1.386582|
|     21|    11| 1.9389596|
|     71|    11| 1.7403847|
|     99|    11| 1.2709212|
+-------+------+----------+

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     13|    11| 2.4518628|
|     21|    11| 1.9389596|
|     12|    11| 1.7662998|
|     71|    11| 1.7403847|
|     48|    11| 1.6764425|
|     20|    11| 1.6518066|
|     41|    11|  1.386582|
|     99|    11| 1.2709212|
|     40|    11| 1.2363977|
|     86|    11| 1.1530622|
+-------+------+----------+



In [None]:
d = perdiction.filter(perdiction['userID']==11)
d.show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|     12|   1.0|    11| 1.7662998|
|     13|   4.0|    11| 2.4518628|
|     86|   1.0|    11| 1.1530622|
|     20|   1.0|    11| 1.6518066|
|     40|   1.0|    11| 1.2363977|
|     48|   5.0|    11| 1.6764425|
|     41|   1.0|    11|  1.386582|
|     21|   1.0|    11| 1.9389596|
|     71|   3.0|    11| 1.7403847|
|     99|   1.0|    11| 1.2709212|
+-------+------+------+----------+



In [None]:
# for User number 22

user22Data=TestData.filter(TestData['userId']==22).select(['movieId','userId'])
user22Data.show()
#recommend the movies to user22

recom22=model.transform(user22Data)
recom22.show()
#arrange the movies to max and min recommendation-->decending order

recom22.orderBy('prediction',ascending=False).show()
d1=perdiction.filter(perdiction['userId']==22)
d1.show()

+-------+------+
|movieId|userId|
+-------+------+
|      0|    22|
|      5|    22|
|     29|    22|
|     33|    22|
|     37|    22|
|     63|    22|
|     65|    22|
|     69|    22|
|     74|    22|
|     75|    22|
|     80|    22|
|     90|    22|
+-------+------+

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     65|    22| 1.7930562|
|      5|    22|  2.195764|
|     37|    22| 1.3643543|
|     69|    22|  2.657474|
|     63|    22| 2.0023277|
|     80|    22| 2.6209564|
|     29|    22| 3.5752323|
|     90|    22|  3.761522|
|     75|    22| 3.9040923|
|     33|    22| 1.2242988|
|      0|    22| 0.7597292|
|     74|    22| 3.1396744|
+-------+------+----------+

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     75|    22| 3.9040923|
|     90|    22|  3.761522|
|     29|    22| 3.5752323|
|     74|    22| 3.1396744|
|     69|    22|  2.657474|
|     80|    22| 2.6209564|
|      5|    22|  2.195764

In [None]:
# for User number 25

user25Data=TestData.filter(TestData['userId']==25).select(['movieId','userId'])
user25Data.show()
#recommend the movies to user22

recom25=model.transform(user25Data)
recom25.show()
#arrange the movies to max and min recommendation-->decending order

recom25.orderBy('prediction',ascending=False).show()
d2=perdiction.filter(perdiction['userId']==25)
d2.show()

+-------+------+
|movieId|userId|
+-------+------+
|     17|    25|
|     18|    25|
|     26|    25|
|     29|    25|
|     30|    25|
|     34|    25|
|     35|    25|
|     37|    25|
|     40|    25|
|     41|    25|
|     43|    25|
|     68|    25|
|     79|    25|
+-------+------+

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     17|    25| 1.0535865|
|     18|    25|  1.509603|
|     26|    25| 1.5546829|
|     29|    25| 1.1157211|
|     30|    25| 1.1987363|
|     34|    25| 1.5552684|
|     35|    25|  1.001963|
|     37|    25|  1.881047|
|     40|    25| 1.9759222|
|     41|    25| 1.4865441|
|     43|    25| 0.5714872|
|     68|    25| 1.0670867|
|     79|    25| 1.6331068|
+-------+------+----------+

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     40|    25| 1.9759222|
|     37|    25|  1.881047|
|     79|    25| 1.6331068|
|     34|    25| 1.5552684|
|     26|    25| 1.5546829|
|     18|

In [None]:
# now using crossvalidator and paramgrid

from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

# create ALS model
als_Model = ALS(userCol='userId',ratingCol='rating',itemCol='movieId')

# define all the hyper parameters using grid builder
param = ParamGridBuilder().addGrid(als_Model.regParam,[0.01,0.05,.1]).addGrid(als_Model.maxIter,
 [5,10,20]).build()

# regression evaluator
eval = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')


# define the cross validator

cv = CrossValidator(estimator=als_Model, estimatorParamMaps=param,
                    evaluator=eval,numFolds=5)

best_model = cv.fit(data)

model_new = best_model.bestModel

ALSModel: uid=ALS_d976cd98d673, rank=10


In [None]:
model_new = best_model.bestModel
pred = model_new.transform(TestData)
pred.show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      3|   1.0|    28| 0.8070102|
|      0|   1.0|    27|0.91564935|
|      3|   1.0|    26| 1.0680491|
|      4|   4.0|    26| 3.1241739|
|      0|   1.0|    22| 1.0804402|
|      2|   2.0|     1| 1.9123015|
|      3|   1.0|    13| 1.0370896|
|      2|   1.0|    16| 1.2052873|
|      1|   1.0|     3|0.96181226|
|      0|   1.0|     5| 1.0422553|
|      4|   1.0|    19| 1.3673761|
|      0|   1.0|    15| 1.0348456|
|      3|   1.0|    17| 0.9681004|
|      4|   1.0|     9| 1.2024503|
|      0|   1.0|     8| 1.0687263|
|      4|   2.0|     8| 1.5164093|
|      5|   1.0|     8| 1.2589011|
|      2|   1.0|    23|  1.209958|
|      2|   2.0|     7| 2.3130436|
|      4|   1.0|    24| 1.0930883|
+-------+------+------+----------+
only showing top 20 rows



In [None]:
# calculate the RMSE for the model
evaluator = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
rmse = evaluator.evaluate(pred)
print('RMSE of the ALS recommendation: ',rmse)

RMSE of the ALS recommendation:  0.41184177765984903
