In [1]:
from google.cloud import storage 

# Initialize a client
client = storage.Client()

# Specify the bucket name and prefix
bucket = client.get_bucket("msca-bdp-student-gcs")
blobs = bucket.list_blobs(prefix="Group9FinalProj/movies_datahub/")

# Print out all file names
for blob in blobs:
    print(blob.name)

Group9FinalProj/movies_datahub/
Group9FinalProj/movies_datahub/boxoffice/2000-2009 Movies Box Ofice Collection.csv
Group9FinalProj/movies_datahub/boxoffice/2010-2024 Movies Box Ofice Collection.csv
Group9FinalProj/movies_datahub/boxoffice/2024 Movies Box Ofice Collection.csv
Group9FinalProj/movies_datahub/boxoffice/boxoffice2024.csv
Group9FinalProj/movies_datahub/boxoffice/thenumbers2024boxoffice.csv
Group9FinalProj/movies_datahub/boxoffice/ukboxoffice/bfi-weekend-box-office-report-2023-01-06-08.xls
Group9FinalProj/movies_datahub/boxoffice/ukboxoffice/bfi-weekend-box-office-report-2023-01-13-15.xls
Group9FinalProj/movies_datahub/boxoffice/ukboxoffice/bfi-weekend-box-office-report-2023-01-20-22.xls
Group9FinalProj/movies_datahub/boxoffice/ukboxoffice/bfi-weekend-box-office-report-2023-01-27-29.xls
Group9FinalProj/movies_datahub/boxoffice/ukboxoffice/bfi-weekend-box-office-report-2023-02-03-05.xls
Group9FinalProj/movies_datahub/boxoffice/ukboxoffice/bfi-weekend-box-office-report-2023-02-

# main_file EDA

In [2]:
spark = SparkSession.builder.appName('FPMovieRecommender').getOrCreate()

# main_file2 EDA

In [3]:
movies = spark.read \
  .option("header", "true") \
  .csv("gs://msca-bdp-student-gcs/Group9FinalProj/movies_datahub/main_file2")

                                                                                

In [4]:
movies.printSchema()

