In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, sum as spark_sum, countDistinct
import time

# Initialize Spark session
spark = SparkSession.builder \
    .appName("spotify-datalake") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "1024M") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


In [None]:
playlists_v1_path = '/shared/sampled/playlists_v1.json'
playlists_v2_path = '/shared/sampled/playlists_v2.json'
playlists_v3_path = '/shared/sampled/playlists_v3.json'
tracks_v1_path = '/shared/sampled/tracks_v1.json'
tracks_v2_path = '/shared/sampled/tracks_v2.json'
tracks_v3_path = '/shared/sampled/tracks_v3.json'

In [1]:
# considering v2 as bronze layer to avoid disk usage
playlists_v2_df = spark.read.json(playlists_v2_path)
tracks_v2_df = spark.read.json(tracks_v2_path)

NameError: name 'spark' is not defined

# Silver Layer

In [2]:
silver_playlists = spark.read.parquet("/silver/parquet/playlists/")
silver_tracks = spark.read.parquet("/silver/parquet/playlist_tracks")
silver_songs = spark.read.parquet("/silver/parquet/songs/")

NameError: name 'spark' is not defined

In [None]:
updated_tracks = silver_tracks.alias("old").join(
    tracks_v2_df.alias("new"),
    (col("old.pid") == col("new.pid")) & (col("old.pos") == col("new.pos")),
    "outer"
).select(
    col("new.pid").alias("pid"),
    col("new.pos").alias("pos"),
    col("new.track_uri").alias("track_uri"),
    col("new.album_uri").alias("album_uri"),
    col("new.artist_uri").alias("artist_uri")
).na.fill({
    "pid": col("old.pid"),
    "pos": col("old.pos"),
    "track_uri": col("old.track_uri"),
    "album_uri": col("old.album_uri"),
    "artist_uri": col("old.artist_uri")
})

In [None]:
updated_playlists = silver_playlists.alias("old").join(
    playlists_v2_df.alias("new"),
    "pid",
    "outer"
).select(
    col("new.pid").alias("pid"),
    col("new.name").alias("name"),
    col("new.description").alias("description"),
    col("new.collaborative").alias("collaborative"),
    col("new.num_followers").alias("num_followers"),
    col("new.last_modified").alias("last_modified")
).na.fill({
    "pid": col("old.pid"),
    "name": col("old.name"),
    "description": col("old.description"),
    "collaborative": col("old.collaborative"),
    "num_followers": col("old.num_followers"),
    "last_modified": col("old.last_modified")
})

In [None]:
updated_tracks.write.mode("overwrite").parquet("/silver/task2/parquet/playlist_tracks/")
updated_playlists.write.mode("overwrite").parquet("/silver/task2/parquet/playlists/")

# Gold Layer

In [None]:
gold_playlist_info = updated_tracks.alias("spt").join(
    silver_songs.alias("ss"), col("spt.track_uri") == col("ss.track_uri"), "inner"
).select(
    col("ss.duration_ms"),
    col("ss.artist_uri").alias("song_artist_uri"),
    col("ss.album_uri"),
    col("ss.track_uri"),
    col("spt.pid").alias("playlist_id")
).groupBy("playlist_id").agg(
    col("playlist_id").alias("pid"),
    spark_sum("duration_ms").alias("total_duration_ms"),
    countDistinct("track_uri").alias("num_tracks"),
    countDistinct("song_artist_uri").alias("num_artists"),
    countDistinct("album_uri").alias("num_albums")
).join(updated_playlists, "pid", "inner").select(
    col("pid").alias("playlist_id"),
    "total_duration_ms",
    "num_tracks",
    "num_artists",
    "num_albums",
    "name",
    "description"
)

