In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, split, explode

# Start a Spark session
spark = SparkSession.builder.appName("MovieRatingsAnalysis").getOrCreate()

# Load the movies dataset
movies_file_path = "/content/movies.csv"  # Update with the correct path
movies_df = spark.read.option("header", "true").csv(movies_file_path, inferSchema=True)

# Load the ratings dataset
ratings_file_path = "/content/ratings.csv"  # Update with the correct path
ratings_df = spark.read.option("header", "true").csv(ratings_file_path, inferSchema=True)

# Show the schema to understand the structure
movies_df.printSchema()
ratings_df.printSchema()

# Create temporary views for movies and ratings data
movies_df.createOrReplaceTempView("movies")
ratings_df.createOrReplaceTempView("ratings")

# ---------------------- 1. Find the most active users ----------------------

# SQL query to find the most active users (users who have rated the most movies)
query_active_users = """
    SELECT userId, COUNT(rating) AS num_ratings
    FROM ratings
    GROUP BY userId
    ORDER BY num_ratings DESC
    LIMIT 10
"""

# Execute the query
most_active_users = spark.sql(query_active_users)
most_active_users.show()

# ---------------------- 2. Sort the movie names in alphabetical order ----------------------

# SQL query to sort movies by title in alphabetical order
query_sorted_movies = """
    SELECT movieId, title
    FROM movies
    ORDER BY title ASC
"""

# Execute the query
sorted_movies = spark.sql(query_sorted_movies)
sorted_movies.show()

# ---------------------- 3. Calculate the average rating per genre ----------------------

# Split the genres by '|' and explode them to create multiple rows per movie
movies_genres_df = movies_df.withColumn("genre", explode(split(col("genres"), "\|")))

# Join with ratings to get the ratings for each genre
genre_ratings_df = movies_genres_df.join(ratings_df, "movieId")

# Create a temporary view for the joined DataFrame
genre_ratings_df.createOrReplaceTempView("genre_ratings")

# SQL query to calculate the average rating per genre
query_avg_rating_per_genre = """
    SELECT genre, AVG(rating) AS avg_rating
    FROM genre_ratings
    GROUP BY genre
    ORDER BY avg_rating DESC
"""

# Execute the query
avg_rating_per_genre = spark.sql(query_avg_rating_per_genre)
avg_rating_per_genre.show()


root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

+------+-----------+
|userId|num_ratings|
+------+-----------+
|   414|       2698|
|   599|       2478|
|   474|       2108|
|   448|       1864|
|   274|       1346|
|   610|       1302|
|    68|       1260|
|   380|       1218|
|   606|       1115|
|   288|       1055|
+------+-----------+

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|   7789|"11'09""01 - Sept...|
| 117867|          '71 (2014)|
|  97757|'Hellboy': The Se...|
|  26564|'Round Midnight (...|
|  27751| 'Salem's Lot (2004)|
|    779|'Til There Was Yo...|
| 149380|'Tis the Season f...|
|   2072|  'burbs, The (1989)|
|   3112|'night Mother (1986)|
|  69757|(500) Days of Sum...|
|   8169|*batter