root
 |-- movieTitle: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- quote: string (nullable = true)
 |-- reviewId: string (nullable = true)
 |-- isVerified: string (nullable = true)
 |-- isSuperReviewer: string (nullable = true)
 |-- hasSpoilers: string (nullable = true)
 |-- hasProfanity: string (nullable = true)
 |-- score: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- userRealm: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- movieYear: string (nullable = true)
 |-- movieURL: string (nullable = true)
 |-- critic_score: string (nullable = true)
 |-- critic_sentiment: string (nullable = true)
 |-- audience_score: string (nullable = true)
 |-- audience_sentiment: string (nullable = true)
 |-- release_date_theaters: string (nullable = true)
 |-- release_date_streaming: string (nullable = true)
 |-- original_language: string (nullable

In [5]:
movies.select("rating","movieId","userId","theme","movieTitle")

DataFrame[rating: string, movieId: string, userId: string, theme: string, movieTitle: string]

In [6]:
ratings = movies.select(['userId', 'movieId', 'rating'])

In [7]:
ratings.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+--------------------+------+
|              userId|             movieId|rating|
+--------------------+--------------------+------+
|                null|6476cd54-ceb6-372...|   4.5|
|654e0d80-81bd-4bc...|9c225277-6be0-3e7...|   5.0|
|           795863019|1ea77d0a-a98c-31c...|   5.0|
|fd7c042c-153f-4ec...|d64c1e77-b505-3cf...|   4.0|
|           963232239|455d87bc-6850-4c1...|   5.0|
|d8fd3262-376c-438...|2734371e-65e3-3a5...|   3.0|
|           978080360|1ea77d0a-a98c-31c...|   4.5|
|78fb5de0-af90-4c3...|591fdf7c-5514-3d4...|   3.0|
|           260041541|a5eb2634-eb0a-351...|   3.5|
|d8fd3262-376c-438...|a62bdfeb-7d07-383...|   3.0|
|3b8acba1-017e-47a...|2cf75de0-1443-323...|   0.5|
|           260013347|41f4f84c-d06d-3f0...|   5.0|
|           977615484|9c18a32e-7c1e-334...|   4.0|
|           974236207|a50bc50f-396f-3d6...|   5.0|
|           260169552|bfd2fd97-a33c-3aa...|   0.5|
|c4f7e235-a63d-4e2...|46cb357d-6a77-32f...|   4.0|
|           978144575|e1374d4d-

                                                                                

In [8]:
# Data Cleaning
from pyspark.sql.functions import col

# 1. Drop duplicate rows
cleaned_df = ratings.dropDuplicates()

# 2. Filter out rows with null values in any column
cleaned_df = cleaned_df.dropna()

# 3. Ensure rating is a valid numeric value between 0.5 and 5.0
cleaned_df = cleaned_df.filter((col('rating') >= 0.5) & (col('rating') <= 5.0))

# 4. Trim whitespace from string columns (if applicable)
from pyspark.sql.functions import trim
cleaned_df = cleaned_df.withColumn('movieId', trim(col('movieId')))
cleaned_df = cleaned_df.withColumn('userId', trim(col('userId')))
cleaned_df = cleaned_df.filter(col("userId").rlike("^[0-9]+$"))

# Show the cleaned data
cleaned_df.show()

[Stage 5:>                                                          (0 + 1) / 1]

+---------+--------------------+------+
|   userId|             movieId|rating|
+---------+--------------------+------+
|917708463|e866b902-d727-354...|   2.5|
|921282529|f8e72581-39ea-3c4...|   2.0|
|908655006|916c512c-6f82-3f7...|   5.0|
|804117263|70531264-23dc-3aa...|   5.0|
|913959862|b025f290-a925-344...|   4.0|
|895096331|591fdf7c-5514-3d4...|   4.0|
|975841142|6eeeadff-c77d-3b3...|   5.0|
|858517239|36745381-5769-3fc...|   2.5|
|971873539|bfd2fd97-a33c-3aa...|   4.5|
|901274300|43dd5c07-9e99-3a7...|   3.0|
|977292722|3f191729-56b1-31c...|   3.0|
|587531845|344b1318-d47f-3b4...|   4.0|
|972088730|d58ef52e-6a69-36a...|   3.5|
|818701579|7d3aec31-0e03-312...|   3.5|
|965602433|4e7870b9-7a00-3bc...|   1.0|
|791272108|edae8d3a-8db9-3c1...|   0.5|
|298280146|c38a8fa5-ca05-34b...|   3.0|
|782789856|5b986c90-af31-3d2...|   4.0|
|849569608|b8611440-e6ac-37f...|   5.0|
|962212833|92ced6d7-3257-329...|   4.5|
+---------+--------------------+------+
only showing top 20 rows



                                                                                

In [9]:
cleaned_df.describe().show

                                                                                

<bound method DataFrame.show of DataFrame[summary: string, userId: string, movieId: string, rating: string]>

In [10]:
cleaned_df.head(5)

                                                                                

[Row(userId='787826723', movieId='23e7fde3-a4d5-338c-8783-f3de6f791fd6', rating='5.0'),
 Row(userId='157121878', movieId='1dcf23ef-0e8e-396e-8e0d-79210a4d377c', rating='4.0'),
 Row(userId='885108513', movieId='286d8c12-e104-3d60-b889-b5a7cd4ee790', rating='2.5'),
 Row(userId='782233804', movieId='0f697f41-3c8c-325d-b462-9ed0e1d75c26', rating='4.0'),
 Row(userId='965033056', movieId='36745381-5769-3fcf-b6b4-1d1f7a63ed48', rating='2.0')]

In [11]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Step 1: Extract distinct movieId and assign integer IDs starting from 1
movieId_mapping = (
    cleaned_df.select("movieId")
    .distinct()
    .withColumn("movieId_int_raw", monotonically_increasing_id())
    .withColumn(
        "movieId_int",
        row_number().over(Window.orderBy("movieId_int_raw"))  # Assign sequential IDs starting from 1
    )
    .drop("movieId_int_raw")  # Clean up temporary column
)

In [12]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import SparkSession


# Step 2: Join the mapping back to the original DataFrame
df_mapped = cleaned_df.join(movieId_mapping, on="movieId", how="inner").select("rating", "movieId_int", "userId")

# Step 3: Show the results
df_mapped.show()

24/12/02 15:49:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:37 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:38 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:39 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:40 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+------+-----------+---------+
|rating|movieId_int|   userId|
+------+-----------+---------+
|   3.5|          1|178644083|
|   1.0|          1|907061450|
|   3.0|          1|792095246|
|   0.5|          2|974698819|
|   3.0|          1|892741758|
|   3.0|          1|782435102|
|   4.5|          1|834408232|
|   2.0|          1|958841443|
|   4.0|          3|976511688|
|   5.0|          3|880644209|
|   4.0|          1|878614196|
|   4.0|          2|978565554|
|   3.5|          3|978402656|
|   5.0|          1|889869448|
|   3.0|          1|788234294|
|   1.5|          1|904386321|
|   3.5|          1|904293150|
|   4.0|          3|907550627|
|   3.5|          1|860966783|
|   4.0|          3|970884536|
+------+-----------+---------+
only showing top 20 rows



In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

# Step 1: Create a window specification ordered by userId
window_spec = Window.orderBy("userId")

# Step 2: Assign new user IDs starting from 1
df_ranked = (
    df_mapped
    .withColumn("userId_new", dense_rank().over(window_spec))
)

# Step 3: Drop old userId column and rename userId_new (optional)
df_final = (
    df_ranked
    .drop("userId")
    .withColumnRenamed("userId_new", "userId")
)

# Show the results
df_final.show(20)

24/12/02 15:49:41 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:41 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:41 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:56 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:49:56 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+------+-----------+------+
|rating|movieId_int|userId|
+------+-----------+------+
|   4.0|        820|     1|
|   3.0|        864|     1|
|   5.0|        417|     1|
|   5.0|        429|     2|
|   5.0|        488|     2|
|   4.5|       1088|     3|
|   3.5|        720|     4|
|   1.0|       1151|     4|
|   4.0|       1224|     5|
|   3.5|       1382|     6|
|   1.5|        282|     6|
|   5.0|        632|     7|
|   5.0|        658|     8|
|   5.0|       1339|     9|
|   5.0|       1309|     9|
|   5.0|        935|    10|
|   5.0|       1199|    10|
|   5.0|         57|    10|
|   5.0|       1151|    10|
|   5.0|       1332|    10|
+------+-----------+------+
only showing top 20 rows



                                                                                

In [14]:
df_final.describe().show()

24/12/02 15:50:05 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:05 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:19 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:19 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:19 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-------+------------------+------------------+------------------+
|summary|            rating|       movieId_int|            userId|
+-------+------------------+------------------+------------------+
|  count|           4086018|           4086018|           4086018|
|   mean| 3.613420082828808| 769.1483397772599| 603560.4970580159|
| stddev|1.2770091826290464|435.67092766487906|362347.54740908265|
|    min|               0.5|                 1|                 1|
|    max|               5.0|              1498|           1384746|
+-------+------------------+------------------+------------------+



                                                                                

In [15]:
from pyspark.sql.functions import col

df_final = df_final.withColumn("rating", col("rating").cast("float"))


In [16]:
#df_final.printSchema()

root
 |-- rating: float (nullable = true)
 |-- movieId_int: integer (nullable = true)
 |-- userId: integer (nullable = true)



In [17]:
df_final.select("userId", "movieId_int", "rating").summary("count").show()

24/12/02 15:50:33 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:33 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-------+-------+-----------+-------+
|summary| userId|movieId_int| rating|
+-------+-------+-----------+-------+
|  count|4086018|    4086018|4086018|
+-------+-------+-----------+-------+



                                                                                

In [18]:
df_final = df_final.dropna(subset=["userId", "movieId_int", "rating"])

In [19]:
# Count interactions per user
df_final.groupBy("userId").count().orderBy("count").show(5)

# Count interactions per movie
df_final.groupBy("movieId_int").count().orderBy("count").show(5)

24/12/02 15:50:56 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:50:56 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+------+-----+
|userId|count|
+------+-----+
|    14|    1|
|    23|    1|
|    17|    1|
|    13|    1|
|    19|    1|
+------+-----+
only showing top 5 rows



24/12/02 15:51:34 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:34 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:35 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:35 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-----------+-----+
|movieId_int|count|
+-----------+-----+
|       1158|    1|
|        425|    1|
|       1498|    1|
|       1157|    2|
|        715|    2|
+-----------+-----+
only showing top 5 rows



                                                                                

Analysis with genres

## 1. Filter Users and Movies with Sufficient Interactions

In [20]:
# Filter users and movies with at least 5 interactions
df_final = df_final.join(
    df_final.groupBy("userId").count().filter("count >= 5").select("userId"),
    on="userId"
).join(
    df_final.groupBy("movieId_int").count().filter("count >= 5").select("movieId_int"),
    on="movieId_int"
)

In [21]:
from pyspark.sql.functions import col

# Filter users with at least 5 interactions
user_interactions = df_final.groupBy("userId").count().filter("count >= 5").select("userId")

# Filter movies with at least 5 interactions
movie_interactions = df_final.groupBy("movieId_int").count().filter("count >= 5").select("movieId_int")

# Join back to the main dataset
df_filtered = df_final.join(user_interactions, on="userId").join(movie_interactions, on="movieId_int")

# Check new counts
print(f"Filtered count: {df_filtered.count()}")

24/12/02 15:51:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:51:45 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

Filtered count: 2359777


                                                                                

## 3. Train the ALS Model

In [22]:
from pyspark.ml.recommendation import ALS

# Train-test split
(training, test) = df_final.randomSplit([0.8, 0.2])

from pyspark.ml.recommendation import ALS

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userId",
    itemCol="movieId_int",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True  # Enforce nonnegative factors
)

