In [40]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np
from pyspark.sql.types import FloatType


In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("content-based") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/10 21:45:52 WARN Utils: Your hostname, Hungs-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.97.154 instead (on interface en0)
25/08/10 21:45:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/10 21:45:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# UDF to calculate cosine similarity
def cosine_similarity(v1, v2):
    """
    return cosine similirity of 2 two vector a and b
    """
    v1 = np.array(v1.toArray())
    v2 = np.array(v2.toArray())
    return float(np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)))

cosine_sim_udf = f.udf(cosine_similarity, FloatType())

# Convert array to vector
to_vector = f.udf(lambda row: Vectors.dense(row), VectorUDT())

# UDF to multiply vector by scalar
def scale_vector(scalar, vector):
    return Vectors.dense([scalar * x for x in vector])

scale_vector_udf = f.udf(scale_vector, VectorUDT())

# UDF to sum a list of vectors
def sum_vectors(vectors):
    if not vectors:
        return None
    length = len(vectors[0])
    summed = [0.0] * length
    for v in vectors:
        for i in range(length):
            summed[i] += v[i]
    return Vectors.dense(summed)

sum_vectors_udf = f.udf(sum_vectors, VectorUDT())

# UDF to normalize a vector (L2 norm)
def normalize_vector(vector):
    return vector / np.linalg.norm(vector)

normalize_vector_udf = f.udf(normalize_vector, VectorUDT())

In [99]:
user_ids = [1, 2, 3]
top_k = 5
rating_threshold = 2

In [100]:
# load movies and ratings data
movies_df = spark.read.parquet("../data/movie.parquet")
ratings_df = spark.read.parquet("../data/rating.parquet")

In [101]:

ratings_df = ratings_df.filter(f.col("user_id").isin(user_ids)).select(["user_id", "movie_id", "rating"])

In [102]:
movies_df.show(2)

+--------+----------------+--------------------+
|movie_id|           title|              genres|
+--------+----------------+--------------------+
|       1|Toy Story (1995)|Adventure|Animati...|
|       2|  Jumanji (1995)|Adventure|Childre...|
+--------+----------------+--------------------+
only showing top 2 rows


In [103]:
ratings_df.show(2)

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|      1|       2|   3.5|
|      1|      29|   3.5|
+-------+--------+------+
only showing top 2 rows


In [104]:
# Create movie-genre pairs
movie_genre_pairs_df = movies_df \
    .withColumn("genres_array", f.split(f.col("genres"), "\\|")) \
    .withColumn("genre", f.explode("genres_array")) \
    .select(["movie_id", "genre"])

movie_genre_pairs_df.show(2)

+--------+---------+
|movie_id|    genre|
+--------+---------+
|       1|Adventure|
|       1|Animation|
+--------+---------+
only showing top 2 rows


In [105]:
# Get unique genres
genres = movie_genre_pairs_df.select("genre").distinct().sort("genre").rdd.map(lambda x: x.genre).collect()
print(genres)

