In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, BooleanType, DoubleType
from pyspark.sql.functions import col, regexp_replace, year, lit, when

# 🛠️ CONFIGURATIONS
CATALOG_URI = "http://nessie:19120/api/v1"
WAREHOUSE = "s3a://warehouse/"
STORAGE_URI = "http://172.21.0.3:9000"
CSV_PATH = "/workspace/seed-data/Top_spotify_songs.csv"

# 🔥 Initialize Spark
conf = (
    pyspark.SparkConf()
    .setAppName("spotify_data_app")
    .setMaster("local[*]")  # Runs locally with all available cores
        # Enable logging for Spark History Server
        .set("spark.eventLog.enabled", "true")
        .set("spark.eventLog.dir", "file:///tmp/spark-events")
        .set("spark.history.fs.logDirectory", "file:///tmp/spark-events")
        # Include necessary packages
        .set("spark.jars.packages", "org.postgresql:postgresql:42.7.3,"
                                     "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,"
                                     "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,"
                                     "software.amazon.awssdk:bundle:2.24.8,"
                                     "software.amazon.awssdk:url-connection-client:2.24.8")
        # Enable Iceberg and Nessie extensions
        .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
                                      "org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
        # Configure Nessie catalog
        .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
        .set("spark.sql.catalog.nessie.uri", CATALOG_URI)
        .set("spark.sql.catalog.nessie.ref", "main")
        .set("spark.sql.catalog.nessie.authentication.type", "NONE")
        .set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
        # Set Minio as the S3 endpoint for Iceberg storage
        .set("spark.sql.catalog.nessie.s3.endpoint", STORAGE_URI)
        .set("spark.sql.catalog.nessie.warehouse", WAREHOUSE)
        .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("🚀 Spark Session Started")

# 📥 **STEP 1: READ & CLEAN CSV DATA**
df = spark.read.option("header", True).csv(CSV_PATH)

# ✅ Remove quotes from column values & names
df = df.select([regexp_replace(col(c), '"', '').alias(c) for c in df.columns])
df = df.toDF(*[col_name.strip('"') for col_name in df.columns])

# ✅ Convert necessary columns safely
def safe_cast(df, col_name, dtype):
    return df.withColumn(col_name, when(col(col_name).rlike("^[0-9]+$"), col(col_name).cast(dtype)).otherwise(None))

numeric_cols = ["duration_ms", "popularity", "daily_rank", "daily_movement", "weekly_movement", "key", "mode"]
for col_name in numeric_cols:
    df = safe_cast(df, col_name, IntegerType())

double_cols = ["danceability", "energy", "loudness", "speechiness", "acousticness", "instrumentalness", "liveness", "valence", "tempo"]
for col_name in double_cols:
    df = safe_cast(df, col_name, DoubleType())

df = df.withColumn("is_explicit", col("is_explicit").cast(BooleanType()))
df = df.withColumn("release_year", year(col("release_date"))) if "release_date" in df.columns else df.withColumn("release_year", lit(2000))

print("✅ CSV data cleaned successfully.")

# 🏗️ **STEP 2: CREATE ICEBERG NAMESPACE**
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.spotify")
print("✅ Iceberg namespace 'nessie.spotify' verified.")

# 🔍 **STEP 3: CHECK & CREATE ICEBERG TABLE WITH PARTITIONING**
table_exists = spark.sql("SHOW TABLES IN nessie.spotify").collect()
if not any(row["tableName"].lower() == "tracks" for row in table_exists):
    spark.sql("""
        CREATE TABLE nessie.spotify.tracks (
            spotify_id STRING,
            name STRING,
            artists STRING,
            daily_rank INT,
            daily_movement INT,
            weekly_movement INT,
            country STRING,
            snapshot_date STRING,
            popularity INT,
            is_explicit BOOLEAN,
            duration_ms INT,
            album_name STRING,
            album_release_date STRING,
            danceability DOUBLE,
            energy DOUBLE,
            key INT,
            loudness DOUBLE,
            mode INT,
            speechiness DOUBLE,
            acousticness DOUBLE,
            instrumentalness DOUBLE,
            liveness DOUBLE,
            valence DOUBLE,
            tempo DOUBLE,
            release_year INT
        ) USING iceberg
        PARTITIONED BY (release_year)
        LOCATION 's3a://warehouse/nessie/spotify/tracks';
    """)
    print("✅ Iceberg table `nessie.spotify.tracks` created with partitioning.")

# 🏗️ **STEP 4: CLEAN DATA & WRITE TO ICEBERG WITH PARTITIONING**
expected_columns = [
    "spotify_id", "name", "artists", "daily_rank", "daily_movement", "weekly_movement",
    "country", "snapshot_date", "popularity", "is_explicit", "duration_ms",
    "album_name", "album_release_date", "danceability", "energy", "key",
    "loudness", "mode", "speechiness", "acousticness", "instrumentalness",
    "liveness", "valence", "tempo", "release_year"
]
df = df.select(*expected_columns)

# **Write data to MinIO with partitioning**
df.write.format("iceberg").mode("overwrite").partitionBy("release_year").save("nessie.spotify.tracks")
print("✅ Data successfully written to Iceberg table `nessie.spotify.tracks` with partitioning.")

# 🧐 **STEP 5: VERIFY WRITTEN DATA**
df_read = spark.read.format("iceberg").load("nessie.spotify.tracks")
df_read.show(5)

# 📌 **STEP 6: CHECK TABLE HISTORY**
snapshots = spark.sql("SELECT * FROM nessie.spotify.tracks.history ORDER BY made_current_at DESC")
snapshots.show()

# 🕒 **STEP 7: TIME TRAVEL QUERY**
latest_snapshot = snapshots.collect()[0]["snapshot_id"]
df_time_travel = spark.read.format("iceberg").option("snapshot-id", latest_snapshot).load("nessie.spotify.tracks")
df_time_travel.show(5)

# 🛑 **STEP 8: STOP SPARK SESSION**
spark.stop()
print("✅ Spark Session Stopped")

🚀 Spark Session Started


                                                                                

✅ CSV data cleaned successfully.
✅ Iceberg namespace 'nessie.spotify' verified.


                                                                                

✅ Data successfully written to Iceberg table `nessie.spotify.tracks` with partitioning.
+--------------------+------------------+--------------------+----------+--------------+---------------+-------+-------------+----------+-----------+-----------+--------------------+------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+------------+
|          spotify_id|              name|             artists|daily_rank|daily_movement|weekly_movement|country|snapshot_date|popularity|is_explicit|duration_ms|          album_name|album_release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|release_year|
+--------------------+------------------+--------------------+----------+--------------+---------------+-------+-------------+----------+-----------+-----------+--------------------+------------------+------------+------+---+--------+----+-----------+------------+------------