In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MovieClustering") \
    .getOrCreate()

# Read movies data
movies_df = spark.read.format("csv").option("delimiter", "::").option("header", False).load("/content/drive/MyDrive/ml-1m/movies.dat")
movies_df = movies_df.toDF("movie_id", "movie_name", "genre")

# Read users data
users_df = spark.read.format("csv").option("delimiter", "::").option("header", False).load("/content/drive/MyDrive/ml-1m/users.dat")
users_df = users_df.toDF("user_id", "gender", "age", "occupation", "zip_code")

# Read ratings data
rating_df = spark.read.format("csv").option("delimiter", "::").option("header", False).load("/content/drive/MyDrive/ml-1m/ratings.dat")
rating_df = rating_df.toDF("user_id", "movie_id", "rating", "time")

# Join dataframes
user_ratings = rating_df.join(users_df, "user_id").join(movies_df, "movie_id")

# Select required columns
data = user_ratings.select("user_id", "genre", "rating")

# Convert genre column to array of strings
from pyspark.sql.functions import split
data = data.withColumn("genre", split(data.genre, "\\|"))

# Explode genre column to get individual genres
from pyspark.sql.functions import explode
data = data.withColumn("genre", explode(data.genre))

# Cache the dataframe for better performance
data.cache()

# Get unique genres
genres = data.select("genre").distinct().rdd.flatMap(lambda x: x).collect()

# Function to get genre ratings
from pyspark.sql.functions import avg
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

def get_genre_ratings(rating_df, movies_df, genre):
    windowSpec = Window.partitionBy("user_id")
    genre_ratings = rating_df.join(movies_df, "movie_id") \
        .filter(col("genre").contains(genre)) \
        .select("user_id", "rating") \
        .groupBy("user_id") \
        .agg(avg("rating").alias(genre)) \
        .withColumn("row_number", row_number().over(windowSpec.orderBy(col(genre).desc())))
    genre_ratings = genre_ratings.filter(col("row_number") <= 30).drop("row_number")
    return genre_ratings

# Get genre ratings
genre_ratings = {}
for genre in genres:
    genre_ratings[genre] = get_genre_ratings(rating_df, movies_df, genre)

# Pivot genre ratings
user_genre_ratings = genre_ratings[genres[0]]
for genre in genres[1:]:
    user_genre_ratings = user_genre_ratings.join(genre_ratings[genre], "user_id", "outer")

# Replace null values with 0
user_genre_ratings = user_genre_ratings.na.fill(0)

# Select subset for visualization
user_genre_ratings_selection = user_genre_ratings.limit(30)

# Function to sort dataframe by rating density
def sort_by_rating_density(user_movie_ratings, n_movies, n_users):
    # Count movies and users
    movie_counts = user_movie_ratings.count()
    user_counts = user_movie_ratings.columns[1:]

    # Sort by movie counts
    sorted_movies = user_movie_ratings.sort(col(movie_counts).desc())

    # Sort by user counts
    sorted_movies_users = sorted_movies.sort(col(user_counts[0]).desc())
    sorted_movies_users = sorted_movies_users.drop(movie_counts)

    # Select top movies and users
    most_rated_movies_users_selection = sorted_movies_users.limit(n_movies).select(user_counts)
    return most_rated_movies_users_selection

# Sort by rating density
most_rated_movies_users_selection = sort_by_rating_density(user_genre_ratings, 18, 30)

# Draw heatmap
import numpy as np
import matplotlib.pyplot as plt

plt.imshow(most_rated_movies_users_selection.toPandas().values, cmap="hot", interpolation="nearest")
plt.xlabel("Genre")
plt.ylabel("User")
plt.xticks(np.arange(len(genres)), genres, rotation=90)
plt.yticks(np.arange(30))
plt.colorbar()
plt.show()
