In [1]:
pip install pyspark


Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType


In [3]:
spark = SparkSession.builder \
    .master("spark://192.168.61.131:7077")\
    .appName("Movie Ratings Analysis")\
    .getOrCreate()

In [4]:
data_schema = StructType([
    StructField('user_id', IntegerType(), True),
    StructField('movie_id', IntegerType(), True),
    StructField('rating', IntegerType(), True),
    StructField('timestamp', TimestampType(), True)
])

In [5]:
item_schema = StructType([
    StructField('movie_id', IntegerType(), True),
    StructField('movie_title', StringType(), True),
    StructField('release_date', StringType(), True),
    StructField('video_release_date', StringType(), True),
    StructField('IMDb_URL', StringType(), True)
])

In [6]:
user_schema = StructType([
    StructField('user_id', IntegerType(), True),
    StructField('age', IntegerType(), True),
    StructField('gender', StringType(), True),
    StructField('occupation', StringType(), True),
    StructField('zip_code', StringType(), True)
])

In [7]:
data_path = r"C:\Users\athit\Desktop\ml-100k\u.data"
item_path = r"C:\Users\athit\Desktop\ml-100k\u.item"
users_path = r"C:\Users\athit\Desktop\ml-100k\u.user"


In [8]:
data = spark.read.csv(data_path, sep='\t', header=False, schema=data_schema)
item = spark.read.csv(item_path, sep='|', header=False, schema=item_schema)
users = spark.read.csv(users_path, sep='|', header=False, schema=user_schema)


In [9]:
data.createOrReplaceTempView("ratings")
item.createOrReplaceTempView("movies")
users.createOrReplaceTempView("users")

In [10]:
spark.sql("""
    SELECT m.movie_title, COUNT(r.rating) AS rating_count
    FROM movies m
    LEFT JOIN ratings r ON m.movie_id = r.movie_id
    GROUP BY m.movie_title
    ORDER BY rating_count DESC
""").show()
# Example 2: Displaying users and the count of ratings they've given for movies
spark.sql("""
    SELECT u.user_id, u.gender, COUNT(r.rating) AS rating_count
    FROM users u
    LEFT JOIN ratings r ON u.user_id = r.user_id
    GROUP BY u.user_id, u.gender
    ORDER BY rating_count DESC
""").show()
spark.sql("""
    SELECT m.movie_title, COUNT(r.rating) AS rating_count
    FROM movies m
    LEFT JOIN ratings r ON m.movie_id = r.movie_id
    GROUP BY m.movie_title
    ORDER BY rating_count DESC
""").show()

+--------------------+------------+
|         movie_title|rating_count|
+--------------------+------------+
|    Star Wars (1977)|         583|
|      Contact (1997)|         509|
|        Fargo (1996)|         508|
|Return of the Jed...|         507|
|    Liar Liar (1997)|         485|
|English Patient, ...|         481|
|       Scream (1996)|         478|
|    Toy Story (1995)|         452|
|Air Force One (1997)|         431|
|Independence Day ...|         429|
|Raiders of the Lo...|         420|
|Godfather, The (1...|         413|
| Pulp Fiction (1994)|         394|
|Twelve Monkeys (1...|         392|
|Silence of the La...|         390|
|Jerry Maguire (1996)|         384|
|  Chasing Amy (1997)|         379|
|    Rock, The (1996)|         378|
|Empire Strikes Ba...|         367|
|Star Trek: First ...|         365|
+--------------------+------------+
only showing top 20 rows

+-------+------+------------+
|user_id|gender|rating_count|
+-------+------+------------+
|    405|     F|    

In [11]:
# Milestone 2: Exploratory Data Analysis with SQL

# Example SQL queries for exploratory data analysis

# 1. Listing all movies along with the number of ratings
spark.sql("""
    SELECT m.movie_title, COUNT(r.rating) AS rating_count
    FROM movies m
    LEFT JOIN ratings r ON m.movie_id = r.movie_id
    GROUP BY m.movie_title
    ORDER BY rating_count DESC
""").show()

# 2. Displaying users and the count of ratings they've given for movies
spark.sql("""
    SELECT u.user_id, u.gender, COUNT(r.rating) AS rating_count
    FROM users u
    LEFT JOIN ratings r ON u.user_id = r.user_id
    GROUP BY u.user_id, u.gender
    ORDER BY rating_count DESC
""").show()

# 3. Identifying Movie IDs with at least one user rating
spark.sql("""
    SELECT DISTINCT movie_id
    FROM ratings
""").show()

# 4. Listing users who have rated at least one movie
spark.sql("""
    SELECT DISTINCT user_id
    FROM ratings
""").show()

# 5. Providing lists of users and movies with their maximum, minimum, and average ratings
spark.sql("""
    SELECT 
        u.user_id,
        MAX(r.rating) AS max_rating,
        MIN(r.rating) AS min_rating,
        AVG(r.rating) AS avg_rating
    FROM users u
    LEFT JOIN ratings r ON u.user_id = r.user_id
    GROUP BY u.user_id
""").show()

spark.sql("""
    SELECT 
        m.movie_title,
        MAX(r.rating) AS max_rating,
        MIN(r.rating) AS min_rating,
        AVG(r.rating) AS avg_rating
    FROM movies m
    LEFT JOIN ratings r ON m.movie_id = r.movie_id
    GROUP BY m.movie_title
""").show()

# Milestone 3: User Ratings Analysis

# Example analysis on user ratings

# 1. How many of the users are artists?
num_artists = users.filter(users.occupation == "artist").count()
print("Number of users who are artists:", num_artists)

# 2. How many of the users are artists at least 25 years old?
num_artists_25plus = users.filter((users.occupation == "artist") & (users.age >= 25)).count()
print("Number of artists aged 25 or older:", num_artists_25plus)

# 1. Group by Movies by their title
ratings_total = data.groupBy("movie_id").count()
ratings_total.show()

# 2. Average movie rating
average_rating = data.groupBy("movie_id").avg("rating")
average_rating.show()

# 3. Average rating given for movie id #1 and #2
average_rating.filter("movie_id = 1").show()
average_rating.filter("movie_id = 2").show()

# 4. Gender and zip code of the user who rated the most movies
most_rated_user = data.groupBy("user_id").count().orderBy("count", ascending=False).first()[0]
user_info = users.filter(users["user_id"] == most_rated_user).select("gender", "zip_code").collect()

# Printing the user's gender and zip code
print("Gender and Zip Code of the user who rated the most movies:")
for row in user_info:
    print("Gender:", row["gender"], ", Zip Code:", row["zip_code"])

+--------------------+------------+
|         movie_title|rating_count|
+--------------------+------------+
|    Star Wars (1977)|         583|
|      Contact (1997)|         509|
|        Fargo (1996)|         508|
|Return of the Jed...|         507|
|    Liar Liar (1997)|         485|
|English Patient, ...|         481|
|       Scream (1996)|         478|
|    Toy Story (1995)|         452|
|Air Force One (1997)|         431|
|Independence Day ...|         429|
|Raiders of the Lo...|         420|
|Godfather, The (1...|         413|
| Pulp Fiction (1994)|         394|
|Twelve Monkeys (1...|         392|
|Silence of the La...|         390|
|Jerry Maguire (1996)|         384|
|  Chasing Amy (1997)|         379|
|    Rock, The (1996)|         378|
|Empire Strikes Ba...|         367|
|Star Trek: First ...|         365|
+--------------------+------------+
only showing top 20 rows

+-------+------+------------+
|user_id|gender|rating_count|
+-------+------+------------+
|    405|     F|    