In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os, re, time, math, random, threading, base64, requests
import json
from glob import glob

In [3]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("himanshuwagh/spotify-million")

print("Path to dataset files:", path)

Path to dataset files: /Users/lucasborges/.cache/kagglehub/datasets/himanshuwagh/spotify-million/versions/1


In [4]:
data_path = os.path.join(path, "data")

# Lista todos os arquivos slice (cada slice possui 1000 playlists)
files = sorted(glob(os.path.join(data_path, "mpd.slice.*.json")))
print(f"Qtd de slices encontrados: {len(files)}")
print("Exemplos:", [os.path.basename(f) for f in files[:5]])


subset_files = files[:100] # Carrega apenas os 100 primeiros slices (100.000 playlists)
print("Arquivos carregados:", [os.path.basename(f) for f in subset_files])

Qtd de slices encontrados: 1000
Exemplos: ['mpd.slice.0-999.json', 'mpd.slice.1000-1999.json', 'mpd.slice.10000-10999.json', 'mpd.slice.100000-100999.json', 'mpd.slice.101000-101999.json']
Arquivos carregados: ['mpd.slice.0-999.json', 'mpd.slice.1000-1999.json', 'mpd.slice.10000-10999.json', 'mpd.slice.100000-100999.json', 'mpd.slice.101000-101999.json', 'mpd.slice.102000-102999.json', 'mpd.slice.103000-103999.json', 'mpd.slice.104000-104999.json', 'mpd.slice.105000-105999.json', 'mpd.slice.106000-106999.json', 'mpd.slice.107000-107999.json', 'mpd.slice.108000-108999.json', 'mpd.slice.109000-109999.json', 'mpd.slice.11000-11999.json', 'mpd.slice.110000-110999.json', 'mpd.slice.111000-111999.json', 'mpd.slice.112000-112999.json', 'mpd.slice.113000-113999.json', 'mpd.slice.114000-114999.json', 'mpd.slice.115000-115999.json', 'mpd.slice.116000-116999.json', 'mpd.slice.117000-117999.json', 'mpd.slice.118000-118999.json', 'mpd.slice.119000-119999.json', 'mpd.slice.12000-12999.json', 'mpd.sl

In [6]:
# Separa playlists e tracks em datasets distintos

# Sub-datasets
playlist_rows = []
track_rows = []

# Campos comuns do MPD
playlist_fields = [
    "pid", "name", "description", "modified_at", "num_tracks", "num_albums", "num_artists",
    "num_followers", "num_edits", "duration_ms", "collaborative", "tracks"
]

track_fields = [
    "track_uri", "track_name", "artist_uri", "artist_name",
    "album_uri", "album_name", "duration_ms", "pos"
]

for f in subset_files:
    with open(f, "r", encoding="utf-8") as fp:
        blob = json.load(fp)

    playlists = blob.get("playlists", [])
    # Flatten de playlists
    for p in playlists:
        playlist_rows.append({k: p.get(k, None) for k in playlist_fields})

        # Flatten de tracks (uma linha por faixa por playlist)
        for t in p.get("tracks", []):
            row = {k: t.get(k, None) for k in track_fields}
            row["pid"] = p.get("pid")  # chave estrangeira para ligar com a playlist
            track_rows.append(row)

# DataFrames finais
df_playlists = pd.DataFrame(playlist_rows)
df_tracks = pd.DataFrame(track_rows)

print("Playlists — shape:", df_playlists.shape)
print("Colunas de playlists:", list(df_playlists.columns))

print("\nTracks — shape:", df_tracks.shape)
print("Colunas de tracks:", list(df_tracks.columns))


Playlists — shape: (100000, 12)
Colunas de playlists: ['pid', 'name', 'description', 'modified_at', 'num_tracks', 'num_albums', 'num_artists', 'num_followers', 'num_edits', 'duration_ms', 'collaborative', 'tracks']

Tracks — shape: (6685101, 9)
Colunas de tracks: ['track_uri', 'track_name', 'artist_uri', 'artist_name', 'album_uri', 'album_name', 'duration_ms', 'pos', 'pid']


In [7]:
df_playlists.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 12 columns):
 #   Column         Non-Null Count   Dtype 
---  ------         --------------   ----- 
 0   pid            100000 non-null  int64 
 1   name           100000 non-null  object
 2   description    1796 non-null    object
 3   modified_at    100000 non-null  int64 
 4   num_tracks     100000 non-null  int64 
 5   num_albums     100000 non-null  int64 
 6   num_artists    100000 non-null  int64 
 7   num_followers  100000 non-null  int64 
 8   num_edits      100000 non-null  int64 
 9   duration_ms    100000 non-null  int64 
 10  collaborative  100000 non-null  object
 11  tracks         100000 non-null  object
dtypes: int64(8), object(4)
memory usage: 9.2+ MB


In [8]:
df_tracks.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6685101 entries, 0 to 6685100
Data columns (total 9 columns):
 #   Column       Dtype 
---  ------       ----- 
 0   track_uri    object
 1   track_name   object
 2   artist_uri   object
 3   artist_name  object
 4   album_uri    object
 5   album_name   object
 6   duration_ms  int64 
 7   pos          int64 
 8   pid          int64 
dtypes: int64(3), object(6)
memory usage: 459.0+ MB


In [9]:
# Salvar datasets "originais"

df_playlists.to_parquet("TCC/data/raw/df_playlists_v0.parquet", index=False)
df_tracks.to_parquet("TCC/data/raw/df_tracks_v0.parquet", index=False)

## Audio features

In [13]:
# Extração do Spotify Track ID (Reccobeats)

import re

# ID do Spotify: 22 caracteres base62
_SPOTIFY_ID_RE = re.compile(r'^[A-Za-z0-9]{22}$')

# URL do Spotify (aceita variações com /intl-xx/)
_SPOTIFY_URL_RE = re.compile(
    r'^https?://open\.spotify\.com/(?:intl-[a-z]{2}/)?track/([A-Za-z0-9]{22})(?:\?.*)?$',
    re.IGNORECASE
)

# URI do Spotify
_SPOTIFY_URI_RE = re.compile(
    r'^spotify:track:([A-Za-z0-9]{22})$'
)

def extract_spotify_id(track_uri: object) -> str | None: #Extração do Spotify Track ID
    if not isinstance(track_uri, str):
        return None
    s = track_uri.strip()
    if not s:
        return None

    m = _SPOTIFY_URI_RE.match(s)
    if m:
        return m.group(1)

    m = _SPOTIFY_URL_RE.match(s)
    if m:
        return m.group(1)

    if _SPOTIFY_ID_RE.match(s):
        return s

    return None

# Aplicação no df_tracks
df_tracks = df_tracks.copy()
df_tracks["track_spotify_id"] = df_tracks["track_uri"].apply(extract_spotify_id)

# Verificar se alguma linha não possui Spotify ID válido
missing = df_tracks["track_spotify_id"].isna().sum()
print(f"Linhas sem Spotify ID válido: {missing}")

Linhas sem Spotify ID válido: 0


In [15]:
import time
import math
import random
import requests
from typing import List, Dict, Any, Iterable, Optional

# Regex para extrair Spotify ID do href 
_SPOTIFY_HREF_RE = re.compile(
    r'^https?://open\.spotify\.com/(?:intl-[a-z]{2}/)?track/([A-Za-z0-9]{22})(?:\?.*)?$',
    re.IGNORECASE
)

def _extract_spotify_id_from_href(href: Optional[str]) -> Optional[str]:
    if not href or not isinstance(href, str):
        return None
    m = _SPOTIFY_HREF_RE.match(href.strip())
    return m.group(1) if m else None

# Colunas esperadas (inclui UUID interno e spotify_id)
_EXPECTED_KEYS = [
    "id",          # UUID interno ReccoBeats (metadado)
    "spotify_id",  # <- chave útil para join depois
    "href",
    "acousticness","danceability","energy","instrumentalness",
    "key","liveness","loudness","mode","speechiness","tempo","valence",
]

def _ensure_keys(d: Dict[str, Any]) -> Dict[str, Any]:
    return {k: d.get(k, None) for k in _EXPECTED_KEYS}

def _flatten_item_from_content(item: Dict[str, Any]) -> Dict[str, Any]:
    """
    A resposta real vem como {"content": [ {...}, ... ]}.
    Cada item já traz as features no próprio nível + 'href' (URL Spotify).
    'id' é UUID interno; extraímos 'spotify_id' do 'href'.
    """
    out: Dict[str, Any] = {}
    out["id"] = item.get("id")
    out["href"] = item.get("href")
    out["spotify_id"] = _extract_spotify_id_from_href(out["href"])

    out["acousticness"]     = item.get("acousticness")
    out["danceability"]     = item.get("danceability")
    out["energy"]           = item.get("energy")
    out["instrumentalness"] = item.get("instrumentalness")
    out["key"]              = item.get("key")
    out["liveness"]         = item.get("liveness")
    out["loudness"]         = item.get("loudness")
    out["mode"]             = item.get("mode")
    out["speechiness"]      = item.get("speechiness")
    out["tempo"]            = item.get("tempo")
    out["valence"]          = item.get("valence")
    return _ensure_keys(out)