['(no genres listed)', 'Action', 'Adventure', 'Animation', 'Children', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'IMAX', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']


In [106]:
# Create movie features
movie_features_df = movie_genre_pairs_df \
                .groupBy("movie_id") \
                .pivot("genre") \
                .agg(f.lit(1)) \
                .na.fill(0) \
                .withColumn("feature", to_vector(f.array(genres))) \
                .select(["movie_id", "feature"])

movie_features_df.show(2)

[Stage 849:>                                                        (0 + 1) / 1]

+--------+--------------------+
|movie_id|             feature|
+--------+--------------------+
|   89844|[0.0,0.0,0.0,0.0,...|
|   90817|[0.0,0.0,1.0,0.0,...|
+--------+--------------------+
only showing top 2 rows


                                                                                

In [107]:
# Keep only high ratings to aggrergate user profiles (default is 3)

high_ratings = ratings_df.filter(f"rating >= {rating_threshold}")
high_ratings.show(2)

+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|      1|       2|   3.5|
|      1|      29|   3.5|
+-------+--------+------+
only showing top 2 rows


In [108]:
# Create user profiles
# For each user do:
    # 1. calculate weighted vector for each movie the user has rated highly
    # 2. sum weighted vectors to get user profile
    # 3. normalize user profile using L2 norm
user_profiles_df = high_ratings.alias("hr") \
    .join(movie_features_df.alias("mf"), f.col("hr.movie_id") == f.col("mf.movie_id")) \
    .withColumn("weighted_feature", scale_vector_udf("hr.rating", "mf.feature")) \
    .groupBy("user_id") \
    .agg(sum_vectors_udf(f.collect_list("weighted_feature")).alias("sum_weighted_features")) \
    .withColumn("profile", normalize_vector_udf("sum_weighted_features")) \
    .select("user_id", "profile")

user_profiles_df.show(truncate=False)



+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|profile                                                                                                                                                                                                                                                                                                                                                                           |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [109]:
users_df = user_profiles_df.select("user_id").distinct()

users_df.show(2)

+-------+
|user_id|
+-------+
|      1|
|      3|
+-------+
only showing top 2 rows


In [110]:
user_movie_all = users_df.crossJoin(movie_features_df)

user_movie_all.show(2)

+-------+--------+--------------------+
|user_id|movie_id|             feature|
+-------+--------+--------------------+
|      1|   89844|[0.0,0.0,0.0,0.0,...|
|      3|   89844|[0.0,0.0,0.0,0.0,...|
+-------+--------+--------------------+
only showing top 2 rows


In [111]:
users_candidates_df = user_movie_all \
    .join(ratings_df.select("user_id", "movie_id"), on=["user_id", "movie_id"], how="left_anti") \
    .select(["user_id", "movie_id", f.col("feature").alias("movie_feature")])

    
users_candidates_df.show(truncate=False)

+-------+--------+---------------------------------------------------------------------------------+
|user_id|movie_id|movie_feature                                                                    |
+-------+--------+---------------------------------------------------------------------------------+
|1      |89844   |[0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|
|3      |89844   |[0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|
|2      |89844   |[0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|
|1      |90817   |[0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|
|3      |90817   |[0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|
|2      |90817   |[0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]|
|1      |5518    |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,

In [112]:
user_profiles_df.show(2)



+-------+--------------------+
|user_id|             profile|
+-------+--------------------+
|      1|[0.0,0.4155676657...|
|      3|[0.0,0.3793613604...|
+-------+--------------------+
only showing top 2 rows


                                                                                

In [113]:
user_profiles_candidates_features_df = users_candidates_df.alias("uc") \
    .join(user_profiles_df.alias("up"), f.col("uc.user_id") == f.col("up.user_id")) \
    .select(["uc.user_id", "uc.movie_id", "uc.movie_feature", "up.profile"]) \
   
user_profiles_candidates_features_df.show(2)

                                                                                

+-------+--------+--------------------+--------------------+
|user_id|movie_id|       movie_feature|             profile|
+-------+--------+--------------------+--------------------+
|      1|   89844|[0.0,0.0,0.0,0.0,...|[0.0,0.4155676657...|
|      1|   90817|[0.0,0.0,1.0,0.0,...|[0.0,0.4155676657...|
+-------+--------+--------------------+--------------------+
only showing top 2 rows


In [None]:
user_profiles_candidates_features_df \
    .withColumn("similarity", cosine_sim_udf(f.col("movie_feature"), f.col("profile"))) \
    .withColumn("rank", f.row_number().over(
        Window.partitionBy("user_id").orderBy(f.col("similarity").desc())
    )) \
    .filter(f"rank <= {top_k}").show()

[Stage 984:>                                                        (0 + 1) / 1]

+-------+--------+--------------------+--------------------+----------+----+
|user_id|movie_id|       movie_feature|             profile|similarity|rank|
+-------+--------+--------------------+--------------------+----------+----+
|      1|   72165|[0.0,1.0,1.0,0.0,...|[0.0,0.4155676657...|0.87137735|   1|
|      1|    2617|[0.0,1.0,1.0,0.0,...|[0.0,0.4155676657...|0.87137735|   2|
|      1|  117646|[0.0,1.0,1.0,0.0,...|[0.0,0.4155676657...| 0.8668946|   3|
|      1|   49593|[0.0,1.0,1.0,0.0,...|[0.0,0.4155676657...|0.83419394|   4|
|      1|    2429|[0.0,1.0,1.0,0.0,...|[0.0,0.4155676657...| 0.8340474|   5|
|      2|   91500|[0.0,1.0,1.0,0.0,...|[0.0,0.4162350293...|0.91169214|   1|
|      2|   48774|[0.0,1.0,1.0,0.0,...|[0.0,0.4162350293...|0.91169214|   2|
|      2|   27618|[0.0,1.0,1.0,0.0,...|[0.0,0.4162350293...|0.91169214|   3|
|      2|    8361|[0.0,1.0,1.0,0.0,...|[0.0,0.4162350293...|0.91169214|   4|
|      2|   58025|[0.0,1.0,1.0,0.0,...|[0.0,0.4162350293...|0.91169214|   5|

                                                                                