# Exploring movie ratings dataset with PySpark DataFrames

This project focuses on exploring the MovieLens dataset using PySpark DataFrame operations. Unlike Spark SQL, which relies on SQL queries, PySpark DataFrames offer a more Pythonic approach to data manipulation and analysis.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
# Create a SparkSession as the entry point to Spark SQL functionality
spark = SparkSession.builder.appName("DataFramesExploration").getOrCreate()

In [3]:
# Load the MovieLens dataset and rename the columns
data_df = spark.read.csv("ml-100k/ml-100k/u.data", inferSchema=True, sep="\t", header=False)
data_df = data_df.withColumnRenamed("_c0", "user_id").withColumnRenamed("_c1", "movie_id").withColumnRenamed("_c2", "rating").withColumnRenamed("_c3", "timestamp")

# Show the schema of the dataframe, which helps in understanding the structure of the data
print("Dataframe Schema:")
data_df.printSchema()
print("\nFirst few rows of the Dataframe:")
# The take method retrieves the first few rows of the dataframe as a list of row objects, which can be accessed programmatically
data_df.take(5)

Dataframe Schema:
root
 |-- user_id: integer (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)


First few rows of the Dataframe:


[Row(user_id=196, movie_id=242, rating=3, timestamp=881250949),
 Row(user_id=186, movie_id=302, rating=3, timestamp=891717742),
 Row(user_id=22, movie_id=377, rating=1, timestamp=878887116),
 Row(user_id=244, movie_id=51, rating=2, timestamp=880606923),
 Row(user_id=166, movie_id=346, rating=1, timestamp=886397596)]

#### DataFrame operations

In [4]:
# Count the number of ratings
num_ratings = data_df.count()
print(f"Number of ratings: {num_ratings}")

Number of ratings: 100000


In [5]:
# Calculate the total rating and count per movie
movie_rating_counts = data_df.groupBy("movie_id").agg(F.sum("rating").alias("total_rating"), F.count("rating").alias("rating_count"))
# Cache the movie_rating_counts
# Caching this dataframe improves performance by storing it in memory (or disk) after its first computation
# Subsequent computations that depend on this dataframe will retrieve it from the cache instead of recomputing it
movie_rating_counts.cache()

# Calculate the average rating per movie. withColumn method adds a new column to the dataframe
average_rating_per_movie = movie_rating_counts.withColumn("avg_rating", F.col("total_rating") / F.col("rating_count"))
# persist is similar to cache in that it stores the dataframe in memory or disk for future use to avoid recomputation
# It provides more control over the storage level and allows specifying whether the data should be stored only in memory, on disk, or a combination of both
# MEMORY_AND_DISK ensures that if the dataframe does not fit in memory, it will spill over to disk
average_rating_per_movie.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
print("Average rating per movie:")
average_rating_per_movie.show(5)

Average rating per movie:
+--------+------------+------------+------------------+
|movie_id|total_rating|rating_count|        avg_rating|
+--------+------------+------------+------------------+
|     496|         952|         231| 4.121212121212121|
|     471|         798|         221|3.6108597285067874|
|     463|         274|          71| 3.859154929577465|
|     148|         410|         128|          3.203125|
|    1342|           5|           2|               2.5|
+--------+------------+------------+------------------+
only showing top 5 rows



In [6]:
# Calculate the average rating. The collect method retrieves the result as a list of row objects
average_rating = data_df.agg({"rating": "avg"}).collect()[0][0]
print(f"Average rating: {average_rating:.2f}")

#Find unique users and movies
unique_users = data_df.select("user_id").distinct().count()
unique_movies = data_df.select("movie_id").distinct().count()
print("\nNumber of unique users:", unique_users)
print("Number of unique movies:", unique_movies)

Average rating: 3.53

Number of unique users: 943
Number of unique movies: 1682


In [7]:
# Calculate the total number of ratings per movie
total_ratings_per_movie = movie_rating_counts.select("movie_id", "rating_count")
print("Total ratings per movie:")
total_ratings_per_movie.show(5)

# Filter movies with a minimum number of ratings
popular_movies = movie_rating_counts.filter("rating_count >= 100")
print("\nPopular movies:")
popular_movies.show(5)

Total ratings per movie:
+--------+------------+
|movie_id|rating_count|
+--------+------------+
|     496|         231|
|     471|         221|
|     463|          71|
|     148|         128|
|    1342|           2|
+--------+------------+
only showing top 5 rows