def _normalize_response(data: Any) -> List[Dict[str, Any]]:
    """
    Normaliza para lista de dicts com _EXPECTED_KEYS.
    Suporta o formato real: {"content": [ {...}, ... ]}.
    """
    rows: List[Dict[str, Any]] = []
    if data is None:
        return rows

    if isinstance(data, dict):
        if "content" in data and isinstance(data["content"], list):
            for el in data["content"]:
                if isinstance(el, dict):
                    rows.append(_flatten_item_from_content(el))
            return rows
        # Fallback defensivo
        for alt in ("audio_features","audioFeatures","items","results","tracks"):
            if alt in data and isinstance(data[alt], list):
                for el in data[alt]:
                    if isinstance(el, dict):
                        rows.append(_flatten_item_from_content(el))
                return rows
        # Objeto único (raro aqui)
        if data:
            rows.append(_flatten_item_from_content(data))
        return rows

    if isinstance(data, list):
        for el in data:
            if isinstance(el, dict):
                rows.append(_flatten_item_from_content(el))
        return rows

    return rows

def chunked(seq: Iterable[Any], n: int) -> Iterable[list]:
    """Quebra uma sequência em blocos de tamanho n."""
    buf = []
    for item in seq:
        buf.append(item)
        if len(buf) == n:
            yield buf
            buf = []
    if buf:
        yield buf

# Tratamento de rate-limit (429)
class RateLimit(Exception):
    def __init__(self, retry_after: float | None = None):
        self.retry_after = retry_after or 1.0
        super().__init__(f"Rate limited. Retry after ~{self.retry_after}s")

def _fetch_features_batch(ids_batch: List[str], timeout: int = 20) -> List[Dict[str, Any]]:
    """
    Endpoint oficial de múltiplos:
      GET https://api.reccobeats.com/v1/audio-features?ids=id1,id2,...,idN
    Levanta RateLimit se 429, usando Retry-After (com jitter).
    """
    base_url = "https://api.reccobeats.com/v1/audio-features"
    headers = {"Accept": "application/json"}
    params = {"ids": ",".join(ids_batch)}  

    r = requests.get(base_url, headers=headers, params=params, timeout=timeout)

    if r.status_code == 429:
        retry_after = r.headers.get("Retry-After")
        try:
            retry_after = float(retry_after)
        except (TypeError, ValueError):
            retry_after = 2.0  # fallback conservador
        retry_after *= (1.0 + 0.2 * random.random())  # jitter até 20%
        raise RateLimit(retry_after)

    r.raise_for_status()
    return _normalize_response(r.json())

def _read_checkpoint(path: str) -> Optional[pd.DataFrame]:
    try:
        return pd.read_parquet(path)
    except Exception:
        try:
            return pd.read_csv(path)
        except Exception:
            return None

def _migrate_checkpoint(df: pd.DataFrame) -> pd.DataFrame:
    """
    Se o checkpoint antigo não tiver 'spotify_id', cria a coluna a partir de 'href'
    e deduplica por 'spotify_id'. Retorna DF pronto para uso como cache.
    """
    df = df.copy()
    if "spotify_id" not in df.columns:
        if "href" in df.columns:
            df["spotify_id"] = df["href"].apply(_extract_spotify_id_from_href)
        else:
            df["spotify_id"] = None
    # manter apenas colunas conhecidas (quando possível) e deduplicar por spotify_id
    for col in _EXPECTED_KEYS:
        if col not in df.columns:
            df[col] = None
    df = df[_EXPECTED_KEYS]
    df = df.dropna(subset=["spotify_id"]).drop_duplicates(subset=["spotify_id"], keep="first").reset_index(drop=True)
    return df

def _safe_concat_cache(prev: Optional[pd.DataFrame], cur: Optional[pd.DataFrame]) -> pd.DataFrame:
    """
    Concatena prev + cur de forma segura (evita FutureWarning com DFs vazios/NA).
    Deduplica por 'spotify_id'.
    """
    if prev is None or prev.empty:
        base = pd.DataFrame(columns=_EXPECTED_KEYS) if cur is None else cur
    else:
        base = prev
    if cur is None or cur.empty:
        out = base
    else:
        # drop colunas all-NA para evitar dtype indefinido
        base2 = base.dropna(axis=1, how='all')
        cur2 = cur.dropna(axis=1, how='all')
        out = pd.concat([base2, cur2], ignore_index=True)
    if "spotify_id" in out.columns:
        out = out.dropna(subset=["spotify_id"]).drop_duplicates(subset=["spotify_id"], keep="first")
    # garantir todas as colunas esperadas
    for col in _EXPECTED_KEYS:
        if col not in out.columns:
            out[col] = None
    return out[_EXPECTED_KEYS].reset_index(drop=True)