model = als.fit(training)

24/12/02 15:54:48 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:54:48 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:54:48 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:54:48 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:54:48 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

In [23]:
training.printSchema()


root
 |-- movieId_int: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: float (nullable = true)



## 4. Generate Predictions


In [24]:
predictions = model.transform(test)
predictions.show()

24/12/02 15:56:38 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:56:38 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:56:38 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:56:38 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 15:56:38 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-----------+------+------+----------+
|movieId_int|userId|rating|prediction|
+-----------+------+------+----------+
|          1|  2327|   3.0| 1.8056486|
|          1|  3336|   2.5| 2.5912154|
|          1|  3416|   3.5| 3.3103907|
|          1|  4342|   3.0| 3.1841657|
|          1|  4672|   3.0| 3.2311711|
|          1|  5849|   2.5| 2.6152978|
|          1|  6011|   4.0| 3.1791594|
|          1|  6712|   4.5|  2.941024|
|          1|  8270|   3.0|  2.595817|
|          1|  8418|   3.5|  2.508561|
|          1|  8994|   3.5|  2.834766|
|          1| 10613|   1.0| 2.8775067|
|          1| 10621|   4.0| 3.1116025|
|          1| 10623|   4.0|  3.263725|
|          1| 10810|   2.5| 3.3972583|
|          1| 11387|   3.0| 2.7022038|
|          1| 12841|   3.0| 2.8283086|
|          1| 14799|   4.5|   3.49982|
|          1| 15778|   4.0|   3.77731|
|          1| 16269|   3.5| 3.4396787|
+-----------+------+------+----------+
only showing top 20 rows



                                                                                

