In [2]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, from_json, col, explode,lower, collect_list,concat_ws
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [3]:
spark = init_spark()
movies_df = spark.read.csv(os.path.join("data/movies_metadata.csv"), header=True, inferSchema=True)

# Select columns we are interested in
movies_df = movies_df.select("id", "title", "genres").distinct()
genres_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
])

# Convert the "genres" column to an array of structs
movies_df = movies_df.withColumn("genres", from_json(col("genres"), ArrayType(genres_schema)))

movies_df = movies_df.select("id", "title", explode(col("genres")).alias("genre")) \
               .select("id", "title", col("genre.name").alias("genre_name")) \
               .dropna()

# Group the data by id and title, and collect the genre names into a list
movies_df = movies_df.groupBy("id", "title").agg(collect_list("genre_name").alias("genres"))

# Combine the genre names into one string
movies_df = movies_df.withColumn("genres", concat_ws(",", "genres"))
movies_df.show(truncate=False)


+------+--------------------------------------------------+------------------------------------------------+
|id    |title                                             |genres                                          |
+------+--------------------------------------------------+------------------------------------------------+
|10001 |Young Einstein                                    |Comedy,Science Fiction                          |
|100010|Flight Command                                    |Drama,War                                       |
|10002 |Mona Lisa                                         |Drama,Crime,Romance                             |
|100024|Bloodwork                                         |Horror,Thriller                                 |
|100032|Released                                          |Drama,Action                                    |
|10004 |Desperation                                       |Drama,Fantasy,Horror,Thriller,Mystery           |
|10006 |Wild Seven 

In [4]:
# Split the genres column into multiple columns
genres = movies_df.select("genres").rdd.flatMap(lambda x: x[0].split(',')).distinct().collect()
for genre in genres:
    movies_df = movies_df.withColumn(genre, col("genres").contains(genre).cast("int"))
movies_df.show(truncate=False)



+------+--------------------------------------------------+------------------------------------------------+---------------+-----+-------+------+--------+------+-------+------+-----------+-------+--------+------+---+-----+-------+---------+---------+-------+-----+-------+
|id    |title                                             |genres                                          |Science Fiction|Drama|Romance|Horror|Thriller|Action|Fantasy|Family|Documentary|Western|TV Movie|Comedy|War|Crime|Mystery|Adventure|Animation|History|Music|Foreign|
+------+--------------------------------------------------+------------------------------------------------+---------------+-----+-------+------+--------+------+-------+------+-----------+-------+--------+------+---+-----+-------+---------+---------+-------+-----+-------+
|10001 |Young Einstein                                    |Comedy,Science Fiction                          |1              |0    |0      |0     |0       |0     |0      |0     |0    

In [5]:
# Create a feature vector for each movie
assembler = VectorAssembler(inputCols=genres, outputCol="features")
movies_df = assembler.transform(movies_df)
movies_df.show()

+------+--------------------+--------------------+---------------+-----+-------+------+--------+------+-------+------+-----------+-------+--------+------+---+-----+-------+---------+---------+-------+-----+-------+--------------------+
|    id|               title|              genres|Science Fiction|Drama|Romance|Horror|Thriller|Action|Fantasy|Family|Documentary|Western|TV Movie|Comedy|War|Crime|Mystery|Adventure|Animation|History|Music|Foreign|            features|
+------+--------------------+--------------------+---------------+-----+-------+------+--------+------+-------+------+-----------+-------+--------+------+---+-----+-------+---------+---------+-------+-----+-------+--------------------+
| 10001|      Young Einstein|Comedy,Science Fi...|              1|    0|      0|     0|       0|     0|      0|     0|          0|      0|       0|     1|  0|    0|      0|        0|        0|      0|    0|      0|(20,[0,11],[1.0,1...|
|100010|      Flight Command|           Drama,War|      

In [6]:
from pyspark.ml.clustering import KMeans

# Train and fit to kmeans model (can change k and seed later)
kmeans = KMeans(k=20)
model = kmeans.fit(movies_df.select("features"))

In [13]:
def recommend_movies(moviesInput, ratings):
    # Create a dataframe with the input movies and their ratings
    input_df = spark.createDataFrame(zip(moviesInput, ratings), schema=["title", "rating"])

    # Join the input movies with the movies dataframe to get their features
    input_features_df = input_df.join(movies_df, on="title", how="inner").select("id", "features", "rating")
    print("input features df")
    input_features_df.show()

    print("ids of inputs")

    inputIds = []
    for row in input_features_df.select("id").collect():
        inputIds.append(row["id"])
    print(inputIds)

    # Make predictions for the input movies using the kmeans model
    predictions_df = model.transform(input_features_df).drop("features")
    print("predictions df")
    predictions_df.show()

    # Get the cluster label for the input movies
    cluster_label = predictions_df.select("prediction").distinct().collect()[0]["prediction"]
    print("cluster label")
    print(cluster_label)

    # Get the ids of the movies in the same cluster as the input movies
    cluster_movies_df = model.transform(movies_df).filter(col("prediction") == cluster_label)
    cluster_movies_ids = cluster_movies_df.select("id").rdd.flatMap(lambda x: x).collect()
    print("cluster movies id")
    print(cluster_movies_ids)

    # Exclude the input movies from the recommendations AND keep ids in clust
    recommendations_df = movies_df.filter(col('id').isin(cluster_movies_ids))
    recommendations_df = recommendations_df.filter(~col('id').isin(inputIds))

    print("recommendations df (Exclude the input movies from the recommendations AND keep ids in clust)")
    recommendations_df.show(truncate=False)

    #----------------------------------------------------------------------------------


    # Join with the predictions dataframe to get the rating of the recommended movies
    # PROBLEM HERE
    recommendations_df = recommendations_df.join(predictions_df, on="id", how="inner").select("title", "genres", "prediction", "rating")
    print("recommendations df (Join with the predictions dataframe to get the rating of the recommended movies)")
    recommendations_df.show(truncate=False)

    # Sort the recommended movies by their rating and select the top 10
    top_recommendations = recommendations_df.sort(col("rating").desc()).limit(10)

    print("top recommendations df")
    top_recommendations.show(truncate=False)

    return top_recommendations.select("title", "genres").rdd.map(lambda x: (x[0], x[1])).collect()

# Input movies
moviesInput = ["The Notebook"]

# Ratings for the input movies
ratings = [1, 1, 0, 10]

# Call the recommend_movies function
recommended_movies = recommend_movies(moviesInput, ratings)

# Print the recommended movies
print("recommended: ")
print(recommended_movies)


input features df
+------+--------------------+------+
|    id|            features|rating|
+------+--------------------+------+
|   862|(20,[7,11,16],[1....|     1|
| 10009|(20,[7,15,16],[1....|     1|
| 72003|(20,[1,4,5,13],[1...|     0|
|   155|(20,[1,4,5,13],[1...|     0|
|199615|(20,[1,12],[1.0,1...|    10|
| 11036|(20,[1,2],[1.0,1.0])|    10|
+------+--------------------+------+

ids of inputs
['862', '10009', '72003', '155', '199615', '11036']
predictions df
+------+------+----------+
|    id|rating|prediction|
+------+------+----------+
|   862|     1|         3|
| 10009|     1|         3|
| 72003|     0|         9|
|   155|     0|         9|
|199615|    10|         1|
| 11036|    10|         6|
+------+------+----------+

cluster label
3
cluster movies id
['10009', '10112', '101806', '10530', '10555', '105875', '106629', '10674', '10693', '10800', '10837', '10865', '10895', '10948', '10996', '110416', '110420', '11135', '11360', '114718', '11497', '11544', '11619', '11625', '1