In [0]:
from pyspark.sql import functions as F

df = spark.table("gold_fact_ratings")

df.show(5)


+------+-------+------+---------+-------------------+-----------+
|userId|movieId|rating|timestamp|       timestamp_ts|rating_date|
+------+-------+------+---------+-------------------+-----------+
|   236|   2718|   3.0|943015211|1999-11-19 12:40:11| 1999-11-19|
|   236|   2542|   5.0|943015062|1999-11-19 12:37:42| 1999-11-19|
|   236|   2459|   4.0|943013896|1999-11-19 12:18:16| 1999-11-19|
|   236|   2580|   5.0|943015701|1999-11-19 12:48:21| 1999-11-19|
|   236|   1350|   4.0|943013896|1999-11-19 12:18:16| 1999-11-19|
+------+-------+------+---------+-------------------+-----------+
only showing top 5 rows


In [0]:
df.printSchema()


root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- timestamp_ts: timestamp (nullable = true)
 |-- rating_date: date (nullable = true)



In [0]:
train, test = df.randomSplit([0.8, 0.2], seed=42)


In [0]:
from pyspark.ml.recommendation import ALS


In [0]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop",
    rank=10,
    maxIter=10,
    regParam=0.1
)


In [0]:
model = als.fit(train)


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)

print("RMSE:", rmse)


RMSE: 0.8123190612289932


In [0]:
users = df.select("userId").distinct()


In [0]:
user_recs = model.recommendForUserSubset(users, 10)


In [0]:
users = df.select("userId").distinct()
movies = df.select("movieId").distinct()


In [0]:
user_movie_pairs = users.crossJoin(movies)


In [0]:
predictions = model.transform(user_movie_pairs)


In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

window_spec = Window.partitionBy("userId").orderBy(F.desc("prediction"))

top_recs = predictions \
    .withColumn("rank", F.row_number().over(window_spec)) \
    .filter(F.col("rank") <= 10) \
    .select("userId", "movieId", F.col("prediction").alias("predicted_rating"))


In [0]:
df_movies = spark.table("silver_dim_movies_enriched")

recs_with_titles = top_recs.join(
    df_movies.select("movieId", "title"),
    "movieId",
    "left"
)

recs_with_titles.show(20, truncate=False)


+-------+------+----------------+---------------------------------------------------------+
|movieId|userId|predicted_rating|title                                                    |
+-------+------+----------------+---------------------------------------------------------+
|170355 |1     |5.8688707       |Mulholland Dr. (1999)                                    |
|3379   |1     |5.8688707       |On the Beach (1959)                                      |
|33649  |1     |5.821521        |Saving Face (2004)                                       |
|27523  |1     |5.6181192       |My Sassy Girl (Yeopgijeogin geunyeo) (2001)              |
|171495 |1     |5.581305        |Cosmos                                                   |
|132333 |1     |5.533605        |Seve (2014)                                              |
|72171  |1     |5.468348        |Black Dynamite (2009)                                    |
|86237  |1     |5.468348        |Connections (1978)                             

In [0]:
recs_with_titles.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold_user_recommendations")


In [0]:
tables = spark.catalog.listTables()
for t in tables:
    if t.name.startswith("gold_"):
        print(f"{t.name}  |  {t.tableType}  |  {t.isTemporary}")


gold_fact_ratings  |  MANAGED  |  False
gold_user_recommendations  |  MANAGED  |  False


In [0]:
# Fact ratings
spark.table("gold_fact_ratings").show(5)
spark.table("gold_fact_ratings").count()

# Recommendations
spark.table("gold_user_recommendations").show(5, truncate=False)
spark.table("gold_user_recommendations").count()


+------+-------+------+---------+-------------------+-----------+
|userId|movieId|rating|timestamp|       timestamp_ts|rating_date|
+------+-------+------+---------+-------------------+-----------+
|   236|   2718|   3.0|943015211|1999-11-19 12:40:11| 1999-11-19|
|   236|   2542|   5.0|943015062|1999-11-19 12:37:42| 1999-11-19|
|   236|   2459|   4.0|943013896|1999-11-19 12:18:16| 1999-11-19|
|   236|   2580|   5.0|943015701|1999-11-19 12:48:21| 1999-11-19|
|   236|   1350|   4.0|943013896|1999-11-19 12:18:16| 1999-11-19|
+------+-------+------+---------+-------------------+-----------+
only showing top 5 rows
+-------+------+----------------+-------------------------------------------+
|movieId|userId|predicted_rating|title                                      |
+-------+------+----------------+-------------------------------------------+
|170355 |1     |5.8688707       |Mulholland Dr. (1999)                      |
|3379   |1     |5.8688707       |On the Beach (1959)                  

6100

In [0]:
from pyspark.sql import functions as F

