# Fetch Spotify Audio Features

This notebook fetches audio features and artist genres from the Spotify API.

## Setup

1. Go to https://developer.spotify.com/dashboard
2. Create an app (any name, select "Web API")
3. Copy Client ID and Client Secret
4. Add to your `.env` file:

```
SPOTIFY_CLIENT_ID=your_client_id
SPOTIFY_CLIENT_SECRET=your_client_secret
```


In [6]:
import pandas as pd
import numpy as np
from pathlib import Path
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from tqdm import tqdm
import time
import os
from dotenv import load_dotenv
from collections import Counter
import matplotlib.pyplot as plt

load_dotenv()


True

In [7]:
client_id = os.getenv("SPOTIFY_CLIENT_ID")
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET")

if not client_id or not client_secret:
    raise ValueError("Missing SPOTIFY_CLIENT_ID or SPOTIFY_CLIENT_SECRET in .env file")

sp = spotipy.Spotify(
    auth_manager=SpotifyClientCredentials(client_id=client_id, client_secret=client_secret),
    requests_timeout=10,
    retries=3
)

sp.search(q="test", limit=1)
print("Spotify API connected!")


Spotify API connected!


In [8]:
MODEL_DIR = Path("../models")
DATA_DIR = Path("../data/processed")
DATA_DIR.mkdir(parents=True, exist_ok=True)

track_df = pd.read_parquet(MODEL_DIR / "track_metadata.parquet")
print(f"Loaded {len(track_df):,} tracks")

def extract_id(uri):
    return uri.split(":")[-1]

track_df['track_id'] = track_df['track_uri'].apply(extract_id)
track_df['artist_id'] = track_df['artist_uri'].apply(extract_id)

track_ids = track_df['track_id'].unique().tolist()
artist_ids = track_df['artist_id'].unique().tolist()

print(f"Track IDs: {len(track_ids):,}")
print(f"Artist IDs: {len(artist_ids):,}")


Loaded 252,999 tracks
Track IDs: 252,999
Artist IDs: 49,124


## API Diagnostics

Let's test which endpoints we have access to. Spotify deprecated audio-features for new apps in Nov 2024.


In [9]:
test_track_id = track_ids[0]
test_artist_id = artist_ids[0]

print("Testing Spotify API endpoints...")
print("=" * 50)

# Test 1: Track info
try:
    track = sp.track(test_track_id)
    print(f"Track endpoint:    OK - {track['name']}")
except Exception as e:
    print(f"Track endpoint:    FAILED - {type(e).__name__}")

# Test 2: Artist info
try:
    artist = sp.artist(test_artist_id)
    print(f"Artist endpoint:   OK - {artist['name']}, genres: {artist['genres'][:2]}")
except Exception as e:
    print(f"Artist endpoint:   FAILED - {type(e).__name__}")

# Test 3: Audio features (blocked for new apps since Nov 2024)
AUDIO_FEATURES_AVAILABLE = False
try:
    features = sp.audio_features([test_track_id])
    if features and features[0]:
        print(f"Audio features:    OK - danceability: {features[0]['danceability']}")
        AUDIO_FEATURES_AVAILABLE = True
    else:
        print("Audio features:    BLOCKED (returns None)")
except Exception as e:
    print(f"Audio features:    BLOCKED ({type(e).__name__})")

print("=" * 50)

if not AUDIO_FEATURES_AVAILABLE:
    print("\nAudio features endpoint is blocked for new apps (Nov 2024 change).")
    print("We'll use artist genres + popularity instead - this is enough for")
    print("controllable recommendations!")

Testing Spotify API endpoints...
Track endpoint:    OK - Lose Control (feat. Ciara & Fat Man Scoop)


HTTP Error for GET to https://api.spotify.com/v1/audio-features/?ids=0UaMYEvWZi0ZqiDOoHU3YI with Params: {} returned 403 due to None


Artist endpoint:   OK - Missy Elliott, genres: ['hip hop']
Audio features:    BLOCKED (SpotifyException)

Audio features endpoint is blocked for new apps (Nov 2024 change).
We'll use artist genres + popularity instead - this is enough for
controllable recommendations!


In [10]:
def fetch_artist_info(artist_ids, batch_size=50):
    """Fetch artist info (genres, popularity, followers) in batches."""
    all_artists = []
    
    for i in tqdm(range(0, len(artist_ids), batch_size), desc="Fetching artists"):
        batch = artist_ids[i:i + batch_size]
        try:
            artists = sp.artists(batch)['artists']
            for a in artists:
                if a:
                    all_artists.append({
                        'artist_id': a['id'],
                        'artist_name': a['name'],
                        'genres': a['genres'],
                        'artist_popularity': a['popularity'],
                        'artist_followers': a['followers']['total']
                    })
        except Exception as e:
            print(f"Error: {e}")
            time.sleep(5)
        time.sleep(0.1)
    
    return all_artists


