##### download dataset from here 
! wget http://files.grouplens.org/datasets/movielens/ml-25m.zip

In [1]:
! ls ../datasets/movie-ml-25/ml-25m

genome-scores.csv  links.csv   ratings.csv  tags.csv
genome-tags.csv    movies.csv  README.txt


In [2]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext('local', 'movie recommendation system')
spark = SparkSession(sc)

In [3]:
print(spark.version)

2.4.5


In [4]:
df = spark.read.csv("../datasets/movie-ml-25/ml-25m/ratings.csv", header=True, inferSchema=True)

In [5]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [6]:
df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [7]:
random_seed = 1

In [8]:
training_data , test_data = df.randomSplit([0.7,0.3],random_seed)

In [9]:
from pyspark.ml.recommendation import ALS

In [10]:
als = ALS(
        maxIter=15,
        rank=10,
        seed=1234,
        ratingCol=('rating'),
        userCol='userId',
        itemCol='movieId'
)

In [11]:
type(training_data)

pyspark.sql.dataframe.DataFrame

In [12]:
training_data.count()

17502939

In [13]:
test_data.count()

7497156

In [14]:
model = als.fit(training_data)

In [15]:
predictions = model.transform(test_data)

In [16]:
predictions.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- prediction: float (nullable = false)



In [17]:
from pyspark.sql.functions import rand