In [25]:
model.explainParams()

"blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)\ncoldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)\nitemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: movieId_int)\npredictionCol: prediction column name. (default: prediction)\nuserCol: column name for user ids. Ids must be within the integer value range. (default: user, current: userId)"

In [26]:
#item factors Yes
model.itemFactors.show(10, truncate = False)

+---+----------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                              |
+---+----------------------------------------------------------------------------------------------------------------------+
|10 |[0.0, 0.0, 0.5587536, 0.4260583, 1.0419513, 0.79300445, 0.5416639, 0.059982933, 0.529111, 0.20405497]                 |
|20 |[0.34628347, 0.17921154, 0.0, 0.14491725, 0.5581346, 0.7882183, 0.40903693, 0.1079608, 1.2102369, 0.0]                |
|30 |[0.9325258, 0.6691315, 0.041025948, 0.0, 0.06938319, 0.4106829, 0.48541582, 0.63843083, 0.5566391, 0.688988]          |
|40 |[0.0, 0.33796015, 1.2438678, 0.7055885, 0.0, 0.0, 0.1560386, 0.0, 0.9549518, 0.29498246]                              |
|50 |[0.3227716, 0.7888561, 0.13605022, 0.4608307, 1.404714, 0.22631904, 0.0, 0.4129716, 0.4157511, 0.29806834]            |


