# A Basic Movie Recommendation Program

In [1]:
from pyspark.sql import  SparkSession
from pyspark.sql.types import IntegerType, FloatType 
from pyspark.sql.functions import split, col


spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()

ratings = spark.read.text("ml-1m/ratings.dat")

ratings = ratings.withColumn("value", split(col("value"), "::")) \
    .select(
        col("value")[0].cast(IntegerType()).alias("user"),
        col("value")[1].cast(IntegerType()).alias("product"),
        col("value")[2].cast(FloatType()).alias("rating")
    )

ratings.show(10, truncate=False)



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/24 20:32:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+----+-------+------+
|user|product|rating|
+----+-------+------+
|1   |1193   |5.0   |
|1   |661    |3.0   |
|1   |914    |3.0   |
|1   |3408   |4.0   |
|1   |2355   |5.0   |
|1   |1197   |3.0   |
|1   |1287   |5.0   |
|1   |2804   |5.0   |
|1   |594    |4.0   |
|1   |919    |4.0   |
+----+-------+------+
only showing top 10 rows



In [2]:
movies = spark.read.text("ml-1m/movies.dat")

# Process the data
movies = movies.withColumn("value", split(col("value"), "::")) \
    .select(
        col("value")[0].cast(IntegerType()).alias("movie"),
        col("value")[1].alias("title"),
        split(col("value")[2], "\\|").alias("genre")
    )

movies.show(10, truncate=False)