In [None]:
ARTIST_PATH = DATA_DIR / "artist_info.parquet"

if ARTIST_PATH.exists():
    print("Loading cached artist info...")
    artist_df = pd.read_parquet(ARTIST_PATH)
else:
    print(f"Fetching {len(artist_ids):,} artists (~{len(artist_ids)//50//6} min)...")
    artists = fetch_artist_info(artist_ids)
    artist_df = pd.DataFrame(artists)
    artist_df.to_parquet(ARTIST_PATH, index=False)

print(f"Artist info: {len(artist_df):,} artists")
print(f"Artists with genres: {(artist_df['genres'].str.len() > 0).sum():,}")
artist_df.head()


Fetching 49,124 artists (~163 min)...


Fetching artists:   6%|â–Œ         | 57/983 [00:24<06:32,  2.36it/s]

## Explore Genres

In [None]:
all_genres = []
for genres in artist_df['genres']:
    if genres:
        all_genres.extend(genres)

genre_counts = Counter(all_genres)
top_genres = genre_counts.most_common(30)

print(f"Total unique genres: {len(genre_counts):,}")
print("\nTop 30 genres:")
for genre, count in top_genres:
    print(f"  {count:5d} | {genre}")


# Popularity distribution

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].hist(artist_df['artist_popularity'], bins=50, edgecolor='black')
axes[0].set_title('Artist Popularity Distribution')
axes[0].set_xlabel('Popularity (0-100)')

axes[1].hist(np.log10(artist_df['artist_followers'] + 1), bins=50, edgecolor='black')
axes[1].set_title('Artist Followers (log scale)')
axes[1].set_xlabel('log10(followers)')

plt.tight_layout()
plt.show()


## Create Enriched Dataset

Merge track metadata with artist info.


In [None]:
enriched_tracks = track_df.merge(artist_df, on='artist_id', how='left')

print(f"Enriched tracks: {len(enriched_tracks):,}")
print(f"\nCoverage:")
print(f"  Genres: {enriched_tracks['genres'].notna().sum():,} ({enriched_tracks['genres'].notna().mean()*100:.1f}%)")
print(f"  Popularity: {enriched_tracks['artist_popularity'].notna().sum():,} ({enriched_tracks['artist_popularity'].notna().mean()*100:.1f}%)")

enriched_tracks.head()


ENRICHED_PATH = DATA_DIR / "enriched_tracks.parquet"
enriched_tracks.to_parquet(ENRICHED_PATH, index=False)
print(f"Saved enriched tracks to {ENRICHED_PATH}")
print(f"\nColumns: {enriched_tracks.columns.tolist()}")


In [None]:
## Summary

We now have enriched track data with:
- **Artist genres** - for genre-based filtering
- **Artist popularity** (0-100) - for mainstream vs obscure recommendations
- **Artist followers** - another popularity metric

This is enough to build controllable recommendations in the next notebook!


In [None]:
# This notebook is now complete. Run cells 1-14 in order.


## Fetch Audio Features

Spotify API allows 100 tracks per request. With rate limiting, this may take a while.


In [None]:
def fetch_audio_features(track_ids, batch_size=100):
    """Fetch audio features for tracks in batches."""
    all_features = []
    
    for i in tqdm(range(0, len(track_ids), batch_size), desc="Fetching audio features"):
        batch = track_ids[i:i + batch_size]
        try:
            features = sp.audio_features(batch)
            for f in features:
                if f is not None:
                    all_features.append(f)
        except Exception as e:
            print(f"Error at batch {i}: {e}")
            time.sleep(5)
            continue
        
        time.sleep(0.1)  # Rate limiting
    
    return all_features


In [None]:
# Check if we already have cached features
AUDIO_FEATURES_PATH = DATA_DIR / "audio_features.parquet"

if AUDIO_FEATURES_PATH.exists():
    print("Loading cached audio features...")
    audio_features_df = pd.read_parquet(AUDIO_FEATURES_PATH)
    print(f"Loaded {len(audio_features_df):,} cached features")
else:
    print("Fetching audio features from Spotify API...")
    print(f"Estimated time: ~{len(track_ids) // 100 * 0.15:.0f} minutes")
    
    features = fetch_audio_features(track_ids)
    audio_features_df = pd.DataFrame(features)
    
    audio_features_df.to_parquet(AUDIO_FEATURES_PATH, index=False)
    print(f"Saved {len(audio_features_df):,} features to {AUDIO_FEATURES_PATH}")