In [27]:
# Check the schema of df_final and movie
df_final.printSchema()
movies.printSchema()

root
 |-- movieId_int: integer (nullable = true)
 |-- userId: integer (nullable = true)
 |-- rating: float (nullable = true)

root
 |-- movieTitle: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- quote: string (nullable = true)
 |-- reviewId: string (nullable = true)
 |-- isVerified: string (nullable = true)
 |-- isSuperReviewer: string (nullable = true)
 |-- hasSpoilers: string (nullable = true)
 |-- hasProfanity: string (nullable = true)
 |-- score: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- userDisplayName: string (nullable = true)
 |-- userRealm: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- movieYear: string (nullable = true)
 |-- movieURL: string (nullable = true)
 |-- critic_score: string (nullable = true)
 |-- critic_sentiment: string (nullable = true)
 |-- audience_score: string (nullable = true)
 |-- audience_sentiment: string (nullable = true)
 |-- release_date_the

In [30]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Add a synthetic `movieId_int` column to `movies`
movies = movies.withColumn("movieId_int", F.row_number().over(Window.orderBy(F.lit(1))) - 1)


In [31]:
# Repartition using the correct column
df_filtered = df_filtered.repartition(200, "movieId_int")
movies = movies.repartition(200, "movieId_int")


In [None]:
# Perform the join
result = df_filtered.join(movies, on="movieId_int", how="left")

# Show the result
result.show()


24/12/02 16:06:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:06:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:06:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:06:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:06:20 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-----------+------+------+----------+--------------------+------+--------------------+--------------------+----------+---------------+-----------+------------+-----+--------------------+---------------+---------+------+---------+--------------------+------------+----------------+--------------+------------------+---------------------+----------------------+-----------------+----+---------+---------+----------------+---------+---------------+----+--------------------+--------------------+-----------+--------------+--------------------+--------------+
|movieId_int|userId|rating|movieTitle|             movieId|rating|               quote|            reviewId|isVerified|isSuperReviewer|hasSpoilers|hasProfanity|score|        creationDate|userDisplayName|userRealm|userId|movieYear|            movieURL|critic_score|critic_sentiment|audience_score|audience_sentiment|release_date_theaters|release_date_streaming|original_language|Rank|Worldwide| Domestic|Domestic_percent|  Foreign|Foreign_perce

                                                                                

## 5. Add Movie Metadata

