In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("CollaborativeFiltering") \
    .getOrCreate()

# Load data
file_path = "ratings.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)


# Keep only needed columns
ratings = df.select("userId", "movieId", "rating")
df = df.na.drop()
print("=== Ratings Schema ===")
ratings.printSchema()
print("=== Sample Ratings ===")
ratings.show(5)



=== Ratings Schema ===
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)

=== Sample Ratings ===
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows


In [None]:
data = [
    (1, 101, 4.0),
    (1, 102, 3.5),
    (1, 103, 5.0),
    (2, 101, 4.5),
    (2, 104, 4.0),
    (3, 101, 2.0),
    (3, 103, 3.0),
    (3, 104, 4.5),
    (4, 102, 4.0),
    (4, 103, 3.0),
]
columns = ["userId", "itemId", "rating"]
ratings_df = spark.createDataFrame(data, columns)


In [14]:
train,test = df.randomSplit([0.8,0.2])
als = ALS(
    maxIter =2,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
   coldStartStrategy="drop")

In [19]:
ALS?

[1;31mInit signature:[0m
[0mALS[0m[1;33m([0m[1;33m
[0m    [1;33m*[0m[1;33m,[0m[1;33m
[0m    [0mrank[0m[1;33m:[0m [0mint[0m [1;33m=[0m [1;36m10[0m[1;33m,[0m[1;33m
[0m    [0mmaxIter[0m[1;33m:[0m [0mint[0m [1;33m=[0m [1;36m10[0m[1;33m,[0m[1;33m
[0m    [0mregParam[0m[1;33m:[0m [0mfloat[0m [1;33m=[0m [1;36m0.1[0m[1;33m,[0m[1;33m
[0m    [0mnumUserBlocks[0m[1;33m:[0m [0mint[0m [1;33m=[0m [1;36m10[0m[1;33m,[0m[1;33m
[0m    [0mnumItemBlocks[0m[1;33m:[0m [0mint[0m [1;33m=[0m [1;36m10[0m[1;33m,[0m[1;33m
[0m    [0mimplicitPrefs[0m[1;33m:[0m [0mbool[0m [1;33m=[0m [1;32mFalse[0m[1;33m,[0m[1;33m
[0m    [0malpha[0m[1;33m:[0m [0mfloat[0m [1;33m=[0m [1;36m1.0[0m[1;33m,[0m[1;33m
[0m    [0muserCol[0m[1;33m:[0m [0mstr[0m [1;33m=[0m [1;34m'user'[0m[1;33m,[0m[1;33m
[0m    [0mitemCol[0m[1;33m:[0m [0mstr[0m [1;33m=[0m [1;34m'item'[0m[1;33m,[0m[1;33m
[0m    [0mseed[0m[1;

In [15]:
model = als.fit(train)

In [16]:
pred = model.transform(test)

In [17]:
pred.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   148|   4308|   4.0|1482548613| 3.7580557|
|   148|   5952|   3.0|1482548769| 3.2930183|
|   148|   8368|   4.0|1482548676|  3.595332|
|   148|  50872|   3.0|1482548504|  3.074541|
|   148|  69844|   4.0|1482548500| 4.2803464|
|   148|  72998|   4.0|1482548525| 3.3900802|
|   148|  76093|   3.0|1482548702|  4.011851|
|   148|  81834|   4.0|1482548498|  4.036421|
|   148|  88125|   4.0|1482548673| 3.8480496|
|   148| 112852|   3.5|1482548700| 3.8826246|
|   148| 134853|   4.0|1482548516| 3.3906078|
|   463|   2019|   4.0|1145460514|  4.282893|
|   463|   5010|   4.0|1145460370|  4.185996|
|   471|    527|   4.5|1496671869| 3.7489147|
|   471|   6333|   2.5|1496671903|  3.248807|
|   471|   7147|   4.0|1496669535| 3.6682727|
|   471|  60069|   4.5|1496671876|  3.695722|
|   496|   2394|   3.5|1415165480| 1.6258128|
|   496|   8865|   2.5|1415165701|

In [18]:
evalo = RegressionEvaluator(metricName='rmse',labelCol='rating',predictionCol='prediction')
print(evalo.evaluate(pred))


1.0186191525457002


In [21]:
item_recommendations = model.recommendForAllItems(3)
print(item_recommendations.show(truncate=False))

+-------+------------------------------------------------------+
|movieId|recommendations                                       |
+-------+------------------------------------------------------+
|1      |[{543, 5.4244423}, {276, 4.8006625}, {413, 4.73842}]  |
|12     |[{147, 5.2911587}, {295, 5.186598}, {396, 5.168371}]  |
|13     |[{543, 5.0802484}, {96, 4.877048}, {240, 4.7404103}]  |
|22     |[{543, 5.7828755}, {485, 5.6436634}, {548, 5.585657}] |
|26     |[{548, 6.5141306}, {537, 6.159847}, {485, 6.1359916}] |
|27     |[{544, 5.6243405}, {406, 5.315045}, {12, 4.7568874}]  |
|28     |[{537, 8.043259}, {548, 7.7697783}, {371, 7.649599}]  |
|31     |[{557, 5.066306}, {43, 5.0427284}, {594, 4.7312155}]  |
|34     |[{543, 5.249088}, {423, 4.647146}, {485, 4.606229}]   |
|44     |[{452, 3.6239524}, {267, 3.5729752}, {251, 3.5575836}]|
|47     |[{543, 5.101436}, {276, 4.885463}, {452, 4.835167}]   |
|52     |[{461, 6.9999223}, {207, 6.198996}, {295, 5.9389777}] |
|53     |[{258, 6.8721485

In [22]:
item_recomend = model.recommendForAllItems(3)
print(item_recomend.show(truncate=False))

+-------+------------------------------------------------------+
|movieId|recommendations                                       |
+-------+------------------------------------------------------+
|1      |[{543, 5.4244423}, {276, 4.8006625}, {413, 4.73842}]  |
|12     |[{147, 5.2911587}, {295, 5.186598}, {396, 5.168371}]  |
|13     |[{543, 5.0802484}, {96, 4.877048}, {240, 4.7404103}]  |
|22     |[{543, 5.7828755}, {485, 5.6436634}, {548, 5.585657}] |
|26     |[{548, 6.5141306}, {537, 6.159847}, {485, 6.1359916}] |
|27     |[{544, 5.6243405}, {406, 5.315045}, {12, 4.7568874}]  |
|28     |[{537, 8.043259}, {548, 7.7697783}, {371, 7.649599}]  |
|31     |[{557, 5.066306}, {43, 5.0427284}, {594, 4.7312155}]  |
|34     |[{543, 5.249088}, {423, 4.647146}, {485, 4.606229}]   |
|44     |[{452, 3.6239524}, {267, 3.5729752}, {251, 3.5575836}]|
|47     |[{543, 5.101436}, {276, 4.885463}, {452, 4.835167}]   |
|52     |[{461, 6.9999223}, {207, 6.198996}, {295, 5.9389777}] |
|53     |[{258, 6.8721485