def fetch_audio_features_in_batches(
    track_ids: Iterable[Optional[str]],
    batch_size: int = 20,               #conservador por causa do 429
    pause_seconds: float = 0.5,         #pausa base entre lotes
    timeout: int = 20,
    max_retries: int = 2,               #para erros != 429
    backoff_factor: float = 0.75,
    progress_every: int = 100,          #imprime a cada 100 lotes
    checkpoint_every: int = 2_000,      #salva a cada 2k lotes
    checkpoint_path: Optional[str] = "features_checkpoint.parquet",
    jitter_base_pause: bool = True,     #aplica jitter na pausa base
    early_stop_after_batches: Optional[int] = None,  #ex.: 600
    min_hit_rate: Optional[float] = None,            #ex.: 5.0 (%)
) -> pd.DataFrame:
    """
    Busca audio features em lotes e retorna um DataFrame com colunas _EXPECTED_KEYS.

    - Respeita 429 (Retry-After) repetindo o mesmo lote.
    - Retries + backoff para outros erros.
    - Logs de progresso + hit-rate (conta apenas linhas com spotify_id).
    - Checkpoints periódicos (parquet/csv) e retomada com migração automática.
    - Deduplica por spotify_id.
    - Segunda passada para 5xx com batch_size=5.
    """
    # IDs únicos e válidos (lista)
    ids = pd.Series(track_ids).dropna().astype(str)
    ids = ids[ids.str.len() > 0].unique().tolist()

    # Carregar checkpoint (se existir) + MIGRAR se faltar 'spotify_id'
    prev_ck = None
    collected_spotify_ids: set[str] = set()
    if checkpoint_path and os.path.exists(checkpoint_path):
        ck = _read_checkpoint(checkpoint_path)
        if ck is not None:
            ck = _migrate_checkpoint(ck)
            prev_ck = ck
            collected_spotify_ids = set(ck["spotify_id"].astype(str).unique())
            # pular IDs já coletados
            ids = [x for x in ids if x not in collected_spotify_ids]
            print(f"[Resume] Checkpoint: {len(collected_spotify_ids):,} já coletados | Pendentes: {len(ids):,}")
        else:
            print("[Resume] Falha ao ler checkpoint; seguindo...")

    if not ids:
        # Nada a fazer; devolve o checkpoint migrado como resultado (se existir)
        if prev_ck is not None:
            return prev_ck
        return pd.DataFrame(columns=_EXPECTED_KEYS)

    total_batches = math.ceil(len(ids) / batch_size)
    all_rows: List[Dict[str, Any]] = []

    # métricas
    total_sent = 0
    total_received = 0  #só com spotify_id válido
    last_checkpoint_batch = 0
    failed_ids_5xx: List[str] = []  #para segunda passada

    for b_idx, batch in enumerate(chunked(ids, batch_size), start=1):
        attempt = 0
        while True:
            try:
                rows = _fetch_features_batch(batch, timeout=timeout)
                # contar apenas válidos (com spotify_id)
                valid_rows = [r for r in rows if r.get("spotify_id")]
                all_rows.extend(valid_rows)
                total_sent += len(batch)
                total_received += len(valid_rows)
                attempt = 0
                break
            except RateLimit as rl:
                wait_s = max(rl.retry_after, pause_seconds)
                print(f"[429] Lote {b_idx}/{total_batches}: aguardando {wait_s:.2f}s (Retry-After)")
                time.sleep(wait_s)
                continue
            except requests.HTTPError as e:
                # se for 5xx e estourou os retries, guardamos o lote
                status = getattr(getattr(e, "response", None), "status_code", None)
                attempt += 1
                if attempt > max_retries:
                    if isinstance(status, int) and 500 <= status < 600:
                        failed_ids_5xx.extend(batch)
                    print(f"[ERRO] Lote {b_idx}/{total_batches}: HTTP {e}. Sem mais retries, seguindo.")
                    break
                sleep_s = backoff_factor * (attempt ** 2)
                print(f"[WARN] Lote {b_idx}: HTTPError. Retry {attempt}/{max_retries} em {sleep_s:.2f}s...")
                time.sleep(sleep_s)
            except requests.RequestException as e:
                attempt += 1
                if attempt > max_retries:
                    print(f"[ERRO] Lote {b_idx}/{total_batches}: {e}. Sem mais retries, seguindo.")
                    break
                sleep_s = backoff_factor * (attempt ** 2)
                print(f"[WARN] Lote {b_idx}: NetworkError. Retry {attempt}/{max_retries} em {sleep_s:.2f}s...")
                time.sleep(sleep_s)

        # progresso + hit-rate
        if (b_idx % progress_every == 0) or (b_idx == 1):
            hit_rate = (total_received / total_sent * 100.0) if total_sent else 0.0
            print(f"[Progresso] {b_idx}/{total_batches} lotes | enviados: {total_sent:,} | recebidos: {total_received:,} | hit-rate: {hit_rate:.2f}%")

        # early-stop opcional
        if early_stop_after_batches and min_hit_rate is not None and b_idx % early_stop_after_batches == 0:
            hit_rate = (total_received / total_sent * 100.0) if total_sent else 0.0
            if hit_rate < min_hit_rate:
                print(f"[HALT] Hit-rate {hit_rate:.2f}% abaixo de {min_hit_rate:.2f}% após {b_idx} lotes. Interrompendo para reavaliar.")
                break

        # checkpoint periódico
        if checkpoint_path and (b_idx % checkpoint_every == 0 or b_idx == total_batches):
            df_ck_new = pd.DataFrame(all_rows, columns=_EXPECTED_KEYS)
            if not df_ck_new.empty:
                df_ck_new = df_ck_new.dropna(subset=["spotify_id"]).drop_duplicates(subset=["spotify_id"], keep="first")
            # concat segura com o checkpoint anterior
            to_write = _safe_concat_cache(prev_ck, df_ck_new)
            # salvar
            if checkpoint_path.lower().endswith(".parquet"):
                to_write.to_parquet(checkpoint_path, index=False)
            else:
                to_write.to_csv(checkpoint_path, index=False)
            prev_ck = to_write
            last_checkpoint_batch = b_idx
            print(f"[Checkpoint] Salvo em '{checkpoint_path}' (lote {b_idx}/{total_batches}).")

        # pausa base (com jitter opcional)
        if jitter_base_pause:
            time.sleep(pause_seconds * (0.9 + 0.2 * random.random()))
        else:
            time.sleep(pause_seconds)

    # Segunda passada para 5xx (com lotes menores)
    if failed_ids_5xx:
        print(f"[Retry pass] Reprocessando {len(failed_ids_5xx)} IDs de 5xx com batch_size=5...")
        for b2_idx, batch2 in enumerate(chunked(failed_ids_5xx, 5), start=1):
            attempt = 0
            while True:
                try:
                    rows2 = _fetch_features_batch(batch2, timeout=timeout)
                    valid2 = [r for r in rows2 if r.get("spotify_id")]
                    all_rows.extend(valid2)
                    total_sent += len(batch2)
                    total_received += len(valid2)
                    break
                except RateLimit as rl:
                    wait_s = max(rl.retry_after, pause_seconds)
                    print(f"[429][retry pass] Lote {b2_idx}: aguardando {wait_s:.2f}s...")
                    time.sleep(wait_s)
                    continue
                except requests.RequestException as e2:
                    attempt += 1
                    if attempt > max_retries:
                        print(f"[ERRO][retry pass] Lote {b2_idx}: {e2}. desistindo deste mini-lote.")
                        break
                    sleep_s = backoff_factor * (attempt ** 2)
                    print(f"[WARN][retry pass] Lote {b2_idx}: retry {attempt}/{max_retries} em {sleep_s:.2f}s...")
                    time.sleep(sleep_s)

        # salvar checkpoint após retry pass
        if checkpoint_path:
            df_ck_retry = pd.DataFrame(all_rows, columns=_EXPECTED_KEYS)
            if not df_ck_retry.empty:
                df_ck_retry = df_ck_retry.dropna(subset=["spotify_id"]).drop_duplicates(subset=["spotify_id"], keep="first")
            to_write = _safe_concat_cache(prev_ck, df_ck_retry)
            if checkpoint_path.lower().endswith(".parquet"):
                to_write.to_parquet(checkpoint_path, index=False)
            else:
                to_write.to_csv(checkpoint_path, index=False)
            prev_ck = to_write
            print(f"[Checkpoint] Salvo após retry pass em '{checkpoint_path}'.")

    # DF final
    df_features = pd.DataFrame(all_rows, columns=_EXPECTED_KEYS)
    if not df_features.empty and "spotify_id" in df_features.columns:
        df_features = df_features.dropna(subset=["spotify_id"]).drop_duplicates(subset=["spotify_id"], keep="first").reset_index(drop=True)

    # Se houver checkpoint acumulando mais dados, devolve a união segura
    if prev_ck is not None:
        df_features = _safe_concat_cache(prev_ck, df_features)

    return df_features

In [16]:
# Número de IDs únicos no df_tracks

n_valid = df_tracks["track_spotify_id"].notna().sum()
n_unique = df_tracks["track_spotify_id"].dropna().nunique()

print(f"IDs válidos: {n_valid:,}")
print(f"IDs únicos : {n_unique:,}")

IDs válidos: 6,685,101
IDs únicos : 679,889


In [17]:
# Coleta dos dados:

subset_tracks = df_tracks["track_spotify_id"].dropna().unique()
df_features = fetch_audio_features_in_batches(
    track_ids=subset_tracks,
    batch_size=40,
    pause_seconds=0.25, 
    progress_every=1000,
    checkpoint_every=2_000,
    checkpoint_path="features_checkpoint.parquet",
)
print(df_features.shape)
display(df_features.head())
df_features.to_parquet("TCC/data/raw/df_audio_features.parquet", index=False)

[Resume] Checkpoint: 572,008 já coletados | Pendentes: 292,609
[Progresso] 1/7316 lotes | enviados: 40 | recebidos: 0 | hit-rate: 0.00%


KeyboardInterrupt: 

## Albums features

In [13]:
# Spotify - dim_albums (paralelo, com auto-refresh, checkpoint, rate limit e cap de Retry-After)

from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Iterable, Optional

# Constantes e helpers
DIM_COLS = ["id", "release_date", "release_date_precision", "album_type", "total_tracks", "label", "popularity"]
_ALBUM_URI_RE = re.compile(r"^spotify:album:([A-Za-z0-9]{22})$")

def extract_album_id(uri: str) -> Optional[str]:
    if isinstance(uri, str):
        m = _ALBUM_URI_RE.match(uri.strip())
        if m:
            return m.group(1)
    return None

def chunked(seq: Iterable[Any], n: int):
    buf = []
    for x in seq:
        buf.append(x)
        if len(buf) == n:
            yield buf; buf = []
    if buf:
        yield buf

# -------- Exceções --------
class RateLimit(Exception):
    def __init__(self, retry_after: float = 1.0):
        self.retry_after = retry_after
        super().__init__(f"429 – espere ~{self.retry_after}s")

class TokenExpired(Exception):
    pass

# -------- Auth (Client Credentials) --------
def get_spotify_token(client_id: str, client_secret: str) -> tuple[str, int]:
    if not client_id or not client_secret:
        raise ValueError("Defina SPOTIFY_CLIENT_ID e SPOTIFY_CLIENT_SECRET.")
    auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
    headers = {"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"}
    data = {"grant_type": "client_credentials"}
    r = requests.post("https://accounts.spotify.com/api/token", headers=headers, data=data, timeout=20)
    try:
        r.raise_for_status()
    except requests.HTTPError as e:
        try: detail = r.json()
        except Exception: detail = r.text
        raise RuntimeError(f"Falha ao obter token ({r.status_code}). Detalhes: {detail}") from e
    j = r.json()
    return j["access_token"], int(j.get("expires_in", 3600))

def _normalize_albums_response(data: Dict[str, Any]) -> List[Dict[str, Any]]:
    out = []
    for a in data.get("albums") or []:
        if not isinstance(a, dict) or a is None:
            continue
        out.append({
            "id": a.get("id"),
            "release_date": a.get("release_date"),
            "release_date_precision": a.get("release_date_precision"),
            "album_type": a.get("album_type"),
            "total_tracks": a.get("total_tracks"),
            "label": a.get("label"),
            "popularity": a.get("popularity"),
        })
    return out

