In [1]:
# Import neccessary libs
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os

# Init Spark Context
conf = SparkConf().setAppName("MovieRatingsAnalytics").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.appName("MovieRatingsAnalytics").getOrCreate()

print("Init successfully!")

25/11/25 21:30:24 WARN Utils: Your hostname, tienloc-laptop resolves to a loopback address: 127.0.1.1; using 192.168.31.171 instead (on interface wlp0s20f3)
25/11/25 21:30:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/25 21:30:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Init successfully!


In [2]:
# Read data files
import os

data_path = "file:///home/tienloc/lab2bigdata/data/"

# Read file movies.txt
movies_rdd = sc.textFile(data_path + "movies.txt")
print(f"Number of film: {movies_rdd.count()}")

# Read files ratings_1.txt và ratings_2.txt
ratings_1_rdd = sc.textFile(data_path + "ratings_1.txt")
ratings_2_rdd = sc.textFile(data_path + "ratings_2.txt")

# Read file users.txt
users_rdd = sc.textFile(data_path + "users.txt")

print(f"Number of rating from file 1: {ratings_1_rdd.count()}")
print(f"Number of rating từ file 2: {ratings_2_rdd.count()}")

# Show sample data
print("\nData from file movies.txt (5 first line):")
for line in movies_rdd.take(5):
    print(line)

print("\nData from ratings_1.txt (5 first line):")
for line in ratings_1_rdd.take(5):
    print(line)

print("\nData from users.txt (5 first line):")
for line in users_rdd.take(5):
    print(line)

Number of film: 50
Number of rating from file 1: 84
Number of rating từ file 2: 100

Data from file movies.txt (5 first line):
1001,The Godfather (1972),Crime|Drama
1002,The Shawshank Redemption (1994),Drama
1003,Schindler's List (1993),Biography|Drama|History
1004,Raging Bull (1980),Biography|Drama|Sport
1005,Casablanca (1942),Drama|Romance|War

Data from ratings_1.txt (5 first line):
7,1020,4.5,1577836800
23,1015,3.5,1577923200
45,1030,4.0,1578009600
12,1047,3.0,1578096000
38,1012,4.5,1578182400

Data from users.txt (5 first line):
21,M,28,3,12345
2,F,35,7,23456
3,M,42,2,34567
4,F,19,10,45678
5,M,31,1,56789


In [3]:
# Process users
# Parse users.txt: UserID, Gender, Age, Occupation, Zip-code
def parse_user_age(line):
    parts = line.split(',')
    user_id = int(parts[0])
    # gender = parts[1]  # M hoặc F
    age = int(parts[2])
    # occupation = int(parts[3])
    # zipcode = parts[4]
    return (user_id, age)

# Category ages
def get_age_group(age):
    if age <= 18:
        return "0-18"
    elif age <= 35:
        return "18-35"
    elif age <= 50:
        return "35-50"
    else:
        return "50+"

users_parsed = users_rdd.map(parse_user_age)
print("Users parsed with age (5 records):")
for user in users_parsed.take(5):
    user_id, age = user
    age_group = get_age_group(age)
    print(f"UserID: {user_id}, Age: {age}, Age Group: {age_group}")

# Create dictionary to look up ages by user id
users_age_dict = users_parsed.collectAsMap()
# Create dictionary to look up ages by user id
users_age_group_dict = {user_id: get_age_group(age) for user_id, age in users_age_dict.items()}

print(f"\nTotal number of user in dictionary: {len(users_age_dict)}")
print(f"Age distribution:")
age_groups = list(users_age_group_dict.values())
for group in ["0-18", "18-35", "35-50", "50+"]:
    count = age_groups.count(group)
    print(f"  {group}: {count} users")

Users parsed with age (5 records):
UserID: 21, Age: 28, Age Group: 18-35
UserID: 2, Age: 35, Age Group: 18-35
UserID: 3, Age: 42, Age Group: 35-50
UserID: 4, Age: 19, Age Group: 18-35
UserID: 5, Age: 31, Age Group: 18-35

