In [1]:
import os
os.environ['SPARK_HOME'] = "/opt/spark/spark-3.4.3-bin-hadoop3"


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Set up Spark configuration
conf = SparkConf() \
    .setAppName("ALS Model") \
    .setMaster("spark://nhatdm2k4:7077") \
    .set("spark.driver.memory", "8g") \
    .set("spark.executor.memory", "8g")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

df = spark.read.csv('final_dataset.csv', header=True, inferSchema=True)

24/05/28 07:12:37 WARN Utils: Your hostname, nhatdm2k4 resolves to a loopback address: 127.0.1.1; using 192.168.1.49 instead (on interface wlo1)
24/05/28 07:12:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/28 07:12:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
from pyspark.sql.functions import col

df = df.withColumn("User_Id", col("User_Id").cast("integer"))
df = df.withColumn("Movie_ID", col("Movie_ID").cast("integer"))
df = df.withColumn("Rating", col("Rating").cast("float"))

In [4]:
(training, test) = df.randomSplit([0.8, 0.2], seed=42)

In [5]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="User_Id",
    itemCol="Movie_ID",
    ratingCol="Rating",
    coldStartStrategy="drop"
)

model = als.fit(training)

                                                                                

In [6]:
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Rating",
    predictionCol="prediction"
)

predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")


                                                                                

Root-mean-square error = 0.8078979668028524


In [18]:
from pyspark.sql.functions import col, lit

# Assume 'user_id' is the user for whom we want to recommend movies
user_id = 69

# Generate a DataFrame with all possible user-movie combinations for the given user
user_movies = (
    training.select('Movie_ID').distinct()
    .withColumn('User_Id', lit(user_id))
)

# Predict ratings for these user-movie combinations
predictions = model.transform(user_movies)

# Filter out the movies the user has already rated
rated_movies = training.filter(col('User_Id') == user_id).select('Movie_ID')
predictions = predictions.join(rated_movies, 'Movie_ID', 'left_anti')

# Get top 5 recommendations
top_5_recommendations = predictions.orderBy(col('prediction').desc()).limit(5)

# Join with the original dataset to get movie details
top_5_details = top_5_recommendations.join(training.select('Movie_ID', 'Movie_Name', 'Year').distinct(), on='Movie_ID')

# Show the top 5 recommended movies with their names and years
top_5_details.select('Movie_Name', 'Year', 'prediction').show()




+--------------------+------+----------+
|          Movie_Name|  Year|prediction|
+--------------------+------+----------+
|Born in the USSR:...|2005.0| 5.8548536|
|          Adrenaline|1990.0|  6.451732|
|             Acı Aşk|2009.0| 6.0834537|
|           The Thorn|1971.0| 6.5986204|
|NOFX Backstage Pa...|  null| 6.2526855|
+--------------------+------+----------+



                                                                                

In [22]:
user_id = 22345

# Generate a DataFrame with all possible user-movie combinations for the given user
user_movies = (
    training.select('Movie_ID').distinct()
    .withColumn('User_Id', lit(user_id))
)

# Predict ratings for these user-movie combinations
predictions = model.transform(user_movies)

# Filter out the movies the user has already rated
rated_movies = training.filter(col('User_Id') == user_id).select('Movie_ID')
predictions = predictions.join(rated_movies, 'Movie_ID', 'left_anti')

# Get top 5 recommendations
top_5_recommendations = predictions.orderBy(col('prediction').desc()).limit(5)

# Join with the original dataset to get movie details
top_5_details = top_5_recommendations.join(training.select('Movie_ID', 'Movie_Name', 'Year').distinct(), on='Movie_ID')

# Show the top 5 recommended movies with their names and years
top_5_details.select('Movie_Name', 'Year', 'prediction').show()



+--------------------+------+----------+
|          Movie_Name|  Year|prediction|
+--------------------+------+----------+
|        Civilisation|1969.0| 6.0659013|
|   Truth and Justice|2019.0| 6.3914495|
|          Adrenaline|1990.0| 6.5918508|
|             Acı Aşk|2009.0|  7.182456|
|Täällä Pohjantähd...|1968.0| 6.0449433|
+--------------------+------+----------+



                                                                                

In [13]:
from pyspark.ml.recommendation import ALSModel

# Load the model from the specified directory
loaded_model = ALSModel.load("/home/nhatdm2k4/Documents/test/als_final_model")

# Example DataFrame for inference
new_data = spark.createDataFrame([
    (1, 10),  # (User_Id, Movie_ID)
    (1, 20),
    (2, 10)
], ["User_Id", "Movie_ID"])

# Make predictions using the loaded model
predictions = loaded_model.transform(new_data)
predictions.show()

                                                                                

+-------+--------+----------+
|User_Id|Movie_ID|prediction|
+-------+--------+----------+
|      1|      10| 3.6942236|
|      1|      20|  3.990606|
|      2|      10| 3.8788896|
+-------+--------+----------+



24/05/28 05:54:25 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
24/05/28 05:54:25 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:978)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce

In [7]:
all_movies = df.select("Movie_ID", "Movie_Name", "Year").distinct()


In [8]:
df.show()

+-------+--------------------+------+--------------------+------+--------+
|User_Id|          Movie_Name|Rating|               Genre|  Year|Movie_ID|
+-------+--------------------+------+--------------------+------+--------+
|      1|        Pulp Fiction|   5.0|Comedy|Crime|Dram...|1994.0|       1|
|      1|Three Colors: Red...|   3.5|               Drama|1994.0|       2|
|      1|Three Colors: Blu...|   5.0|               Drama|1993.0|       3|
|      1|         Underground|   5.0|    Comedy|Drama|War|1995.0|       4|
|      1| Singin' in the Rain|   3.5|Comedy|Musical|Ro...|1952.0|       5|
|      1|       Dirty Dancing|   4.0|Drama|Musical|Rom...|1987.0|       6|
|      1|        Delicatessen|   3.5|Comedy|Drama|Romance|1991.0|       7|
|      1|                 Ran|   3.5|           Drama|War|1985.0|       8|
|      1|Seventh Seal, The...|   5.0|               Drama|1957.0|       9|
|      1|Bridge on the Riv...|   4.0| Adventure|Drama|War|1957.0|      10|
|      1|                

In [9]:
all_movies.show()



+--------+--------------------+------+
|Movie_ID|          Movie_Name|  Year|
+--------+--------------------+------+
|     119|Star Trek: First ...|1996.0|
|     308|         Skulls, The|2000.0|
|     765|                Lucy|2014.0|
|    1273|Hudsucker Proxy, The|1994.0|
|    1495|       Wayne's World|1992.0|
|    1520|           Bamba, La|1987.0|
|    1580|Witness for the P...|1957.0|
|    1641|Edukators, The (D...|2004.0|
|    1690|      Batman Returns|1992.0|
|    1879|Visitors, The (Vi...|1993.0|
|    2022|Insidious: Chapter 2|2013.0|
|    2716|Squid and the Wha...|2005.0|
|    3179|       Summer of Sam|1999.0|
|    3241|       Maid to Order|1987.0|
|    3342|      Get the Gringo|2012.0|
|    3546|      One Trick Pony|1980.0|
|    3580|               Angus|1995.0|
|    3622| Crazy for Christmas|2005.0|
|    3681|Legend of the Gal...|1988.0|
|    3952|          Ride Along|2014.0|
+--------+--------------------+------+
only showing top 20 rows



                                                                                

In [None]:
#Fine-tuning
paramGrid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 50, 100]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

cvModel = crossval.fit(training)

bestModel = cvModel.bestModel
predictions = bestModel.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Best model Root-mean-square error = {rmse}")