In [266]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("MovieRatings").getOrCreate()

In [267]:
# read csv
movie_ratings = spark.read.csv("../data/ml-100k/u.data", sep="\t", inferSchema=True)
# we only care about 2nd and 3rd column
movie_ratings = movie_ratings.select(["_c0", "_c1", "_c2"])
# descriptive column names
column_names = ["user_id", "movie_id", "rating"]
movie_ratings = movie_ratings.toDF(*column_names)

movie_ratings.show(5)

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|    196|     242|     3|
|    186|     302|     3|
|     22|     377|     1|
|    244|      51|     2|
|    166|     346|     1|
+-------+--------+------+
only showing top 5 rows



In [268]:
target_movie_id = 50

# find all users who rated the movie
user_rows = (
    movie_ratings.filter(movie_ratings["movie_id"] == target_movie_id)
    .select("user_id")
    .collect()
)
users = [row["user_id"] for row in user_rows]

# the target ratings of a given user is the rating he gave to the target movie
target_ratings = (
    movie_ratings.filter(movie_ratings["movie_id"] == target_movie_id)
    .select(["user_id", "rating"])
    .withColumnRenamed("rating", "target_rating")
)

# df with the ratings given by the users who watched the target movie
users_ratings = movie_ratings.filter(movie_ratings["user_id"].isin(users))

# add target rating to this dataset
users_ratings = users_ratings.join(target_ratings, "user_id", "inner")

users_ratings.show(5)

+-------+--------+------+-------------+
|user_id|movie_id|rating|target_rating|
+-------+--------+------+-------------+
|     22|     377|     1|            5|
|    244|      51|     2|            5|
|    298|     474|     4|            5|
|    115|     265|     2|            5|
|    253|     465|     5|            4|
+-------+--------+------+-------------+
only showing top 5 rows



In [269]:
def rounded_rmse(col1, col2):
    mse = F.mean((col1 - col2) ** 2)
    rmse = F.sqrt(mse)
    return F.round(rmse, 3)


def rounded_cosine_sim(col1, col2):
    dot_product = F.sum(col1 * col2)
    norm_1 = F.sqrt(F.sum(col1**2))
    norm_2 = F.sqrt(F.sum(col2**2))
    cosine_sim = dot_product / (norm_1 * norm_2)
    return F.round(cosine_sim, 3)


recommendations = users_ratings.groupBy("movie_id").agg(
    rounded_cosine_sim(users_ratings["rating"], users_ratings["target_rating"]).alias(
        "cosine_sim"
    ),
    F.median(users_ratings["rating"]).alias("median_rating"),
    # (rounded_rmse(users_ratings["rating"], users_ratings["target_rating"])).alias("rmse"),
    F.count(users_ratings["rating"]).alias("num_of_ratings"),
)

# only recommend movies rated by at least 50 people who also rated the target movie
recommendations = recommendations.filter(recommendations["num_of_ratings"] >= 50)

recommendations.sort("cosine_sim", ascending=False).show(5)

+--------+----------+-------------+--------------+
|movie_id|cosine_sim|median_rating|num_of_ratings|
+--------+----------+-------------+--------------+
|      50|       1.0|          5.0|           583|
|     172|      0.99|          4.0|           345|
|     181|     0.986|          4.0|           480|
|     174|     0.982|          5.0|           380|
|     141|     0.979|          4.0|            68|
+--------+----------+-------------+--------------+
only showing top 5 rows



In [270]:
# read csv
movie_info = spark.read.csv("../data/ml-100k/u.item", sep="|", inferSchema=True)

