In [1]:
#Import statemenets
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS 
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 
from pyspark.sql.functions import col, explode


In [2]:
spark = SparkSession.builder.appName("CollaborativeFilteringRecsEngine").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/28 14:23:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

In [4]:
#Getting the datasets
moviesDF = spark.read.options(header="True", inferSchema="True").csv("./data/movies.csv")
ratingsDF = spark.read.options(header="True", inferSchema="True").csv("./data/rating.csv")


In [5]:
moviesDF.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [6]:
moviesDF.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [7]:
ratingsDF.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [8]:
ratingsDF.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [9]:
display(moviesDF)

DataFrame[movieId: int, title: string, genres: string]

In [10]:
display(ratingsDF)

DataFrame[userId: int, movieId: int, rating: double, timestamp: int]

In [11]:
#Performing a join on the two datasets
#We  join the 2 datasets on movieID columns, that is common between them, 
# we perform the left join
#By doing so, we avoid films(movies) without rating
ratings = ratingsDF.join(moviesDF, 'movieId', 'left')

In [12]:
ratings.show()

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

# Data spliting (80%-20%)(Training, testing)

In [13]:
(train, test) = ratings.randomSplit([0.8, 0.2])

In [14]:
ratings.count()

100836

In [15]:
print(train.count())
train.show()

