In [2]:
%pip install numpy

Collecting numpy
  Downloading numpy-2.2.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
Downloading numpy-2.2.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.4/16.4 MB[0m [31m19.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-2.2.5
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark.sql import SparkSession

In [19]:
#spark = SparkSession.builder \
    #.appName("Read MovieLens from HDFS") \
    #.getOrCreate()

#spark = SparkSession.builder \
    #.appName("ALS Model") \
    #.config("spark.driver.memory", "4g") \
    #.config("spark.executor.memory", "4g") \
   # .getOrCreate()

#.config("spark.hadoop.fs.defaultFS", "hdfs://bigdata-node:8088") \
spark = SparkSession.builder \
    .appName("MovieLensAnalysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.sql.shuffle.partitions", "20") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .getOrCreate()

In [20]:
ratings_df = spark.read.option("header", True).csv("hdfs:///data/rating.csv")

In [21]:
ratings_df.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



In [22]:
movies_df = spark.read.option("header", True).csv("hdfs:///data/movie.csv")

In [23]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [24]:
df_data = ratings_df.join(
    movies_df.select("movieId", "title"),
    on="movieId",
    how="left"
)

In [25]:
df_data.show(5)

+-------+------+------+-------------------+--------------------+
|movieId|userId|rating|          timestamp|               title|
+-------+------+------+-------------------+--------------------+
|      2|     1|   3.5|2005-04-02 23:53:47|      Jumanji (1995)|
|     29|     1|   3.5|2005-04-02 23:31:16|City of Lost Chil...|
|     32|     1|   3.5|2005-04-02 23:33:39|Twelve Monkeys (a...|
|     47|     1|   3.5|2005-04-02 23:32:07|Seven (a.k.a. Se7...|
|     50|     1|   3.5|2005-04-02 23:29:40|Usual Suspects, T...|
+-------+------+------+-------------------+--------------------+
only showing top 5 rows



In [26]:
df_data = df_data.withColumnsRenamed({
    'userId': 'user',
    'movieId': 'movie',
})

In [27]:
df_data.show(5)

+-----+----+------+-------------------+--------------------+
|movie|user|rating|          timestamp|               title|
+-----+----+------+-------------------+--------------------+
|    2|   1|   3.5|2005-04-02 23:53:47|      Jumanji (1995)|
|   29|   1|   3.5|2005-04-02 23:31:16|City of Lost Chil...|
|   32|   1|   3.5|2005-04-02 23:33:39|Twelve Monkeys (a...|
|   47|   1|   3.5|2005-04-02 23:32:07|Seven (a.k.a. Se7...|
|   50|   1|   3.5|2005-04-02 23:29:40|Usual Suspects, T...|
+-----+----+------+-------------------+--------------------+
only showing top 5 rows



In [28]:
from pyspark.sql.functions import col

df_data = df_data \
    .withColumn("user", col("user").cast("int")) \
    .withColumn("movie", col("movie").cast("int")) \
    .withColumn("rating", col("rating").cast("float")) \
    .withColumn("timestamp", col("timestamp").cast("int")) \
    .withColumn("title", col("title").cast("string"))

In [29]:
df_data_train, df_data_test = df_data.randomSplit([0.8, 0.2], 42)

In [30]:
# import ALS model from spark
from pyspark.ml.recommendation import ALS

# initialise the model 
als = ALS(maxIter=10, regParam=0.1, rank=10, userCol="user", itemCol="movie", 
          ratingCol="rating", coldStartStrategy="drop", nonnegative=True)

# train model 
als_model = als.fit(df_data_train)

                                                                                

In [31]:
dfs_preds = als_model.transform(df_data_test)

In [32]:
dfs_preds.show(5)



+-----+----+------+---------+--------------------+----------+
|movie|user|rating|timestamp|               title|prediction|
+-----+----+------+---------+--------------------+----------+
|   47|  95|   3.0|     NULL|Seven (a.k.a. Se7...| 3.0762284|
|  262|  95|   3.0|     NULL|Little Princess, ...| 3.4373078|
|  446|  95|   3.0|     NULL|Farewell My Concu...| 3.6016524|
|  592|  95|   3.0|     NULL|       Batman (1989)|  2.502191|
|  708|  95|   4.0|     NULL|Truth About Cats ...|  3.108661|
+-----+----+------+---------+--------------------+----------+
only showing top 5 rows



                                                                                

In [33]:
#als_model.save("hdfs:///data/als_prediction_rating")

In [34]:
from pyspark.ml.recommendation import ALSModel

als_model_loaded = ALSModel.load("hdfs:///data/als_prediction_rating")

In [None]:
dfs_preds_loaded = als_model_loaded.transform(df_data_test)

In [None]:
dfs_preds_loaded.show(5)

In [None]:
df_data_test.show()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

# initiate evaluator
eval = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
# calculate rmse
rmse = eval.evaluate(dfs_preds_loaded)

# reset metric and evaluate mse, R2, mae & explained variance
r2 = eval.evaluate(dfs_preds_loaded, {eval.metricName: 'r2'})
mae = eval.evaluate(dfs_preds_loaded, {eval.metricName: 'mae'})
var = eval.evaluate(dfs_preds_loaded, {eval.metricName: 'var'})


print(
    "RMSE score = {}".format(rmse),
    "R2 score = {}".format(r2),
    "MAE score = {}".format(mae),
    "Explained variance score = {}".format(var),
    sep="\n"
)

In [47]:
from pyspark.sql.functions import col

# ID de l'utilisateur
user_id = 29

# Créer un DataFrame avec l'utilisateur
user_df = spark.createDataFrame([(user_id,)], ["user"])

# Créer un DataFrame avec tous les films
all_movies_df = movies_df.select(col("movieId").cast("int").alias("movie"))

# Faire un produit cartésien pour obtenir (user, movie)
user_movies_df = user_df.crossJoin(all_movies_df)

# Prédire les notes
predicted_ratings_df = als_model_loaded.transform(user_movies_df)

# ✅ Préparer les titres des films sans ambiguïté
movies_renamed_df = movies_df.select(
    col("movieId").cast("int").alias("movie_id_for_join"),
    "title"
)

# ✅ Join propre entre prédictions et titres
result_df = predicted_ratings_df.join(
    movies_renamed_df,
    predicted_ratings_df.movie == movies_renamed_df.movie_id_for_join,
    how="left"
)

# Afficher les 10 meilleures prédictions
result_df.select("user", "movie", "title", "prediction") \
         .orderBy(col("prediction").desc()) \
         .show(100, truncate=False)

#result_df.select("user", "movie", "title", "prediction") \
#         .orderBy(col("prediction").desc()) \
#         .show(truncate=False, n=result_df.count())


+----+------+------------------------------------------------------------------------------------------------------+----------+
|user|movie |title                                                                                                 |prediction|
+----+------+------------------------------------------------------------------------------------------------------+----------+
|29  |128091|Craig Ferguson: A Wee Bit o' Revolution (2009)                                                        |6.5495496 |
|29  |121029|No Distance Left to Run (2010)                                                                        |5.922186  |
|29  |77736 |Crazy Stone (Fengkuang de shitou) (2006)                                                              |5.7918434 |
|29  |129536|Code Name Coq Rouge (1989)                                                                            |5.3435593 |
|29  |99243 |Parasites, Les (1999)                                                                      

In [6]:
%pip install pymongo

Collecting pymongo
  Downloading pymongo-4.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Downloading pymongo-4.12.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m15.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dnspython-2.7.0-py3-none-any.whl (313 kB)
Installing collected packages: dnspython, pymongo
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [pymongo]m1/2[0m [pymongo]
[1A[2KSuccessfully installed dnspython-2.7.0 pymongo-4.12.1
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m
Note: you ma

In [8]:
from pymongo import MongoClient

# Connexion à MongoDB (dans Docker)
client = MongoClient("mongodb://mongodb:27017/")  # ou "mongodb://mongodb:27017/" si tu es DANS un conteneur

# Sélection de la base et de la collection
db = client["reco_db"]
collection = db["predictions"]

# Exemple de données à insérer
predictions = [
    {"user": 29, "movie": 1, "title": "Film 1", "prediction": 4.5},
    {"user": 29, "movie": 2, "title": "Film 2", "prediction": 4.0},
    {"user": 29, "movie": 3, "title": "Film 3", "prediction": 3.7},
]

# Insertion
collection.insert_many(predictions)

print("✅ Prédictions insérées avec succès dans MongoDB !")

✅ Prédictions insérées avec succès dans MongoDB !


In [None]:
# 1. Convertir Spark DataFrame en Pandas
# Attention : .toPandas() charge tout en mémoire
pdf = result_df.select("user", "movie", "title", "prediction") \
               .orderBy(col("prediction").desc()) \
               .limit(100) \
               .toPandas()

# 2. Se connecter à MongoDB
client = MongoClient("mongodb://localhost:27017/")  # ou "mongodb://mongodb:27017/" dans un conteneur
collection = client["reco_db"]["predictions"]

# 3. Insérer les prédictions dans Mongo
collection.insert_many(pdf.to_dict(orient="records"))

print("✅ Données insérées dans MongoDB depuis result_df !")

In [4]:
spark = SparkSession.builder \
    .appName("WriteToMongoDB") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .config("spark.mongodb.write.connection.uri", "mongodb://mongodb:27017/reco_db.predictions") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1850713e-7cbf-439b-b86f-13badd79f9d5;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;10.1.1 in central
	found org.mongodb#mongodb-driver-sync;4.8.2 in central
	[4.8.2] org.mongodb#mongodb-driver-sync;[4.8.1,4.8.99)
	found org.mongodb#bson;4.8.2 in central
	found org.mongodb#mongodb-driver-core;4.8.2 in central
	found org.mongodb#bson-record-codec;4.8.2 in central
downloading https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar ...
	[SUCCESSFUL ] org.mongodb.spark#mongo-spark-connector_2.12;10.1.1!mongo-spark-connector_2.12.jar (39ms)
downloading https://repo1.maven.org/maven2/org/mongodb/mongodb-driver-sync/4.8.2/mongodb-driver-sync-4.8.2.jar ...
	[SUCCESSFUL ] org.m

In [5]:
from pyspark.sql.functions import col

# Exemple de DataFrame à insérer (remplace par result_cleaned_df)
data = [(29, 1, "Film 1", 4.5), (29, 2, "Film 2", 4.0)]
columns = ["user", "movie", "title", "prediction"]
df = spark.createDataFrame(data, columns)

# Écriture dans MongoDB (base: reco_db, collection: predictions)
df.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("database", "reco_db") \
    .option("collection", "predictions") \
    .option("uri", "mongodb://mongodb:27017/") \
    .save()

25/05/05 13:04:43 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 7) 12]
java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
	at com.mongodb.spark.sql.connector.schema.InternalRowToRowFunction.<init>(InternalRowToRowFunction.java:46)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.<init>(RowToBsonDocumentConverter.java:80)
	at com.mongodb.spark.sql.connector.write.MongoDataWriter.<init>(MongoDataWriter.java:78)
	at com.mongodb.spark.sql.connector.write.MongoDataWriterFactory.createWriter(MongoDataWriterFactory.java:54)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:436)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.

Py4JJavaError: An error occurred while calling o55.save.
: org.apache.spark.SparkException: Writing job failed.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobFailedError(QueryExecutionErrors.scala:902)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:411)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
	at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.writeWithV2(WriteToDataSourceV2Exec.scala:243)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337)
	at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336)
	at org.apache.spark.sql.execution.datasources.v2.OverwriteByExpressionExec.run(WriteToDataSourceV2Exec.scala:243)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:319)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 (TID 11) (namenode executor driver): java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
	at com.mongodb.spark.sql.connector.schema.InternalRowToRowFunction.<init>(InternalRowToRowFunction.java:46)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.<init>(RowToBsonDocumentConverter.java:80)
	at com.mongodb.spark.sql.connector.write.MongoDataWriter.<init>(MongoDataWriter.java:78)
	at com.mongodb.spark.sql.connector.write.MongoDataWriterFactory.createWriter(MongoDataWriterFactory.java:54)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:436)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:385)
	... 44 more
	Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: c8ec8a57-2024-4c90-9e8b-dc6430b53448. 0/12 tasks completed.
		at com.mongodb.spark.sql.connector.write.MongoBatchWrite.abort(MongoBatchWrite.java:95)
		at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:406)
		... 44 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(org.apache.spark.sql.types.StructType)'
	at com.mongodb.spark.sql.connector.schema.InternalRowToRowFunction.<init>(InternalRowToRowFunction.java:46)
	at com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter.<init>(RowToBsonDocumentConverter.java:80)
	at com.mongodb.spark.sql.connector.write.MongoDataWriter.<init>(MongoDataWriter.java:78)
	at com.mongodb.spark.sql.connector.write.MongoDataWriterFactory.createWriter(MongoDataWriterFactory.java:54)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:436)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [49]:
# Nettoyage : enlever les valeurs NaN (optionnel mais recommandé)
result_df.select("user", "movie", "title", "prediction") \
         .orderBy(col("prediction").desc()) \
         .show(30, truncate=False)

# Écriture dans MongoDB
result_df.write \
    .format("mongodb") \
    .mode("overwrite") \
    .option("database", "reco_db") \
    .option("collection", "predictions") \
    .option("uri", "mongodb://localhost:27017/") \
    .save()

+----+------+------------------------------------------------------------------------------------------------------+----------+
|user|movie |title                                                                                                 |prediction|
+----+------+------------------------------------------------------------------------------------------------------+----------+
|29  |128091|Craig Ferguson: A Wee Bit o' Revolution (2009)                                                        |6.5495496 |
|29  |121029|No Distance Left to Run (2010)                                                                        |5.922186  |
|29  |77736 |Crazy Stone (Fengkuang de shitou) (2006)                                                              |5.7918434 |
|29  |129536|Code Name Coq Rouge (1989)                                                                            |5.3435593 |
|29  |99243 |Parasites, Les (1999)                                                                      

Py4JJavaError: An error occurred while calling o870.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongodb. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: mongodb.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more