# genre data is one hot encoded, we want to undo this encoding
# meaning of columns _c5 to _c23
genre_columns = [
    "Unknown",
    "Action",
    "Adventure",
    "Animation",
    "Childrens",
    "Comedy",
    "Crime",
    "Documentary",
    "Drama",
    "Fantasy",
    "Film-Noir",
    "Horror",
    "Musical",
    "Mystery",
    "Romance",
    "SciFi",
    "Thriller",
    "War",
    "Western",
]
# create genre column with the genre as a string
movie_info = movie_info.withColumn("genre", F.lit("Unknown"))
for i in range(19):
    column_name = "_c" + str(i + 5)
    genre = genre_columns[i]
    movie_info = movie_info.withColumn(
        "genre",
        F.when(movie_info[column_name] == 1, genre).otherwise(movie_info["genre"]),
    )

# movie_name info is in the column _c1
movie_info = movie_info.withColumn("movie_name", F.split("_c1", pattern=" \(")[0])

# drop all columns but genre, movie_name and movie_id
movie_info = movie_info.withColumnRenamed("_c0", "movie_id")
movie_info = movie_info.select(["movie_id", "movie_name", "genre"])

movie_info.show(5)

+--------+----------+--------+
|movie_id|movie_name|   genre|
+--------+----------+--------+
|       1| Toy Story|  Comedy|
|       2| GoldenEye|Thriller|
|       3|Four Rooms|Thriller|
|       4|Get Shorty|   Drama|
|       5|   Copycat|Thriller|
+--------+----------+--------+
only showing top 5 rows



In [271]:
# join movie_info to movie_ratings so that movie_ratings now displays the movie name
named_recommendations = recommendations.join(movie_info, "movie_id", "left")
named_recommendations = named_recommendations.sort("cosine_sim", ascending=False)
named_recommendations.show(10)

+--------+----------+-------------+--------------+--------------------+---------+
|movie_id|cosine_sim|median_rating|num_of_ratings|          movie_name|    genre|
+--------+----------+-------------+--------------+--------------------+---------+
|      50|       1.0|          5.0|           583|           Star Wars|      War|
|     172|      0.99|          4.0|           345|Empire Strikes Ba...|      War|
|     181|     0.986|          4.0|           480|  Return of the Jedi|      War|
|     174|     0.982|          5.0|           380|Raiders of the Lo...|Adventure|
|     141|     0.979|          4.0|            68|20,000 Leagues Un...|    SciFi|
|     178|     0.978|          4.0|           109|        12 Angry Men|    Drama|
|     408|     0.978|          5.0|            92|      Close Shave, A| Thriller|
|     498|     0.976|          4.0|           138|  African Queen, The|      War|
|     194|     0.975|          4.0|           204|          Sting, The|    Crime|
|     169|     0

In [272]:
# Posible ideas for better recommendations

# Only recommend movies with above average ratings
median_rating = movie_ratings.agg(F.median("rating")).collect()[0]["median(rating)"]
better_recommendations = named_recommendations.filter(
    named_recommendations["median_rating"] >= median_rating
)

# Only recommend movies with the same genre
target_genre = movie_info.filter(movie_info["movie_id"] == target_movie_id).collect()[
    0
]["genre"]
better_recommendations = better_recommendations.filter(
    better_recommendations["genre"] == target_genre
)

better_recommendations.show(10)

+--------+----------+-------------+--------------+--------------------+-----+
|movie_id|cosine_sim|median_rating|num_of_ratings|          movie_name|genre|
+--------+----------+-------------+--------------+--------------------+-----+
|      50|       1.0|          5.0|           583|           Star Wars|  War|
|     172|      0.99|          4.0|           345|Empire Strikes Ba...|  War|
|     181|     0.986|          4.0|           480|  Return of the Jedi|  War|
|     498|     0.976|          4.0|           138|  African Queen, The|  War|
|     199|     0.973|          4.0|           145|Bridge on the Riv...|  War|
|     483|     0.973|          5.0|           214|          Casablanca|  War|
|     651|     0.972|          4.0|           155|               Glory|  War|
|     511|     0.969|          4.0|           153|  Lawrence of Arabia|  War|
|     176|     0.967|          4.0|           259|              Aliens|  War|
|     318|     0.966|          5.0|           247|    Schindler'

In [273]:
spark.stop()