def _read_checkpoint(path: str) -> Optional[pd.DataFrame]:
    try:
        return pd.read_parquet(path)
    except Exception:
        try: return pd.read_csv(path)
        except Exception: return None

def _safe_concat(prev: Optional[pd.DataFrame], cur: Optional[pd.DataFrame]) -> pd.DataFrame:
    if prev is None or prev.empty:
        base = pd.DataFrame(columns=DIM_COLS) if cur is None else cur
    else:
        base = prev
    if cur is None or cur.empty:
        out = base
    else:
        out = pd.concat([base, cur], ignore_index=True)
    out = out.dropna(subset=["id"]).drop_duplicates(subset=["id"])
    if not out.empty:
        out["total_tracks"] = pd.to_numeric(out["total_tracks"], errors="coerce").astype("Int16")
        out["popularity"]   = pd.to_numeric(out["popularity"], errors="coerce").astype("Int16")
    return out[DIM_COLS].reset_index(drop=True)

# -------- Rate limit global (RPS) e cooldown 429 --------
class GlobalCooldown:
    """Coordena espera global quando qualquer worker recebe 429."""
    def __init__(self):
        self._until = 0.0
        self._lock = threading.Lock()
    def wait(self):
        while True:
            with self._lock:
                until = self._until
            now = time.time()
            if now >= until:
                return
            time.sleep(min(1.0, until - now))
    def set(self, seconds: float):
        with self._lock:
            self._until = max(self._until, time.time() + float(seconds))

class GlobalRateLimiter:
    """Limita o número de requests por segundo no conjunto de threads."""
    def __init__(self, rps: float):
        self.rps = max(0.0, float(rps))
        self.lock = threading.Lock()
        self.next_t = 0.0
    def wait(self):
        if self.rps <= 0:
            return
        interval = 1.0 / self.rps
        with self.lock:
            now = time.time()
            if now < self.next_t:
                time.sleep(self.next_t - now)
                now = time.time()
            # agenda próximo slot
            self.next_t = max(now, self.next_t) + interval

# -------- Token manager (thread-safe) --------
class TokenManager:
    def __init__(self, client_id: str, client_secret: str, ttl_margin: int = 120):
        self.client_id = client_id
        self.client_secret = client_secret
        self.ttl_margin = ttl_margin
        self._lock = threading.RLock()
        self._token = None
        self._exp = 0.0
        self.refresh()
    def refresh(self):
        with self._lock:
            tok, ttl = get_spotify_token(self.client_id, self.client_secret)
            self._token = tok
            self._exp = time.time() + (ttl or 3600)
    def get(self) -> str:
        with self._lock:
            if time.time() > self._exp - self.ttl_margin:
                old = self._token
                self.refresh()
                if old != self._token:
                    print("[Auth] Token renovado proativamente.")
            return self._token
    def force_refresh(self):
        with self._lock:
            self.refresh()
            print("[Auth] Token renovado após 401.")

# -------- requests.Session por thread --------
_thread_local = threading.local()
def _get_session() -> requests.Session:
    s = getattr(_thread_local, "session", None)
    if s is None:
        s = requests.Session()
        _thread_local.session = s
    return s

def _fetch_batch_worker(
    ids_batch: List[str],
    token_mgr: TokenManager,
    cooldown: GlobalCooldown,
    ratelimiter: GlobalRateLimiter,
    timeout: int = 20,
    max_retries: int = 3,
    backoff_factor: float = 0.75,
    max_retry_after_seconds: float = 120.0,
) -> Dict[str, Any]:
    """Busca 1 batch (<=20 IDs). Respeita 429 global, RPS global e renova token em 401.
       Se Retry-After > max_retry_after_seconds, sinaliza 'hard_429' para interromper a coleta."""
    sess = _get_session()
    url = "https://api.spotify.com/v1/albums"
    params = {"ids": ",".join(ids_batch[:20])}
    attempt = 0
    while True:
        cooldown.wait()       # espera global por 429 anterior
        ratelimiter.wait()    # impõe RPS global
        token = token_mgr.get()
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        try:
            r = sess.get(url, headers=headers, params=params, timeout=timeout)
        except requests.RequestException as e:
            attempt += 1
            if attempt > max_retries:
                return {"rows": [], "sent": len(ids_batch), "recv": 0, "error": f"NetworkError: {e}"}
            time.sleep(backoff_factor * (attempt ** 2) * (1.0 + 0.2*random.random()))
            continue

        if r.status_code == 200:
            data = r.json()
            rows = _normalize_albums_response(data)
            return {"rows": rows, "sent": len(ids_batch), "recv": sum(1 for x in rows if x.get("id")), "error": None}

        if r.status_code == 401:
            token_mgr.force_refresh()
            continue

        if r.status_code == 429:
            ra_hdr = r.headers.get("Retry-After")
            try:
                wait = float(ra_hdr) if ra_hdr is not None else 2.0
            except ValueError:
                wait = 2.0
            # se vier um Retry-After absurdo (p. ex., ~90.000s), interrompe graciosamente
            if wait > max_retry_after_seconds:
                return {"rows": [], "sent": len(ids_batch), "recv": 0, "error": f"429 Retry-After={wait}", "hard_429": True, "retry_after": wait}
            wait *= (1.0 + 0.2*random.random())
            print(f"[429] Aguardando {wait:.2f}s (batch de {len(ids_batch)})")
            cooldown.set(wait)
            time.sleep(wait)
            continue

        attempt += 1
        if attempt > max_retries:
            return {"rows": [], "sent": len(ids_batch), "recv": 0, "error": f"HTTP {r.status_code}: {r.text[:200]}"}
        time.sleep(backoff_factor * (attempt ** 2) * (1.0 + 0.2*random.random()))

def fetch_dim_albums_parallel(
    album_ids: Iterable[str],
    client_id: str,
    client_secret: str,
    batch_size: int = 20,           # Spotify: máx 20
    max_workers: int = 4,           # comece baixo
    in_flight_multiplier: int = 2,  # poucos pedidos simultâneos
    rate_limit_per_sec: float = 0.5,# ex.: 0.5 = 1 req a cada 2s
    timeout: int = 12,
    progress_every: int = 50,
    checkpoint_every: int = 1000,
    checkpoint_path: str = "dim_albums_checkpoint.parquet",
    max_retry_after_seconds: float = 120.0,  # se vier maior que isso, interrompe e devolve
) -> pd.DataFrame:

    ids = pd.Series(album_ids).dropna().astype(str)
    ids = ids[ids.str.len() > 0].unique().tolist()

    prev_ck = None
    collected_ids = set()
    if checkpoint_path and os.path.exists(checkpoint_path):
        ck = _read_checkpoint(checkpoint_path)
        if ck is not None and "id" in ck.columns:
            prev_ck = ck[DIM_COLS].copy()
            prev_ck = prev_ck.dropna(subset=["id"]).drop_duplicates("id")
            collected_ids = set(prev_ck["id"].astype(str))
            ids = [x for x in ids if x not in collected_ids]
            # Aqui não temos mapeamento album_id->coletado; continuamos buscando todos,
            # mas o dedupe evita duplicar; se quiser otimizar, mantenha um mapa das ids colhidas.
            print(f"[Resume] Checkpoint: {len(prev_ck):,} já coletados.")

    if not ids:
        return prev_ck if prev_ck is not None else pd.DataFrame(columns=DIM_COLS)

    batch_size = max(1, min(int(batch_size), 20))
    batches: List[List[str]] = [ids[i:i+batch_size] for i in range(0, len(ids), batch_size)]
    total_batches = len(batches)

    token_mgr = TokenManager(client_id, client_secret, ttl_margin=120)
    cooldown = GlobalCooldown()
    ratelimiter = GlobalRateLimiter(rate_limit_per_sec)

    all_rows: List[Dict[str, Any]] = []
    completed = 0
    total_sent = 0
    total_recv = 0

    inflight_limit = max_workers * in_flight_multiplier
    i = 0
    futures = set()

    def flush_checkpoint():
        nonlocal all_rows, prev_ck
        if not all_rows:
            return
        df_new = pd.DataFrame(all_rows, columns=DIM_COLS)
        df_new = df_new.dropna(subset=["id"]).drop_duplicates("id")
        to_write = _safe_concat(prev_ck, df_new)
        if checkpoint_path.lower().endswith(".parquet"):
            to_write.to_parquet(checkpoint_path, index=False)
        else:
            to_write.to_csv(checkpoint_path, index=False)
        prev_ck = to_write
        all_rows = []

    print(f"[Dispatch] Total batches: {total_batches} | max_workers={max_workers} | in_flight={inflight_limit} | rps={rate_limit_per_sec}")

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        # pré-enche o pipeline
        while i < total_batches and len(futures) < inflight_limit:
            fut = ex.submit(_fetch_batch_worker, batches[i], token_mgr, cooldown, ratelimiter,
                            timeout, 3, 0.75, max_retry_after_seconds)
            futures.add(fut); i += 1
        print(f"[Dispatch] Primeiros {len(futures)} futures submetidos.")

        # processa conforme termina
        while futures:
            for fut in as_completed(list(futures), timeout=None):
                futures.remove(fut)
                res = fut.result()
                total_sent += res.get("sent", 0)
                total_recv += res.get("recv", 0)
                rows = res.get("rows") or []
                if rows:
                    all_rows.extend(rows)

                # hard_429: Retry-After muito alto → salva checkpoint e interrompe
                if res.get("hard_429"):
                    wait = res.get("retry_after")
                    print(f"[HARD 429] Retry-After ~{wait:.0f}s. Interrompendo agora e salvando checkpoint. Retome mais tarde.")
                    flush_checkpoint()
                    # monta DF parcial e retorna
                    df_dim = pd.DataFrame(all_rows, columns=DIM_COLS)
                    if not df_dim.empty:
                        df_dim = df_dim.dropna(subset=["id"]).drop_duplicates("id").reset_index(drop=True)
                        df_dim["total_tracks"] = pd.to_numeric(df_dim["total_tracks"], errors="coerce").astype("Int16")
                        df_dim["popularity"]   = pd.to_numeric(df_dim["popularity"], errors="coerce").astype("Int16")
                    if prev_ck is not None and not prev_ck.empty:
                        df_dim = _safe_concat(prev_ck, df_dim)
                    return df_dim

                completed += 1
                if (completed % progress_every == 0) or (completed == 1):
                    hit = (100.0 * total_recv / total_sent) if total_sent else 0.0
                    print(f"[Progresso] {completed}/{total_batches} lotes | enviados: {total_sent:,} | recebidos: {total_recv:,} | hit-rate: {hit:.2f}%")

                if checkpoint_path and (completed % checkpoint_every == 0 or completed == total_batches):
                    flush_checkpoint()

                # reabastece
                if i < total_batches:
                    fut2 = ex.submit(_fetch_batch_worker, batches[i], token_mgr, cooldown, ratelimiter,
                                     timeout, 3, 0.75, max_retry_after_seconds)
                    futures.add(fut2); i += 1

    # DF final
    df_dim = pd.DataFrame(all_rows, columns=DIM_COLS)
    if not df_dim.empty:
        df_dim = df_dim.dropna(subset=["id"]).drop_duplicates("id").reset_index(drop=True)
        df_dim["total_tracks"] = pd.to_numeric(df_dim["total_tracks"], errors="coerce").astype("Int16")
        df_dim["popularity"]   = pd.to_numeric(df_dim["popularity"], errors="coerce").astype("Int16")

    if prev_ck is not None and not prev_ck.empty:
        df_dim = _safe_concat(prev_ck, df_dim)

    return df_dim