spark.table("gold_fact_ratings") \
    .groupBy("movieId") \
    .agg(
        F.avg("rating").alias("avg_rating"),
        F.count("rating").alias("num_ratings")
    ) \
    .orderBy(F.desc("avg_rating"), F.desc("num_ratings")) \
    .show(10)


+-------+----------+-----------+
|movieId|avg_rating|num_ratings|
+-------+----------+-----------+
|   6442|       5.0|          2|
|   6818|       5.0|          2|
|     53|       5.0|          2|
|   1151|       5.0|          2|
|     99|       5.0|          2|
|   3473|       5.0|          2|
|  78836|       5.0|          2|
|  96608|       5.0|          1|
| 139640|       5.0|          1|
|   3303|       5.0|          1|
+-------+----------+-----------+
only showing top 10 rows


In [0]:
import mlflow

with mlflow.start_run():
    mlflow.log_param("rank", model.rank)
    mlflow.log_param("regParam", getattr(model, 'regParam', 'not available'))
    mlflow.log_metric("rmse", 0.8123190612289932)


In [0]:
from pyspark.ml.recommendation import ALSModel

# Load the trained model if not already in memory
model = ALSModel.load("dbfs:/path/to/gold_als_model")  # or use the DataFrame you trained

# Generate top 10 recommendations per user
top_recs = model.recommendForAllUsers(10)

# Optional: flatten for easier querying
from pyspark.sql.functions import explode
top_recs_flat = top_recs.select("userId", explode("recommendations").alias("rec"))
top_recs_flat = top_recs_flat.select(
    "userId",
    top_recs_flat.rec.movieId.alias("movieId"),
    top_recs_flat.rec.rating.alias("predicted_rating")
)

top_recs_flat.show(5)


{"ts": "2026-02-17 13:52:54.295", "level": "ERROR", "logger": "pyspark.sql.connect.logging", "msg": "GRPC Error received", "context": {}, "exception": {"class": "_MultiThreadedRendezvous", "msg": "<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.INTERNAL\n\tdetails = \"[PATH_NOT_FOUND] Path does not exist: dbfs:/path/to/gold_als_model/metadata. SQLSTATE: 42K03\"\n\tdebug_error_string = \"UNKNOWN:Error received from peer  {created_time:\"2026-02-17T13:52:54.295139972+00:00\", grpc_status:13, grpc_message:\"[PATH_NOT_FOUND] Path does not exist: dbfs:/path/to/gold_als_model/metadata. SQLSTATE: 42K03\"}\"\n>", "stacktrace": [{"class": null, "method": "_execute_and_fetch_as_iterator", "file": "/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", "line": "2099"}, {"class": null, "method": "__next__", "file": "<frozen _collections_abc>", "line": "356"}, {"class": null, "method": "send", "file": "/databricks/python/lib/python3.12/sit

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-5385075738654564>, line 4[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mpyspark[39;00m[38;5;21;01m.[39;00m[38;5;21;01mml[39;00m[38;5;21;01m.[39;00m[38;5;21;01mrecommendation[39;00m [38;5;28;01mimport[39;00m ALSModel
[1;32m      3[0m [38;5;66;03m# Load the trained model if not already in memory[39;00m
[0;32m----> 4[0m model [38;5;241m=[39m ALSModel[38;5;241m.[39mload([38;5;124m"[39m[38;5;124mdbfs:/path/to/gold_als_model[39m[38;5;124m"[39m)  [38;5;66;03m# or use the DataFrame you trained[39;00m
[1;32m      6[0m [38;5;66;03m# Generate top 10 recommendations per user[39;00m
[1;32m      7[0m top_recs [38;5;241m=[39m model[38;5;241m.[39mrecommendForAllUsers([38;5;241m10[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/m

In [0]:
from pyspark.ml.recommendation import ALSModel

data_path = "/FileStore/tables/movie-recommender-data-raw/gold_als_model/"

model_path = data_path + "gold_als_model"
als_model.write().overwrite().save(model_path)


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-5385075738654566>, line 6[0m
[1;32m      3[0m data_path [38;5;241m=[39m [38;5;124m"[39m[38;5;124m/FileStore/tables/movie-recommender-data-raw/gold_als_model/[39m[38;5;124m"[39m
[1;32m      5[0m model_path [38;5;241m=[39m data_path [38;5;241m+[39m [38;5;124m"[39m[38;5;124mgold_als_model[39m[38;5;124m"[39m
[0;32m----> 6[0m als_model[38;5;241m.[39mwrite()[38;5;241m.[39moverwrite()[38;5;241m.[39msave(model_path)

[0;31mNameError[0m: name 'als_model' is not defined

In [0]:
tables = spark.catalog.listTables()
for t in tables:
    if t.name.startswith("gold_"):
        print(f"{t.name}  |  {t.tableType}  |  {t.isTemporary}")


gold_fact_ratings  |  MANAGED  |  False
gold_user_recommendations  |  MANAGED  |  False
