# 0. Setup: Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, avg, count, round, to_timestamp, to_date
)

spark = SparkSession.builder \
    .appName("MusicRatingsETL") \
    .getOrCreate()

print("Spark session created.")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/22 17:20:18 WARN Utils: Your hostname, Manis-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.99 instead (on interface en0)
25/11/22 17:20:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/22 17:20:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session created.


# 1. Load TSV Files (Cleaned Subsets)

In [2]:
data_path = "data/subsets/cleaned/"

album_path = data_path + "album_info_cleaned.tsv"
critic_path = data_path + "critic_ratings_cleaned.tsv"
user_path = data_path + "user_ratings_cleaned.tsv"

album_df = spark.read.csv(album_path, sep="\t",
                          header=True, inferSchema=True, quote="", escape="")
critic_df = spark.read.csv(critic_path, sep="\t",
                           header=True, inferSchema=True, quote="", escape="")
user_df = spark.read.csv(user_path,   sep="\t", header=True,
                         inferSchema=True, quote="", escape="")

album_df.show(5)
critic_df.show(5)
user_df.show(5)

+--------------------+--------------+--------------------+------------+----------+------------+------------+--------------------+
|                slug|        artist|               album|critic_score|user_score|release_date|release_year|              genres|
+--------------------+--------------+--------------------+------------+----------+------------+------------+--------------------+
|6647-john-coltran...| John Coltrane|         Giant Steps|          95|        86|  January 27|        1960|            Hard Bop|
|6041-miles-davis-...|   Miles Davis|   Sketches of Spain|          96|        83|     July 18|        1960|        Third Stream|
|6654-charles-ming...|Charlie Mingus|       Blues & Roots|          90|        84|       March|        1960|            Post-Bop|
|7022-etta-james-a...|    Etta James|            At Last!|         100|        81| November 15|        1960|Soul|Rhythm and B...|
|21774-max-roach-w...|     Max Roach|We Insist! Max Ro...|          93|        82|    Dece

# 2. Basic Cleaning & Type Fixing

In [3]:
# Convert critic/user date columns into proper timestamps
critic_df = critic_df.withColumn(
    "date", to_timestamp("date", "dd MMM yyyy HH:mm:ss 'GMT'"))
user_df = user_df.withColumn("date", to_timestamp(
    "date", "dd MMM yyyy HH:mm:ss 'GMT'"))

# Ensure numeric fields are numeric
album_df = album_df.withColumn("critic_score", col("critic_score").cast("double")) \
    .withColumn("user_score", col("user_score").cast("double"))

critic_df = critic_df.withColumn("score", col("score").cast("double"))
user_df = user_df.withColumn("score", col("score").cast("double"))

print("Schemas after type fixing:")
album_df.printSchema()
critic_df.printSchema()
user_df.printSchema()

Schemas after type fixing:
root
 |-- slug: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- album: string (nullable = true)
 |-- critic_score: double (nullable = true)
 |-- user_score: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- slug: string (nullable = true)
 |-- publication: string (nullable = true)
 |-- author: string (nullable = true)
 |-- snippet: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- score: double (nullable = true)

root
 |-- slug: string (nullable = true)
 |-- username: string (nullable = true)
 |-- score: double (nullable = true)
 |-- date: timestamp (nullable = true)



# 3. Join Datasets on "slug"

In [4]:
# Join critic and user ratings separately to the album metadata
album_critic = album_df.join(critic_df, on="slug", how="left")
album_user = album_df.join(user_df,   on="slug", how="left")

print("Album + Critic Ratings:")
album_critic.show(5)

print("Album + User Ratings:")
album_user.show(5)

Album + Critic Ratings:
+--------------------+--------------+--------------------+------------+----------+------------+------------+--------------------+-----------+------+-------+----+-----+
|                slug|        artist|               album|critic_score|user_score|release_date|release_year|              genres|publication|author|snippet|date|score|
+--------------------+--------------+--------------------+------------+----------+------------+------------+--------------------+-----------+------+-------+----+-----+
|6647-john-coltran...| John Coltrane|         Giant Steps|        95.0|      86.0|  January 27|        1960|            Hard Bop|       NULL|  NULL|   NULL|NULL| NULL|
|6041-miles-davis-...|   Miles Davis|   Sketches of Spain|        96.0|      83.0|     July 18|        1960|        Third Stream|       NULL|  NULL|   NULL|NULL| NULL|
|6654-charles-ming...|Charlie Mingus|       Blues & Roots|        90.0|      84.0|       March|        1960|            Post-Bop|       

# 4. Aggregation: Compare Critic vs User Ratings (Overall)

In [5]:
overall_summary = (
    album_df
    .agg(
        round(avg("critic_score"), 2).alias("avg_critic_rating"),
        round(avg("user_score"), 2).alias("avg_user_rating")
    )
    .withColumn("difference", col("avg_critic_rating") - col("avg_user_rating"))
)

overall_summary.show()

+-----------------+---------------+------------------+
|avg_critic_rating|avg_user_rating|        difference|
+-----------------+---------------+------------------+
|            73.09|          71.14|1.9500000000000028|
+-----------------+---------------+------------------+