In [15]:
%env SPOTIFY_CLIENT_ID=9c9030a3c0234c128f8a9d1302c4a695
%env SPOTIFY_CLIENT_SECRET=20d8ceab184a4cff9d66c9a749f31053

env: SPOTIFY_CLIENT_ID=9c9030a3c0234c128f8a9d1302c4a695
env: SPOTIFY_CLIENT_SECRET=20d8ceab184a4cff9d66c9a749f31053


In [16]:
# Credenciais Spotify (no Jupyter, SEM aspas nas %env)
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")

# Garante a coluna de IDs de álbuns
if "album_spotify_id" not in df_tracks.columns:
    df_tracks = df_tracks.copy()
    df_tracks["album_spotify_id"] = df_tracks["album_uri"].apply(extract_album_id)

album_ids_unique = df_tracks["album_spotify_id"].dropna().unique().tolist()
print(f"Álbuns únicos no dataset: {len(album_ids_unique):,}")

# Rodada "leve" para destravar (pode aumentar depois)
dim_albums = fetch_dim_albums_parallel(
    album_ids=album_ids_unique,
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    batch_size=20,                 # máx 20 por chamada
    max_workers=4,                 
    in_flight_multiplier=3,        
    rate_limit_per_sec=1,        
    timeout=12,
    progress_every=100,
    checkpoint_every=500,
    checkpoint_path="dim_albums_checkpoint.parquet",
    max_retry_after_seconds=120.0, # se vier Retry-After > 120s, interrompe e salva
)

print(dim_albums.shape)
display(dim_albums.head())
dim_albums.to_parquet("TCC/data/raw/dim_albums.parquet", index=False)

Álbuns únicos no dataset: 270,156
[Resume] Checkpoint: 270,082 já coletados.
[Dispatch] Total batches: 4 | max_workers=4 | in_flight=12 | rps=1
[Dispatch] Primeiros 4 futures submetidos.
[Progresso] 1/4 lotes | enviados: 20 | recebidos: 0 | hit-rate: 0.00%
(270082, 7)


Unnamed: 0,id,release_date,release_date_precision,album_type,total_tracks,label,popularity
0,6vV5UrXcfyQD1wu4Qo2I9K,2005-07-04,day,album,16,Atlantic Records/ATG,57
1,0z7pVBGOD7HCIB7S8eLkLI,2003-11-13,day,album,13,Jive,74
2,25hVFAxTlDvXbx2X2QkUkE,2003-06-23,day,album,16,Columbia,11
3,6QPkyl04rXwTGlGlcYaRoW,2002-11-04,day,album,13,Jive,75
4,6NmFmPX56pcLBOFMhIiKvF,2000,year,album,15,Geffen,0


## Artists features

In [17]:
# Spotify - dim_artists (paralelo, com auto-refresh, checkpoint, rate limit e cap de Retry-After)

import os, re, time, math, random, threading, base64, requests
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Iterable, Optional

# Constantes e helpers 
ARTIST_DIM_COLS = ["id", "followers_total", "popularity", "genres"]
_ARTIST_URI_RE = re.compile(r"^spotify:artist:([A-Za-z0-9]{22})$")

def extract_artist_id(uri: str) -> Optional[str]:
    if isinstance(uri, str):
        m = _ARTIST_URI_RE.match(uri.strip())
        if m:
            return m.group(1)
    return None

def chunked(seq: Iterable[Any], n: int):
    buf = []
    for x in seq:
        buf.append(x)
        if len(buf) == n:
            yield buf; buf = []
    if buf:
        yield buf

# -------- Exceções --------
class RateLimit(Exception):
    def __init__(self, retry_after: float = 1.0):
        self.retry_after = retry_after
        super().__init__(f"429 – espere ~{self.retry_after}s")

class TokenExpired(Exception):
    pass

# -------- Auth (Client Credentials) --------
def get_spotify_token(client_id: str, client_secret: str) -> tuple[str, int]:
    if not client_id or not client_secret:
        raise ValueError("Defina SPOTIFY_CLIENT_ID e SPOTIFY_CLIENT_SECRET.")
    auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
    headers = {"Authorization": f"Basic {auth}", "Content-Type": "application/x-www-form-urlencoded"}
    data = {"grant_type": "client_credentials"}
    r = requests.post("https://accounts.spotify.com/api/token", headers=headers, data=data, timeout=20)
    r.raise_for_status()
    j = r.json()
    return j["access_token"], int(j.get("expires_in", 3600))

# -------- Normalização do /v1/artists --------
def _normalize_artists_response(data: Dict[str, Any]) -> List[Dict[str, Any]]:
    out = []
    for a in data.get("artists") or []:
        if not isinstance(a, dict) or a is None:
            continue
        followers = a.get("followers") or {}
        row = {
            "id": a.get("id"),
            "followers_total": followers.get("total"),
            "popularity": a.get("popularity"),
            "genres": a.get("genres") or [],
        }
        out.append(row)
    return out

# -------- Checkpoint helpers --------
def _read_checkpoint(path: str) -> Optional[pd.DataFrame]:
    try:
        return pd.read_parquet(path)
    except Exception:
        try:
            return pd.read_csv(path)
        except Exception:
            return None

def _safe_concat_artists(prev: Optional[pd.DataFrame], cur: Optional[pd.DataFrame]) -> pd.DataFrame:
    if prev is None or prev.empty:
        base = pd.DataFrame(columns=ARTIST_DIM_COLS) if cur is None else cur
    else:
        base = prev
    if cur is None or cur.empty:
        out = base
    else:
        out = pd.concat([base, cur], ignore_index=True)

    # limpa e dtypes
    out = out.dropna(subset=["id"]).drop_duplicates(subset=["id"])
    if not out.empty:
        out["followers_total"] = pd.to_numeric(out["followers_total"], errors="coerce").astype("Int64")
        out["popularity"]      = pd.to_numeric(out["popularity"], errors="coerce").astype("Int16")
        # genres permanece como list[object] (dtype 'object')
    # garante ordem/colunas
    for c in ARTIST_DIM_COLS:
        if c not in out.columns:
            out[c] = pd.NA
    return out[ARTIST_DIM_COLS].reset_index(drop=True)

