In [24]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("ContentBasedFiltering").config("spark.sql.execution.arrow.maxRecordsPerBatch", 50).getOrCreate()
data = spark.read.option("header", "true").csv("new_data.csv")
data = data.na.drop(subset=["tags"])
tokenizer = Tokenizer(inputCol="tags", outputCol="words")
data = tokenizer.transform(data)
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
data = remover.transform(data)
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures")
data = hashingTF.transform(data)
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(data)
data = idfModel.transform(data)
assembler = VectorAssembler(inputCols=["features"], outputCol="feature_vector")
data = assembler.transform(data)
normalizer = Normalizer(inputCol="feature_vector", outputCol="normalized_features")
data = normalizer.transform(data)

selected_data = data.select("name", "normalized_features")

def get_top_movies(movie_name, top_n):
    movie_data = selected_data.filter(col("name") == movie_name)
    cross_joined_data = selected_data.crossJoin(movie_data.withColumnRenamed("name", "movie_name_2").withColumnRenamed("normalized_features", "normalized_features_2"))
    def cosine_similarity(v1, v2):
        return float(v1.dot(v2) / (v1.norm(2) * v2.norm(2)))
    cosine_similarity_udf = spark.udf.register("cosine_similarity", cosine_similarity)
    result = cross_joined_data.withColumn("similarity", cosine_similarity_udf("normalized_features", "normalized_features_2")).filter(col("name") != col("movie_name_2"))
    result = result.withColumn("similarity", col("similarity").cast("double"))
    result = result.withColumn("similarity", col("similarity").cast("decimal(10,5)"))
    recommendations = (
        result.select(col("name").alias("movie_name"), "similarity")
        .orderBy(col("similarity").desc())
        .limit(top_n)
    )
    print(f"Recommendations for movie {movie_name} are:")
    recommendations.show(100, truncate=False)


                                                                                

In [25]:
get_top_movies("Loki", 30)

23/12/02 20:59:51 WARN SimpleFunctionRegistry: The function cosine_similarity replaced a previously registered function.


Recommendations for movie Loki are:


23/12/02 20:59:51 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
23/12/02 20:59:59 WARN DAGScheduler: Broadcasting large task binary with size 18.7 MiB
[Stage 46:>                                                         (0 + 1) / 1]

+-----------------------------------------------------------------------------+----------+
|movie_name                                                                   |similarity|
+-----------------------------------------------------------------------------+----------+
|Thor & Loki: Blood Brothers                                                  |0.16683   |
|Westinghouse Desilu Playhouse                                                |0.16055   |
|Mythical Detective Loki Ragnarok                                             |0.14933   |
|Fate/Apocrypha                                                               |0.13609   |
|The Falcon and the Winter Soldier                                            |0.11557   |
|Don't Waste Your Time, Lovers                                                |0.09179   |
|Is It Wrong to Try to Pick Up Girls in a Dungeon? On the Side: Sword Oratoria|0.08925   |
|3 Body Problem                                                               |0.08489   |

                                                                                