In [1]:
import os
# Find the latest version of spark 3.x  from https://downloads.apache.org/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.5.5'
spark_version = 'spark-3.5.5'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:5 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import json

In [3]:
# Initialize Spark session
spark = SparkSession.builder.appName("MoviesData").getOrCreate()

# Load the JSON file into a PySpark DataFrame
df = spark.read.json("movie_results.json")  # Replace with your JSON file path

# Show the structure of the JSON to understand the data
df.printSchema()

root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: struct (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- poster_path: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- error: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- origin_country: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production

In [4]:
# Extract the genre names (explode the array of genres to a new row per genre)
df_genres = df.select(
    col("id").alias("movieId"),
    explode(col("genres")).alias("genre")
).select(
    "movieId",
    col("genre.name").alias("genre_name")
)
# Index the genre names (convert them to numerical values)
indexer = StringIndexer(inputCol="genre_name", outputCol="genre_index")

# Encode the genre indices using OneHotEncoder
encoder = OneHotEncoder(inputCol="genre_index", outputCol="genre_vec")

# Combine indexing and encoding in a pipeline
pipeline = Pipeline(stages=[indexer, encoder])
genre_model = pipeline.fit(df_genres)
df_genres_encoded = genre_model.transform(df_genres)

# Now join the original movie DataFrame with the encoded genres
df_selected = df.select(
    col("id").alias("movieId"),
    col("title").alias("name"),
    col("popularity")
)

df_with_genre = df_selected.join(df_genres_encoded, on="movieId", how="left")
df_with_genre.show(truncate=False)
df_with_genre.filter(df_with_genre.movieId == 85 ).show()



+-------+---------------------+----------+---------------+-----------+---------------+
|movieId|name                 |popularity|genre_name     |genre_index|genre_vec      |
+-------+---------------------+----------+---------------+-----------+---------------+
|950396 |The Gorge            |210.407   |Thriller       |3.0        |(18,[3],[1.0]) |
|950396 |The Gorge            |210.407   |Science Fiction|9.0        |(18,[9],[1.0]) |
|950396 |The Gorge            |210.407   |Romance        |5.0        |(18,[5],[1.0]) |
|1126166|Flight Risk          |199.997   |Crime          |7.0        |(18,[7],[1.0]) |
|1126166|Flight Risk          |199.997   |Thriller       |3.0        |(18,[3],[1.0]) |
|1126166|Flight Risk          |199.997   |Action         |1.0        |(18,[1],[1.0]) |
|1064213|Anora                |161.432   |Romance        |5.0        |(18,[5],[1.0]) |
|1064213|Anora                |161.432   |Comedy         |2.0        |(18,[2],[1.0]) |
|1064213|Anora                |161.432   |D

In [5]:
from pyspark.sql.functions import col
ratings_df = spark.read.csv('ratings.csv', header=True, inferSchema=True)
# Load links dataset
links_df = spark.read.csv('links.csv', header=True, inferSchema=True)

# Ensure tmdbId in links_df is an integer
links_df = links_df.withColumn("tmdbId", col("tmdbId").cast("int"))

# Rename movieId to tmdbId in df_with_genre before joining
df_with_genre = df_with_genre.withColumnRenamed("movieId", "tmdbId")

# Join df_with_genre with links_df using tmdbId
df_combined = df_with_genre.join(links_df, on="tmdbId", how="inner")

# Show the result
df_combined.show(10)


+------+--------------------+----------+----------+-----------+---------------+-------+------+
|tmdbId|                name|popularity|genre_name|genre_index|      genre_vec|movieId|imdbId|
+------+--------------------+----------+----------+-----------+---------------+-------+------+
|    85|Raiders of the Lo...|    15.823|    Action|        1.0| (18,[1],[1.0])|   1198| 82971|
|    85|Raiders of the Lo...|    15.823| Adventure|        4.0| (18,[4],[1.0])|   1198| 82971|
|   411|The Chronicles of...|     26.11|   Fantasy|        8.0| (18,[8],[1.0])|  41566|363771|
|   411|The Chronicles of...|     26.11|    Family|       10.0|(18,[10],[1.0])|  41566|363771|
|   411|The Chronicles of...|     26.11| Adventure|        4.0| (18,[4],[1.0])|  41566|363771|
|   808|               Shrek|    14.761|    Family|       10.0|(18,[10],[1.0])|   4306|126029|
|   808|               Shrek|    14.761| Adventure|        4.0| (18,[4],[1.0])|   4306|126029|
|   808|               Shrek|    14.761|   Fantasy

In [6]:
# Now join with ratings DataFrame (assuming 'ratings.csv' exists)
# Join df_combined with rating_df using movieId
rating_df = spark.read.csv('ratings.csv', header=True, inferSchema=True)
final_df = df_combined.join(rating_df, on="movieId", how="inner")

# Select relevant columns
final_df_selected = final_df.select(
    col("userId"),
    col("movieId"),
    col("tmdbId"),
    col("name"),
    col("popularity"),
    col("rating"),
    col("genre_vec")  # Include the genre vector
)

# Show the final DataFrame
final_df_selected.show(10, truncate=False)

+------+-------+------+-----------------------+----------+------+--------------+
|userId|movieId|tmdbId|name                   |popularity|rating|genre_vec     |
+------+-------+------+-----------------------+----------+------+--------------+
|610   |1198   |85    |Raiders of the Lost Ark|15.823    |5.0   |(18,[1],[1.0])|
|606   |1198   |85    |Raiders of the Lost Ark|15.823    |3.5   |(18,[1],[1.0])|
|603   |1198   |85    |Raiders of the Lost Ark|15.823    |4.0   |(18,[1],[1.0])|
|601   |1198   |85    |Raiders of the Lost Ark|15.823    |4.0   |(18,[1],[1.0])|
|600   |1198   |85    |Raiders of the Lost Ark|15.823    |4.0   |(18,[1],[1.0])|
|599   |1198   |85    |Raiders of the Lost Ark|15.823    |3.5   |(18,[1],[1.0])|
|597   |1198   |85    |Raiders of the Lost Ark|15.823    |5.0   |(18,[1],[1.0])|
|596   |1198   |85    |Raiders of the Lost Ark|15.823    |3.5   |(18,[1],[1.0])|
|590   |1198   |85    |Raiders of the Lost Ark|15.823    |5.0   |(18,[1],[1.0])|
|586   |1198   |85    |Raide

In [7]:
# Ensure there are no missing ratings
final_df_selected = final_df_selected.dropna(subset=['rating'])

# Split data into training and test sets
(training_data, test_data) = final_df_selected.randomSplit([0.8, 0.2], seed=1234)

In [15]:
def train_evaluate_als(rank_value=150):
    als = ALS(
        userCol="userId",
        itemCol="tmdbId",
        ratingCol="rating",
        maxIter=10,  # Keeping iterations constant
        rank=rank_value,  # Fixed rank at 50
        regParam=0.05,  # Regularization
        alpha=0.5,  # For implicit feedback (if applicable)
        coldStartStrategy="drop"
    )

    # Train the model
    model = als.fit(training_data)

    # Get predictions
    predictions = model.transform(test_data)

    # Evaluate R-squared
    evaluator_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")
    r2 = evaluator_r2.evaluate(predictions)

    print(f"R-squared for rank={rank_value}: {r2}")

    # Save the trained model
    model.write().overwrite().save("als_movie_model")
    print("Model saved successfully")

    return r2

# Train and evaluate ALS with rank=150
r2_value = train_evaluate_als(rank_value=150)



R-squared for rank=150: 0.8115920628897665
Model saved successfully


In [20]:
from pyspark.ml.recommendation import ALSModel
from pyspark.ml.evaluation import RegressionEvaluator

# Load the saved model
model_path = "/content/als_movie_model"  # Path to the saved best model
model = ALSModel.load(model_path)  # Use ALSModel for loading the saved model
print("Best model loaded successfully!")

# Get predictions using the loaded best model
predictions = model.transform(test_data)  # Use 'model' instead of 'loaded_model'

# Evaluate R-squared
evaluator_r2 = RegressionEvaluator(metricName="r2", labelCol="rating", predictionCol="prediction")
r2 = evaluator_r2.evaluate(predictions)
print(f"R-squared from loaded best model: {r2}")

# Show predictions for the best model
predictions.select("userId", "tmdbId", "rating", "prediction").show(5, truncate=False)




Best model loaded successfully!
R-squared from loaded best model: 0.8115920628897665
+------+------+------+----------+
|userId|tmdbId|rating|prediction|
+------+------+------+----------+
|1     |862   |4.0   |4.4017406 |
|1     |949   |4.0   |4.1285596 |
|1     |8068  |5.0   |4.467547  |
|1     |11017 |5.0   |3.5996635 |
|1     |8467  |5.0   |3.2101717 |
+------+------+------+----------+
only showing top 5 rows



In [21]:
from google.colab import files
import shutil

# Move the saved model to a downloadable location (e.g., compress it into a .zip)
shutil.make_archive("/content/als_movie_model", 'zip', model_path)

# Download the .zip file containing the model
files.download("als_movie_model.zip")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [22]:
from pyspark.sql import functions as F

# Generate top N recommendations for all users (e.g., Top 10)
top_n_recommendations = model.recommendForAllUsers(10)

# Explode the recommendations to get one row per movie recommendation
exploded_recommendations = top_n_recommendations.select(
    "userId",
    F.explode("recommendations").alias("recommendation")
)

# Extract tmdbId and predicted rating from the exploded recommendations
final_recommendations = exploded_recommendations.select(
    "userId",
    F.col("recommendation.tmdbId").alias("tmdbId"),
    F.col("recommendation.rating").alias("predicted_rating")
)

# Scale the predicted rating to a range of 1 to 5
min_rating = final_recommendations.agg({"predicted_rating": "min"}).collect()[0][0]
max_rating = final_recommendations.agg({"predicted_rating": "max"}).collect()[0][0]

scaled_recommendations = final_recommendations.withColumn(
    "scaled_predicted_rating",
    ((F.col("predicted_rating") - min_rating) / (max_rating - min_rating)) * 4 + 1
)

# Join with the movie metadata (movie names, genres, etc.)
final_with_metadata = scaled_recommendations.join(df_with_genre, "tmdbId", "inner")

# To avoid duplicates, we can aggregate genres into a single row per user/movie combination
aggregated_recommendations = final_with_metadata.groupBy(
    "userId", "tmdbId", "scaled_predicted_rating", "name"
).agg(
    F.concat_ws(", ", F.collect_list("genre_name")).alias("genre_name")
)

# Show the final recommendations with both original and scaled predicted ratings
aggregated_recommendations.show(truncate=False)

+------+------+-----------------------+--------------------------------------------------------------------+----------------------------------+
|userId|tmdbId|scaled_predicted_rating|name                                                                |genre_name                        |
+------+------+-----------------------+--------------------------------------------------------------------+----------------------------------+
|1     |278   |4.002888470755939      |The Shawshank Redemption                                            |Crime, Drama                      |
|1     |389   |3.8377558423754348     |12 Angry Men                                                        |Drama                             |
|1     |530   |3.8615396879121593     |A Grand Day Out                                                     |Comedy, Animation, Family         |
|1     |550   |3.8849722790358237     |Fight Club                                                          |Drama                       

In [23]:

output_path = '/content/top_n_recommendations.csv'  # Full path with the file name

# Overwrite the file if it already exists
aggregated_recommendations.write.mode("overwrite").option("header", "true").csv(output_path)


In [24]:
from google.colab import files
import os

# Get the CSV file from the output directory
csv_file = [f for f in os.listdir(output_path) if f.endswith('.csv')][0]  # Find the CSV part file

# Full path to the file to download
full_file_path = os.path.join(output_path, csv_file)

# Download the file to your local machine
files.download(full_file_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>