In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=c2a6bea893767ca445781e637017ac47f92f031badb4a2c0be077a4dec355b54
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
import pyspark

In [9]:
from pyspark.sql import SparkSession

ss = SparkSession  \
  .builder  \
  .appName('Use Collaborative Filtering System for movie recommendations')  \
  .getOrCreate()

ratings = ss.read\
            .format('csv')\
            .option('header', 'true')\
            .load('ratings.csv')

In [13]:
movies = ss.read.csv('movies.csv', header = True, inferSchema = True)

In [12]:
ratings.head(5)

[Row(userId='1', movieId='1', rating='4.0', timestamp='964982703'),
 Row(userId='1', movieId='3', rating='4.0', timestamp='964981247'),
 Row(userId='1', movieId='6', rating='4.0', timestamp='964982224'),
 Row(userId='1', movieId='47', rating='5.0', timestamp='964983815'),
 Row(userId='1', movieId='50', rating='5.0', timestamp='964982931')]

In [15]:
from pyspark.sql.functions import col

dataset = rawData.select(col('userId').cast('int'),
                         col('movieId').cast('int'),
                         col('rating').cast('float'))

dataset.head(5)

[Row(userId=1, movieId=1, rating=4.0),
 Row(userId=1, movieId=3, rating=4.0),
 Row(userId=1, movieId=6, rating=4.0),
 Row(userId=1, movieId=47, rating=5.0),
 Row(userId=1, movieId=50, rating=5.0)]

In [17]:
(trainingData, testData) = dataset.randomSplit([0.8,0.2])

In [18]:
from pyspark.ml.recommendation import ALS

als = ALS (maxIter = 5,
           regParam = 0.1,
           userCol = 'userId',
           itemCol = 'movieId',
           ratingCol = 'rating',
           coldStartStrategy='drop')

In [19]:
model = als.fit(trainingData)

In [20]:
predictions = model.transform(testData)
predictions.head(5)

[Row(userId=148, movieId=1197, rating=3.0, prediction=3.941575527191162),
 Row(userId=148, movieId=4896, rating=4.0, prediction=3.5489368438720703),
 Row(userId=148, movieId=4993, rating=3.0, prediction=3.26409649848938),
 Row(userId=148, movieId=5618, rating=3.0, prediction=3.5257482528686523),
 Row(userId=148, movieId=7153, rating=3.0, prediction=3.659503698348999)]

In [22]:
predictions.select('rating', 'prediction').describe().show()

+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|             19347|             19347|
|   mean|3.5097948002274255| 3.355771693411388|
| stddev|1.0450266667787786|0.7367727719130752|
|    min|               0.5|        -0.2699326|
|    max|               5.0|         5.5720677|
+-------+------------------+------------------+



In [24]:
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(metricName = 'rmse',
                          labelCol = 'rating',
                          predictionCol= 'prediction')

rmse = eval.evaluate(predictions)
print('RMSE =', rmse)

RMSE = 0.8951079531697683
