<a href="https://colab.research.google.com/github/muhammetsnts/SPARK/blob/main/projects/9.MovieLens_Ratings_with_Recommendation_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup Environment

In [1]:
# install Java8
!apt-get -q install openjdk-8-jdk-headless -qq > /dev/null

# download spark3.1.1
!wget -q https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

# unzip it
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

# install findspark 
!pip install -q findspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"


import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Download and Read Dataset

In [2]:
!wget -q https://raw.githubusercontent.com/muhammetsnts/SPARK/main/data/movielens_ratings.csv

In [3]:
data = spark.read.csv("movielens_ratings.csv", header=True, inferSchema=True)

In [4]:
data.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [8]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [7]:
data.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



# Train-Test Split

In [9]:
train_data, test_data = data.randomSplit([0.8,0.2])

# Create Model

In [5]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator  # for evaluation

In [10]:
als = ALS(maxIter=5, regParam=0.01, userCol='userId', itemCol='movieId', ratingCol='rating')

In [11]:
model = als.fit(train_data)

In [15]:
predictions = model.transform(test_data)

In [16]:
predictions.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|     31|   1.0|    13| 0.67219985|
|     85|   1.0|    29|  0.7955177|
|     65|   2.0|     3| -1.7422171|
|     53|   5.0|     8|  1.5773742|
|     53|   1.0|    23|-0.32910877|
|     78|   1.0|    12|  1.0773535|
|     78|   1.0|    13| 0.61468405|
|     34|   1.0|    17|   -2.41009|
|     34|   1.0|     4| 0.27035052|
|     34|   1.0|     0|  2.1697016|
|     81|   5.0|    28|  1.8757992|
|     81|   1.0|    22| -0.2947706|
|     81|   1.0|    16|  1.8840457|
|     81|   2.0|     5| 0.96085626|
|     81|   4.0|    11| 0.80889714|
|     81|   3.0|    18| -1.4816947|
|     28|   1.0|    10|0.051807664|
|     76|   1.0|    20| 0.62976027|
|     76|   1.0|    19| 0.56428397|
|     26|   2.0|    25| 0.21175577|
+-------+------+------+-----------+
only showing top 20 rows



For example; in our dataset, user #13 gave 1 star to movie #31 and we predicted 0.67. If the dataset was a big one, we could get predictions much better.

In [17]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')

In [18]:
rmse = evaluator.evaluate(predictions)

In [19]:
print('RMSE :', rmse)

RMSE : 1.9482202566776712


# Model Deploy

In [22]:
single_user = test_data.filter(test_data['userId']==11).select(['movieId','userId'])
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|     16|    11|
|     21|    11|
|     23|    11|
|     30|    11|
|     36|    11|
|     41|    11|
|     48|    11|
|     51|    11|
|     64|    11|
|     69|    11|
|     75|    11|
|     81|    11|
|     89|    11|
|     90|    11|
+-------+------+



In [24]:
recommendations = model.transform(single_user)
recommendations.orderBy('prediction', ascending=False).show()

+-------+------+-----------+
|movieId|userId| prediction|
+-------+------+-----------+
|     30|    11|   3.605505|
|     16|    11|  3.1629817|
|     64|    11|  3.0595193|
|     48|    11|  2.7807956|
|     23|    11|  2.4031336|
|     21|    11|  1.7254679|
|     89|    11|  1.6147878|
|     41|    11|  1.0666989|
|     81|    11| 0.80889714|
|     36|    11| 0.80128413|
|     75|    11| 0.15030378|
|     90|    11|-0.21517786|
|     69|    11| -0.5421032|
|     51|    11|  -4.051649|
+-------+------+-----------+



We can say to user #11 "hey! watch the movie #30 and don't watch #51".