In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Recommendation_System').getOrCreate()

In [None]:
df1 = spark.read.format("csv").option("header", "true").load("data/movielens_ratings.csv")

In [None]:
df1.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [None]:
df1.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                 9|
+-------+------------------+------------------+------------------+



In [None]:
df1.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
from pyspark.sql.functions import col

# Convert columns to integers
df1 = df1.withColumn("movieId", col("movieId").cast("int"))
df1 = df1.withColumn("rating", col("rating").cast("int"))
df1 = df1.withColumn("userId", col("userId").cast("int"))

# Print the updated schema
df1.printSchema()


root
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- userId: integer (nullable = true)



In [None]:
train_set, test_set = df1.randomSplit([0.8, 0.2])

In [None]:
train_set.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- userId: integer (nullable = true)



In [None]:

from pyspark.ml.recommendation import ALS
recommender = ALS(userCol='userId', ratingCol='rating', itemCol='movieId')
recommender = recommender.fit(train_set)

In [None]:
preds = recommender.transform(test_set)


In [None]:
preds.show()


+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      0|     3|    28| 0.7167872|
|      1|     1|    28| 1.6560076|
|      0|     1|    26| 1.0797006|
|      2|     1|    26| 2.7047203|
|      3|     1|    26| 1.3936241|
|      4|     1|    12|0.67713547|
|      2|     2|     1| 1.7815928|
|      3|     1|     1| 1.0668828|
|      2|     1|     3| 1.5187106|
|      0|     1|     5| 0.8582602|
|      0|     1|    15|0.71633375|
|      5|     1|     9| 0.5918421|
|      6|     1|    17|  1.531064|
|      1|     1|     4|0.65627825|
|      6|     1|     4| 0.8324476|
|      4|     1|    23| 1.0354474|
|      3|     1|     7| 1.3130604|
|      0|     3|    10| 0.5870611|
|      5|     1|    29| 1.7051051|
|      6|     2|    11| 2.0928206|
+-------+------+------+----------+
only showing top 20 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='rating')
evaluator.evaluate(preds)

Out[26]: 1.029632763266958

In [None]:
single_user = test_set.filter(test_set['userId'] == 23).select(['userId', 'movieId'])


In [None]:
single_user.show()


+------+-------+
|userId|movieId|
+------+-------+
|    23|      4|
|    23|     10|
|    23|     24|
|    23|     36|
|    23|     39|
|    23|     53|
|    23|     55|
|    23|     61|
|    23|     66|
|    23|     69|
|    23|     82|
|    23|     84|
|    23|     95|
+------+-------+



In [None]:
recommendations = recommender.transform(single_user)


In [None]:
recommendations.orderBy('prediction', ascending=False).show()


+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|    23|     55| 3.9089527|
|    23|     69| 2.6745977|
|    23|     36| 2.3484592|
|    23|     66|  2.287672|
|    23|     95| 1.8769937|
|    23|     24| 1.7108818|
|    23|     10| 1.5748067|
|    23|     82| 1.3795393|
|    23|      4| 1.0354474|
|    23|     84|0.89641464|
|    23|     53| 0.7822006|
|    23|     61|0.51314914|
|    23|     39|0.32163793|
+------+-------+----------+