In [None]:
gold_playlist_tracks = updated_tracks.alias("spt").join(
    silver_songs.alias("ss"), col("spt.track_uri") == col("ss.track_uri"), "inner"
).select(
    col("ss.artist_uri"),
    col("ss.album_uri"),
    col("ss.track_name"),
    col("spt.pos"),
    col("spt.pid").alias("playlist_id")
).join(
    silver_artists, "artist_uri", "inner"
).select(
    "playlist_id",
    "pos",
    "track_name",
    "album_uri",
    "artist_name"
).join(
    silver_album, "album_uri", "inner"
).select(
    "playlist_id",
    "pos",
    "track_name",
    "artist_name",
    "album_name"
)

In [None]:
gold_playlist_info.write.mode("overwrite").parquet("/gold/task2/parquet/playlists/")
gold_playlist_tracks.write.mode("overwrite").parquet("/gold/task2/parquet/playlist_tracks/")

# Update playlist 11992

In [None]:
# fix incorrect data entry for playlist 11992

updated_playlists = updated_playlists.withColumn(
    "name", when(col("pid") == 11992, "GYM WORKOUT").otherwise(col("name"))
).withColumn(
    "collaborative", when(col("pid") == 11992, lit(True)).otherwise(col("collaborative"))
)

In [None]:
updated_playlists.write.mode("overwrite").parquet("/silver/task2/parquet/playlists/")

In [None]:
gold_playlist_info = updated_tracks.alias("spt").join(
    silver_songs.alias("ss"), col("spt.track_uri") == col("ss.track_uri"), "inner"
).select(
    col("ss.duration_ms"),
    col("ss.artist_uri").alias("song_artist_uri"),
    col("ss.album_uri"),
    col("ss.track_uri"),
    col("spt.pid").alias("playlist_id")
).groupBy("playlist_id").agg(
    col("playlist_id").alias("pid"),
    spark_sum("duration_ms").alias("total_duration_ms"),
    countDistinct("track_uri").alias("num_tracks"),
    countDistinct("song_artist_uri").alias("num_artists"),
    countDistinct("album_uri").alias("num_albums")
).join(updated_playlists, "pid", "inner").select(
    col("pid").alias("playlist_id"),
    "total_duration_ms",
    "num_tracks",
    "num_artists",
    "num_albums",
    "name",
    "description"
)

In [None]:
gold_playlist_tracks = updated_tracks.alias("spt").join(
    silver_songs.alias("ss"), col("spt.track_uri") == col("ss.track_uri"), "inner"
).select(
    col("ss.artist_uri"),
    col("ss.album_uri"),
    col("ss.track_name"),
    col("spt.pos"),
    col("spt.pid").alias("playlist_id")
).join(
    silver_artists, "artist_uri", "inner"
).select(
    "playlist_id",
    "pos",
    "track_name",
    "album_uri",
    "artist_name"
).join(
    silver_album, "album_uri", "inner"
).select(
    "playlist_id",
    "pos",
    "track_name",
    "artist_name",
    "album_name"
)


In [None]:
gold_playlist_info.write.mode("overwrite").parquet("/gold/task2/parquet/playlists/")
gold_playlist_tracks.write.mode("overwrite").parquet("/gold/task2/parquet/playlist_tracks/")

# Ingest new dataset

In [None]:
# Ingest third sample (v3) and repeat the process
playlists_v3_df = spark.read.json(playlists_v3_path)
tracks_v3_df = spark.read.json(tracks_v3_path)

In [None]:
# ---> Considering bronze layer as v3 to avoid disk usage

# playlists_v3_df.write.mode("overwrite").json("/bronze/playlists_v3/")
# tracks_v3_df.write.mode("overwrite").json("/bronze/tracks_v3/")


In [None]:
updated_tracks = updated_tracks.alias("old").join(
    tracks_v3_df.alias("new"),
    (col("old.pid") == col("new.pid")) & (col("old.pos") == col("new.pos")),
    "outer"
).select(
    col("new.pid").alias("pid"),
    col("new.pos").alias("pos"),
    col("new.track_uri").alias("track_uri"),
    col("new.album_uri").alias("album_uri"),
    col("new.artist_uri").alias("artist_uri")
).na.fill({
    "pid": col("old.pid"),
    "pos": col("old.pos"),
    "track_uri": col("old.track_uri"),
    "album_uri": col("old.album_uri"),
    "artist_uri": col("old.artist_uri")
})