80468
+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|     1|   4.0| 964982703|Toy Story (1995)|Adventure|Animati...|
|      1|     5|   4.0| 847434962|Toy Story (1995)|Adventure|Animati...|
|      1|     7|   4.5|1106635946|Toy Story (1995)|Adventure|Animati...|
|      1|    18|   3.5|1455209816|Toy Story (1995)|Adventure|Animati...|
|      1|    19|   4.0| 965705637|Toy Story (1995)|Adventure|Animati...|
|      1|    21|   3.5|1407618878|Toy Story (1995)|Adventure|Animati...|
|      1|    27|   3.0| 962685262|Toy Story (1995)|Adventure|Animati...|
|      1|    33|   3.0| 939647444|Toy Story (1995)|Adventure|Animati...|
|      1|    40|   5.0| 832058959|Toy Story (1995)|Adventure|Animati...|
|      1|    43|   5.0| 848993983|Toy Story (1995)|Adventure|Animati...|
|      1|    44|   3.0| 869251860|Toy Story (

In [16]:
print(test.count())
test.show()

20368
+-------+------+------+----------+----------------+--------------------+
|movieId|userId|rating| timestamp|           title|              genres|
+-------+------+------+----------+----------------+--------------------+
|      1|    15|   2.5|1510577970|Toy Story (1995)|Adventure|Animati...|
|      1|    17|   4.5|1305696483|Toy Story (1995)|Adventure|Animati...|
|      1|    31|   5.0| 850466616|Toy Story (1995)|Adventure|Animati...|
|      1|    32|   3.0| 856736119|Toy Story (1995)|Adventure|Animati...|
|      1|    73|   4.5|1464196374|Toy Story (1995)|Adventure|Animati...|
|      1|    96|   5.0| 964772990|Toy Story (1995)|Adventure|Animati...|
|      1|   103|   4.0|1431954238|Toy Story (1995)|Adventure|Animati...|
|      1|   121|   4.0| 847656180|Toy Story (1995)|Adventure|Animati...|
|      1|   132|   2.0|1157921785|Toy Story (1995)|Adventure|Animati...|
|      1|   137|   4.0|1204859907|Toy Story (1995)|Adventure|Animati...|
|      1|   160|   4.0| 971115026|Toy Story (

In [17]:
als = ALS(userCol = "userId", itemCol="movieId", ratingCol="rating", nonnegative=True,implicitPrefs=False, coldStartStrategy="drop")


This line of code is creating an **ALS (Alternating Least Squares)** model using PySpark's MLlib for building a recommendation system.

### **Code Breakdown:**

```python
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop"
)
```

#### **Parameters:**

1. **`userCol="userId"`**
   - Specifies the column name in your DataFrame that contains the **user IDs**.
   - Each unique user is identified by an ID.

2. **`itemCol="movieId"`**
   - Specifies the column name that contains the **item IDs** (e.g., movies in this case).
   - Each unique item is identified by an ID.

3. **`ratingCol="rating"`**
   - Specifies the column name that contains the **ratings** (e.g., user feedback for items such as 1–5 stars).

4. **`nonnegative=True`**
   - Ensures that all matrix factorization values (user and item factors) are non-negative.
   - Non-negative values make sense for many recommendation problems, like ratings or counts, where negative values would not be valid.

5. **`implicitPrefs=False`**
   - Indicates whether the model is for **explicit preferences** (e.g., numeric ratings like 1-5).
   - If `True`, it assumes **implicit feedback** (e.g., clicks, views, or purchases) instead of explicit ratings.

6. **`coldStartStrategy="drop"`**
   - Specifies how to handle **cold-start scenarios** (when the model encounters unseen users or items during predictions):
     - `"drop"`: Drops predictions for cold-start users/items to avoid errors.
     - Other options may allow fallback behavior, but `"drop"` is common.

---

### **How This Works in Practice:**

This `ALS` instance is set up to:
- Use user-item-rating data.
- Predict explicit preferences (ratings).
- Generate only valid, non-negative predictions.
- Handle cold-start users or items by skipping their predictions.

---



In [18]:
als

ALS_36bad81e9267

### HyperParameters setup  

In [19]:
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()


evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction")

In [20]:
param_grid

[{Param(parent='ALS_36bad81e9267', name='rank', doc='rank of the factorization'): 10,
  Param(parent='ALS_36bad81e9267', name='regParam', doc='regularization parameter (>= 0).'): 0.01},
 {Param(parent='ALS_36bad81e9267', name='rank', doc='rank of the factorization'): 10,
  Param(parent='ALS_36bad81e9267', name='regParam', doc='regularization parameter (>= 0).'): 0.05},
 {Param(parent='ALS_36bad81e9267', name='rank', doc='rank of the factorization'): 10,
  Param(parent='ALS_36bad81e9267', name='regParam', doc='regularization parameter (>= 0).'): 0.1},
 {Param(parent='ALS_36bad81e9267', name='rank', doc='rank of the factorization'): 10,
  Param(parent='ALS_36bad81e9267', name='regParam', doc='regularization parameter (>= 0).'): 0.15},
 {Param(parent='ALS_36bad81e9267', name='rank', doc='rank of the factorization'): 50,
  Param(parent='ALS_36bad81e9267', name='regParam', doc='regularization parameter (>= 0).'): 0.01},
 {Param(parent='ALS_36bad81e9267', name='rank', doc='rank of the factor

In [21]:
evaluator

RegressionEvaluator_4b5cc2ba175e

In [22]:
#Using cross validation
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)


In [23]:
cv

CrossValidator_c5745a811dde

## Fitting the model

In [24]:
model = cv.fit(train)

25/01/28 14:23:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetw

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010aaa5150, pid=3477, tid=222979
#
# JRE version: OpenJDK Runtime Environment Homebrew (11.0.26) (build 11.0.26+0)
# Java VM: OpenJDK 64-Bit Server VM Homebrew (11.0.26+0, mixed mode, tiered, compressed oops, g1 gc, bsd-aarch64)
# Problematic frame:
# V  [libjvm.dylib+0x695150]  ObjectSynchronizer::inflate(Thread*, oopDesc*, ObjectSynchronizer::InflateCause)+0x18c
#
# No core dump will be written. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/kempslysilencieux/Documents/bigdata-tuto/spark-code/Spark-ML/Spark-collaborative-filtering/hs_err_pid3477.log
[thread 225283 also had an error]
[thread 28167 also had an error][thread 211971 also had an error]

[thread 223747 also had an error][thread 224259 also had an error]

#
# If you would like to submit a bug rep

Py4JError: An error occurred while calling o203.fit

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/socket.py", line 720, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [Errno 54] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site

In [None]:
best_model = model.bestModel


In [None]:
test_predictions = best_model.transform(test)


In [None]:
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

In [None]:
recommendations = best_model.recommendForAllUsers(5)

In [None]:

df = recommendations

display(df)


In [None]:
df2 = df.withColumn("movieid_rating", explode("recommendations"))

In [None]:
display(df2)

In [None]:
display(df2.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating")))

In [None]:

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, explode

# Initialize Spark session
spark = SparkSession.builder.appName("Collaborative filtering").getOrCreate()


# Load movies and ratings data
moviesDF = spark.read.options(header="True", inferSchema="True").csv("./data/movies.csv")
ratingsDF = spark.read.options(header="True", inferSchema="True").csv("./data/rating.csv")

# Display the dataframes to ensure they are loaded correctly
display(moviesDF)
display(ratingsDF)


# Join ratings with movies on movieId
ratings = ratingsDF.join(moviesDF, 'movieId', 'left')

# Split the data into training and testing sets
(train, test) = ratings.randomSplit([0.8, 0.2])


# Check the count of rows in the training dataset
ratings.count()

# Show some rows from the training dataset
print(train.count())
train.show()

# Show some rows from the test dataset
print(test.count())
test.show()

# Initialize the ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

# Set up parameter grid for cross-validation
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

# Initialize evaluator for RMSE
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)


# Initialize CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Fit the model using cross-validation
model = cv.fit(train)

# Get the best model
best_model = model.bestModel

# Transform the test set using the best model
test_predictions = best_model.transform(test)

# Evaluate the RMSE on the test set
RMSE = evaluator.evaluate(test_predictions)
print(f"RMSE: {RMSE}")

# Get recommendations for all users
recommendations = best_model.recommendForAllUsers(5)

# Display the recommendations dataframe
display(recommendations)

# Explode the recommendations array and select the relevant columns
df2 = recommendations.withColumn("movieid_rating", explode("recommendations"))

# Display the exploded dataframe with selected columns
df2.select(
    "userId", 
    col("movieid_rating.movieId").alias("movieId"), 
    col("movieid_rating.rating").alias("rating")
).show()


ConnectionRefusedError: [Errno 61] Connection refused

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, explode

# Initialize Spark session with adjusted parameters
spark = SparkSession.builder \
    .appName("Collaborative filtering") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "4") \
    .getOrCreate()

# Load movies and ratings data
moviesDF = spark.read.options(header="True", inferSchema="True").csv("./data/movies.csv")
ratingsDF = spark.read.options(header="True", inferSchema="True").csv("./data/rating.csv")

# Display the dataframes to ensure they are loaded correctly
moviesDF.show()
ratingsDF.show()

# Join ratings with movies on movieId
ratings = ratingsDF.join(moviesDF, 'movieId', 'left')

# Split the data into training and testing sets
(train, test) = ratings.randomSplit([0.8, 0.2])

# Check the count of rows in the training dataset
print(train.count())
train.show()

# Show some rows from the test dataset
print(test.count())
test.show()

# Initialize the ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

# Set up parameter grid for cross-validation
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

# Initialize evaluator for RMSE
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)

# Initialize CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

# Fit the model using cross-validation
try:
    model = cv.fit(train)
    # Get the best model
    best_model = model.bestModel
    # Transform the test set using the best model
    test_predictions = best_model.transform(test)
    # Evaluate the RMSE on the test set
    RMSE = evaluator.evaluate(test_predictions)
    print(f"RMSE: {RMSE}")
    # Get recommendations for all users
    recommendations = best_model.recommendForAllUsers(5)
    # Display the recommendations dataframe
    recommendations.show()
    # Explode the recommendations array and select the relevant columns
    df2 = recommendations.withColumn("movieid_rating", explode("recommendations"))
    # Display the exploded dataframe with selected columns
    df2.select(
        "userId", 
        col("movieid_rating.movieId").alias("movieId"), 
        col("movieid_rating.rating").alias("rating")
    ).show()
except Exception as e:
    print(f"An error occurred: {e}")


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/28 14:34:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010a2a5150, pid=3682, tid=197123
#
# JRE version: OpenJDK Runtime Environment Homebrew (11.0.26) (build 11.0.26+0)
# Java VM: OpenJDK 64-Bit Server VM Homebrew (11.0.26+0, mixed mode, tiered, compressed oops, g1 gc, bsd-aarch64)
# Problematic frame:
# V  [libjvm.dylib+0x695150]  ObjectSynchronizer::inflate(Thread*, oopDesc*, ObjectSynchronizer::InflateCause)+0x18c
#
# No core dump will be written. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/kempslysilencieux/Documents/bigdata-tuto/spark-code/Spark-ML/Spark-collaborative-filtering/hs_err_pid3682.log
#
# If you would like to submit a bug report, please visit:
#   https://github.com/Homebrew/homebrew-core/issues
#
An error occurred: An error occurred while calling o196.fit


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kempslysilencieux/Documents/bigdata-tuto/pyspark-env/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [None]:

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col, explode

# Initialize Spark session with resource optimization
spark = SparkSession.builder \
    .appName("Collaborative Filtering") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.default.parallelism", "4") \
    .getOrCreate()

# Set log level to DEBUG to capture detailed logs
spark.sparkContext.setLogLevel("DEBUG")

# Load movies and ratings data (Limit the data for testing purposes)
moviesDF = spark.read.options(header="True", inferSchema="True").csv("./data/movies.csv").limit(1000)
ratingsDF = spark.read.options(header="True", inferSchema="True").csv("./data/rating.csv")

# Display the dataframes to ensure they are loaded correctly
print("Movies Dataset:")
moviesDF.show(5)
print("Ratings Dataset:")
ratingsDF.show(5)

# Join ratings with movies on movieId
ratings = ratingsDF.join(moviesDF, 'movieId', 'left')

# Split the data into training and testing sets (Use a smaller split for local testing)
(train, test) = ratings.randomSplit([0.8, 0.2])

# Print the count of rows in the training and test datasets
print(f"Training Data Count: {train.count()}")
train.show(5)
print(f"Test Data Count: {test.count()}")
test.show(5)

# Initialize the ALS model with user and item column names
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", 
          nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

# Set up parameter grid for cross-validation
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 100, 150]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()


# Initialize evaluator for RMSE
evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="rating", 
    predictionCol="prediction"
)

