In [64]:
import pyspark
import warnings
warnings.filterwarnings('ignore')

In [65]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommendation').getOrCreate()

In [66]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [67]:
data = spark.read.csv('ml-latest-small/ratings.csv',inferSchema=True,header=True)
# data = spark.read.csv('ml-25m/ratings.csv',inferSchema=True,header=True)

In [68]:
data.head()

Row(userId=1, movieId=1, rating=4.0, timestamp=964982703)

In [69]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [70]:
data.describe().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+------------------+--------------------+



In [71]:
# Smaller dataset so we will use 0.8 / 0.2
(train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

In [72]:
# Build the recommendation model using ALS on the training data
from pyspark.ml.evaluation import RegressionEvaluator

rank=[5,10,15,20]
regParam=[0.01,0.02,0.03,0.04,0.05]
minr=None
minp=None
minrmse=None
model=None

for r in rank:
    for p in regParam:
        print(f"rank {r} param {p} ========> done")
        als = ALS(maxIter=3, regParam=p,rank=r, userCol="userId", itemCol="movieId", ratingCol="rating")
        model = als.fit(train_data)
        predictions = model.transform(test_data)
        evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
        cleanPred = predictions.dropna(how="any", subset=["prediction"])
        rmse = evaluator.evaluate(cleanPred)
        if (minp == None and minr==None) :
            minp=p
            minr=r
            minrmse=rmse
        
        if(rmse<minrmse):
            minp=p
            minr=r
            minrmse=rmse

predictions.show()
print ("Minmum Root Mean Square Error (RMSE) is :", rmse)
print(f"For rank {minr} and regParam {minp}")


+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|      6|   4.0|964982224| 4.3425045|
|     1|    101|   5.0|964980868|   3.56074|
|     1|    151|   5.0|964984041| 4.1308126|
|     1|    231|   5.0|964981179| 3.7429395|
|     1|    349|   4.0|964982563|   3.96025|
|     1|    423|   3.0|964982363| 3.2417707|
|     1|    543|   4.0|964981179|  4.330907|
|     1|    596|   5.0|964982838| 3.9245913|
|     1|    923|   5.0|964981529|  4.555496|
|     1|    940|   5.0|964982176|   4.17733|
|     1|    943|   4.0|964983614| 3.8304183|
|     1|   1009|   3.0|964981775| 3.8385594|
|     1|   1024|   5.0|964982876|  3.765919|
|     1|   1031|   5.0|964982653|  5.064827|
|     1|   1089|   5.0|964982951| 5.0626817|
|     1|   1197|   5.0|964981872| 4.9457917|
|     1|   1220|   5.0|964981909|   4.56793|
|     1|   1573|   5.0|964982290| 3.9188936|
|     1|   1805|   4.0|964983056|  4.333993|
|     1|  

In [73]:
single_user = test_data.filter(test_data['userId']==12).select(['movieId','userId'])

In [74]:
# User had 7 ratings in the test data set 
# Realistically this should be some sort of hold out set!
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|    357|    12|
|    543|    12|
|    830|    12|
|   2072|    12|
|   2717|    12|
|   4018|    12|
|  40629|    12|
+-------+------+



In [75]:
reccomendations = model.transform(single_user)

In [76]:
reccomendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|  40629|    12| 4.9405403|
|   4018|    12|  4.877439|
|    543|    12|  3.724832|
|    830|    12| 3.6777914|
|   2717|    12|  3.622339|
|    357|    12|  3.359665|
|   2072|    12| 2.7380674|
+-------+------+----------+



We can recommend movie with id 40629 to user. The user might like those based on previous history. However, we say don't watch movies with id 830. You might don't like it.

What if a user has never watched any movie or a new user, what we can recommend, it's called a cold start in the recommendation system. Well, in that case, we can ask the user to take a survey and get an idea of his interest in movies. Or we can give other users recommendations. Cold start is a problem for the recommendation system problem in general.