In [1]:
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import pyspark
    
# get the sparkcontext to communicate with the cluster
cf = SparkConf()
cf.set("spark.submit.deployMode","client")
cf.set('spark.sql.repl.eagerEval.enabled', True)

cf.set('spark.driver.memory','16g')
cf.set("spark.executor.instances", "8")
sc = SparkContext.getOrCreate(cf)

spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/08 00:57:08 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/05/08 00:57:08 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/05/08 00:57:08 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/05/08 00:57:08 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [2]:
spark

In [3]:
anime_data_path = 'animedata/anime-dataset-2023_new.csv'
animedata = spark.read.format("csv").option("header", "true") \
                                   .option("headers", "true") \
                                   .option('escape','"') \
                                   .option("multiLine","true")\
                                   .option("inferSchema", "true") \
                                   .load(anime_data_path, sep=',')

                                                                                

In [4]:
animedata = animedata.limit(1000)


In [5]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, Normalizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import BucketedRandomProjectionLSH, StopWordsRemover
from pyspark.sql.functions import col
# Initialize Spark Session
spark = SparkSession.builder.appName("ContentBasedRecommender").getOrCreate()


animedata  = animedata.na.drop(subset=["Synopsis"])
animedata = animedata.filter(col("Synopsis").isNotNull() & (col("Synopsis") != "") & (col("Synopsis") != "UNKNOWN") & (col("Synopsis") != "No description available for this anime.") )
# Preprocessing: Tokenize the Synopsis text
tokenizer = Tokenizer(inputCol="Synopsis", outputCol="words")
words_data = tokenizer.transform(animedata)

# Optionally remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_df = remover.transform(words_data)

# Feature Extraction: Apply HashingTF and IDF
hashing_tf = HashingTF(inputCol="words", outputCol="raw_features")
featurized_data = hashing_tf.transform(filtered_df)
idf = IDF(inputCol="raw_features", outputCol="idf_features")
idf_model = idf.fit(featurized_data)
tfidf_data = idf_model.transform(featurized_data)

# Normalize the features
normalizer = Normalizer(inputCol="idf_features", outputCol="features")
normalized_data = normalizer.transform(tfidf_data)

# Approximate Similarity Join
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, numHashTables=3)
model = brp.fit(normalized_data)
hashed_df = model.transform(normalized_data)

                                                                                

In [6]:
hashed_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- anime_id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- English name: string (nullable = true)
 |-- Other name: string (nullable = true)
 |-- Score: string (nullable = true)
 |-- Genres: string (nullable = true)
 |-- Synopsis: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Episodes: string (nullable = true)
 |-- Aired: string (nullable = true)
 |-- Premiered: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Producers: string (nullable = true)
 |-- Licensors: string (nullable = true)
 |-- Studios: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Duration: string (nullable = true)
 |-- Rating: string (nullable = true)
 |-- Rank: string (nullable = true)
 |-- Popularity: integer (nullable = true)
 |-- Favorites: integer (nullable = true)
 |-- Scored By: string (nullable = true)
 |-- Members: integer (nullable = true)
 |-- Image URL: string (nullable = true)
 |-- word

In [7]:
def recommend_anime(anime_id, top_n):
    # Filter for the given anime_id
    query_df = hashed_df.filter(hashed_df.anime_id == anime_id)

    # Compute the approx similarity join
    similar_anime = model.approxSimilarityJoin(query_df, hashed_df, threshold=1.5, distCol="EuclideanDistance")

    # Select the anime and their distances
    similar_anime_df = similar_anime.select(col("datasetB.anime_id").alias("anime_id"), col("EuclideanDistance"))
    
    # Filter out the query anime and limit to top N
    similar_anime_df = similar_anime_df.filter(similar_anime_df.anime_id != anime_id).orderBy("EuclideanDistance").limit(top_n)

    return similar_anime_df

# Example usage
specific_anime_id = 1  # Replace with an actual anime_id
recommended_anime = recommend_anime(specific_anime_id, 5)

recommended_anime.show()

24/05/08 00:58:24 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/05/08 00:58:25 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
24/05/08 00:58:29 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.5 MiB
24/05/08 00:58:34 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.1 MiB


+--------+------------------+
|anime_id| EuclideanDistance|
+--------+------------------+
|       5|1.2856004070547875|
|     693|1.3710705584445335|
|     977|1.3735517622751743|
|    1059|1.3739296232000533|
|     387|1.3775137152845998|
+--------+------------------+



                                                                                

In [8]:
final_recommendations = recommended_anime.join(animedata, "anime_id")

In [9]:
animedata.filter(col('anime_id')==specific_anime_id).select("anime_id","Name", "Synopsis").show(truncate = False)

