In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, BooleanType, LongType
from delta.tables import DeltaTable

CATALOGO_ORIGEM = "spotify_analytics"
SCHEMA_ORIGEM = "bronze"
TABELA_ORIGEM = "tb_bronze_search"

CATALOGO_DESTINO = "spotify_analytics"
SCHEMA_DESTINO = "silver"
TABELA_DESTINO = "tb_track_artists"
TABELA_INVALIDOS_DESTINO = "tb_track_artists_invalidos"

nome_tabela_origem = f"{CATALOGO_ORIGEM}.{SCHEMA_ORIGEM}.{TABELA_ORIGEM}"
nome_tabela_destino = f"{CATALOGO_DESTINO}.{SCHEMA_DESTINO}.{TABELA_DESTINO}"
nome_tabela_invalidos = f"{CATALOGO_DESTINO}.{SCHEMA_DESTINO}.{TABELA_INVALIDOS_DESTINO}"

### Configuração

Define origem (Bronze), destino (Silver) e tabela de auditoria.

### Schema Explícito para JSON (com Artists)

Além da estrutura de tracks, inclui array de artistas:
- `tracks.items[].artists[]`: Array de artistas por track
- Cada artista tem: id, name

Estratégia de Explode Duplo:
1. Primeiro explode: tracks.items → uma linha por track
2. Segundo explode: artists → uma linha por combinação track-artist

In [None]:
# Schema incluindo artists (array direto na raiz)
artist_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True)
])

spotify_schema = StructType([
    StructField("items", ArrayType(StructType([
        StructField("id", StringType(), True),
        StructField("artists", ArrayType(artist_schema), True)
    ])), True)
])

### Leitura Incremental + Double Explode

Lê última carga do Bronze e aplica transformações:
1. Parseia JSON
2. **Explode 1**: tracks.items
3. **Explode 2**: artists array
4. Remove duplicatas por (track_id, artist_id)

In [None]:
# Lê última carga do Bronze
df_bronze = spark.read.table(nome_tabela_origem)

max_dt_ingestao = (
    df_bronze
    .agg(F.max(F.col("ingestion_date")).alias("max_ts"))
    .first()["max_ts"]
)

# Parseia JSON
df_parsed = (
    df_bronze
    .filter(F.col("ingestion_date") == F.lit(max_dt_ingestao))
    .withColumn("parsed_data", F.from_json(F.col("raw_json"), spotify_schema))
    .select(
        F.explode(F.col("parsed_data.items")).alias("track"),
        "ingestion_date",
        "source_file"
    )
)

# Explode artistas (segundo explode)
df_exploded_artists = (
    df_parsed
    .select(
        F.col("track.id").alias("track_id"),
        F.explode(F.col("track.artists")).alias("artist"),
        "ingestion_date"
    )
)

# Extrai campos e limpa
df_limpo = (
    df_exploded_artists
    .select(
        F.col("track_id"),
        F.col("artist.id").alias("artist_id"),
        F.trim(F.col("artist.name")).alias("artist_name"),
        F.col("ingestion_date").alias("dt_ingestion"),
        F.lit("spotify_api_search").alias("dc_origem")
    )
    .dropDuplicates(["track_id", "artist_id"])
)

print(f"Registros de track-artists: {df_limpo.count()}")

### Validação de Qualidade

Adiciona flags de validação:
- **flag_track_id_valido**: track_id IS NOT NULL
- **flag_artist_id_valido**: artist_id IS NOT NULL
- **flag_artist_name_valido**: artist_name IS NOT NULL
- **flag_qualidade**: "OK" se todas flags TRUE, senão "ERRO"

In [None]:
df_validacao = (
    df_limpo
    .withColumn("flag_track_id_valido", F.col("track_id").isNotNull())
    .withColumn("flag_artist_id_valido", F.col("artist_id").isNotNull())
    .withColumn("flag_artist_name_valido", F.col("artist_name").isNotNull())
    .withColumn("flag_qualidade",
        F.when(
            F.col("flag_track_id_valido") &
            F.col("flag_artist_id_valido") &
            F.col("flag_artist_name_valido"),
            F.lit("OK")
        ).otherwise(F.lit("ERRO"))
    )
)

df_validos = df_validacao.filter(F.col("flag_qualidade") == "OK")
df_invalidos = df_validacao.filter(F.col("flag_qualidade") == "ERRO")

# Remove flags dos registros válidos
df_silver = df_validos.select(
    "track_id", "artist_id", "artist_name", "dt_ingestion", "dc_origem"
)

print(f"Registros válidos: {df_validos.count()}")
print(f"Registros inválidos: {df_invalidos.count()}")

### MERGE de Registros Válidos

Faz MERGE (UPSERT) em tb_track_artists:
- Match na chave composta (track_id, artist_id)
- **MATCHED**: Atualiza
- **NOT MATCHED**: Insere

In [None]:
delta_table = DeltaTable.forName(spark, nome_tabela_destino)

delta_table.alias("destino").merge(
    df_silver.alias("origem"),
    "destino.track_id = origem.track_id AND destino.artist_id = origem.artist_id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()

print(f"✅ Tabela {nome_tabela_destino} atualizada com sucesso!")

### OVERWRITE de Registros Inválidos

Sobrescreve tabela de auditoria com registros rejeitados.

In [None]:
df_invalidos.write.format("delta").mode("overwrite").saveAsTable(nome_tabela_invalidos)

print(f"✅ Tabela {nome_tabela_invalidos} atualizada para auditoria")

### Verificação Final

Exibe estatísticas da tabela bridge.

In [None]:
# Total de combinações track-artist
total = spark.table(nome_tabela_destino).count()
print(f"Total de track-artists na Silver: {total}")

# Top 5 artistas com mais tracks
spark.sql(f"""
    SELECT artist_name, COUNT(DISTINCT track_id) as qtd_tracks
    FROM {nome_tabela_destino}
    GROUP BY artist_name
    ORDER BY qtd_tracks DESC
    LIMIT 5
""").show(truncate=False)