+-----+----------------------------------+--------------------------------+
|movie|title                             |genre                           |
+-----+----------------------------------+--------------------------------+
|1    |Toy Story (1995)                  |[Animation, Children's, Comedy] |
|2    |Jumanji (1995)                    |[Adventure, Children's, Fantasy]|
|3    |Grumpier Old Men (1995)           |[Comedy, Romance]               |
|4    |Waiting to Exhale (1995)          |[Comedy, Drama]                 |
|5    |Father of the Bride Part II (1995)|[Comedy]                        |
|6    |Heat (1995)                       |[Action, Crime, Thriller]       |
|7    |Sabrina (1995)                    |[Comedy, Romance]               |
|8    |Tom and Huck (1995)               |[Adventure, Children's]         |
|9    |Sudden Death (1995)               |[Action]                        |
|10   |GoldenEye (1995)                  |[Action, Adventure, Thriller]   |
+-----+-----

In [3]:
animated_movies = movies.rdd \
    .filter(lambda row: "Animation" in row["genre"]) \
    .take(10)

for movie in animated_movies:
    print(movie)

Row(movie=1, title='Toy Story (1995)', genre=['Animation', "Children's", 'Comedy'])
Row(movie=13, title='Balto (1995)', genre=['Animation', "Children's"])
Row(movie=48, title='Pocahontas (1995)', genre=['Animation', "Children's", 'Musical', 'Romance'])
Row(movie=239, title='Goofy Movie, A (1995)', genre=['Animation', "Children's", 'Comedy', 'Romance'])
Row(movie=244, title='Gumby: The Movie (1995)', genre=['Animation', "Children's"])
Row(movie=313, title='Swan Princess, The (1994)', genre=['Animation', "Children's"])
Row(movie=364, title='Lion King, The (1994)', genre=['Animation', "Children's", 'Musical'])
Row(movie=558, title='Pagemaster, The (1994)', genre=['Action', 'Adventure', 'Animation', "Children's", 'Fantasy'])
Row(movie=588, title='Aladdin (1992)', genre=['Animation', "Children's", 'Comedy', 'Musical'])
Row(movie=594, title='Snow White and the Seven Dwarfs (1937)', genre=['Animation', "Children's", 'Musical'])


In [4]:
personal_ratings = [
    ("Toy Story (1995)", 5.0),
    ("Jumanji (1995)", 4.0),
    ("Pocahontas (1995)", 5.0),
    ("Aladdin (1992)", 5.0),
    ("Inferno (1980)", 1.0),
    ("Balto (1995)", 5.0),
    ("Man of the Year (1995)", 5.0),
    ("Mortal Kombat (1995)", 1.0),
    ("Ace Ventura: When Nature Calls (1995)", 1.0),
    ("Heat (1995)", 1.0),
    ("Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)", 1.0),
    ("Saving Private Ryan (1998)", 1.0),
    ("Sixth Sense, The (1999)", 1.0),
    ("Wrong Trousers, The (1993)", 1.0),
    ("Indiana Jones and the Last Crusade (1989)", 1.0),
    ("Godfather, The (1972)", 1.0),
    ("Fugitive, The (1993)", 1.0),
    ("Silence of the Lambs, The (1991)", 2.0)
]

columns = ["title", "rating"]

personal_ratings = spark.createDataFrame(personal_ratings, columns)

In [5]:
from pyspark.sql.functions import lit


user_id = 0

# Join `personal_ratings` with `movies` on the "title" column
normalized_ratings = personal_ratings.join(movies, on="title") \
    .select(
        lit(user_id).alias("user"), 
        movies["movie"].alias("product"),
        personal_ratings["rating"]
    )



movie_ids = normalized_ratings.select("product").rdd.map(lambda row: row["product"]).collect()

print("Movie IDs:", movie_ids)

Movie IDs: [1, 2, 48, 588, 3587, 13, 137, 44, 19, 6, 2019, 2028, 2762, 1148, 1291, 858, 457, 593]


In [6]:

# Split the ratings DataFrame into 90% training and 10% test
set = ratings.randomSplit([0.9, 0.1], seed=12345)

training = normalized_ratings.union(set[0]).cache()


training.show(10)

test = set[1].cache()

test.show(5)

print(f"Training: {training.count()}, Test: {test.count()}")

                                                                                

+----+-------+------+
|user|product|rating|
+----+-------+------+
|   0|      1|   5.0|
|   0|      2|   4.0|
|   0|     48|   5.0|
|   0|    588|   5.0|
|   0|   3587|   1.0|
|   0|     13|   5.0|
|   0|    137|   5.0|
|   0|     44|   1.0|
|   0|     19|   1.0|
|   0|      6|   1.0|
+----+-------+------+
only showing top 10 rows

+----+-------+------+
|user|product|rating|
+----+-------+------+
|   1|   1207|   4.0|
|   1|   1246|   4.0|
|   1|   2028|   5.0|
|   1|   2804|   5.0|
|   1|   3114|   4.0|
+----+-------+------+
only showing top 5 rows

Training: 900442, Test: 99785


In [7]:
from pyspark.ml.recommendation import ALS

# Define ALS model parameters
rank = 10
num_iterations = 40
regularization = 0.09

# Initialize ALS
als = ALS(
    rank=rank,
    maxIter=num_iterations,
    regParam=regularization,
    userCol="user",
    itemCol="product",
    ratingCol="rating",
    coldStartStrategy="drop" 
)

model = als.fit(training)

# Model type
print(type(model))

24/11/24 20:32:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/11/24 20:32:44 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


<class 'pyspark.ml.recommendation.ALSModel'>


In [8]:
from pyspark.ml.evaluation import RegressionEvaluator

users_products = movies.select(lit(user_id).alias("user"), movies["movie"]).rdd.map(lambda row: (row[0], row[1]))


users_products_df = users_products.toDF(["user", "product"])
joined_df = users_products_df.join(normalized_ratings, on=["user","product"])

predictions = model.transform(users_products_df)

predictions.show()

+----+-------+-----------+
|user|product| prediction|
+----+-------+-----------+
|   0|    148|-0.15345615|
|   0|    463|  2.3202193|
|   0|    471| 0.62753725|
|   0|    496| -1.6967096|
|   0|    833|  0.9868451|
|   0|   1088|  3.7997017|
|   0|   1238|  1.9333942|
|   0|   1342|  1.7283303|
|   0|   1580|  2.0095618|
|   0|   1591|  0.7100055|
|   0|   1645|  1.1646177|
|   0|   1829|  1.4399781|
|   0|   1959|  2.7971826|
|   0|   2122|  1.2953333|
|   0|   2142|   3.418547|
|   0|   2366|  2.3574135|
|   0|   2659|  1.8270967|
|   0|   2866|  2.3011346|
|   0|   3175|   2.460287|
|   0|   3749|  1.4143329|
+----+-------+-----------+
only showing top 20 rows



In [9]:
# Filter the predictions for user ID 0
df = predictions.filter(col("user") == 0)

sorted_df = df.orderBy(col("prediction").desc())

result_df = (
    sorted_df.join(movies, sorted_df["product"] == movies["movie"])
    .select("movie", "title", "genre", 'prediction')
)


result_df.show(truncate=False)


+-----+------------------------------------------+-------------------------------------+-----------+
|movie|title                                     |genre                                |prediction |
+-----+------------------------------------------+-------------------------------------+-----------+
|148  |Awfully Big Adventure, An (1995)          |[Drama]                              |-0.15345615|
|463  |Guilty as Sin (1993)                      |[Crime, Drama, Thriller]             |2.3202193  |
|471  |Hudsucker Proxy, The (1994)               |[Comedy, Romance]                    |0.62753725 |
|496  |What Happened Was... (1994)               |[Comedy, Drama, Romance]             |-1.6967096 |
|833  |High School High (1996)                   |[Comedy]                             |0.9868451  |
|1088 |Dirty Dancing (1987)                      |[Musical, Romance]                   |3.7997017  |
|1238 |Local Hero (1983)                         |[Comedy]                             |1.9

In [10]:
from pyspark.sql.functions import explode


# Generate top 10 recommendations for user 0
user_recommendations = model.recommendForAllUsers(10)
user_recommendations = user_recommendations.filter("user = 0")
user_recommendations = user_recommendations.select(col("user"), explode(col("recommendations").alias("recommendation"))) \
.select(col("user"),
        col("col.product").alias("product"),
        col("col.rating").alias("predicted_rating")
       )

user_recommendations = user_recommendations.join(
    movies,
    user_recommendations["product"] == movies["movie"]
).select("user", "movie", "title", "genre", 'predicted_rating')
user_recommendations.show(truncate=False)

# Generate top 10 recommendations for  product 2101
item_recommendations = model.recommendForAllItems(5)
item_recommendations = item_recommendations.filter("product = 138")
item_recommendations.show(truncate=False)

                                                                                

+----+-----+-----------------------------------------------+-----------------------+----------------+
|user|movie|title                                          |genre                  |predicted_rating|
+----+-----+-----------------------------------------------+-----------------------+----------------+
|0   |138  |Neon Bible, The (1995)                         |[Drama]                |5.7786355       |
|0   |687  |Country Life (1994)                            |[Drama, Romance]       |5.31579         |
|0   |3853 |Tic Code, The (1998)                           |[Drama]                |5.054108        |
|0   |2101 |Squanto: A Warrior's Tale (1994)               |[Adventure, Drama]     |4.991754        |
|0   |831  |Stonewall (1995)                               |[Drama]                |4.971883        |
|0   |137  |Man of the Year (1995)                         |[Documentary]          |4.7907467       |
|0   |2156 |Best Man, The (Il Testimone dello sposo) (1997)|[Comedy, Drama]       



+-------+-----------------------------------------------------------------------------------------+
|product|recommendations                                                                          |
+-------+-----------------------------------------------------------------------------------------+
|138    |[{0, 5.7786355}, {1213, 5.520504}, {1111, 5.206867}, {2151, 5.1378045}, {1657, 4.964856}]|
+-------+-----------------------------------------------------------------------------------------+



                                                                                

In [11]:
spark.stop()