In [1]:
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@17'


In [None]:
# !pip3 install python-dotenv



In [None]:
# !pip3 install pyspark



In [5]:
import pyspark
pyspark.__version__

'4.1.0'

In [None]:
# !pip3 install psycopg2-binary



In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .config("spark.jars", "postgresql-42.7.1.jar") \
    .getOrCreate()

# Suppress verbose logs immediately after creation
spark.sparkContext.setLogLevel("ERROR")

print("✓ Spark initialized!")
print(f"Version: {spark.version}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/03 16:22:48 WARN Utils: Your hostname, jamess-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.68.128 instead (on interface en0)
26/01/03 16:22:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
26/01/03 16:22:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


✓ Spark initialized!
Version: 4.1.0


In [8]:
from pyspark.sql.functions import input_file_name, regexp_extract, to_date

# Read CSVs with proper quote handling
df = spark.read.csv(
    "spotify_data/*.csv", 
    header=True, 
    inferSchema=True,
    quote='"',              # ← This tells Spark that quoted fields can contain commas
    escape='"',             # ← This handles escaped quotes like ""El Father""
    multiLine=True          # ← This allows fields to span multiple lines
) \
    .withColumn("filename", input_file_name()) \
    .withColumn("chart_date",
        regexp_extract("filename", r"(\d{4}-\d{2}-\d{2})", 1)
    ) \
    .withColumn("chart_date", to_date("chart_date", "yyyy-MM-dd")) \
    .drop("filename")

print(f"Total rows: {df.count():,}")
print(f"Total columns: {len(df.columns)}")
df.printSchema()


Total rows: 6,000
Total columns: 10
root
 |-- rank: integer (nullable = true)
 |-- uri: string (nullable = true)
 |-- artist_names: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- source: string (nullable = true)
 |-- peak_rank: integer (nullable = true)
 |-- previous_rank: integer (nullable = true)
 |-- days_on_chart: integer (nullable = true)
 |-- streams: integer (nullable = true)
 |-- chart_date: date (nullable = false)



In [9]:
from pyspark.sql.functions import explode, split, trim, monotonically_increasing_id, col
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, quarter, date_format

# 1. Create dim_date
dim_date = (df
    .select("chart_date")
    .distinct()
    .withColumn("date_key", date_format(col("chart_date"), "yyyyMMdd").cast("int"))
    .withColumn("full_date", col("chart_date"))
    .withColumn("year", year(col("chart_date")))
    .withColumn("month", month(col("chart_date")))
    .withColumn("quarter", quarter(col("chart_date")))
    .withColumn("day", dayofmonth(col("chart_date")))
    .withColumn("day_of_week", dayofweek(col("chart_date")))
    .withColumn("month_name", date_format(col("chart_date"), "MMMM"))
    .withColumn("day_name", date_format(col("chart_date"), "EEEE"))
    .select("date_key", "full_date","chart_date","year", "month", "quarter", 
            "day", "day_of_week", "month_name", "day_name")
    .orderBy("date_key")
)



In [10]:
# 2. Create dim_artists
dim_artists = (df
    .select("artist_names")
    .distinct()
    .withColumn("artist_name", explode(split(col("artist_names"), ",")))
    .withColumn("artist_name", trim(col("artist_name")))
    .select("artist_name")
    .distinct()
    .orderBy("artist_name")
    .withColumn("artist_sk", monotonically_increasing_id() + 1)
    .select("artist_sk", "artist_name")
)


In [11]:
# 3. Create dim_tracks
dim_tracks = (df
    .select("uri", "track_name", "source")
    .distinct()
    .orderBy("track_name")
    .withColumn("track_sk", monotonically_increasing_id() + 1)
    .select("track_sk", "uri", "track_name", "source")
)

In [12]:
# 4. Create fact_chart_rankings (with exploded artists)
fact_chart_rankings = (df
    # Join with tracks dimension
    .join(dim_tracks.select("track_sk", "uri"), on="uri", how="inner")
    # Join with date dimension
    .join(dim_date.select("date_key", "chart_date"), on="chart_date", how="inner")
    # Explode artists - creates one row per artist per track
    .withColumn("artist_name", explode(split(col("artist_names"), ",")))
    .withColumn("artist_name", trim(col("artist_name")))
    # Join with artists dimension
    .join(dim_artists.select("artist_sk", "artist_name"), on="artist_name", how="inner")
    # Select fact table columns
    .select(
        "track_sk",
        "artist_sk",
        "date_key",
        "rank",
        "streams",
        "previous_rank",
        "peak_rank",
        "days_on_chart"
    )
    .orderBy("date_key", "rank", "artist_sk")
    # Add surrogate key
    .withColumn("ranking_sk", monotonically_increasing_id() + 1)
    # Cast peak_rank to int
    .withColumn("peak_rank", col("peak_rank").cast("int"))
    # Final column selection
    .select(
        "ranking_sk",
        "track_sk",
        "artist_sk",
        "date_key",
        "rank",
        "streams",
        "previous_rank",
        "peak_rank",
        "days_on_chart"
    )
)


In [13]:

print("VALIDATING REFERENTIAL INTEGRITY")

# Check 1: No null foreign keys
null_tracks = fact_chart_rankings.filter(col("track_sk").isNull()).count()
null_artists = fact_chart_rankings.filter(col("artist_sk").isNull()).count()
null_dates = fact_chart_rankings.filter(col("date_key").isNull()).count()

print(f"\nNull foreign keys:")
print(f"  track_sk:  {null_tracks}")
print(f"  artist_sk: {null_artists}")
print(f"  date_key:  {null_dates}")

if null_tracks > 0 or null_artists > 0 or null_dates > 0:
    print("\n INTEGRITY VIOLATION: Null foreign keys found!")
else:
    print("\n✓ No null foreign keys")

# Check 2: All foreign keys exist in dimensions
invalid_tracks = (fact_chart_rankings
    .join(dim_tracks.select("track_sk"), on="track_sk", how="left_anti")
    .count()
)

invalid_artists = (fact_chart_rankings
    .join(dim_artists.select("artist_sk"), on="artist_sk", how="left_anti")
    .count()
)

invalid_dates = (fact_chart_rankings
    .join(dim_date.select("date_key"), on="date_key", how="left_anti")
    .count()
)

print(f"\nInvalid foreign key references:")
print(f"  track_sk:  {invalid_tracks}")
print(f"  artist_sk: {invalid_artists}")
print(f"  date_key:  {invalid_dates}")

if invalid_tracks > 0 or invalid_artists > 0 or invalid_dates > 0:
    print("\n INTEGRITY VIOLATION: Invalid foreign keys found!")
else:
    print("\n✓ All foreign keys valid!")

print("REFERENTIAL INTEGRITY: VERIFIED ✓")


VALIDATING REFERENTIAL INTEGRITY

Null foreign keys:
  track_sk:  0
  artist_sk: 0
  date_key:  0

✓ No null foreign keys

Invalid foreign key references:
  track_sk:  0
  artist_sk: 0
  date_key:  0

✓ All foreign keys valid!
REFERENTIAL INTEGRITY: VERIFIED ✓


In [14]:
#summary
print("\n" + "="*60)
print("STAR SCHEMA SUMMARY")
print("="*60)
print(f"\nDimension Tables:")
print(f"  - dim_date:     {dim_date.count():,} rows")
print(f"  - dim_artists:  {dim_artists.count():,} rows")
print(f"  - dim_tracks:   {dim_tracks.count():,} rows")
print(f"\nFact Table:")
print(f"  - fact_chart_rankings: {fact_chart_rankings.count():,} rows")
print("\n" + "="*60)


STAR SCHEMA SUMMARY

Dimension Tables:
  - dim_date:     30 rows
  - dim_artists:  332 rows
  - dim_tracks:   395 rows

Fact Table:
  - fact_chart_rankings: 8,601 rows



In [15]:
print("="*60)
print("dim_date SCHEMA")
print("="*60)
dim_date.printSchema()

print("\n" + "="*60)
print("dim_artists SCHEMA")
print("="*60)
dim_artists.printSchema()

print("\n" + "="*60)
print("dim_tracks SCHEMA")
print("="*60)
dim_tracks.printSchema()

print("\n" + "="*60)
print("fact_chart_rankings SCHEMA")
print("="*60)
fact_chart_rankings.printSchema()

dim_date SCHEMA
root
 |-- date_key: integer (nullable = true)
 |-- full_date: date (nullable = false)
 |-- chart_date: date (nullable = false)
 |-- year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- quarter: integer (nullable = false)
 |-- day: integer (nullable = false)
 |-- day_of_week: integer (nullable = false)
 |-- month_name: string (nullable = false)
 |-- day_name: string (nullable = false)


dim_artists SCHEMA
root
 |-- artist_sk: long (nullable = false)
 |-- artist_name: string (nullable = false)


dim_tracks SCHEMA
root
 |-- track_sk: long (nullable = false)
 |-- uri: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- source: string (nullable = true)


fact_chart_rankings SCHEMA
root
 |-- ranking_sk: long (nullable = false)
 |-- track_sk: long (nullable = false)
 |-- artist_sk: long (nullable = false)
 |-- date_key: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- streams: integer (nullable = true)
 |-- previou

In [16]:
# STEP 2: CREATE DATABASE SCHEMA (Run SQL DDL)

import psycopg2

# Database connection details
db_config = {
    "host": "localhost",
    "database": "spotify_db",
    "user": "james",
    "password": ""
}

# Connect and create schema
conn = psycopg2.connect(**db_config)
cur = conn.cursor()

# Read and execute schema.sql
print("\nExecuting schema.sql...")
with open('./sql/schema.sql', 'r') as f:
    schema_sql = f.read()
    cur.execute(schema_sql)

conn.commit()
cur.close()
conn.close()

print("✓ Database schema created!")
print("  - Tables created with PKs and FKs")
print("  - Indexes created")
print("  - Constraints enforced")


Executing schema.sql...
✓ Database schema created!
  - Tables created with PKs and FKs
  - Indexes created
  - Constraints enforced


In [17]:
# STEP 3: LOAD DATA INTO POSTGRESQL
# ============================================================================

print("\n" + "="*60)
print("LOADING DATA TO POSTGRESQL")
print("="*60)

jdbc_url = "jdbc:postgresql://localhost:5432/spotify_db"
connection_properties = {
    "user": "james",
    "password": "",
    "driver": "org.postgresql.Driver"
}

# Load dimensions first (order matters due to FKs!)
print("\n1. Loading dim_date...")
dim_date.write.jdbc(
    url=jdbc_url,
    table="dim_date",
    mode="append",  # Tables already exist from schema.sql
    properties=connection_properties
)
print(f"   ✓ {dim_date.count()} rows loaded")

print("\n2. Loading dim_artists...")
dim_artists.write.jdbc(
    url=jdbc_url,
    table="dim_artists",
    mode="append",
    properties=connection_properties
)
print(f"   ✓ {dim_artists.count()} rows loaded")

print("\n3. Loading dim_tracks...")
dim_tracks.write.jdbc(
    url=jdbc_url,
    table="dim_tracks",
    mode="append",
    properties=connection_properties
)
print(f"   ✓ {dim_tracks.count()} rows loaded")

# Load fact table last (references dimensions)
print("\n4. Loading fact_chart_rankings...")
fact_chart_rankings.write.jdbc(
    url=jdbc_url,
    table="fact_chart_rankings",
    mode="append",
    properties=connection_properties
)
print(f"   ✓ {fact_chart_rankings.count():,} rows loaded")

print("\n" + "="*60)
print("DATA LOAD COMPLETE!")
print("="*60)
print("\nStar schema successfully loaded with:")
print("  ✓ Primary keys")
print("  ✓ Foreign keys")
print("  ✓ Indexes")
print("  ✓ Referential integrity enforced")


LOADING DATA TO POSTGRESQL

1. Loading dim_date...
   ✓ 30 rows loaded

2. Loading dim_artists...
   ✓ 332 rows loaded

3. Loading dim_tracks...
   ✓ 395 rows loaded

4. Loading fact_chart_rankings...
   ✓ 8,601 rows loaded

DATA LOAD COMPLETE!

Star schema successfully loaded with:
  ✓ Primary keys
  ✓ Foreign keys
  ✓ Indexes
  ✓ Referential integrity enforced


In [18]:
pip freeze > requirements.txt

Note: you may need to restart the kernel to use updated packages.


26/01/03 17:27:28 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:359)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:131)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:707)
	at org.apache.spark.storage.BlockManagerMasterE