In [18]:
predictions.orderBy(rand()).show(10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 13403|   2772|   2.0| 993737923| 3.5786314|
|157541|  71535|   4.0|1443476967| 3.6844072|
|162296|   3248|   3.5|1443129982| 3.0449328|
|154050|  43560|   3.5|1419614232|  3.370797|
|  8408|  74228|   4.0|1532394392| 3.6625733|
| 72315| 185723|   2.5|1551180065| 1.4301612|
| 59145|   1580|   5.0|1026178441| 3.6775548|
| 68341|   6936|   4.5|1564364156| 3.5418048|
| 59756|   2021|   2.0|1281384322| 3.1852303|
|139907|    454|   3.0| 846515944|  2.993246|
+------+-------+------+----------+----------+
only showing top 10 rows



In [19]:
predictions.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|114572|    148|   2.0| 838460783| 2.4473324|
|159730|    148|   3.0| 842162037| 2.7568665|
| 47989|    148|   2.0| 833173771| 3.1970022|
| 72337|    148|   2.0| 944246202| 2.8859456|
|108767|    148|   3.0|1276969740|  2.621679|
| 21531|    148|   3.0| 834035555|  3.017579|
| 99684|    148|   3.0|1027645782| 2.9729304|
| 35969|    148|   2.0| 835094487| 2.9234517|
| 29943|    148|   3.0|1049216998| 2.9995558|
|117168|    148|   4.0| 835820190| 2.9894955|
|  3411|    148|   3.0| 835966104| 2.7953959|
| 28229|    148|   1.0| 833850593| 2.6176789|
|148197|    148|   2.5|1207008368| 2.8564281|
|  6491|    148|   4.0|1500217059| 2.6247742|
|147301|    148|   3.0| 951070210| 2.7431169|
|111567|    148|   3.0| 945399307| 2.9379678|
| 98520|    148|   4.0|1034547175| 2.8708107|
| 73827|    148|   4.0|1490671894| 2.8109019|
| 66440|    148|   2.5|1099143605|

## EVALUATION AND PREDICTIONS ON TEST DATA 

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator

In [21]:
evaluator = RegressionEvaluator(metricName='rmse', 
                                predictionCol='prediction',
                                labelCol='rating')
rmse = evaluator.evaluate(predictions)
print(rmse)

nan


## Recommend top movies that active user might like 

our predictions have Nan and we can use coldstratstrategy to handle the issue later

In [22]:
predictions2 = predictions.na.drop()

In [23]:
predictions2.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|114572|    148|   2.0| 838460783| 2.4473324|
|159730|    148|   3.0| 842162037| 2.7568665|
| 47989|    148|   2.0| 833173771| 3.1970022|
| 72337|    148|   2.0| 944246202| 2.8859456|
|108767|    148|   3.0|1276969740|  2.621679|
| 21531|    148|   3.0| 834035555|  3.017579|
| 99684|    148|   3.0|1027645782| 2.9729304|
| 35969|    148|   2.0| 835094487| 2.9234517|
| 29943|    148|   3.0|1049216998| 2.9995558|
|117168|    148|   4.0| 835820190| 2.9894955|
|  3411|    148|   3.0| 835966104| 2.7953959|
| 28229|    148|   1.0| 833850593| 2.6176789|
|148197|    148|   2.5|1207008368| 2.8564281|
|  6491|    148|   4.0|1500217059| 2.6247742|
|147301|    148|   3.0| 951070210| 2.7431169|
|111567|    148|   3.0| 945399307| 2.9379678|
| 98520|    148|   4.0|1034547175| 2.8708107|
| 73827|    148|   4.0|1490671894| 2.8109019|
| 66440|    148|   2.5|1099143605|

In [24]:
evaluator = RegressionEvaluator(metricName='rmse', 
                                predictionCol='prediction',
                                labelCol='rating')
rmse = evaluator.evaluate(predictions2)
print(rmse)

0.8048781346451978


#### generating recommendations 

In [25]:
model.recommendForAllUsers(3).show(3, False)

+------+--------------------------------------------------------------+
|userId|recommendations                                               |
+------+--------------------------------------------------------------+
|148   |[[183947, 5.505346], [203086, 5.443754], [184299, 5.287776]]  |
|463   |[[185959, 6.594113], [176597, 6.4512553], [184299, 6.3820105]]|
|471   |[[176597, 5.7674417], [204302, 5.541669], [173153, 5.3972287]]|
+------+--------------------------------------------------------------+
only showing top 3 rows



In [26]:
model.recommendForAllItems(3).show(3, False)

+-------+-------------------------------------------------------------+
|movieId|recommendations                                              |
+-------+-------------------------------------------------------------+
|148    |[[59134, 4.2925167], [7349, 4.2520046], [142811, 4.176842]]  |
|463    |[[87426, 5.5767035], [10417, 4.989494], [149507, 4.893813]]  |
|471    |[[138914, 5.1985846], [86599, 5.107135], [142811, 5.0901127]]|
+-------+-------------------------------------------------------------+
only showing top 3 rows



In [33]:
from pyspark.sql.types import *

cSchema = StructType([StructField("movieID", IntegerType())])

test_list = [[111], [202], [225], [347], [488]]

selected_df = spark.createDataFrame(test_list,schema=cSchema) 

In [34]:
selected_df.show()

+-------+
|movieID|
+-------+
|    111|
|    202|
|    225|
|    347|
|    488|
+-------+



In [37]:
model.recommendForItemSubset(selected_df, 3).show(10, False)

+-------+-------------------------------------------------------------+
|movieId|recommendations                                              |
+-------+-------------------------------------------------------------+
|225    |[[87426, 5.2278924], [10417, 4.9314265], [79224, 4.9270334]] |
|111    |[[25160, 5.4074335], [142811, 5.396805], [108880, 5.3852315]]|
|347    |[[142811, 5.450516], [67467, 5.2219796], [66426, 5.1274385]] |
|202    |[[59134, 5.0535088], [142811, 5.01551], [148347, 4.864934]]  |
|488    |[[142811, 5.040885], [59134, 4.8364477], [138914, 4.812961]] |
+-------+-------------------------------------------------------------+



Collaborative filtering can be very effective in providing highly relevant recommendations. It scales well and can handle extremely large datasets. For collaborative filtering to operate optimally, it needs access to a large of amount of data. The more data, the better. As time progresses and ratings start to accumulate, recommendations become more and more accurate. Access to large datasets is often a problem during the early stages of implementation. One solution is to use content-based filtering in conjunction with collaborative filtering. Since content-based filtering doesn’t rely on user activity, it can immediately start providing recommendations, gradually increasing your dataset over time.