In [1]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from collections import Counter

In [3]:
spark = SparkSession.builder.appName("CollaborativeBookSong").getOrCreate()

In [4]:
df = spark.read.parquet("hdfs://localhost:9000/golden/matched_user_book_song.parquet")

In [5]:
user_indexer = StringIndexer(inputCol="user_id", outputCol="userIndex").fit(df)
song_indexer = StringIndexer(inputCol="song_id", outputCol="songIndex").fit(df)
book_indexer = StringIndexer(inputCol="book_id", outputCol="bookIndex").fit(df)

df = user_indexer.transform(df)
df = song_indexer.transform(df)
df = book_indexer.transform(df)

df = df.withColumn("rating", col("songIndex") * 0 + 1)

In [6]:
als = ALS(
    userCol="userIndex",
    itemCol="songIndex",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    implicitPrefs=True
)
als_model = als.fit(df)

In [7]:
user_id_to_index = dict(df.select("user_id", "userIndex").distinct().rdd.map(lambda r: (r[0], int(r[1]))).collect())
song_index_to_id = dict(df.select("song_id", "songIndex").distinct().rdd.map(lambda r: (int(r[1]), r[0])).collect())

In [8]:
def recommend_for_user_and_book(df_spark, als_model, target_user, target_book, top_n_users=5, top_n_songs=5):
    user_books_df = df_spark.select("user_id", "book_id").distinct()
    user_books = user_books_df.groupBy("user_id").agg(collect_list("book_id").alias("books"))
    user_books_dict = {row["user_id"]: set(row["books"]) for row in user_books.collect()}

    if target_user not in user_books_dict:
        print(f"User {target_user} not found.")
        return []

    target_books = user_books_dict[target_user]
    similarities = []
    for other_user, books in user_books_dict.items():
        if other_user == target_user:
            continue
        shared_books = len(target_books.intersection(books))
        similarities.append((other_user, shared_books))

    top_users = [u for u, _ in sorted(similarities, key=lambda x: x[1], reverse=True)[:top_n_users]]

    filtered_songs_df = df_spark.filter((col("user_id").isin(top_users)) & (col("book_id") == target_book))
    songs = filtered_songs_df.select("song_id").rdd.flatMap(lambda x: x).collect()
    popular_songs = Counter(songs)
    top_song_ids = [s for s, _ in popular_songs.most_common(top_n_songs * 2)]

    if target_user not in user_id_to_index:
        print("User not indexed in ALS.")
        return []

    user_idx = user_id_to_index[target_user]

    song_indices = [int(r.songIndex) for r in df_spark.filter(col("song_id").isin(top_song_ids)).select("songIndex").distinct().collect()]
    test_df = spark.createDataFrame([(user_idx, s_idx) for s_idx in song_indices], ["userIndex", "songIndex"])
    predictions = als_model.transform(test_df).dropna()

    predicted = predictions.rdd.map(lambda row: (song_index_to_id[row.songIndex], row.prediction)).collect()
    top_predicted_songs = [s for s, _ in sorted(predicted, key=lambda x: x[1], reverse=True)[:top_n_songs]]
    return top_predicted_songs

In [9]:
target_user = "d8f55c9e774ddb880968a1ee57e3b86d"
target_book = "37470"

recommendations = recommend_for_user_and_book(df, als_model, target_user, target_book)
print(f"Recommended songs for user '{target_user}' and book '{target_book}':")
print(recommendations)

Recommended songs for user 'd8f55c9e774ddb880968a1ee57e3b86d' and book '37470':
['Guilty', 'Speakeasy', 'Sweet Thing', 'You', 'Money On Fleek']