# Initialize CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

# COMMAND ----------
# Fit the model using cross-validation
model = cv.fit(train)

# Get the best model from cross-validation
best_model = model.bestModel

# Transform the test set using the best model
test_predictions = best_model.transform(test)

# Evaluate the RMSE on the test set
RMSE = evaluator.evaluate(test_predictions)
print(f"Test RMSE: {RMSE}")

# COMMAND ----------
# Get recommendations for all users (Limit the number of recommendations for testing)
recommendations = best_model.recommendForAllUsers(3)

# COMMAND ----------
# Display the recommendations dataframe
print("Recommendations for All Users:")
recommendations.show(5)

# COMMAND ----------
# Explode the recommendations array and select the relevant columns
df2 = recommendations.withColumn("movieid_rating", explode("recommendations"))

# Display the exploded dataframe with selected columns
df2.select(
    "userId", 
    col("movieid_rating.movieId").alias("movieId"), 
    col("movieid_rating.rating").alias("rating")
).show(5)

# COMMAND ----------
# Stop the Spark session after the job is complete
spark.stop()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/28 14:40:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/28 14:40:25 DEBUG FileSystem: Loading filesystems
25/01/28 14:40:25 DEBUG FileSystem: nullscan:// = class org.apache.hadoop.hive.ql.io.NullScanFileSystem from /opt/spark/jars/hive-exec-2.3.9-core.jar
25/01/28 14:40:25 DEBUG FileSystem: file:// = class org.apache.hadoop.fs.LocalFileSystem from /opt/spark/jars/hadoop-client-api-3.3.4.jar
25/01/28 14:40:25 DEBUG FileSystem: file:// = class org.apache.hadoop.hive.ql.io.ProxyLocalFileSystem from /opt/spark/jars/hive-exec-2.3.9-core.jar
25/01/28 14:40:25 DEBUG FileSystem: viewfs:// = class org.apache.hadoop.fs.viewfs.ViewFileSystem from /opt/spark/jars/hadoop-client-api-3.3.4.jar
25/01/28 14:40:25 DEBUG FileSystem: har:// = class org.apache.hadoop.fs.HarFileSystem fro