In [None]:
print("Audio features columns:")
print(audio_features_df.columns.tolist())
audio_features_df.head()


In [None]:
def fetch_artist_info(artist_ids, batch_size=50):
    """Fetch artist info (including genres) in batches."""
    all_artists = []
    
    for i in tqdm(range(0, len(artist_ids), batch_size), desc="Fetching artist info"):
        batch = artist_ids[i:i + batch_size]
        try:
            artists = sp.artists(batch)['artists']
            for a in artists:
                if a is not None:
                    all_artists.append({
                        'artist_id': a['id'],
                        'artist_name': a['name'],
                        'genres': a['genres'],
                        'popularity': a['popularity'],
                        'followers': a['followers']['total']
                    })
        except Exception as e:
            print(f"Error at batch {i}: {e}")
            time.sleep(5)
            continue
        
        time.sleep(0.1)
    
    return all_artists


In [None]:
ARTIST_INFO_PATH = DATA_DIR / "artist_info.parquet"

if ARTIST_INFO_PATH.exists():
    print("Loading cached artist info...")
    artist_df = pd.read_parquet(ARTIST_INFO_PATH)
    print(f"Loaded {len(artist_df):,} cached artists")
else:
    print("Fetching artist info from Spotify API...")
    print(f"Estimated time: ~{len(artist_ids) // 50 * 0.15:.0f} minutes")
    
    artists = fetch_artist_info(artist_ids)
    artist_df = pd.DataFrame(artists)
    
    artist_df.to_parquet(ARTIST_INFO_PATH, index=False)
    print(f"Saved {len(artist_df):,} artists to {ARTIST_INFO_PATH}")


In [None]:
print(f"Artists with genres: {(artist_df['genres'].str.len() > 0).sum():,}")
print(f"Artists without genres: {(artist_df['genres'].str.len() == 0).sum():,}")
artist_df.head()


## Explore Audio Features


In [None]:
import matplotlib.pyplot as plt

feature_cols = ['danceability', 'energy', 'valence', 'tempo', 'acousticness', 'instrumentalness', 'loudness']
available_cols = [c for c in feature_cols if c in audio_features_df.columns]

fig, axes = plt.subplots(2, 4, figsize=(16, 8))
axes = axes.flatten()

for i, col in enumerate(available_cols):
    axes[i].hist(audio_features_df[col].dropna(), bins=50, edgecolor='black')
    axes[i].set_title(col)
    axes[i].set_xlabel(col)

for i in range(len(available_cols), len(axes)):
    axes[i].axis('off')

plt.tight_layout()
plt.show()


## Explore Genres


In [None]:
from collections import Counter

all_genres = []
for genres in artist_df['genres']:
    if genres:
        all_genres.extend(genres)

genre_counts = Counter(all_genres)
top_genres = genre_counts.most_common(30)

print(f"Total unique genres: {len(genre_counts):,}")
print("\nTop 30 genres:")
for genre, count in top_genres:
    print(f"  {count:5d} | {genre}")


## Create Enriched Track Dataset


In [None]:
# Merge track metadata with audio features
audio_features_df['track_id'] = audio_features_df['id']
enriched_tracks = track_df.merge(
    audio_features_df[['track_id', 'danceability', 'energy', 'valence', 'tempo', 
                       'acousticness', 'instrumentalness', 'loudness', 'speechiness', 'liveness']],
    on='track_id',
    how='left'
)

# Merge with artist genres
enriched_tracks = enriched_tracks.merge(
    artist_df[['artist_id', 'genres', 'popularity', 'followers']].rename(
        columns={'popularity': 'artist_popularity', 'followers': 'artist_followers'}
    ),
    on='artist_id',
    how='left'
)

print(f"Enriched tracks shape: {enriched_tracks.shape}")
enriched_tracks.head()


In [None]:
# Check coverage
print("Feature coverage:")
for col in ['danceability', 'energy', 'valence', 'genres', 'artist_popularity']:
    if col in enriched_tracks.columns:
        non_null = enriched_tracks[col].notna().sum()
        pct = non_null / len(enriched_tracks) * 100
        print(f"  {col}: {non_null:,} ({pct:.1f}%)")


In [None]:
# Save enriched dataset
ENRICHED_PATH = DATA_DIR / "enriched_tracks.parquet"
enriched_tracks.to_parquet(ENRICHED_PATH, index=False)
print(f"Saved enriched tracks to {ENRICHED_PATH}")
