In [2]:
PROJECT_NAME: str = "recommender_system"
HDFS_NAMENODE: str = "hdfs://namenode:9000"
INPUT_DIR: str = f"{HDFS_NAMENODE}/input/{PROJECT_NAME}"
OUTPUT_DIR: str = f"{HDFS_NAMENODE}/output/{PROJECT_NAME}"

MASTER_URI = "spark://spark-master:7077"

In [3]:
# Schemas
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
)

userSchema = StructType(
    [
        StructField("UserID", IntegerType(), True),
        StructField("Gender", StringType(), True),
        StructField("Age", IntegerType(), True),
        StructField("Occupation", StringType(), True),
        StructField("Zip_code", StringType(), True),
    ]
)

movieSchema = StructType(
    [
        StructField("MovieID", IntegerType(), True),
        StructField("Title", StringType(), True),
        StructField("Genres", StringType(), True),
    ]
)

ratingSchema = StructType(
    [
        StructField("UserID", IntegerType(), True),
        StructField("MovieID", IntegerType(), True),
        StructField("Rating", IntegerType(), True),
        StructField("Timestamp", StringType(), True),
    ]
)


In [4]:
from pyspark.sql import SparkSession


def spark_session() -> SparkSession:
    spark = (
        SparkSession.builder.appName(PROJECT_NAME.capitalize)
        .master(MASTER_URI)
        .config("spark.hadoop.fs.defaultFS", HDFS_NAMENODE)
        .config("spark.hadoop.dfs.client.use.datanode.hostname", "true")
        .getOrCreate()
    )
    print(f"Connected to Spark {spark.version}")
    return spark

In [4]:
%%time


spark = spark_session()


def convert_dat_to_parquet(file_name: str, schema: StructType):
    input_path = f"{INPUT_DIR}/{file_name}.dat"
    output_path = f"{INPUT_DIR}/{file_name}_parquet"

    print(f"Processing: {input_path}")

    df = spark.read.option("sep", "::").csv(input_path, schema=schema)
    df.write.mode("overwrite").parquet(output_path)

    print(f"Successfully converted to Parquet at: {output_path}")
    print("-" * 30)


print("Starting data conversion to Parquet...")
print("-" * 30)

convert_dat_to_parquet("ratings", ratingSchema)
convert_dat_to_parquet("users", userSchema)
convert_dat_to_parquet("movies", movieSchema)

print("All files converted.")

spark.stop()

Connected to Spark 3.5.0
Starting data conversion to Parquet...
------------------------------
Processing: hdfs://namenode:9000/input/recommender_system/ratings.dat
Successfully converted to Parquet at: hdfs://namenode:9000/input/recommender_system/ratings_parquet
------------------------------
Processing: hdfs://namenode:9000/input/recommender_system/users.dat
Successfully converted to Parquet at: hdfs://namenode:9000/input/recommender_system/users_parquet
------------------------------
Processing: hdfs://namenode:9000/input/recommender_system/movies.dat
Successfully converted to Parquet at: hdfs://namenode:9000/input/recommender_system/movies_parquet
------------------------------
All files converted.
CPU times: user 39.9 ms, sys: 18.6 ms, total: 58.5 ms
Wall time: 9.48 s


In [5]:
RATINGS_FILE = f"{INPUT_DIR}/ratings_parquet"
USERS_FILE = f"{INPUT_DIR}/users_parquet"
MOVIES_FILE = f"{INPUT_DIR}/movies_parquet"

## Task 1
List the top-rated movies by all users. 

#### Output format
A list of <movie, score> pairs
sorted in descending order of ‘average’ rating score

In [None]:
%%time

from pyspark.sql.functions import col, avg

spark = spark_session()

df = spark.read.parquet(RATINGS_FILE)

agg_df = df.groupBy("MovieId").agg(avg("Rating").alias("AvgRating"))

sorted_df = agg_df.orderBy(col("AvgRating").desc())

sorted_df.cache()

sorted_df = sorted_df.select(
    col("MovieId").alias("movie"), col("AvgRating").alias("score")
)
print(f"Writing results to '{OUTPUT_DIR}/average_movie_ratings'\n")
sorted_df.coalesce(1).write.mode("overwrite").csv(
    f"{OUTPUT_DIR}/average_movie_ratings", header=True, sep=","
)

sorted_df.unpersist()

spark.stop()

## Task 2
List the top-rated movies grouped by gender, by age group, and by occupation, respectively. 

#### Output format
3 sorted lists: <movie, gender, score> pairs, <movie, age group, score> pairs, <movie, occupation, score>
sorted in descending order of ‘average’ rating score grouped by gender, by age group, and by occupation 

In [None]:
%%time

from pyspark.sql.functions import broadcast, col, avg

spark = spark_session()

ratings_df = spark.read.parquet(RATINGS_FILE)
user_df = spark.read.parquet(USERS_FILE)