In [64]:
predictions_with_metadata = predictions_with_metadata.na.drop()
predictions_with_metadata.show(10, truncate = False)

24/12/02 19:30:17 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 19:30:17 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 19:30:17 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 19:30:17 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 19:30:17 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-----------+------+------+----------+
|movieId_int|userId|rating|prediction|
+-----------+------+------+----------+
|1          |2327  |3.0   |2.6354458 |
|1          |3336  |2.5   |2.7389197 |
|1          |3416  |3.5   |3.145992  |
|1          |4342  |3.0   |3.1080115 |
|1          |4672  |3.0   |3.0366933 |
|1          |5849  |2.5   |2.8740804 |
|1          |6011  |4.0   |3.1449518 |
|1          |6712  |4.5   |2.8929288 |
|1          |8270  |3.0   |2.5986078 |
|1          |8418  |3.5   |2.758317  |
+-----------+------+------+----------+
only showing top 10 rows



                                                                                

In [35]:
predictions = predictions.na.drop()
predictions.show(10, truncate = False)

24/12/02 16:13:16 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:13:16 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:13:16 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:13:16 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:13:16 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-----------+------+------+----------+
|movieId_int|userId|rating|prediction|
+-----------+------+------+----------+
|1          |35254 |3.5   |3.3540092 |
|1          |50700 |3.0   |3.0752802 |
|1          |56907 |2.0   |2.8402524 |
|1          |70558 |4.0   |2.638674  |
|1          |76018 |2.0   |2.9413228 |
|1          |95320 |1.0   |2.6249087 |
|1          |103021|1.5   |2.661216  |
|1          |107831|2.5   |2.6878371 |
|1          |122044|4.0   |3.3329022 |
|1          |122462|3.0   |2.8830755 |
+-----------+------+------+----------+
only showing top 10 rows



                                                                                

## 6. Evaluate Model Performance


In [36]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error (RMSE): {rmse}")

24/12/02 16:14:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:14:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:14:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:14:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:14:10 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

Root-mean-square error (RMSE): 1.2216656912492991


                                                                                

## 7. Generate Top-N Recommendations


In [39]:
from pyspark.sql import functions as F

# Explode recommendations and select necessary columns
recommendations_with_metadata = user_recommendations.withColumn("recommendations", F.explode("recommendations")) \
    .select(F.col("userId").alias("recommendation_userId"),  # Rename `userId` from recommendations
            F.col("recommendations.movieId_int").alias("movieId_int"),
            F.col("recommendations.rating").alias("predicted_rating")) \
    .join(movies, on="movieId_int", how="left")

# Select the columns explicitly after the join to avoid ambiguity
recommendations_with_metadata = recommendations_with_metadata.select(
    F.col("recommendation_userId").alias("userId"),  # Rename back to `userId` for clarity
    "movieTitle",
    "theme",
    "predicted_rating"
)

# Show top recommendations
recommendations_with_metadata.show(truncate=False)


24/12/02 16:21:38 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:21:59 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:22:03 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:22:03 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 534:>                                                        (0 + 1) / 1]

+------+--------------+--------------------------------------------+----------------+
|userId|movieTitle    |theme                                       |predicted_rating|
+------+--------------+--------------------------------------------+----------------+
|316   |Halloween Ends|Terrifying, haunted, and supernatural horror|4.7743845       |
|2428  |Halloween Ends|Terrifying, haunted, and supernatural horror|5.091102        |
|7213  |Halloween Ends|Terrifying, haunted, and supernatural horror|4.978832        |
|14512 |Halloween Ends|Terrifying, haunted, and supernatural horror|4.831298        |
|31327 |Halloween Ends|Terrifying, haunted, and supernatural horror|4.7017183       |
|35831 |Halloween Ends|Terrifying, haunted, and supernatural horror|3.8768892       |
|38499 |Halloween Ends|Terrifying, haunted, and supernatural horror|3.8880491       |
|41044 |Halloween Ends|Terrifying, haunted, and supernatural horror|3.616927        |
|41402 |Halloween Ends|Terrifying, haunted, and supern

                                                                                

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize ALS model
als = ALS(
    userCol="userId",
    itemCol="movieId_int",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True
)