In [None]:
updated_playlists = updated_playlists.alias("old").join(
    playlists_v3_df.alias("new"),
    "pid",
    "outer"
).select(
    col("new.pid").alias("pid"),
    col("new.name").alias("name"),
    col("new.description").alias("description"),
    col("new.collaborative").alias("collaborative"),
    col("new.num_followers").alias("num_followers"),
    col("new.last_modified").alias("last_modified")
).na.fill({
    "pid": col("old.pid"),
    "name": col("old.name"),
    "description": col("old.description"),
    "collaborative": col("old.collaborative"),
    "num_followers": col("old.num_followers"),
    "last_modified": col("old.last_modified")
})

In [None]:
updated_tracks.write.mode("overwrite").parquet("/silver/task2/parquet/playlist_tracks/")
updated_playlists.write.mode("overwrite").parquet("/silver/task2/parquet/playlists/")


In [None]:
gold_playlist_info = updated_tracks.alias("spt").join(
    silver_songs.alias("ss"), col("spt.track_uri") == col("ss.track_uri"), "inner"
).select(
    col("ss.duration_ms"),
    col("ss.artist_uri").alias("song_artist_uri"),
    col("ss.album_uri"),
    col("ss.track_uri"),
    col("spt.pid").alias("playlist_id")
).groupBy("playlist_id").agg(
    col("playlist_id").alias("pid"),
    spark_sum("duration_ms").alias("total_duration_ms"),
    countDistinct("track_uri").alias("num_tracks"),
    countDistinct("song_artist_uri").alias("num_artists"),
    countDistinct("album_uri").alias("num_albums")
).join(updated_playlists, "pid", "inner").select(
    col("pid").alias("playlist_id"),
    "total_duration_ms",
    "num_tracks",
    "num_artists",
    "num_albums",
    "name",
    "description"
)

In [None]:
gold_playlist_tracks = updated_tracks.alias("spt").join(
    silver_songs.alias("ss"), col("spt.track_uri") == col("ss.track_uri"), "inner"
).select(
    col("ss.artist_uri"),
    col("ss.album_uri"),
    col("ss.track_name"),
    col("spt.pos"),
    col("spt.pid").alias("playlist_id")
).join(
    silver_artists, "artist_uri", "inner"
).select(
    "playlist_id",
    "pos",
    "track_name",
    "album_uri",
    "artist_name"
).join(
    silver_album, "album_uri", "inner"
).select(
    "playlist_id",
    "pos",
    "track_name",
    "artist_name",
    "album_name"
)

In [None]:
gold_playlist_info.write.mode("overwrite").parquet("/gold/parquet/task2/playlists/")
gold_playlist_tracks.write.mode("overwrite").parquet("/gold/parquet/task2/playlist_tracks/")

A adoção do Parquet em Data Lakes traz desafios como a evolução do schema, que exige gerenciamento rigoroso para adaptar mudanças na estrutura de dados sem comprometer a compatibilidade. Além disso, o formato não suporta nativamente operações de atualização/exclusão eficientes, obrigando a regravação de arquivos inteiros, o que é custoso em grandes volumes. Particionamento inadequado pode gerar problemas de desempenho, e a concorrência em leitura/gravação exige mecanismos extras para garantir consistência.

Outros obstáculos incluem a complexidade no gerenciamento de metadados (essencial para consultas eficientes) e a escolha de estratégias de compressão balanceadas entre armazenamento e desempenho. A integração com ferramentas de análise também pode demandar ajustes. Apesar dessas limitações, o Parquet permanece vantajoso para cenários de leitura, desde que seus desafios sejam mitigados com planejamento e boas práticas de arquitetura.