In [1]:
import pandas as pd
import requests
import spotipy
from dotenv import load_dotenv
import os
from spotipy.oauth2 import SpotifyOAuth

In [2]:
load_dotenv()

True

In [3]:
CLIENT_ID = os.getenv('client_id')
CLIENT_SECRET = os.getenv('client_secret')


In [4]:
sp = spotipy.Spotify(auth_manager=SpotifyOAuth(
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    redirect_uri='http://localhost:8888/callback',
    scope="user-library-read"
))

In [5]:
sp

<spotipy.client.Spotify at 0x222233974d0>

In [6]:
def get_artists_from_playlist(playlist_id):
    artist_ids = set()
    results = sp.playlist_tracks(playlist_id)
    
    while results:
        for item in results['items']:
            artist = item['track']['artists'][0]
            artist_ids.add(artist['id'])
        
        # Check if there's another page
        results = sp.next(results) if results['next'] else None

    return list(artist_ids)

# Example: Top 50 Global Playlist
top_50_global_playlist_id = '37i9dQZEVXbMDoHDwVN2tF'
artist_ids = get_artists_from_playlist(top_50_global_playlist_id)

In [24]:
def get_related_artists(artist_ids):
    related_artist_ids = set()
    
    for artist_id in artist_ids:
        results = sp.artist_related_artists(artist_id)
        for artist in results['artists']:
            related_artist_ids.add(artist['id'])

    return list(related_artist_ids)

# Expand the list by fetching related artists
all_artist_ids = set(artist_ids)
all_artist_ids.update(get_related_artists(artist_ids[0]))


In [7]:
all_artist_ids = set(artist_ids)
len(all_artist_ids)

42

In [8]:
genre_seeds = sp.recommendation_genre_seeds()

# Print the list of genres
print("Popular Genres on Spotify:")
for genre in genre_seeds['genres']:
    print(genre)

print(len(genre_seeds['genres']))

Popular Genres on Spotify:
acoustic
afrobeat
alt-rock
alternative
ambient
anime
black-metal
bluegrass
blues
bossanova
brazil
breakbeat
british
cantopop
chicago-house
children
chill
classical
club
comedy
country
dance
dancehall
death-metal
deep-house
detroit-techno
disco
disney
drum-and-bass
dub
dubstep
edm
electro
electronic
emo
folk
forro
french
funk
garage
german
gospel
goth
grindcore
groove
grunge
guitar
happy
hard-rock
hardcore
hardstyle
heavy-metal
hip-hop
holidays
honky-tonk
house
idm
indian
indie
indie-pop
industrial
iranian
j-dance
j-idol
j-pop
j-rock
jazz
k-pop
kids
latin
latino
malay
mandopop
metal
metal-misc
metalcore
minimal-techno
movies
mpb
new-age
new-release
opera
pagode
party
philippines-opm
piano
pop
pop-film
post-dubstep
power-pop
progressive-house
psych-rock
punk
punk-rock
r-n-b
rainy-day
reggae
reggaeton
road-trip
rock
rock-n-roll
rockabilly
romance
sad
salsa
samba
sertanejo
show-tunes
singer-songwriter
ska
sleep
songwriter
soul
soundtracks
spanish
study
summer
swe

In [9]:
import json

def search_artists_by_genre(genre, limit=100):
    artist_ids = set()
    results = sp.search(q=f'genre:{genre}', type='artist', limit=50)
    
    while results and len(artist_ids) < limit:
        for artist in results['artists']['items']:
            artist_ids.add(artist['id'])
        
        # Check if there's another page and update search results
        if results['artists']['next']:
            results = sp.next(results['artists'])
        else:
            results = None

    return list(artist_ids)

for genre in genre_seeds['genres']:
    all_artist_ids.update(search_artists_by_genre(genre, limit=10))


In [10]:
len(all_artist_ids)

4312

In [30]:
all_artist_ids.update(get_related_artists(artist_ids))

In [11]:
len(all_artist_ids)

4312

In [12]:
track_df = pd.DataFrame()


In [14]:
def get_albums_for_artist(artist_id):
    """
    Fetch all albums for a given artist ID.
    
    :param artist_id: The Spotify ID of the artist.
    """
    print("in get albums for artist")
    albums = []
    # results = api_call_wrapper(sp.artist_albums, artist_id, album_type='album', limit=50)
    results = sp.artist_albums(artist_id, album_type='album', limit=1)
    
    # Paginate through results if there are more than 50 albums
    while results:
        albums.extend(results['items'])
        if results['next']:
            # results = api_call_wrapper(sp.next, results)
            results = sp.next(results)
        else:
            results = None
    
    return albums

def download_all_albums_for_artists(artist_list):
    """
    Download all albums for a list of artists.
    
    :param artist_list: List of artist names or artist IDs.
    :return: Pandas DataFrame with album details.
    """
    all_albums_data = []
    for artist in artist_list:
        print("In download all albums for artists")
        albums = get_albums_for_artist(artist)
        for album in albums:
            all_albums_data.append({
                'artist_name': album['artists'][0]['name'],
                'artist_id': album['artists'][0]['id'],
                'album_id': album['id'],
                'album_name': album['name'],
                'release_date': album['release_date'],
                'total_tracks': album['total_tracks'],
                'album_type': album['album_type']
            })
        print(f"Downloaded {len(albums)} albums for artist: {artist}")
    
    # Convert the results to a Pandas DataFrame
    df_albums = pd.DataFrame(all_albums_data)
    return df_albums

df_albums = download_all_albums_for_artists(list(all_artist_ids)[:50])

In download all albums for artists
in get albums for artist