# Define hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [10, 15, 20]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Define cross-validator
crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3  # 3-fold cross-validation
)

# Fit cross-validated model
cvModel = crossval.fit(training)

# Get the best model
bestModel = cvModel.bestModel

# Evaluate RMSE on test data
predictions = bestModel.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Best RMSE after tuning: {rmse}")

24/12/02 16:22:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:22:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:22:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:22:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 16:22:47 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

Best RMSE after tuning: 1.1724722066558328


                                                                                

In [41]:
# Generate top 10 recommendations for each user
user_recommendations = bestModel.recommendForAllUsers(10)

# Show recommendations
user_recommendations.show(truncate=False)

24/12/02 17:22:27 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 19.0 in stage 16805.0 (TID 41080) (hub-hub-msca-bdp-dphub-student-irenegliu-sw-m10n.c.msca-bdp-student-ap.internal executor 70): java.lang.StackOverflowError
	at java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2322)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectI

+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                        |
+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|108   |[{287, 4.6287036}, {33, 4.4925776}, {893, 4.2635665}, {827, 4.2542906}, {973, 4.2146444}, {758, 4.1807256}, {1430, 4.152198}, {243, 4.1373477}, {1062, 4.13365}, {1438, 4.124554}]     |
|183   |[{626, 4.859819}, {1483, 4.8264127}, {425, 4.80347}, {1006, 4.7864857}, {974, 4.784842}, {1169, 4.771915}, {999, 4.763179}, {1324, 4.7601247}, {912, 4.7380548}, {190, 4.736757}]      |
|210   |[{758, 4.7943316}, {1092, 4

                                                                                

In [42]:
from pyspark.sql.functions import explode, col

# Explode the recommendations into separate rows
recommendations_exploded = user_recommendations.withColumn("recommendation", explode(col("recommendations"))) \
    .select(
        col("userId"),
        col("recommendation.movieId_int").alias("movieId_int"),
        col("recommendation.rating").alias("predicted_rating")
    )

# Show the exploded recommendations
recommendations_exploded.show(truncate=False)

24/12/02 17:24:40 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 40.0 in stage 16898.0 (TID 41206) (hub-hub-msca-bdp-dphub-student-irenegliu-sw-m10n.c.msca-bdp-student-ap.internal executor 71): java.lang.StackOverflowError
	at java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:3109)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1837)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at scala.collect

+------+-----------+----------------+
|userId|movieId_int|predicted_rating|
+------+-----------+----------------+
|108   |287        |4.6287036       |
|108   |33         |4.4925776       |
|108   |893        |4.2635665       |
|108   |827        |4.2542906       |
|108   |973        |4.2146444       |
|108   |758        |4.1807256       |
|108   |1430       |4.152198        |
|108   |243        |4.1373477       |
|108   |1062       |4.13365         |
|108   |1438       |4.124554        |
|183   |626        |4.859819        |
|183   |1483       |4.8264127       |
|183   |425        |4.80347         |
|183   |1006       |4.7864857       |
|183   |974        |4.784842        |
|183   |1169       |4.771915        |
|183   |999        |4.763179        |
|183   |1324       |4.7601247       |
|183   |912        |4.7380548       |
|183   |190        |4.736757        |
+------+-----------+----------------+
only showing top 20 rows



                                                                                

In [44]:
# Join recommendations with movie metadata
recommendations_with_metadata = recommendations_exploded.join(
    movies.select("movieId_int", "MovieTitle", "theme"),
    on="movieId_int",
    how="left"
)

# Show the recommendations with metadata
recommendations_with_metadata.select("userId", "MovieTitle", "theme", "predicted_rating").show(truncate=False)