Movies Dataset:
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows

Ratings Dataset:


25/01/28 14:40:27 DEBUG BlockManager: Put block broadcast_5 locally took 0 ms
25/01/28 14:40:27 DEBUG BlockManager: Putting block broadcast_5 without replication took 0 ms
25/01/28 14:40:27 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 7.6 KiB, free 1048.0 MiB)
25/01/28 14:40:27 DEBUG BlockManagerMasterEndpoint: Updating block info on master broadcast_5_piece0 for BlockManagerId(driver, client-172-18-120-79.eduroam.universite-paris-saclay.fr, 50917, None)
25/01/28 14:40:27 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on client-172-18-120-79.eduroam.universite-paris-saclay.fr:50917 (size: 7.6 KiB, free: 1048.7 MiB)
25/01/28 14:40:27 DEBUG BlockManagerMaster: Updated info of block broadcast_5_piece0
25/01/28 14:40:27 DEBUG BlockManager: Told master about block broadcast_5_piece0
25/01/28 14:40:27 DEBUG BlockManager: Put block broadcast_5_piece0 locally took 0 ms
25/01/28 14:40:27 DEBUG BlockManager: Putting block broadcast_5_piece0 with

NameError: name 'ratingsDF' is not defined

25/01/28 14:40:44 DEBUG ExecutorMetricsPoller: removing (1, 0) from stageTCMP
25/01/28 14:40:44 DEBUG ExecutorMetricsPoller: removing (2, 0) from stageTCMP
25/01/28 14:40:44 DEBUG ExecutorMetricsPoller: removing (0, 0) from stageTCMP
