In [0]:
import requests
import base64
import json
from pyspark.sql.functions import col, split, element_at, rand
from delta.tables import DeltaTable

In [0]:
CLIENT_ID = dbutils.secrets.get(scope="SPOTIFY", key="SPOTIFY_CLIENT_ID")
CLIENT_SECRET = dbutils.secrets.get(scope="SPOTIFY", key="SPOTIFY_CLIENT_SECRET")

In [0]:
access_token = get_spotify_token(CLIENT_ID, CLIENT_SECRET)

In [0]:
IDS_PARA_PROCESSAR = 15
TABELA_HISTORICO = "silver.spotify_eng.streaming_history_2014_202509"
TABELA_DETALHES_FINAL = "silver.spotify_eng.getalbum"

In [0]:
# VERIFICAÇÃO E SINCRONIZAÇÃO DE IDs
print("A sincronizar IDs entre o histórico e a tabela de destino...")

df_ids_historico = (
    spark.table(TABELA_HISTORICO).filter(col("spotify_track_uri").isNotNull())
    .withColumn("track_id", element_at(split(col("spotify_track_uri"), ":"), -1))
    .select("track_id").distinct()
)
tabela_destino_delta = DeltaTable.forName(spark, TABELA_DESTINO)

df_novos_ids_para_inserir = df_ids_historico.alias("hist").join(
    tabela_destino_delta.toDF().alias("dest"),
    col("hist.track_id") == col("dest.track_id"),
    "left_anti"
).withColumn("track_name", lit(None).cast("string")) # Adiciona uma coluna para o schema

if df_novos_ids_para_inserir.count() > 0:
    print(f"Encontrados {df_novos_ids_para_inserir.count()} novos IDs no histórico. A inseri-los na tabela de destino...")
    (tabela_destino_delta.alias("target")
      .merge(df_novos_ids_para_inserir.alias("source"), "target.track_id = source.track_id")
      .whenNotMatchedInsert(values={"track_id": "source.track_id"})
      .execute())
else:
    print("Sincronização concluída. Não foram encontrados novos IDs.")

In [0]:
print("\nA identificar faixas com detalhes em falta")
df_ids_pendentes = spark.table(TABELA_DESTINO).filter(col("track_name").isNull()).select("track_id")
total_pendentes = df_ids_pendentes.count()
print(f"Total de faixas pendentes de enriquecimento: {total_pendentes}")

if total_pendentes == 0:
    print("Todas as faixas já foram enriquecidas. Job concluído.")
    dbutils.notebook.exit("Processo finalizado.")

In [0]:
df_lote_aleatorio = df_ids_pendentes.orderBy(rand()).limit(IDS_PARA_PROCESSAR)
ids_para_api = [row.track_id for row in df_lote_aleatorio.collect()]
print(f"Selecionado um lote aleatório de {len(ids_para_api)} IDs para processar.")

In [0]:
def get_spotify_token(client_id, client_secret):
    auth_url = 'https://accounts.spotify.com/api/token'
    auth_header = base64.b64encode(f"{client_id}:{client_secret}".encode('utf-8')).decode('utf-8')
    headers = {'Authorization': f'Basic {auth_header}'}
    payload = {'grant_type': 'client_credentials'}
    response = requests.post(auth_url, headers=headers, data=payload)
    if response.status_code == 200: return response.json().get('access_token')
    return None

In [0]:
def get_track_details_json(track_ids, token):
    if not token or not track_ids: return None
    tracks_url = 'https://api.spotify.com/v1/tracks'
    headers = {'Authorization': f'Bearer {token}'}
    params = {'ids': ",".join(track_ids)}
    response = requests.get(tracks_url, headers=headers, params=params)
    if response.status_code == 200: return response.json()
    return None

In [0]:
json_response = get_track_details_json(ids_para_api, access_token)
if not json_response or not json_response.get('tracks'): dbutils.notebook.exit("API não retornou dados.")

track_list_raw = [track for track in json_response.get('tracks', []) if track]
flattened_track_list = []
for track in track_list_raw:
    album_info = track.get('album', {})
    primary_artist = track.get('artists', [{}])[0] if track.get('artists') else {}
    flattened_track_list.append({
        "track_id": track.get('id'), "track_name": track.get('name'), "track_popularity": track.get('popularity'), "track_uri": track.get('uri'),
        "album_id": album_info.get('id'), "album_name": album_info.get('name'), "album_type": album_info.get('album_type'), "album_release_date": album_info.get('release_date'),
        "album_release_date_precision": album_info.get('release_date_precision'), "album_total_tracks": album_info.get('total_tracks'), "album_uri": album_info.get('uri'),
        "album_spotify_url": album_info.get('external_urls', {}).get('spotify'),
        "album_image_url_large": album_info.get('images', [{}])[0].get('url') if len(album_info.get('images', [])) > 0 else None,
        "album_image_url_medium": album_info.get('images', [{}, {}])[1].get('url') if len(album_info.get('images', [])) > 1 else None,
        "album_image_url_small": album_info.get('images', [{}, {}, {}])[2].get('url') if len(album_info.get('images', [])) > 2 else None,
        "artist_id": primary_artist.get('id'), "artist_name": primary_artist.get('name'), "artist_uri": primary_artist.get('uri'),
        "artist_spotify_url": primary_artist.get('external_urls', {}).get('spotify'),
        "album_available_markets": ",".join(album_info.get('available_markets', [])),
        "all_artist_names": ", ".join([artist['name'] for artist in track.get('artists', [])]),
    })

if not flattened_track_list: dbutils.notebook.exit("Nenhum dado processado após achatamento.")
    
df_novos_detalhes = spark.createDataFrame(flattened_track_list)
print(f"Foram obtidos detalhes para {df_novos_detalhes.count()} faixas.")

In [0]:
(tabela_destino_delta.alias("target")
  .merge(
    df_novos_detalhes.alias("source"),
    "target.track_id = source.track_id"
  )
  .whenMatchedUpdateAll() # Ação principal: ATUALIZA as colunas se o ID já existir
  .execute()
)

print(f"A tabela '{TABELA_DESTINO}' foi atualizada com sucesso.")