+--------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
final_recommendations.select("anime_id", "EuclideanDistance", "Name", "Synopsis").show(truncate = False)

24/05/08 00:58:45 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
24/05/08 00:58:47 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.5 MiB
24/05/08 00:58:48 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.1 MiB
                                                                                

+--------+------------------+---------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [11]:
from pyspark.sql.functions import col, rank
from pyspark.sql.window import Window

def get_top_5_recommendations_for_all_anime(df, model):
    # Perform the approx similarity join for the whole dataset
    similar_animes_df = model.approxSimilarityJoin(df, df, threshold=float("inf"), distCol="EuclideanDistance") \
                            .filter("datasetA.anime_id != datasetB.anime_id")  # Exclude same anime comparisons


    similar_animes_df = similar_animes_df.select(
        col("datasetA.anime_id").alias("anime_id"),
        col("datasetB.anime_id").alias("similar_anime_id"),
        col("EuclideanDistance")
    )

    # Add a rank column based on EuclideanDistance for each anime
    windowSpec = Window.partitionBy("anime_id").orderBy("EuclideanDistance")
    similar_animes_df = similar_animes_df.withColumn("rank", rank().over(windowSpec))

    # Filter to keep only top 5 similar animes for each anime
    top_5_similar_animes_df = similar_animes_df.filter(col("rank") <= 5)

    return top_5_similar_animes_df

# Apply the function to get top 5 recommendations for all animes
top_5_recommendations_for_all_animes = get_top_5_recommendations_for_all_anime(hashed_df, model)

In [12]:
top_5_recommendations_for_all_animes.show(5)

24/05/08 00:59:15 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
24/05/08 00:59:18 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.5 MiB
[Stage 27:>                                                         (0 + 1) / 1]

+--------+----------------+------------------+----+
|anime_id|similar_anime_id| EuclideanDistance|rank|
+--------+----------------+------------------+----+
|       1|               5|1.2856004070547875|   1|
|       1|             693|1.3710705584445335|   2|
|       1|             977|1.3735517622751743|   3|
|       1|            1059|1.3739296232000533|   4|
|       1|             387|1.3775137152845998|   5|
+--------+----------------+------------------+----+
only showing top 5 rows



                                                                                

In [13]:
top_5_recommendations_for_all_animes.printSchema()

root
 |-- anime_id: integer (nullable = true)
 |-- similar_anime_id: integer (nullable = true)
 |-- EuclideanDistance: double (nullable = false)
 |-- rank: integer (nullable = true)



In [14]:
from pyspark.sql.functions import col

# Join to add Names  primary Animes (Anime_id_A)
recommendations_with_details = top_5_recommendations_for_all_animes.alias("recs").join(
    animedata.alias("anime"),
    col("recs.anime_id") == col("anime.anime_id")
).select(
    col("recs.anime_id").alias("anime_id_A"),
    col("anime.Name").alias("Name_A"),
    col("recs.similar_anime_id"),
    col("recs.EuclideanDistance")
)

# Join to add Names  primary Animes (Anime_id_B)
final_recommendations = recommendations_with_details.alias("recs").join(
    animedata.alias("anime"),
    col("recs.similar_anime_id") == col("anime.anime_id")
).select(
    col("recs.anime_id_A"),
    col("recs.Name_A"),
    col("anime.Name").alias("Name_B"),
    col("recs.EuclideanDistance").alias("distCol")
)

# Show the final DataFrame
# final_recommendations.show(truncate=False)

In [16]:
content_based = final_recommendations.toPandas()
# content_based.to_csv("animedata/anime_tfidf_reco.csv")


24/05/08 01:03:19 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.2 MiB
24/05/08 01:03:22 WARN org.apache.spark.scheduler.DAGScheduler: Broadcasting large task binary with size 10.5 MiB
                                                                                

In [17]:
content_based[20:40]

Unnamed: 0,anime_id_A,Name_A,Name_B,distCol
20,8,Bouken Ou Beet,Akahori Gedou Hour Rabuge,1.369051
21,8,Bouken Ou Beet,Eyeshield 21,1.373967
22,8,Bouken Ou Beet,Bishoujo Senshi Sailor Moon S,1.375497
23,8,Bouken Ou Beet,Vampire Hunter D,1.375647
24,8,Bouken Ou Beet,Top wo Nerae 2! Diebuster,1.376359
25,15,Eyeshield 21,Witch Hunter Robin,1.359213
26,15,Eyeshield 21,Bouken Ou Beet,1.373967
27,15,Eyeshield 21,Hajime no Ippo,1.377655
28,15,Eyeshield 21,Mahoromatic: Automatic Maiden,1.37834
29,15,Eyeshield 21,Tetsuwan Birdy,1.379137