ratings_users_df = ratings_df.join(broadcast(user_df), on="UserId").select(
    ["MovieId", "Rating", "Gender", "Age", "Occupation"]
)


def top_rated_movies_grouped_by(category: str):
    print(f"Listing top rated movies by {category}...")
    agg_df = ratings_users_df.groupBy(category, "MovieID").agg(
        avg(col("Rating")).alias("AvgRating")
    )
    sorted_df = agg_df.orderBy(col("AvgRating").desc())
    sorted_df = sorted_df.select(
        col("MovieID").alias("movie"),
        col(category).alias(category.lower()),
        col("AvgRating").alias("score"),
    )

    sorted_df.coalesce(1).write.mode("overwrite").csv(
        f"{OUTPUT_DIR}/top_movies_by_{category.lower()}", header=True, sep=","
    )


top_rated_movies_grouped_by("Gender")
top_rated_movies_grouped_by("Age")
top_rated_movies_grouped_by("Occupation")

spark.stop()

## Task 3
List the average rating score of each user for all movies, and grouped by genre, respectively. 


#### Output format
two sorted lists: <user, score> pairs, <user, genre, score> pairs


In [None]:
%%time

spark = spark_session()

ratings_df = spark.read.parquet(RATINGS_FILE)
movies_df = spark.read.parquet(MOVIES_FILE)

user_agg = ratings_df.groupBy("UserId").agg(avg(col("Rating")).alias("AvgRating"))
sorted_user_df = user_agg.orderBy(col("AvgRating").desc())

sorted_user_df = sorted_user_df.select(
    col("UserId").alias("user"), col("AvgRating").alias("score")
)

ratings_movies_df = ratings_df.join(broadcast(movies_df), on="MovieId").select(
    ["MovieId", "Genres", "UserId", "Rating"]
)
ratings_movies_agg = ratings_movies_df.groupBy("UserId", "Genres").agg(
    avg(col("Rating")).alias("AvgRating")
)
sorted_ratings_movies_df = ratings_movies_agg.orderBy(col("AvgRating").desc())
sorted_ratings_movies_df = sorted_ratings_movies_df.select(
    col("UserId").alias("user"),
    col("Genres").alias("genre"),
    col("AvgRating").alias("score"),
)

print("Computing average rating score of each user for all movies...")
sorted_user_df.coalesce(1).write.mode("overwrite").csv(
    f"{OUTPUT_DIR}/average_rating_by_user", header=True, sep=","
)
print("Computing average rating score of each user for all movies by genre...")
sorted_ratings_movies_df.coalesce(1).write.mode("overwrite").csv(
    f"{OUTPUT_DIR}/average_rating_by_user_and_genre", header=True, sep=","
)

spark.stop()

# Utility functions

A list of utility functions to compute similarity and build utility matrix for next tasks 

In [None]:
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import first, col, avg, broadcast
from pyspark.sql.types import Row


def normalized_utility_matrix() -> DataFrame:
    ratings_df: DataFrame = spark.read.parquet(RATINGS_FILE).select(
        ["UserId", "MovieId", "Rating"]
    )

    avg_rating_df = ratings_df.groupBy("UserId").agg(
        avg(col("Rating")).alias("AvgRating")
    )
    ratings_df = ratings_df.join(broadcast(avg_rating_df), "UserId")
    ratings_df = ratings_df.withColumn(
        "NormalizedRating", col("Rating") - col("AvgRating")
    )
    ratings_df.cache()

    ratings_df = (
        ratings_df.groupBy(["UserId"])
        .pivot("MovieId")
        .agg(first("NormalizedRating"))
        .na.fill(0)
    )
    return ratings_df

# Task 4

Given any user, please list the top-’similar’ users based on the cosine similarity of previous ratings each user has given. (sorted in descending order of ‘user’ similarity score)

### Output Format

A list of <user, score> pairs
sorted in descending order of ‘user’ similarity score

In [None]:
spark = spark_session()

USER_ID: str = "148"

utility_df = normalized_utility_matrix()

utility_df.show()

spark.stop()

Connected to Spark 3.5.0


Py4JJavaError: An error occurred while calling o1449.columnSimilarities.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 129.0 failed 4 times, most recent failure: Lost task 0.3 in stage 129.0 (TID 171) (172.22.0.6 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 111, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(d))
TypeError: Cannot convert type <class 'pyspark.sql.types.Row'> into Vector

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1261)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1262)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2493)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1199)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1193)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1286)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1253)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1239)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1239)
	at org.apache.spark.mllib.stat.Statistics$.colStats(Statistics.scala:58)
	at org.apache.spark.mllib.linalg.distributed.RowMatrix.columnSimilarities(RowMatrix.scala:639)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 111, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(d))
TypeError: Cannot convert type <class 'pyspark.sql.types.Row'> into Vector

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1261)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1262)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
