# load data

In [0]:
%python
books = spark.table("data_duan.`recommandation-raw`.books")
users = spark.table("data_duan.`recommandation-raw`.users")
ratings = spark.table("data_duan.`recommandation-raw`.ratings")

In [0]:
%python
import mlflow
import mlflow.spark
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

# Convert ISBN (string) to a numeric index
isbn_indexer = StringIndexer(inputCol="ISBN", outputCol="book_id")
user_indexer = StringIndexer(inputCol="User-ID", outputCol="user_id")

# Fit and transform the data
ratings_valiated = isbn_indexer.fit(ratings).transform(ratings)
ratings_valiated = user_indexer.fit(ratings_valiated).transform(ratings_valiated)

# Select only necessary columns
ratings_valiated = ratings_valiated.select("user_id", "book_id", "Book-Rating")
ratings_valiated.write.mode("overwrite").saveAsTable("data_duan.valiated.ratings")

# ALS Model
als = ALS(
    userCol="user_id",
    itemCol="book_id",
    ratingCol="Book-Rating",
    rank=10,
    maxIter=10,
    regParam=0.1,
    coldStartStrategy="drop"
)

train_ratings, test_ratings, validation_ratings = ratings_valiated.randomSplit([0.8, 0.1, 0.1], seed=42)
# Train model
model = als.fit(train_ratings)

# Log the model with MLflow
with mlflow.start_run():
    mlflow.spark.log_model(model, "als_model")
    mlflow.log_param("rank", 10)
    mlflow.log_param("maxIter", 10)
    mlflow.log_param("regParam", 0.1)
    mlflow.log_param("coldStartStrategy", "drop")

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Make predictions on test data
predictions = model.transform(test_ratings)

# Drop NaN values (caused by cold start)
predictions = predictions.na.drop()

# Evaluate using RMSE
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Book-Rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")