Current path for getting tracks is get genres, get artists by genres, get related artists of current artists, get artists top tracks, get audio features for each individual track.

Keep current path for getting artists, and batch audio features of the top tracks to reduce api calls

Get genres, get top artists, get related artists, get artists albums, get songs from albums,

Create Album Df, contains album name, artist, release date, included songs,

In [16]:
import time
from requests.exceptions import ReadTimeout


def fetch_with_retry(func, *args, **kwargs):
    retries = 3
    for attempt in range(retries):
        try:
            return func(*args, **kwargs)
        except ReadTimeout:
            print(f"Timeout error, retrying... ({attempt + 1}/{retries})")
            time.sleep(2 ** attempt)  # Exponential backoff

def get_artist_tracks(artist_id):
    # Initialize a list to store track data
    tracks_data = {
        'artist_name': [],
        'artist_genres': [],
        'artist_id': [],
        'track_id': [],
        'track_name': [],
        'danceability': [],
        'energy': [],
        'key': [],
        'loudness': [],
        'mode': [],
        'speechiness': [],
        'acousticness': [],
        'instrumentalness': [],
        'liveness': [],
        'valence': [],
        'tempo': []
    }
    
    # Search for the artist's top tracks
    results = sp.artist_top_tracks(artist_id)
    track_ids = [track['id'] for track in results['tracks']]
    track_features = fetch_with_retry(sp.audio_features, track_ids)
    print(track_features)
    for i, track in enumerate(results['tracks']):
        track_id = track['id']
        track_name = track['name']
        artist_name = track['artists'][0]['name']
        artist_id = track['artists'][0]['id']
        # artist_genres = track['artists'][0]['genres']
        # Fetch track features
        
        
        # Append track data
        tracks_data['artist_name'].append(artist_name)
        tracks_data['artist_genres'].append(None)
        tracks_data['artist_id'].append(artist_id)
        tracks_data['track_id'].append(track_id)
        tracks_data['track_name'].append(track_name)
        tracks_data['danceability'].append(track_features[i]['danceability'] if track_features[i] else None)
        tracks_data['energy'].append(track_features[i]['energy'] if track_features[i] else None)
        tracks_data['key'].append(track_features[i]['key'] if track_features[i] else None)
        tracks_data['loudness'].append(track_features[i]['loudness'] if track_features[i] else None)
        tracks_data['mode'].append(track_features[i]['mode'] if track_features[i] else None)
        tracks_data['speechiness'].append(track_features[i]['speechiness'] if track_features[i] else None)
        tracks_data['acousticness'].append(track_features[i]['acousticness'] if track_features[i] else None)
        tracks_data['instrumentalness'].append(track_features[i]['instrumentalness'] if track_features[i] else None)
        tracks_data['liveness'].append(track_features[i]['liveness'] if track_features[i] else None)
        tracks_data['valence'].append(track_features[i]['valence'] if track_features[i] else None)
        tracks_data['tempo'].append(track_features[i]['tempo'] if track_features[i] else None)
        
    
    return tracks_data

# Example list of artist IDs (replace with your actual artist IDs

# Create DataFrame
# tracks_df = pd.DataFrame()
# for artist_id in all_artist_ids:
#         tracks_data = get_artist_tracks(artist_id)
#         track_df.append(tracks_data)

for artist in list(all_artist_ids):
    if artist not in track_df['artist_id'].unique():
        tracks_data = get_artist_tracks(artist)
        track_df = pd.concat([track_df,pd.DataFrame.from_dict(tracks_data)])
    



# Display the DataFrame
print(track_df.head())

KeyError: 'artist_id'

In [17]:
track_df.head()

In [None]:
import time
import datetime as dt
from requests.exceptions import ReadTimeout, HTTPError

def api_call_wrapper(func, *args, max_retries=3, delay=1, backoff_factor=2, calls=0, **kwargs):
    """
    Wrapper for Spotify API calls to handle rate limiting and retries.
    
    :param func: The Spotipy API function you want to call.
    :param args: Positional arguments for the API function.
    :param max_retries: Maximum number of retries before giving up.
    :param delay: Initial delay between retries (in seconds).
    :param backoff_factor: Exponential backoff factor for delays.
    :param kwargs: Keyword arguments for the API function.
    :return: API response or None if the call failed.
    """
    retries = 0
    current_delay = delay
    start_time = dt.datetime.now()
    
    while retries < max_retries:
        try:
            # Make the API call
            response = func(*args, **kwargs)
            calls += 1
            return response, calls, dt.timedelta(dt.datetime.now, start_time)
        
        except HTTPError as e:
            # Handle rate-limiting or server errors (status code 429 or 5xx)
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get("Retry-After", 1))  # Spotify may provide Retry-After
                print(f"Rate limit exceeded. Retrying after {retry_after} seconds...")
                time.sleep(retry_after)
            else:
                print(f"HTTP error: {e.response.status_code}. Retrying in {current_delay} seconds...")
                time.sleep(current_delay)
                retries += 1
                current_delay *= backoff_factor  # Exponential backoff
            
        except ReadTimeout:
            # Handle timeout errors
            print(f"Request timed out. Retrying in {current_delay} seconds...")
            time.sleep(current_delay)
            retries += 1
            current_delay *= backoff_factor
        
        except Exception as e:
            # Handle unexpected errors
            print(f"An error occurred: {e}. Retrying in {current_delay} seconds...")
            time.sleep(current_delay)
            retries += 1
            current_delay *= backoff_factor
    
    print(f"Max retries reached for {func.__name__}.")
    return None  # Return None if the API call fails