# -------- Rate limit global (RPS) e cooldown 429 --------
class GlobalCooldown:
    """Coordena espera global quando qualquer worker recebe 429."""
    def __init__(self):
        self._until = 0.0
        self._lock = threading.Lock()
    def wait(self):
        while True:
            with self._lock:
                until = self._until
            now = time.time()
            if now >= until:
                return
            time.sleep(min(1.0, until - now))
    def set(self, seconds: float):
        with self._lock:
            self._until = max(self._until, time.time() + float(seconds))

class GlobalRateLimiter:
    """Limita o número de requests por segundo no conjunto de threads."""
    def __init__(self, rps: float):
        self.rps = max(0.0, float(rps))
        self.lock = threading.Lock()
        self.next_t = 0.0
    def wait(self):
        if self.rps <= 0:
            return
        interval = 1.0 / self.rps
        with self.lock:
            now = time.time()
            if now < self.next_t:
                time.sleep(self.next_t - now)
                now = time.time()
            self.next_t = max(now, self.next_t) + interval

# -------- Token manager (thread-safe) --------
class TokenManager:
    def __init__(self, client_id: str, client_secret: str, ttl_margin: int = 120):
        self.client_id = client_id
        self.client_secret = client_secret
        self.ttl_margin = ttl_margin
        self._lock = threading.RLock()
        self._token = None
        self._exp = 0.0
        self.refresh()
    def refresh(self):
        with self._lock:
            tok, ttl = get_spotify_token(self.client_id, self.client_secret)
            self._token = tok
            self._exp = time.time() + (ttl or 3600)
    def get(self) -> str:
        with self._lock:
            if time.time() > self._exp - self.ttl_margin:
                old = self._token
                self.refresh()
                if old != self._token:
                    print("[Auth] Token renovado proativamente.")
            return self._token
    def force_refresh(self):
        with self._lock:
            self.refresh()
            print("[Auth] Token renovado após 401.")

# -------- requests.Session por thread --------
_thread_local = threading.local()
def _get_session() -> requests.Session:
    s = getattr(_thread_local, "session", None)
    if s is None:
        s = requests.Session()
        _thread_local.session = s
    return s

def _fetch_artists_batch_worker(
    ids_batch: List[str],
    token_mgr: TokenManager,
    cooldown: GlobalCooldown,
    ratelimiter: GlobalRateLimiter,
    timeout: int = 20,
    max_retries: int = 3,
    backoff_factor: float = 0.75,
    max_retry_after_seconds: float = 120.0,
) -> Dict[str, Any]:
    """Busca 1 batch (<=50 IDs) do /v1/artists. Respeita 429 global, RPS global e renova token em 401.
       Se Retry-After > max_retry_after_seconds, sinaliza 'hard_429' para interromper a coleta."""
    sess = _get_session()
    url = "https://api.spotify.com/v1/artists"
    params = {"ids": ",".join(ids_batch[:50])}
    attempt = 0
    while True:
        cooldown.wait()
        ratelimiter.wait()
        token = token_mgr.get()
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
        try:
            r = sess.get(url, headers=headers, params=params, timeout=timeout)
        except requests.RequestException as e:
            attempt += 1
            if attempt > max_retries:
                return {"rows": [], "sent": len(ids_batch), "recv": 0, "error": f"NetworkError: {e}"}
            time.sleep(backoff_factor * (attempt ** 2) * (1.0 + 0.2*random.random()))
            continue

        if r.status_code == 200:
            data = r.json()
            rows = _normalize_artists_response(data)
            return {"rows": rows, "sent": len(ids_batch), "recv": sum(1 for x in rows if x.get("id")), "error": None}

        if r.status_code == 401:
            token_mgr.force_refresh()
            continue

        if r.status_code == 429:
            ra_hdr = r.headers.get("Retry-After")
            try:
                wait = float(ra_hdr) if ra_hdr is not None else 2.0
            except ValueError:
                wait = 2.0
            if wait > max_retry_after_seconds:
                return {"rows": [], "sent": len(ids_batch), "recv": 0, "error": f"429 Retry-After={wait}", "hard_429": True, "retry_after": wait}
            wait *= (1.0 + 0.2*random.random())
            print(f"[429] Aguardando {wait:.2f}s (batch de {len(ids_batch)})")
            cooldown.set(wait)
            time.sleep(wait)
            continue

        attempt += 1
        if attempt > max_retries:
            return {"rows": [], "sent": len(ids_batch), "recv": 0, "error": f"HTTP {r.status_code}: {r.text[:200]}"}
        time.sleep(backoff_factor * (attempt ** 2) * (1.0 + 0.2*random.random()))

def fetch_dim_artists_parallel(
    artist_ids: Iterable[str],
    client_id: str,
    client_secret: str,
    batch_size: int = 50,           # Spotify: máx 50 por chamada
    max_workers: int = 4,           # comece baixo
    in_flight_multiplier: int = 2,  # poucos pedidos simultâneos
    rate_limit_per_sec: float = 0.5,# ex.: 0.5 = 1 req a cada 2s
    timeout: int = 12,
    progress_every: int = 50,
    checkpoint_every: int = 1000,
    checkpoint_path: str = "dim_artists_checkpoint.parquet",
    max_retry_after_seconds: float = 120.0,  # se vier maior que isso, interrompe e devolve
) -> pd.DataFrame:

    ids = pd.Series(artist_ids).dropna().astype(str)
    ids = ids[ids.str.len() > 0].unique().tolist()

    # checkpoint / retomar removendo já coletados
    prev_ck = None
    collected_ids = set()
    if checkpoint_path and os.path.exists(checkpoint_path):
        ck = _read_checkpoint(checkpoint_path)
        if ck is not None and "id" in ck.columns:
            prev_ck = ck[ARTIST_DIM_COLS].copy()
            prev_ck = prev_ck.dropna(subset=["id"]).drop_duplicates("id")
            collected_ids = set(prev_ck["id"].astype(str))
            ids = [x for x in ids if x not in collected_ids]
            print(f"[Resume] Checkpoint: {len(collected_ids):,} já coletados | Pendentes: {len(ids):,}")

    if not ids:
        return prev_ck if prev_ck is not None else pd.DataFrame(columns=ARTIST_DIM_COLS)

    batch_size = max(1, min(int(batch_size), 50))
    batches: List[List[str]] = [ids[i:i+batch_size] for i in range(0, len(ids), batch_size)]
    total_batches = len(batches)

    token_mgr = TokenManager(client_id, client_secret, ttl_margin=120)
    cooldown = GlobalCooldown()
    ratelimiter = GlobalRateLimiter(rate_limit_per_sec)

    all_rows: List[Dict[str, Any]] = []
    completed = 0
    total_sent = 0
    total_recv = 0

    inflight_limit = max_workers * in_flight_multiplier
    i = 0
    futures = set()

    def flush_checkpoint():
        nonlocal all_rows, prev_ck
        if not all_rows:
            return
        df_new = pd.DataFrame(all_rows, columns=ARTIST_DIM_COLS)
        df_new = df_new.dropna(subset=["id"]).drop_duplicates("id")
        to_write = _safe_concat_artists(prev_ck, df_new)
        if checkpoint_path.lower().endswith(".parquet"):
            to_write.to_parquet(checkpoint_path, index=False)
        else:
            to_write.to_csv(checkpoint_path, index=False)
        prev_ck = to_write
        all_rows = []

    print(f"[Dispatch] Total batches: {total_batches} | max_workers={max_workers} | in_flight={inflight_limit} | rps={rate_limit_per_sec}")

    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        # pré-enche o pipeline
        while i < total_batches and len(futures) < inflight_limit:
            fut = ex.submit(_fetch_artists_batch_worker, batches[i], token_mgr, cooldown, ratelimiter,
                            timeout, 3, 0.75, max_retry_after_seconds)
            futures.add(fut); i += 1
        print(f"[Dispatch] Primeiros {len(futures)} futures submetidos.")

        # processa conforme termina
        while futures:
            for fut in as_completed(list(futures), timeout=None):
                futures.remove(fut)
                res = fut.result()
                total_sent += res.get("sent", 0)
                total_recv += res.get("recv", 0)
                rows = res.get("rows") or []
                if rows:
                    all_rows.extend(rows)

                if res.get("hard_429"):
                    wait = res.get("retry_after")
                    print(f"[HARD 429] Retry-After ~{wait:.0f}s. Interrompendo agora e salvando checkpoint. Retome mais tarde.")
                    flush_checkpoint()
                    df_dim = pd.DataFrame(all_rows, columns=ARTIST_DIM_COLS)
                    if not df_dim.empty:
                        df_dim = df_dim.dropna(subset=["id"]).drop_duplicates("id").reset_index(drop=True)
                        df_dim["followers_total"] = pd.to_numeric(df_dim["followers_total"], errors="coerce").astype("Int64")
                        df_dim["popularity"]      = pd.to_numeric(df_dim["popularity"], errors="coerce").astype("Int16")
                    if prev_ck is not None and not prev_ck.empty:
                        df_dim = _safe_concat_artists(prev_ck, df_dim)
                    return df_dim

                completed += 1
                if (completed % progress_every == 0) or (completed == 1):
                    hit = (100.0 * total_recv / total_sent) if total_sent else 0.0
                    print(f"[Progresso] {completed}/{total_batches} lotes | enviados: {total_sent:,} | recebidos: {total_recv:,} | hit-rate: {hit:.2f}%")

                if checkpoint_path and (completed % checkpoint_every == 0 or completed == total_batches):
                    flush_checkpoint()

                if i < total_batches:
                    fut2 = ex.submit(_fetch_artists_batch_worker, batches[i], token_mgr, cooldown, ratelimiter,
                                     timeout, 3, 0.75, max_retry_after_seconds)
                    futures.add(fut2); i += 1

    df_dim = pd.DataFrame(all_rows, columns=ARTIST_DIM_COLS)
    if not df_dim.empty:
        df_dim = df_dim.dropna(subset=["id"]).drop_duplicates("id").reset_index(drop=True)
        df_dim["followers_total"] = pd.to_numeric(df_dim["followers_total"], errors="coerce").astype("Int64")
        df_dim["popularity"]      = pd.to_numeric(df_dim["popularity"], errors="coerce").astype("Int16")

    if prev_ck is not None and not prev_ck.empty:
        df_dim = _safe_concat_artists(prev_ck, df_dim)

    return df_dim