Popular movies:
+--------+------------+------------+
|movie_id|total_rating|rating_count|
+--------+------------+------------+
|     496|         952|         231|
|     471|         798|         221|
|     148|         410|         128|
|     243|         322|         132|
|      31|         559|         154|
+--------+------------+------------+
only showing top 5 rows



In [8]:
# Find the most and least rated movies
most_rated_movies = movie_rating_counts.orderBy(F.desc("rating_count"))
print("Most rated movies:")
most_rated_movies.show(5)

least_rated_movies = movie_rating_counts.orderBy("rating_count")
print("\nLeast rated movies:")
least_rated_movies.show(5)

Most rated movies:
+--------+------------+------------+
|movie_id|total_rating|rating_count|
+--------+------------+------------+
|      50|        2541|         583|
|     258|        1936|         509|
|     100|        2111|         508|
|     181|        2032|         507|
|     294|        1531|         485|
+--------+------------+------------+
only showing top 5 rows


Least rated movies:
+--------+------------+------------+
|movie_id|total_rating|rating_count|
+--------+------------+------------+
|    1645|           4|           1|
|    1507|           3|           1|
|    1653|           5|           1|
|    1675|           3|           1|
|    1460|           3|           1|
+--------+------------+------------+
only showing top 5 rows



In [9]:
# Find users who rated the most movies
most_active_users = data_df.groupBy("user_id").agg(F.count("*").alias("num_ratings")).orderBy(F.desc("num_ratings"))
print("Users who rated the most movies:")
most_active_users.show(5)

Users who rated the most movies:
+-------+-----------+
|user_id|num_ratings|
+-------+-----------+
|    405|        737|
|    655|        685|
|     13|        636|
|    450|        540|
|    276|        518|
+-------+-----------+
only showing top 5 rows



In [10]:
# Find the distribution of ratings and sort by key
rating_distribution = data_df.groupBy("rating").agg(F.count("*").alias("count")).orderBy("rating")
print("Rating distribution:")
rating_distribution.show()

Rating distribution:
+------+-----+
|rating|count|
+------+-----+
|     1| 6110|
|     2|11370|
|     3|27145|
|     4|34174|
|     5|21201|
+------+-----+



In [11]:
# Group ratings by movie id and calculate sum and count of ratings for each movie
movie_rating_sum_count = data_df.groupBy("movie_id").agg(F.sum("rating").alias("total_rating"), F.count("rating").alias("rating_count"))

# Calculate average rating for each movie
average_rating_per_movie = movie_rating_sum_count.withColumn("avg_rating", F.col("total_rating") / F.col("rating_count"))
print("Average rating per movie:")
average_rating_per_movie.show(5, truncate=False)

Average rating per movie:
+--------+------------+------------+------------------+
|movie_id|total_rating|rating_count|avg_rating        |
+--------+------------+------------+------------------+
|496     |952         |231         |4.121212121212121 |
|471     |798         |221         |3.6108597285067874|
|463     |274         |71          |3.859154929577465 |
|148     |410         |128         |3.203125          |
|1342    |5           |2           |2.5               |
+--------+------------+------------+------------------+
only showing top 5 rows



In [12]:
# Read the csv file of movies and select relevant columns
movie_titles_genres_df = spark.read.option("delimiter", "|").csv("ml-100k/ml-100k/u.item").selectExpr("_c0 as movie_id", "_c1 as title", "_c5 as genres")