Total number of user in dictionary: 49
Age distribution:
  0-18: 0 users
  18-35: 24 users
  35-50: 23 users
  50+: 2 users


In [4]:
# Process data movies
# Parse movies.txt: MovieID, Title, Genres
def parse_movie(line):
    parts = line.split(',', 2) 
    movie_id = int(parts[0])
    title = parts[1]
    return (movie_id, title)

movies_parsed = movies_rdd.map(parse_movie)
print("Movies parsed (5 records):")
for movie in movies_parsed.take(5):
    print(f"MovieID: {movie[0]}, Title: {movie[1]}")

# Create dictionary to look up movie name by id
movies_dict = movies_parsed.collectAsMap()
print(f"\nTotal number of movies in dictionary: {len(movies_dict)}")

Movies parsed (5 records):
MovieID: 1001, Title: The Godfather (1972)
MovieID: 1002, Title: The Shawshank Redemption (1994)
MovieID: 1003, Title: Schindler's List (1993)
MovieID: 1004, Title: Raging Bull (1980)
MovieID: 1005, Title: Casablanca (1942)

Total number of movies in dictionary: 50


In [5]:
# Process data ratings
# Parse ratings: UserID, MovieID, Rating, Timestamp
def parse_rating_with_user(line):
    parts = line.split(',')
    user_id = int(parts[0])
    movie_id = int(parts[1])
    rating = float(parts[2])
    timestamp = int(parts[3])
    return (user_id, movie_id, rating)

# Parse 2 files
ratings_1_parsed = ratings_1_rdd.map(parse_rating_with_user)
ratings_2_parsed = ratings_2_rdd.map(parse_rating_with_user)

print("Ratings 1 parsed (5 records):")
for rating in ratings_1_parsed.take(5):
    print(f"UserID: {rating[0]}, MovieID: {rating[1]}, Rating: {rating[2]}")

print("\nRatings 2 parsed (5 records):")
for rating in ratings_2_parsed.take(5):
    print(f"UserID: {rating[0]}, MovieID: {rating[1]}, Rating: {rating[2]}")

# Merge 2 RDD ratings together
all_ratings = ratings_1_parsed.union(ratings_2_parsed)
print(f"\nTotal number of rating from both files: {all_ratings.count()}")

Ratings 1 parsed (5 records):
UserID: 7, MovieID: 1020, Rating: 4.5
UserID: 23, MovieID: 1015, Rating: 3.5
UserID: 45, MovieID: 1030, Rating: 4.0
UserID: 12, MovieID: 1047, Rating: 3.0
UserID: 38, MovieID: 1012, Rating: 4.5

Ratings 2 parsed (5 records):
UserID: 12, MovieID: 1012, Rating: 3.5
UserID: 34, MovieID: 1039, Rating: 4.0
UserID: 27, MovieID: 1043, Rating: 4.5
UserID: 8, MovieID: 1020, Rating: 3.0
UserID: 19, MovieID: 1050, Rating: 4.0

Total number of rating from both files: 184


In [6]:
# Add age groups into ratings
# all_ratings: (user_id, movie_id, rating)
# Adding age group: (movie_id, (rating, age_group))
def add_age_group_to_rating(record):
    user_id, movie_id, rating = record
    age_group = users_age_group_dict.get(user_id, "Unknown")
    return (movie_id, (rating, age_group))

ratings_with_age_group = all_ratings.map(add_age_group_to_rating)

print("Ratings with age group (5 records):")
for record in ratings_with_age_group.take(5):
    print(f"MovieID: {record[0]}, Rating: {record[1][0]}, Age Group: {record[1][1]}")

# Filter only rating from defined age groups
valid_ratings = ratings_with_age_group.filter(lambda x: x[1][1] != "Unknown")

print(f"\nTotal number of ratings that has valid age groups: {valid_ratings.count()}")

# Split rating by age groups
age_groups = ["0-18", "18-35", "35-50", "50+"]
ratings_by_age = {}