# -------- Pré-voo (opcional) --------
def spotify_window_ready_artists(client_id: str, client_secret: str, test_artist_id: str = "3TVXtAsR1Inumwj472S9r4"):  # Drake
    tok, _ = get_spotify_token(client_id, client_secret)
    r = requests.get(
        "https://api.spotify.com/v1/artists",
        headers={"Authorization": f"Bearer {tok}"},
        params={"ids": test_artist_id},
        timeout=15,
    )
    if r.status_code == 200:
        return True, 0
    if r.status_code == 429:
        ra = r.headers.get("Retry-After")
        try:
            secs = int(float(ra))
        except Exception:
            secs = 60
        return False, max(secs, 1)
    return False, 60

In [18]:
# Credenciais Spotify (no Jupyter, SEM aspas nas %env)
CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")

# Garante a coluna de IDs de artistas
if "artist_spotify_id" not in df_tracks.columns:
    df_tracks = df_tracks.copy()
    df_tracks["artist_spotify_id"] = df_tracks["artist_uri"].apply(extract_artist_id)

artist_ids_unique = df_tracks["artist_spotify_id"].dropna().unique().tolist()
print(f"Artistas únicos no dataset: {len(artist_ids_unique):,}")

# Pré-voo: evita começar se a janela estiver fechada
ready, wait_s = spotify_window_ready_artists(CLIENT_ID, CLIENT_SECRET)
if not ready:
    from datetime import datetime, timedelta
    eta = datetime.now() + timedelta(seconds=wait_s)
    print(f"[Pré-voo] Ainda em rate limit. Retry-After ~{wait_s}s. Tente de novo por volta de: {eta:%Y-%m-%d %H:%M:%S}")
else:
    print("[Pré-voo] OK! Janela aberta, iniciando coleta...")

    dim_artists = fetch_dim_artists_parallel(
        artist_ids=artist_ids_unique,
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        batch_size=50,                 # Spotify: máx 50 p/ /v1/artists
        max_workers=4,                 # comece baixo; suba se não houver 429
        in_flight_multiplier=3,
        rate_limit_per_sec=1,        # 1 req a cada 2s
        timeout=12,
        progress_every=100,
        checkpoint_every=200,
        checkpoint_path="dim_artists_checkpoint.parquet",
        max_retry_after_seconds=120.0, # aborta se Retry-After > 2 min (salvando checkpoint)
    )

    print(dim_artists.shape)
    display(dim_artists.head())
    dim_artists.to_parquet("TCC/data/raw/dim_artists.parquet", index=False)

Artistas únicos no dataset: 109,043
[Pré-voo] OK! Janela aberta, iniciando coleta...
[Resume] Checkpoint: 108,140 já coletados | Pendentes: 1,675
[Dispatch] Total batches: 34 | max_workers=4 | in_flight=12 | rps=1
[Dispatch] Primeiros 12 futures submetidos.
[Progresso] 1/34 lotes | enviados: 50 | recebidos: 50 | hit-rate: 100.00%
(108140, 4)


Unnamed: 0,id,followers_total,popularity,genres
0,2wIVse2owClT7go1WT98tk,2718135,73,[hip hop]
1,26dSoYclwsYLMAKD3tpOr4,17545559,84,[pop]
2,6vWDO969PvNqNYHIOW5v0m,41008233,88,[]
3,31TPClRtHm23RisEBtV3X7,15690525,85,[]
4,5EvFsr3kj42KNv97ZEnqij,2462659,76,[reggae]


## Merge

In [3]:
# 1) Preparar os dataframes de lookup (dimensions)
def prepare_lookup_dataframes(df_features: pd.DataFrame, dim_artists: pd.DataFrame, dim_albums: pd.DataFrame) -> tuple:
    
    # df_features
    df_features_prepared = df_features.rename(columns={
        "id": "reccobeats_id", 
        "href": "spotify_href"
    })
    
    # Converter tipos para eficiência
    feature_cols_float = [
        "acousticness", "danceability", "energy", "instrumentalness",
        "liveness", "loudness", "speechiness", "tempo", "valence"
    ]
    feature_cols_int = ["key", "mode"]
    
    for col in feature_cols_float:
        if col in df_features_prepared.columns:
            df_features_prepared[col] = pd.to_numeric(df_features_prepared[col], errors="coerce").astype("float32")
    
    for col in feature_cols_int:
        if col in df_features_prepared.columns:
            df_features_prepared[col] = pd.to_numeric(df_features_prepared[col], errors="coerce").astype("Int8")
    
    # Criar índice para merge rápido
    df_features_indexed = df_features_prepared.set_index("spotify_id")
    
    # dim_artists
    dim_artists_prepared = dim_artists.rename(columns={
        "id": "artist_spotify_id",
        "followers_total": "artist_followers",
        "popularity": "artist_popularity",
        "genres": "artist_genres"
    })
    dim_artists_indexed = dim_artists_prepared.set_index("artist_spotify_id")
    
    # dim_albums  
    dim_albums_prepared = dim_albums.rename(columns={
        "id": "album_spotify_id",
        "popularity": "album_popularity"
    })
    dim_albums_indexed = dim_albums_prepared.set_index("album_spotify_id")
    
    return df_features_indexed, dim_artists_indexed, dim_albums_indexed


# 2) Função principal de merge em chunks

def extract_spotify_id(uri: str, pattern): # extrair Spotify IDs
    if isinstance(uri, str):
        m = pattern.match(uri.strip())
        if m: return m.group(1)
    return None