# Define schema for movie titles and genres dataframe
schema = StructType([
    StructField("movie_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)
])
# Apply schema to dataframe
movie_titles_genres_df = spark.createDataFrame(movie_titles_genres_df.rdd, schema)

print("Values of movies:")
movie_titles_genres_df.show(5)

# Join movie_titles_genres with average_rating_per_movie by movie_id
movies_with_ratings = movie_titles_genres_df.join(average_rating_per_movie, on="movie_id", how="inner")

print("\nMovies with averaged rating:")
movies_with_ratings.show(5)

Values of movies:
+--------+-----------------+------+
|movie_id|            title|genres|
+--------+-----------------+------+
|       1| Toy Story (1995)|     0|
|       2| GoldenEye (1995)|     0|
|       3|Four Rooms (1995)|     0|
|       4|Get Shorty (1995)|     0|
|       5|   Copycat (1995)|     0|
+--------+-----------------+------+
only showing top 5 rows


Movies with averaged rating:
+--------+-----------------+------+------------+------------+------------------+
|movie_id|            title|genres|total_rating|rating_count|        avg_rating|
+--------+-----------------+------+------------+------------+------------------+
|       1| Toy Story (1995)|     0|        1753|         452|3.8783185840707963|
|       2| GoldenEye (1995)|     0|         420|         131|3.2061068702290076|
|       3|Four Rooms (1995)|     0|         273|          90| 3.033333333333333|
|       4|Get Shorty (1995)|     0|         742|         209| 3.550239234449761|
|       5|   Copycat (1995)|     0| 

In [13]:
# Extract genres and count distinct values
distinct_genres_count = movie_titles_genres_df.select(F.explode(F.split("genres", ",")).alias("genre")).distinct().count()
print(f"Number of genres: {distinct_genres_count}")

# Calculate average rating per genre
average_rating_per_genre_df = movie_titles_genres_df.join(average_rating_per_movie, on="movie_id", how="inner").select(F.explode(F.split("genres", ",")).alias("genre"), "avg_rating").groupBy("genre").agg(F.avg("avg_rating").alias("avg_rating"))

print("\nAverage rating per genre:")
average_rating_per_genre_df.show(5)

Number of genres: 2

Average rating per genre:
+-----+------------------+
|genre|        avg_rating|
+-----+------------------+
|    0|3.0770609634276855|
|    1|2.2222222222222223|
+-----+------------------+



In [14]:
# Define user defined function (udf) to convert rating to sentiment
def rating_sentiment(rating):
    rating = int(rating)
    if rating >= 4:
        return "Positive"
    elif rating <= 2:
        return "Negative"
    else:
        return "Neutral"

# Register udf with Spark sql
spark.udf.register("rating_sentiment", rating_sentiment, StringType())

# Calculate sentiment distribution of ratings
# The F.expr is used to apply the udf to the rating column and generate sentiment labels
rating_sentiment_distribution = data_df.groupBy(F.expr("rating_sentiment(rating)").alias("sentiment")).agg(F.count("*").alias("count"))

print("Sentiment distribution of ratings:")
rating_sentiment_distribution.show()

Sentiment distribution of ratings:
+---------+-----+
|sentiment|count|
+---------+-----+
|  Neutral|27145|
| Positive|55375|
| Negative|17480|
+---------+-----+



In [15]:
# Define the schema for the users dataframe
schema_user_data = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("occupation", StringType(), True)
])

# Load the users dataset and apply the defined schema
user_df = spark.read.option("delimiter", "|").csv("ml-100k/ml-100k/u.user", schema=schema_user_data)
print("Users data:")
user_df.show(5)

# Broadcast the collected data to all nodes in the Spark cluster
# We use broadcasting mechanism to efficiently distribute large read-only datasets to all nodes in a Spark cluster
# Each executor will have a copy of the broadcasted data in memory, avoiding the need to transfer the data multiple times over the network
# Broadcasting requires that the data being broadcasted is available (collected) on the driver node
broadcast_user_data = spark.sparkContext.broadcast(user_df.collect())

# Perform analysis using the broadcasted data
# Calculate the average age of users
average_age = sum(row["age"] for row in broadcast_user_data.value) / len(broadcast_user_data.value)
print(f"\nAverage age of users: {average_age:.2f}")

# Find the maximum age among users
max_age = max(row["age"] for row in broadcast_user_data.value)
print(f"\nMaximum age among users: {max_age}")

# Calculate the total number of users
total_users = len(broadcast_user_data.value)
print(f"\nTotal number of users: {total_users}")

# Calculate the number of users aged 30 or below
users_aged_30 = sum(1 for row in broadcast_user_data.value if row["age"] <= 30)
print(f"\nNumber of users aged 30: {users_aged_30}")

# Most common occupation among users
most_common_occupation = max(set(row["occupation"] for row in broadcast_user_data.value), key=lambda x: sum(1 for row in broadcast_user_data.value if row["occupation"] == x))
print(f"\nMost common occupation among users: {most_common_occupation}")

Users data:
+-------+---+------+----------+
|user_id|age|gender|occupation|
+-------+---+------+----------+
|      1| 24|     M|technician|
|      2| 53|     F|     other|
|      3| 23|     M|    writer|
|      4| 24|     M|technician|
|      5| 33|     F|     other|
+-------+---+------+----------+
only showing top 5 rows


Average age of users: 34.05

Maximum age among users: 73

Total number of users: 943

Number of users aged 30: 448

Most common occupation among users: student


In [16]:
spark.stop()