In [35]:
import requests
from requests import Session
import json
import base64
import os
from typing import Dict, List, Tuple
import time
import sqlite3
from sqlite3 import Connection
import pandas as pd
from pandas import DataFrame

# Download Audio Features

This notebook enables you to download data on artists, songs, and audio_features of songs from the public Spotify API. Documentation for the API can be found [here](https://developer.spotify.com/documentation/).

#### a) API download and Database Storage Functions

We'll start be defining some functions, mainly to process either API requests or SQLite insert statements for the data we request.

First, we will define search_artists() so we can get the top artists in each genre recognized by Spotify, which we filtered down based on the genres that were most reflected in our initial test of the API download  (all reflected in spotify_genres.txt).

In [36]:
def search_artists(genre: str, limit: int, access_token: str, offset):
    # Define the Spotify search URL and query parameters
    search_url = 'https://api.spotify.com/v1/search'
    params = {
        'q': f'genre:"{genre}"',
        'type': 'artist',
        'limit': limit,
        'offset': offset
    }

    # Define the request headers with the access token
    headers = {
        'Authorization': f'Bearer {access_token}'
    }

    # Send a GET request to the search URL with the query parameters and headers
    response = session.get(search_url, params=params, headers=headers)

    if response.status_code == 200:
        return response.json()
    # If the response status code is 404, print the response text, 
    # which contains
    elif response.status_code == 404 or response.status_code == 401:
        requests.exceptions.HTTPError(response.text)
    else:
        print(response)
        raise RuntimeError('Encountered an unexpected error.')

With the appropriate artist-level in hand, we'll get the top tracks for each artist. Because of the way the API is structured, we can only get the top ten tracks for each artist.

In [37]:
def get_artist_top_tracks(artist_id, market, access_token):
    # Construct the API endpoint URL for the top tracks of the artist
    albums_url = f'https://api.spotify.com/v1/artists/{artist_id}/top-tracks'
    
    # Create a dictionary of headers containing the access token for authentication
    headers = {
        'Authorization': f'Bearer {access_token}'
    }

    params = {
        'market': market
    }

    # Make an HTTP GET request to the API endpoint URL with the headers and parameters
    response = session.get(albums_url, headers=headers, params=params)

    # If the response status code is 200, return the JSON response body
    if response.status_code == 200:
        return response.json()
    else:
        print(response.text)

Now we're finally ready to scrape audio features (such as danceability, energy, and valence) from the songs we have Spotify IDs for. 

We'll start by finding song IDs in the database that don't have audio features downloaded, and then iterate over these song IDs to scrape their audio features using the Spotify API.

get_audio_features() let's us get a set of audio features for each Spotify ID str we pass to the API. Here are the features we can get back:

- acousticness
- danceability
- duration_ms
- energy
- instrumentalness
- key
- liveness
- loudness
- mode
- song_ids_fk
- speechiness
- tempo
- time_signature
- valence

In [38]:
def get_audio_features(track_id: str, access_token) -> Dict:
    # Construct the API endpoint URL for the audio features of the track
    api_url = f"https://api.spotify.com/v1/audio-features/{track_id}"
    headers = {"Authorization": f"Bearer {access_token}"}

    response = session.get(api_url, headers=headers)
    if response.status_code == 200:
        return response.json()
    # If the response status code is 404, print the response text, 
    # which contains
    elif response.status_code == 404 or response.status_code == 401:
        requests.exceptions.HTTPError(response.text)
    else:
        print(track_id)
        print(response.text, response.status_code)
        raise RuntimeError('Encountered an unexpected error.')

Once we get that data, we can store it using the parameterized query in this function.

In [39]:
def store_audio_features(audio_features: List[Tuple], spotify_song_id: str) -> None:
    # Define a list of audio feature names to store in the database
    FEATURES_TO_STORE = [
                        'acousticness',
                        'danceability',
                        'duration_ms',
                        'energy',
                        'instrumentalness',
                        'key',
                        'liveness',
                        'loudness',
                        'mode',
                        'song_ids_fk',
                        'speechiness',
                        'tempo',
                        'time_signature',
                        'valence'
                        ]

    # Append the Spotify song ID to the list of audio features
    audio_features.append(('song_ids_fk', spotify_song_id))

    # In-place sorting based on key value
    audio_features.sort(key=lambda x: x[0])

    # Create a list of one-tuples.
    audio_features_filtered: List[Tuple] = []

    # Iterate over the audio features and append their values to the filtered list 
    # if their names are in FEATURES_TO_STORE
    for feature in audio_features:
        if feature[0] in FEATURES_TO_STORE and feature[0] != 'song_ids_fk':
            audio_features_filtered.append(float(feature[1]))
        if feature[0] in FEATURES_TO_STORE and feature[0] == 'song_ids_fk':
            audio_features_filtered.append(feature[1])

    # Use cursor.execute() to execute an INSERT query with the filtered audio feature values
    cursor.execute("""INSERT INTO audio_features (
                        acousticness,
                        danceability,
                        duration_ms,
                        energy,
                        instrumentalness,
                        key,
                        liveness,
                        loudness,
                        mode,
                        song_ids_fk,
                        speechiness,
                        tempo,
                        time_signature,
                        valence)
                VALUES (?, ?, ?, ?, ?, 
                        ?, ?, ?, ?, ?, 
                        ?, ?, ?, ?)""",
                tuple(audio_features_filtered))
    conn.commit()

As we go, we'll need to download access tokens form the API, which expire hourly. By supplying a client_id and client_secret, we can get new tokens programmatically.

In [40]:
def request_client_credentials_token(client_id, client_secret) -> str:
    url = "https://accounts.spotify.com/api/token"

    # encode the client ID and secret using base64 encoding
    auth_header = base64.b64encode(f"{client_id}:{client_secret}" \
                                    .encode("ascii")) \
                                    .decode("ascii")
    
    # Set up the headers to include the authorization header
    headers = {
        "Authorization": f"Basic {auth_header}"
    }

    # Set up the data to include the grant type
    data = {
        "grant_type": "client_credentials"
    }

    # Make a POST request to the API using the headers and data
    response = session.post(url, 
                            data=data, 
                            headers=headers)
    if response.status_code == 200:
        return response.json()['access_token']
    else:
        raise requests.exceptions.HTTPError(response.text)

#### b) Download Data

We'll start by opening up a connection to a local SQLite3 file that will allow us to store the data we download in a structured way.

In [41]:
# Connect to the expanded database.
conn = sqlite3.connect('data/song_data_2.0.db')
cursor = conn.cursor()

We'll also download our client_id and client_secret (generated as part of Spotify API sign-up), which will allow us to generate access tokens.

To keep this as safe as possible, these credentials are stored locally as OS environmental variables. You can get your own client id and secret by signing up for the Spotify API and registering an app.

In [42]:
# Get OS environmental variables.
client_id = os.environ.get('CLIENT_ID')
client_secret = os.environ.get('CLIENT_SECRET')

The following code will let us create or reset the table in our database that will hold our data from Spotify.

In [43]:
# # Create tables.
# cursor.execute("DROP TABLE IF EXISTS tracks_top_genres")
# conn.commit()

# cursor.execute("""CREATE TABLE IF NOT EXISTS tracks_top_genres (
#                         id INTEGER PRIMARY KEY,
#                         danceability REAL,
#                         energy REAL,
#                         key REAL,
#                         loudness REAL,
#                         mode REAL,
#                         speechiness REAL,
#                         acousticness REAL,
#                         instrumentalness REAL,
#                         liveness REAL,
#                         valence REAL,
#                         tempo REAL,
#                         duration_ms REAL,
#                         time_signature REAL,
#                         artist_name TEXT,
#                         song_name TEXT,
#                         genre TEXT,
#                         spotify_track_id TEXT
#             )""")
# conn.commit()

When we get the top tracks, we'll use the following function to insert it as a parameterized query into our local SQLite database.

In [44]:
def store_top_tracks(track: Dict, artist_name: str, genre: str, track_name: str) -> None:
    try:
        # Create a tuple with the required data to insert into the database
        to_insert = (track['danceability'],
                                track['energy'],
                                track['key'],
                                track['loudness'],
                                track['mode'],
                                track['speechiness'],
                                track['acousticness'],
                                track['instrumentalness'],
                                track['liveness'],
                                track['valence'],
                                track['tempo'],
                                track['duration_ms'],
                                track['time_signature'],
                                artist_name,
                                genre,
                                track_name,
                                track['id'])
    # Handle potential TypeError exception
    except TypeError as te:
        print(track, artist_name, genre, track_name)
        print(te)
        # Return None to indicate an error occurred
        return None
    
    # Execute the SQL query to insert the data into the database
    cursor.execute("""INSERT INTO tracks_top_genres (
                        danceability,
                        energy,
                        key,
                        loudness,
                        mode,
                        speechiness,
                        acousticness,
                        instrumentalness,
                        liveness,
                        valence,
                        tempo,
                        duration_ms,
                        time_signature,
                        artist_name,
                        genre,
                        song_name,
                        spotify_track_id) 
                        VALUES (?, ?, ?, ?, ?,
                                ?, ?, ?, ?, ?,
                                ?, ?, ?, ?, ?, ?, ?
                        )""", to_insert)
    conn.commit()

The Spotify API has a rate limit. Spotify discloses that they rate limit in a rolling 30 second window. By trial and error, the limit seems to be about 150 requests within the window.

By instantiating this class and calling its .consult_limiter() method between each API call, we can ensure that we aren't making too many calls and getting locked out.

In [45]:
class RateLimiter:
    def __init__(self, max_calls: int, window: float):
        # Initialize the rate limiter with an empty list of calls, maximum calls allowed per window and the window size
        self.__calls: List[float] = []
        self.__max_per_minute = max_calls
        self.__window_size = window

    def __pause_program(self) -> None:
        # Pause the program if there are any calls in the list
        if self.__calls:
            time.sleep(time.time() - self.__calls[0])
        # Clear the list of calls after the pause
        self.__clear_queue()

    def __update_calls(self) -> None:
        # Add a new call to the list of calls
        self.__calls.append(time.time())
        self.__clear_queue()

    def __clear_queue(self) -> None:
        # Remove all calls from the list that are older than the window size
        for i, call in enumerate(self.__calls):
            if time.time() - call > self.__window_size:
                self.__calls.pop(i)
            else:
                break

    def __is_limit_exceeded(self) -> bool:
        # Return True if the number of calls in the list is greater than or equal to the maximum calls allowed per window
        return len(self.__calls) >= self.__max_per_minute

    def consult_limiter(self):
        self.__update_calls()
        # Pause the program if the limit has been exceeded
        if self.__is_limit_exceeded():
            self.__pause_program()

We'll also create an ErrorLimiter class, which will keep track of the successive errors we encounter. If we encounter too many in a row, we'll close the connection. (Again, to avoid running afoul of the API's policies.)

In [46]:
class ErrorLimiter:
    def __init__(self, allowed_errors):
        # Initialize the error limiter with the number of allowed errors and the remaining errors (which starts at the allowed amount)
        self.__allowed_errors = allowed_errors
        self.__remaining_errors = allowed_errors

    def add_error(self) -> None:
        self.__remaining_errors -= 1
        # If there are no remaining errors left, raise a runtime error
        if self.__remaining_errors <= 0:
            raise RuntimeError('Encountered more than the allowable number of errors.')
        
    def reset_errors(self) -> None:
        # Reset the remaining errors to the original allowed amount
        self.__remaining_errors = self.__allowed_errors


The function searches the Spotify API for up to 1000 artists that match the specified genre, using the search_artists() function. It pauses for one second between each API request to avoid being rate-limited by the Spotify API.

In [47]:
def get_artists_by_genre(genre: str, limiter: RateLimiter, error_checker: ErrorLimiter) -> None:
    # Number of artists to search for at a time.
    BATCH_SIZE = 50
    STARTING_INDEX = 0
    TOTAL_TO_GET = 1000

    # Get an access token using the client ID and secret
    access_token: str = request_client_credentials_token(client_id, client_secret)

    # Loop through batches of artists
    for offset in range(STARTING_INDEX, TOTAL_TO_GET, BATCH_SIZE):
        try:
            # Check if the rate limit has been reached before making the API call
            limiter.consult_limiter()

            # Search for artists in the given genre
            artists_batch: List[Dict] = search_artists(genre, 
                                                    BATCH_SIZE, 
                                                    access_token,
                                                    offset
                                                    )['artists']['items']
        except requests.exceptions.HTTPError as http_error:
            # If there's an HTTP error, print the error and add one to the error count.
            print(http_error)
            error_checker.add_error()
            return None

        # Loop through each artist in the batch
        for artist in artists_batch:
            artist_id = artist['id']
            artist_name = artist['name']

            try:
                # Get top ten tracks
                limiter.consult_limiter()
                top_tracks: List[Dict] = get_artist_top_tracks(artist_id, 
                                                                market='US',
                                                                access_token=access_token
                                                                )['tracks']
                # Loop through each track and get its audio features
                for track in top_tracks:
                    track_name = track['name']
                    track_id = track['id']
                    limiter.consult_limiter()
                    audio_features: Dict = get_audio_features(track_id, access_token)

                    # If there are audio features, store the track information
                    if audio_features:
                        store_top_tracks(audio_features, artist_name, genre, track_name)
                    else:
                        print('No audio features:', track_name, track_id, genre, artist_name, audio_features)
            except requests.exceptions.HTTPError as http_error:
                # If there's an HTTP error, print the error and add one to the error count
                print(http_error, artist_name, genre)
                error_checker.add_error()

            except TypeError as te:
                # If there's a type error, print the error and add one to the error count
                print(te, artist_name, genre)
                error_checker.add_error()


In [48]:
# # Store each line (i.e., each genre) in a list called 'genres'.
# genres = []
# with open('data/selected_genres.txt', 'rt', encoding='utf-8') as fin:
#     for line in fin:
#         genres.append(line.strip())

# # Connect to the expanded database.
# conn = sqlite3.connect('data/song_data_2.0.db')
# cursor = conn.cursor()
# session = requests.Session()

# # Set the maximum calls per minute and window size, and create a rate limiter 
# # and error checker object
# ALLOWED_ERRORS = 3
# error_checker = ErrorLimiter(ALLOWED_ERRORS)
# MAX_PER_MINUTE = 150
# WINDOW_SIZE = 30.0
# limiter = RateLimiter(MAX_PER_MINUTE, WINDOW_SIZE)

# print('Beginning download:', time.time())

# # Set the starting index in the text file
# START_FROM = 0

# try:
#     # Loop through the genres starting from the specified index
#     for i, genre in enumerate(genres[START_FROM:]):       
#         # Add the starting index to the current index
#         i = i + START_FROM
#         try:
#             # Print a message indicating the genre and index being processed
#             print('Getting genre/index:', genre, i)
            
#             # Call the get_artists_by_genre function with the rate limiter and error checker
#             get_artists_by_genre(genre, limiter, error_checker)

#             # If success, reset error counter.
#             error_checker.reset_errors()

#             print('Successfully got:', genre, i)
#             print('Time stamp:', time.time())
#         except requests.exceptions.HTTPError as http_error:
#             print('Failed to get genre/index', genre, i)
#             print(http_error)
#             error_checker.add_error()

# finally:
#     session.close()
#     conn.close()

With our data now downloaded, we will export to a .csv for easier access.

In [49]:
# Export to CSV.
conn = sqlite3.connect('data/song_data_2.0.db')
cursor = conn.cursor()

query = """
    SELECT *
    FROM tracks_top_genres
"""

# Print database rows with audio features as csv.
features_df = pd.read_sql(query, conn)
features_df.to_csv('data/audio_features_4.0.csv', index=False, encoding='utf-8')

# References

- https://stackoverflow.com/questions/12737740/python-requests-and-persistent-sessions
- https://docs.python.org/3/library/base64.html
- https://www.freecodecamp.org/news/python-json-how-to-convert-a-string-to-json/