24/12/02 17:26:08 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:26:17 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 29.0 in stage 16991.0 (TID 41347) (hub-hub-msca-bdp-dphub-student-irenegliu-sw-m10n.c.msca-bdp-student-ap.internal executor 74): java.lang.StackOverflowError
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2411)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.read

+-------+-----------------------------------+-------------------------------------------+----------------+
|userId |MovieTitle                         |theme                                      |predicted_rating|
+-------+-----------------------------------+-------------------------------------------+----------------+
|219745 |Beautiful Creatures                |Dreamlike, quirky, and surreal storytelling|2.8646429       |
|231767 |Beautiful Creatures                |Dreamlike, quirky, and surreal storytelling|2.9300952       |
|1104235|Beautiful Creatures                |Dreamlike, quirky, and surreal storytelling|4.5392985       |
|902388 |Beautiful Creatures                |Dreamlike, quirky, and surreal storytelling|3.3647702       |
|6482   |Captain America: The Winter Soldier|Adrenaline-fueled action and fast cars     |3.353751        |
|87560  |Captain America: The Winter Soldier|Adrenaline-fueled action and fast cars     |3.2008884       |
|94159  |Captain America: The Winter 

                                                                                

In [50]:
# Filter recommendations for a specific user
specific_user_id = 1104235 
user_specific_recommendations = recommendations_with_metadata.filter(col("userId") == specific_user_id)

# Show recommendations for the specific user
user_specific_recommendations.select("MovieTitle", "theme", "predicted_rating").show(truncate=False)


24/12/02 17:37:24 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:37:34 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:37:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:37:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:37:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-------------------------------+-------------------------------------------------------+----------------+
|MovieTitle                     |theme                                                  |predicted_rating|
+-------------------------------+-------------------------------------------------------+----------------+
|Alice in Wonderland            |Epic adventure and breathtaking battles                |4.5392985       |
|Widows                         |Suspenseful crime thrillers                            |4.693705        |
|The Super Mario Bros. Movie    |Fantasy adventure, heroism, and swordplay              |4.832755        |
|The Last Exorcism              |Gothic and eerie haunting horror                       |5.2730346       |
|Monster                        |Touching and sentimental family stories                |4.5331173       |
|Interstellar                   |Surreal and thought-provoking visions of life and death|4.7532506       |
|Monster Hunter                 |Fant

                                                                                

In [62]:
users = df_filtered.limit(1)  # Select only 1 user
userSubsetRecs = bestModel.recommendForUserSubset(users, 5)  # Reduce recommendations to 5
userSubsetRecs.show(5, truncate=False)


24/12/02 18:29:18 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 18:29:18 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 18:29:18 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 18:29:18 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 18:29:18 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+------+------------------------------------------------------------------------------------------+
|userId|recommendations                                                                           |
+------+------------------------------------------------------------------------------------------+
|12841 |[{1169, 4.179165}, {758, 4.090262}, {1348, 4.0093217}, {893, 3.9605584}, {1356, 3.948859}]|
+------+------------------------------------------------------------------------------------------+



In [None]:
# Generate top 10 user recommendations for a specified set of movies
movies = df_filtered.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = bestModel.recommendForItemSubset(movies, 10)

movieSubSetRecs.show(10, truncate=False)

24/12/02 17:52:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:52:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:52:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:52:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/02 17:52:36 WARN org.apache.spark.sql.execution.window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performanc

+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId_int|recommendations                                                                                                                                                                                                    |
+-----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|471        |[{903238, 5.740219}, {618004, 5.4523745}, {1296599, 5.2391396}, {780480, 5.2292323}, {599162, 5.215063}, {705797, 5.1975346}, {1207349, 5.189906}, {607575, 5.1779366}, {1306081, 5.1622267}, {1339490, 5.1390586}]|
|463        |[{656651, 5.778818}, {1162315, 5.7720027}, {594565, 5.7492023}, {331719, 5.743198},