def merge_dataframes_chunked(
    df_tracks: pd.DataFrame,
    df_features: pd.DataFrame, 
    dim_artists: pd.DataFrame,
    dim_albums: pd.DataFrame,
    chunk_size: int = 100000,
    output_path: str = "df_tracks_complete_enriched.parquet"
) -> pd.DataFrame:
    
    # Preparar df_tracks com IDs extraídos
    df_tracks_prepared = df_tracks.copy()
    df_tracks_prepared["track_spotify_id"] = df_tracks_prepared["track_uri"].apply(
        lambda x: extract_spotify_id(x, _SPOTIFY_URI_RE)
    )
    df_tracks_prepared["artist_spotify_id"] = df_tracks_prepared["artist_uri"].apply(
        lambda x: extract_spotify_id(x, _SPOTIFY_ARTIST_URI_RE)
    )
    df_tracks_prepared["album_spotify_id"] = df_tracks_prepared["album_uri"].apply(
        lambda x: extract_spotify_id(x, _SPOTIFY_ALBUM_URI_RE)
    )
    
    # Preparar dataframes de lookup com índices
    df_features_idx, dim_artists_idx, dim_albums_idx = prepare_lookup_dataframes(
        df_features, dim_artists, dim_albums
    )
    
    # Definir colunas que serão adicionadas em cada merge
    features_cols = ["reccobeats_id", "spotify_href", "acousticness", "danceability", 
                    "energy", "instrumentalness", "liveness", "loudness", 
                    "speechiness", "tempo", "valence", "key", "mode"]
    
    artists_cols = ["artist_followers", "artist_popularity", "artist_genres"]
    
    albums_cols = ["release_date", "release_date_precision", "album_type", 
                  "total_tracks", "label", "album_popularity"]
    
    total_chunks = (len(df_tracks_prepared) + chunk_size - 1) // chunk_size
    chunks_processed = 0
    all_chunks = []
    
    print(f"Iniciando merge em {total_chunks} chunks de {chunk_size} linhas...")
    
    for chunk_idx in range(0, len(df_tracks_prepared), chunk_size):
        # Pegar chunk atual
        chunk_end = min(chunk_idx + chunk_size, len(df_tracks_prepared))
        chunk = df_tracks_prepared.iloc[chunk_idx:chunk_end].copy()
        
        # Merge com features (usando join com índice para eficiência)
        chunk = chunk.join(
            df_features_idx[features_cols],
            on="track_spotify_id",
            how="left",
            rsuffix="_features"
        )
        
        # Merge com artists
        chunk = chunk.join(
            dim_artists_idx[artists_cols],
            on="artist_spotify_id", 
            how="left",
            rsuffix="_artist"
        )
        
        # Merge com albums
        chunk = chunk.join(
            dim_albums_idx[albums_cols],
            on="album_spotify_id",
            how="left", 
            rsuffix="_album"
        )
        
        # Remover colunas duplicadas de sufixos (se houver)
        chunk = chunk.loc[:, ~chunk.columns.duplicated()]
        
        all_chunks.append(chunk)
        chunks_processed += 1
        
        # # Estatísticas do chunk atual
        # chunk_with_features = chunk["reccobeats_id"].notna().sum()
        # chunk_with_artists = chunk["artist_followers"].notna().sum()
        # chunk_with_albums = chunk["album_popularity"].notna().sum()
        
        # # print(f"Chunk {chunks_processed}/{total_chunks} processado: "
        # #       f"Features: {chunk_with_features:,} | "
        # #       f"Artists: {chunk_with_artists:,} | "
        # #       f"Albums: {chunk_with_albums:,}")
        
        # Salvar checkpoint a cada 10 chunks para evitar perda de dados
        if chunks_processed % 10 == 0:
            checkpoint_temp = pd.concat(all_chunks, ignore_index=True)
            checkpoint_temp.to_parquet(f"checkpoint_chunk_{chunks_processed}.parquet", index=False)
            
            # Limpar chunks da memória após salvar checkpoint
            all_chunks = [checkpoint_temp.copy()]
            del checkpoint_temp
    
    # Concatenar todos os chunks finais
    print("Concatenando chunks finais...")
    df_final = pd.concat(all_chunks, ignore_index=True)
    
    # Resultados
    total_tracks = len(df_final)
    tracks_with_features = df_final["reccobeats_id"].notna().sum()
    tracks_with_artist_info = df_final["artist_followers"].notna().sum()
    tracks_with_album_info = df_final["album_popularity"].notna().sum()
    
    print(f"\nESTATÍSTICAS FINAIS DO MERGE")
    print(f"Total de tracks: {total_tracks:,}")
    print(f"Tracks com audio features: {tracks_with_features:,} ({tracks_with_features/total_tracks*100:.2f}%)")
    print(f"Tracks com info de artista: {tracks_with_artist_info:,} ({tracks_with_artist_info/total_tracks*100:.2f}%)")
    print(f"Tracks com info de álbum: {tracks_with_album_info:,} ({tracks_with_album_info/total_tracks*100:.2f}%)")
    
    # Salvar resultado final
    print(f"Salvando resultado final em: {output_path}")
    df_final.to_parquet(output_path, index=False)
    
    # Limpar checkpoints temporários
    for i in range(5, chunks_processed + 1, 5):
        temp_path = f"checkpoint_chunk_{i}.parquet"
        if os.path.exists(temp_path):
            os.remove(temp_path)
    
    return df_final

# 3) Versão alternativa com salvamento direto em Parquet (para datasets MUITO grandes)
def merge_and_save_directly(
    df_tracks: pd.DataFrame,
    df_features: pd.DataFrame,
    dim_artists: pd.DataFrame, 
    dim_albums: pd.DataFrame,
    chunk_size: int = 50000,
    output_path: str = "df_tracks_complete_enriched.parquet"
) -> None:
    """
    Para datasets extremamente grandes: processa e salva diretamente sem manter tudo em memória
    """
    
    print("Modo de salvamento direto - processando e salvando incrementalmente...")
    
    # Preparar dataframes (mesmo código anterior)
    df_tracks_prepared = df_tracks.copy()
    df_tracks_prepared["track_spotify_id"] = df_tracks_prepared["track_uri"].apply(
        lambda x: extract_spotify_id(x, _SPOTIFY_URI_RE)
    )
    df_tracks_prepared["artist_spotify_id"] = df_tracks_prepared["artist_uri"].apply(
        lambda x: extract_spotify_id(x, _SPOTIFY_ARTIST_URI_RE)
    )
    df_tracks_prepared["album_spotify_id"] = df_tracks_prepared["album_uri"].apply(
        lambda x: extract_spotify_id(x, _SPOTIFY_ALBUM_URI_RE)
    )
    
    df_features_idx, dim_artists_idx, dim_albums_idx = prepare_lookup_dataframes(
        df_features, dim_artists, dim_albums
    )
    
    total_chunks = (len(df_tracks_prepared) + chunk_size - 1) // chunk_size
    chunks_processed = 0
    
    # Iniciar arquivo Parquet (primeiro chunk define o schema)
    first_chunk = True
    
    for chunk_idx in range(0, len(df_tracks_prepared), chunk_size):
        chunk_end = min(chunk_idx + chunk_size, len(df_tracks_prepared))
        chunk = df_tracks_prepared.iloc[chunk_idx:chunk_end].copy()
        
        # Fazer merges (mesma lógica anterior)
        chunk = chunk.join(df_features_idx, on="track_spotify_id", how="left", rsuffix="_features")
        chunk = chunk.join(dim_artists_idx, on="artist_spotify_id", how="left", rsuffix="_artist")
        chunk = chunk.join(dim_albums_idx, on="album_spotify_id", how="left", rsuffix="_album")
        chunk = chunk.loc[:, ~chunk.columns.duplicated()]
        
        chunks_processed += 1
        
        # Salvar incrementalmente
        if first_chunk:
            chunk.to_parquet(output_path, index=False)
            first_chunk = False
        else:
            chunk.to_parquet(output_path, index=False, append=True)
    
    print(f"Processamento concluído! Arquivo final: {output_path}")

In [6]:
import re

# Track URI
_SPOTIFY_URI_RE = re.compile(r"^spotify:track:([A-Za-z0-9]{22})$")

# Artist URI
_SPOTIFY_ARTIST_URI_RE = re.compile(r"^spotify:artist:([A-Za-z0-9]{22})$")

# Album URI
_SPOTIFY_ALBUM_URI_RE = re.compile(r"^spotify:album:([A-Za-z0-9]{22})$")

In [4]:
df_features = pd.read_parquet("TCC/data/raw/features_checkpoint.parquet")
df_tracks = pd.read_parquet("TCC/data/raw/df_tracks_v0.parquet")
dim_artists = pd.read_parquet("TCC/data/raw/dim_artists.parquet")
dim_albums = pd.read_parquet("TCC/data/raw/dim_albums.parquet")

In [7]:
df_final = merge_dataframes_chunked(
    df_tracks=df_tracks,
    df_features=df_features,
    dim_artists=dim_artists,
    dim_albums=dim_albums,
    chunk_size=100000,  # Ajuste conforme RAM
    output_path="TCC/data/raw/df_tracks_complete.parquet"
)

Iniciando merge em 67 chunks de 100000 linhas...
Concatenando chunks finais...

ESTATÍSTICAS FINAIS DO MERGE
Total de tracks: 6,685,101
Tracks com audio features: 3,691,508 (55.22%)
Tracks com info de artista: 6,669,050 (99.76%)
Tracks com info de álbum: 6,684,922 (100.00%)
Salvando resultado final em: TCC/data/raw/df_tracks_complete.parquet


In [8]:
df_final.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6685101 entries, 0 to 6685100
Data columns (total 34 columns):
 #   Column                  Dtype  
---  ------                  -----  
 0   track_uri               object 
 1   track_name              object 
 2   artist_uri              object 
 3   artist_name             object 
 4   album_uri               object 
 5   album_name              object 
 6   duration_ms             int64  
 7   pos                     int64  
 8   pid                     int64  
 9   track_spotify_id        object 
 10  artist_spotify_id       object 
 11  album_spotify_id        object 
 12  reccobeats_id           object 
 13  spotify_href            object 
 14  acousticness            float32
 15  danceability            float32
 16  energy                  float32
 17  instrumentalness        float32
 18  liveness                float32
 19  loudness                float32
 20  speechiness             float32
 21  tempo                   float32