<a href="https://colab.research.google.com/github/luluoddish/DS-Coding/blob/main/MovieRecommend.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 7.0 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=ea1b5991234e7d9b4fa270f12f4c08ffbe0f724281c28b666f3e77c2d0009b39
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
import findspark
import pyspark
from pyspark import SparkContext, SparkConf


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [6]:
from pyspark.sql import SparkSession
lines = spark.read.text("sample_movielens_ratings.txt").rdd
lines

MapPartitionsRDD[4] at javaToPython at NativeMethodAccessorImpl.java:0

In [8]:
lines.take(5)

[Row(value='0::2::3::1424380312'),
 Row(value='0::3::1::1424380312'),
 Row(value='0::5::2::1424380312'),
 Row(value='0::9::4::1424380312'),
 Row(value='0::11::1::1424380312')]

In [9]:
parts = lines.map(lambda row: row.value.split("::"))
parts.take(5)

[['0', '2', '3', '1424380312'],
 ['0', '3', '1', '1424380312'],
 ['0', '5', '2', '1424380312'],
 ['0', '9', '4', '1424380312'],
 ['0', '11', '1', '1424380312']]

In [18]:
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

In [19]:
ratings.printSchema()
ratings.count()
ratings.show(5)

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     0|      2|   3.0|1424380312|
|     0|      3|   1.0|1424380312|
|     0|      5|   2.0|1424380312|
|     0|      9|   4.0|1424380312|
|     0|     11|   1.0|1424380312|
+------+-------+------+----------+
only showing top 5 rows



In [20]:
#check missing values
from pyspark.sql.functions import isnan, when, count, col, translate
ratings.select([count(when(col(c).isNull(), c)).alias(c) for c in ratings.columns]).show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     0|      0|     0|        0|
+------+-------+------+---------+



In [21]:
(training, test) = ratings.randomSplit([0.7, 0.3])

# **Collaborative filtering**

In [22]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop")
model = als.fit(training)

In [23]:
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.774057289280048


In [24]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)



In [30]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(2)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(2)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)



In [31]:
userRecs.show()
movieRecs.show()
userSubsetRecs.show()
movieSubSetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 4.8757463},...|
|    10|[{22, 4.0335984},...|
|     0|[{9, 3.828909}, {...|
|     1|[{30, 5.8832297},...|
|    21|[{53, 4.9282904},...|
|    11|[{32, 5.0709496},...|
|    12|[{64, 4.99572}, {...|
|    22|[{93, 5.1425695},...|
|     2|[{93, 5.182481}, ...|
|    13|[{93, 3.8537502},...|
|     3|[{95, 4.914546}, ...|
|    23|[{46, 6.7593083},...|
|     4|[{62, 3.9653468},...|
|    24|[{47, 6.0930214},...|
|    14|[{29, 5.200471}, ...|
|     5|[{46, 7.1353745},...|
|    15|[{46, 4.847082}, ...|
|    25|[{27, 4.124669}, ...|
|    26|[{94, 6.5673876},...|
|     6|[{25, 3.9760509},...|
+------+--------------------+
only showing top 20 rows

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     20|[{17, 4.89197}, {...|
|     40|[{2, 3.774947}, {...|
|     10|[{17, 4.1062326},...|
|     50|[{23, 4.450046}, ...|
|     80|[{3, 4.1307096}, ...|
|     