for age_group in age_groups:
    ratings_by_age[age_group] = valid_ratings.filter(lambda x: x[1][1] == age_group).map(lambda x: (x[0], x[1][0]))  # (movie_id, rating)
    print(f"Number of ratings from group {age_group}: {ratings_by_age[age_group].count()}")

Ratings with age group (5 records):
MovieID: 1020, Rating: 4.5, Age Group: 35-50
MovieID: 1015, Rating: 3.5, Age Group: 18-35
MovieID: 1030, Rating: 4.0, Age Group: 35-50
MovieID: 1047, Rating: 3.0, Age Group: 35-50
MovieID: 1012, Rating: 4.5, Age Group: 18-35

Total number of ratings that has valid age groups: 183
Number of ratings from group 0-18: 0
Number of ratings from group 18-35: 77
Number of ratings from group 35-50: 96
Number of ratings from group 50+: 10


In [7]:
# Calculate average ratings per movie per age group
age_averages = {}

# Calculate stats per age group
for age_group in age_groups:
    age_stats = ratings_by_age[age_group].map(lambda x: (x[0], (x[1], 1))).reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
    age_averages[age_group] = age_stats.map(lambda x: (x[0], x[1][0] / x[1][1]))  # (movie_id, average_rating)

# Find all movie_id that has rating
all_movie_ids = valid_ratings.map(lambda x: x[0]).distinct()

# Calculate result for all movies
all_movie_ids_list = all_movie_ids.collect()
final_results = []

for movie_id in all_movie_ids_list:
    movie_title = movies_dict.get(movie_id, f"Unknown Movie {movie_id}")
    age_ratings = {}

    for age_group in age_groups:
        # Finding for movie_id from this age group
        movie_ratings = age_averages[age_group].filter(lambda x: x[0] == movie_id).collect()
        if movie_ratings:
            age_ratings[age_group] = movie_ratings[0][1]  # average_rating
        else:
            age_ratings[age_group] = None

    final_results.append((movie_id, movie_title, age_ratings))

# Show results
for movie_id, title, age_ratings in final_results:
    ratings_str = ", ".join([
        f"{age_group}: {age_ratings[age_group]:.2f}" if age_ratings[age_group] is not None else f"{age_group}: NA"
        for age_group in age_groups
    ])
    print(f"{title} - [{ratings_str}]")

E.T. the Extra-Terrestrial (1982) - [0-18: NA, 18-35: 3.56, 35-50: 3.83, 50+: 3.00]
Psycho (1960) - [0-18: NA, 18-35: 4.50, 35-50: 3.50, 50+: NA]
Gladiator (2000) - [0-18: NA, 18-35: 3.44, 35-50: 3.81, 50+: 3.50]
Fight Club (1999) - [0-18: NA, 18-35: 3.50, 35-50: 3.50, 50+: 3.50]
The Lord of the Rings: The Fellowship of the Ring (2001) - [0-18: NA, 18-35: 4.00, 35-50: 3.83, 50+: NA]
The Terminator (1984) - [0-18: NA, 18-35: 4.17, 35-50: 4.05, 50+: 3.75]
The Godfather: Part II (1974) - [0-18: NA, 18-35: 3.78, 35-50: 4.25, 50+: NA]
The Silence of the Lambs (1991) - [0-18: NA, 18-35: 3.00, 35-50: 3.25, 50+: NA]
Mad Max: Fury Road (2015) - [0-18: NA, 18-35: 3.36, 35-50: 3.64, 50+: NA]
Lawrence of Arabia (1962) - [0-18: NA, 18-35: 3.60, 35-50: 3.29, 50+: 4.50]
Sunset Boulevard (1950) - [0-18: NA, 18-35: 4.17, 35-50: 4.50, 50+: NA]
The Social Network (2010) - [0-18: NA, 18-35: 4.00, 35-50: 3.67, 50+: NA]
No Country for Old Men (2007) - [0-18: NA, 18-35: 3.79, 35-50: 3.94, 50+: 4.00]
The Lord

In [8]:
# Clean resource
sc.stop()
spark.stop()
print("Stopping Spark Context và Spark Session.")

Stopping Spark Context và Spark Session.
