# Code Pudding 2024
***

The purpose of this notebook will be to analyse data retrieved from the [Spotify Web API](https://developer.spotify.com/documentation/web-api) in order to train various machine learning models to predict the genre of any given song. Once the models have been trained, validated and tested, a function will be built that feeds the data from the API to the best preforming model, and it's genre will be predicted.

## Initialization

In [3]:
import os
import math
import random
import pandas as pd
import numpy as np
from collections import Counter
from tqdm import tqdm
import plotly.express as px

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

from dotenv import dotenv_values

## Spotify API

The following cell reads the `.env` file into a dictionary with the following keys:

- `Client_ID`
- `Client_secret`

Ensure you have followed the directions from the [Web API](https://developer.spotify.com/documentation/web-api) and have both of those values.

In [4]:
config = dotenv_values(".env")  # config = {"Client_ID": "foo", "Client_secret": "foo"}
client_credentials_manager = SpotifyClientCredentials(client_id=config['Client_ID'], client_secret=config['Client_secret'])
sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)

## Data Gathering

In [5]:
# Increase timeout and add retry logic
session = sp._session
retry = Retry(
    total=5,  # Total number of retries
    backoff_factor=0.3,  # Wait time between retries
    status_forcelist=[500, 502, 503, 504],  # Retry on these HTTP status codes
    raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry)  # Increase timeout to 10 seconds
session.mount("https://", adapter)

# Set random seed for reproducibility in pulling data from sub genres in an unbiased way

random.seed(42)

### Functions

In [6]:
def test_genre_string(genre, debug=False):
    """
    Input:
        genre: str, genre string to test
        debug: bool, print debug messages
    Output:
        bool: True if the genre is valid, False if not
        
    Tests if a genre string is valid by searching for tracks with that genre. If no tracks are found, the genre is invalid.
    """
    results = sp.search(q=f'genre:{genre}', type='track', limit=1)
    number_of_tracks = len(results['tracks']['items'])
    if number_of_tracks == 0:
        print(f"No tracks found for genre: {genre}")
        return False
    else:
        if debug:
            print(f"Found {number_of_tracks} tracks for genre: {genre}")
        return True

In [7]:
def get_genres_of_interest(genres_dict, genre_record_limit, pagination_limit=45, debug=False):
    """
    Fetches track IDs for specified genres from Spotify, ensuring a balanced representation of each genre.
    Args:
        genres_dict (dict): A dictionary where keys are super genres and values are lists of sub-genres.
        genre_record_limit (int): The maximum number of tracks to fetch per super genre.
        pagination_limit (int, optional): The number of tracks to fetch per API call. Defaults to 45.
        debug (bool, optional): If True, prints debug information. Defaults to False.
    Returns:
        tuple: A tuple containing two lists:
            - track_ids (list): A list of track IDs fetched from Spotify.
            - track_genre (list): A list of super genres corresponding to each track ID.
    Raises:
        AssertionError: If duplicate track IDs are found in the final list.
    """
    
    # Initialize variables
    track_ids = []
    track_genre = []
    seen_track_ids = set()

    for super_genre, sub_genres in genres_dict.items():
        print(f"Getting records for super genre: {super_genre}")
        super_genre_track_ids = []

        # Dictionary to track how many tracks we pulled per sub-genre
        sub_genre_counts = {sub_genre: 0 for sub_genre in sub_genres}

        # Loop until we hit the genre_record_limit for the super genre
        total_tracks_pulled = 0
        
        while total_tracks_pulled < genre_record_limit and sub_genres:
            # Calculate remaining tracks needed for the super genre
            tracks_needed = genre_record_limit - total_tracks_pulled

            # Shuffle sub-genres to randomize the pulls
            random.shuffle(sub_genres)

            for sub_genre in sub_genres[:]:
                
                # Adjust the batch size to ensure we don't exceed the genre_record_limit
                batch_size = min(pagination_limit, tracks_needed)

                # Fetch a batch of tracks for the sub-genre
                results = sp.search(q=f'genre:{sub_genre}', type='track', limit=batch_size, offset=sub_genre_counts[sub_genre])
                
                # If no items are returned, remove the sub-genre and move on
                if not results['tracks']['items']:
                    if debug:
                        print(f"No items for sub-genre: {sub_genre}, removing from sub-genres")
                    sub_genres.remove(sub_genre)  # Remove sub-genre if no more tracks are returned
                    continue  # Skip the rest of the code for this sub-genre
            
                # Add new track IDs that are not already seen, but ensure we don't exceed the genre_record_limit
                new_track_ids = [track['id'] for track in results['tracks']['items'] if track['id'] not in seen_track_ids]
                new_tracks_needed = genre_record_limit - total_tracks_pulled
                
                # Only add as many tracks as needed to reach the limit
                new_track_ids = new_track_ids[:new_tracks_needed]
                
                for track_id in new_track_ids:
                    # Add the new track to the super genre's collection
                    super_genre_track_ids.append(track_id)
                    track_genre.append(super_genre)  # Label the track with the super genre
                    seen_track_ids.add(track_id)
                
                # Update counts and totals
                sub_genre_counts[sub_genre] += len(new_track_ids)
                total_tracks_pulled += len(new_track_ids)

                if debug:
                    print(f"Fetched {len(new_track_ids)} new tracks for sub-genre: {sub_genre}")

                # If we've reached the limit for the super genre, stop
                if total_tracks_pulled >= genre_record_limit:
                    break
                
            # Check again if we've exhausted all sub-genres
            if not sub_genres:
                if debug:
                    print(f"All sub-genres for super genre exhausted.")
                break

        print(f"{len(super_genre_track_ids)} records found for super genre: {super_genre}\n")
        # Add the super genre track ids to the main list
        track_ids.extend(super_genre_track_ids)

    print("Total number of records:", len(track_ids), "\n")

    # Check for duplicates in the final list
    track_id_counts = Counter(track_ids)
    duplicates = {track_id: count for track_id, count in track_id_counts.items() if count > 1}

    # Assert no duplicates
    assert not duplicates, f"Duplicate track IDs found: {duplicates}"

    return track_ids, track_genre


In [8]:
def get_other_genres(genres_of_interest, genre_record_limit, already_seen_ids, pagination_limit=45, debug=False):
    """
    Fetches tracks from genres not included in the genres_of_interest.
    Ensures an even distribution of tracks across genres and respects the genre_record_limit.
    Args:
        genres_of_interest (dict): A dictionary where keys are super genres and values are lists of sub-genres of interest.
        genre_record_limit (int): The maximum number of tracks to fetch.
        already_seen_ids (list or set): A list or set of track IDs that have already been gathered.
        pagination_limit (int, optional): The number of tracks to fetch per API call. Defaults to 45.
        debug (bool, optional): If True, prints debug information. Defaults to False.
    Returns:
        tuple: A tuple containing:
            - other_track_ids (list): A list of track IDs from the 'other' genres.
            - genre_labels (list): A list of genre labels corresponding to the fetched track IDs.
            - genre_counts (dict): A dictionary with genres as keys and the count of fetched tracks as values.
    Raises:
        AssertionError: If duplicate track IDs are found in the final list of track IDs.
    """

    print("Getting 'other' genres.")
    already_seen_ids = set(already_seen_ids)  # Ensure it's a set for fast lookup

    # Flatten the dictionary to get all sub-genres in genres_of_interest
    sub_genres_of_interest = {sub_genre for super_genre, sub_genres in genres_of_interest.items() for sub_genre in sub_genres}

    # All genres excluding genres_of_interest
    other_genres = [genre for genre in sp.recommendation_genre_seeds()['genres'] if genre not in sub_genres_of_interest]

    # Dictionary to track how many tracks we pulled per genre
    genre_counts = {genre: 0 for genre in other_genres}

    # List to hold unique track IDs for this function
    other_track_ids = []

    # Track total number of new tracks pulled
    total_tracks_pulled = 0

    # Loop until we hit the genre_record_limit
    while total_tracks_pulled < genre_record_limit and other_genres:
        # Calculate remaining tracks needed for the overall genre
        tracks_needed = genre_record_limit - total_tracks_pulled

        # Shuffle genres to randomize the pulls
        random.shuffle(other_genres)

        for genre in other_genres[:]:
            # Adjust the batch size to ensure we don't exceed the genre_record_limit
            batch_size = min(pagination_limit, tracks_needed)

            # Fetch a batch of tracks for the genre
            results = sp.search(q=f'genre:{genre}', type='track', limit=batch_size, offset=genre_counts[genre])

            # If no items are returned, remove the genre and move on
            if not results['tracks']['items']:
                if debug:
                    print(f"No items for genre: {genre}, removing from other_genres")
                other_genres.remove(genre)
                continue

            # Add new track IDs that are not already seen, but ensure we don't exceed the genre_record_limit
            new_track_ids = [track['id'] for track in results['tracks']['items'] if track['id'] not in already_seen_ids]
            new_tracks_needed = genre_record_limit - total_tracks_pulled

            # Only add as many tracks as needed to reach the limit
            new_track_ids = new_track_ids[:new_tracks_needed]

            for track_id in new_track_ids:
                # Add the new track to the other track IDs collection
                other_track_ids.append(track_id)
                already_seen_ids.add(track_id)  # Also add to already seen IDs to avoid duplicates
                genre_counts[genre] += 1
                total_tracks_pulled += 1

            if debug:
                print(f"Fetched {len(new_track_ids)} new tracks for genre: {genre}")

            # If we've reached the genre record limit, stop
            if total_tracks_pulled >= genre_record_limit:
                break

    print(f"Total number of new records: {len(other_track_ids)} new tracks")

    if debug:
        # Print genre counts only for genres with tracks > 0
        print(f"\nGenre Counts:")
        for genre, count in genre_counts.items():
            if count > 0:
                print(f"{genre}: {count}")

    # Create genre labels for the new tracks
    genre_labels = ["other"] * len(other_track_ids)

    # Check for duplicates in the final list of track IDs
    track_id_counts = Counter(other_track_ids)
    duplicates = {track_id: count for track_id, count in track_id_counts.items() if count > 1}

    # Assert no duplicates
    assert not duplicates, f"Duplicate track IDs found: {duplicates}"

    return other_track_ids, genre_labels, genre_counts


In [9]:
def amend_sub_genres(sub_genres):
    """
    Input: A list of sub-genres
    Output: The same list with any sub-genres removed that do not return results from the Spotify API
    """
    sub_genres = sub_genres.copy()
    # Modify the sub_genres list in place
    before = len(sub_genres)
    print(f"Number of sub genres before check: {before}")
    
    # Create a copy of the list to avoid modifying it while iterating
    for genre in sub_genres[:]:
        if not test_genre_string(genre):
            sub_genres.remove(genre)
            print(f"Removed {genre} from sub_genres.")
    
    print(f"Number of sub genres after check: {len(sub_genres)}, {before - len(sub_genres)} removed.")
    
    return sub_genres

### Define sub genres to super genre pairing

In [10]:
## You can put any string in the lists below, the next cell will test if it is a valid genre

genres_of_interest = {
    'rock': [
            'rock',
            'alt-rock',
            'hard-rock',
            'j-rock',
            'psych-rock',
            'punk-rock',
            'rock-n-roll',
            'rockabilly',
            'grunge',
            'punk'
            ],
    'pop': [
            "pop",
            "Dance Pop",
            "Electropop",
            "Indie Pop",
            "Synth-pop",
            "Pop Rock",
            "Teen Pop",
            "Power Pop",
            "Art Pop",
            "Pop Punk",
            "K-Pop",
            "J-Pop",
            "Latin Pop",
            "Dream Pop",
            "Bubblegum Pop",
            "Euro Pop",
            "Pop Rap",
            "Chamber Pop",
            "Baroque Pop",
            "Pop Soul",
            "Acoustic Pop",
            "j-pop",
            "k-pop",
            ],
    'rap/hip-hop': [
                "Hip Hop",
                "Hip-Hop",
                "Rap",
                "Trap",
                "Gangsta Rap",
                "East Coast Hip Hop",
                "West Coast Hip Hop",
                "Conscious Hip Hop",
                "Alternative Hip Hop",
                "Boom Bap",
                "Dirty South",
                "Crunk",
                "Drill",
                "Grime",
                "Cloud Rap",
                "Underground Hip Hop",
                "Emo Rap",
                "Hardcore Hip Hop",
                "Lofi Hip Hop",
                "Old School Hip Hop",
                "Christian Hip Hop",
                "Latin Hip Hop"
                ],
    'classical': [
                "Classical",
                "Baroque",
                "Romantic",
                "Classical",
                "Chamber Music",
                "Symphony",
                "Opera",
                "Choral",
                "Contemporary Classical",
                "Minimalism",
                "Orchestral",
                "Piano",
                "String Quartet",
                "Early Music",
                "Renaissance",
                "Modern Classical",
                "Neoclassical",
                "Impressionism",
                "Avant-Garde",
                "Sacred Classical",
                "Cantata",
                "Piano"
                ],
    'jazz': [
                "Jazz",
                "Bebop",
                "Swing",
                "Smooth Jazz",
                "Cool Jazz",
                "Hard Bop",
                "Free Jazz",
                "Fusion",
                "Modal Jazz",
                "Latin Jazz",
                "Avant-Garde Jazz",
                "Gypsy Jazz",
                "Vocal Jazz",
                "Jazz Funk",
                "Jazz Blues",
                "Soul Jazz",
                "Post-Bop",
                "Ragtime",
                "Big Band",
                "Dixieland",
                "Nu Jazz",
                "Jazz Fusion",
                ]
}

In [11]:
# If a sub genre string is invalid, it will be removed from the list to avoid unnecessary API calls

for super_genre in genres_of_interest:
    print(f"\nChecking sub-genres for {super_genre}")
    genres_of_interest[super_genre] = amend_sub_genres(genres_of_interest[super_genre])


Checking sub-genres for rock
Number of sub genres before check: 10
Number of sub genres after check: 10, 0 removed.

Checking sub-genres for pop
Number of sub genres before check: 23
Number of sub genres after check: 23, 0 removed.

Checking sub-genres for rap/hip-hop
Number of sub genres before check: 22
No tracks found for genre: Lofi Hip Hop
Removed Lofi Hip Hop from sub_genres.
Number of sub genres after check: 21, 1 removed.

Checking sub-genres for classical
Number of sub genres before check: 22
No tracks found for genre: Sacred Classical
Removed Sacred Classical from sub_genres.
No tracks found for genre: Cantata
Removed Cantata from sub_genres.
Number of sub genres after check: 20, 2 removed.

Checking sub-genres for jazz
Number of sub genres before check: 22
No tracks found for genre: Modal Jazz
Removed Modal Jazz from sub_genres.
Number of sub genres after check: 21, 1 removed.


### Data Retrieval

In [12]:
genre_record_limit = 1500 # Number of tracks to fetch per genre
pagination_limit = 45 # Number of tracks to fetch per API call, max is 50 but I found 45 to be more reliable

In [13]:
track_ids, track_genre = get_genres_of_interest(genres_of_interest, genre_record_limit, pagination_limit)

Getting records for super genre: rock
1500 records found for super genre: rock

Getting records for super genre: pop
1500 records found for super genre: pop

Getting records for super genre: rap/hip-hop
1500 records found for super genre: rap/hip-hop

Getting records for super genre: classical
1500 records found for super genre: classical

Getting records for super genre: jazz
1500 records found for super genre: jazz

Total number of records: 7500 



In [14]:
# Assert length of track_ids is equal to genre_record_limit * number of super genres
assert len(track_ids) == genre_record_limit * len(genres_of_interest), f"Expected {genre_record_limit * len(genres_of_interest)} tracks, but got {len(track_ids)}"

# Assert no duplicates
assert len(track_ids) == len(set(track_ids)), "Duplicate tracks found"

In [15]:
# Get other genres
other_track_ids, other_genre_labels, other_genre_counts = get_other_genres(genres_of_interest, genre_record_limit, track_ids)

# # This is helpful to see how many tracks were fetched for each super genre
# for genre, count in other_genre_counts.items():
#     if count > 0:
#         print(f"{genre}: {count}")


Getting 'other' genres.
Total number of new records: 1500 new tracks


In [16]:
# Assert no track_ids overlap between genres_of_interest and other genres
assert not set(track_ids).intersection(other_track_ids), "Overlap between genres_of_interest and other genres"

In [17]:
# Combine the two lists
all_track_ids = track_ids + other_track_ids
all_track_genre = track_genre + other_genre_labels

# Create a DataFrame
track_genres_df = pd.DataFrame({"track_id": all_track_ids, "genre": all_track_genre})

# Assert no duplicates
assert track_genres_df['track_id'].nunique() == len(track_genres_df), "Duplicate track IDs found in the final DataFrame"


In [18]:
# Get audio features for each track with a progress bar
track_features = []
for i in tqdm(range(0, len(track_genres_df), pagination_limit), desc="Fetching audio features"):
    features = sp.audio_features(all_track_ids[i:i+pagination_limit])
    
    # Raise error if no features are returned
    if not features:
        raise ValueError(f"No audio features returned for tracks: {all_track_ids[i:i+pagination_limit]}")
    
    for feature in features:
        # Raise error if no features are returned for individual tracks
        if not feature:
            raise ValueError(f"No audio features returned for track: {all_track_ids[i:i+pagination_limit]}")
        track_features.append(feature)



Fetching audio features: 100%|██████████| 200/200 [00:33<00:00,  5.99it/s]


In [19]:
# To DataFrame
track_features_df = pd.DataFrame(track_features)
# Rename id to track_id
track_features_df.rename(columns={'id': 'track_id'}, inplace=True)

# Assert no duplicates
assert track_features_df['track_id'].nunique() == len(track_features_df), "Duplicate track IDs found in the final DataFrame"

# Assert same length as track_genres_df
assert len(track_features_df) == len(track_genres_df), "Length of track_features_df and track_genres_df do not match"

In [20]:
# Merge the two DataFrames
all_data = pd.merge(track_genres_df, track_features_df, on='track_id')

print(len(all_data))

9000


In [20]:
# Check if "spotify_data.csv" already exists
if os.path.exists("spotify_data.csv"):
    # Option to rename
    input_text = input("File already exists. Do you want to overwrite it? (y/n): ")
    if input_text.lower() == "y":
        # Save the data
        all_data.to_csv("spotify_data.csv", index=False)
    else:
        input_text = input("Do you want to save the data to a new file? (y/n): ")
        if input_text.lower() == "y":
            file_name = input("Enter the new file name (.csv extension will be added): ")
            all_data.to_csv(file_name + ".csv", index=False)
        else:
            print("Data not saved.")
else:
    # Save the data
    all_data.to_csv("spotify_data.csv", index=False)

Data not saved.


## EDA

In [40]:
data = pd.read_csv('spotify_data.csv')
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5829 entries, 0 to 5828
Data columns (total 19 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   track_id          5829 non-null   object 
 1   genre             5829 non-null   object 
 2   danceability      5829 non-null   float64
 3   energy            5829 non-null   float64
 4   key               5829 non-null   int64  
 5   loudness          5829 non-null   float64
 6   mode              5829 non-null   int64  
 7   speechiness       5829 non-null   float64
 8   acousticness      5829 non-null   float64
 9   instrumentalness  5829 non-null   float64
 10  liveness          5829 non-null   float64
 11  valence           5829 non-null   float64
 12  tempo             5829 non-null   float64
 13  type              5829 non-null   object 
 14  uri               5829 non-null   object 
 15  track_href        5829 non-null   object 
 16  analysis_url      5829 non-null   object 


In [6]:
print(data.duplicated().sum())
print(data['track_id'].duplicated().sum())

0
0


In [41]:
data['genre'].value_counts()

genre
other        1500
rock          900
hip hop       900
jazz          898
classical     856
pop           775
Name: count, dtype: int64

## Search Function

Created by Isaiah Montoya

In [39]:
def get_track_features(song_title, artist_name):
    # Search for the song using Spotipy's search function
    result = sp.search(q=f"track:{song_title} artist:{artist_name}", type='track', limit=1)
    
    if result['tracks']['items']:
        # Extract the track ID from the search result
        track = result['tracks']['items'][0]
        track_id = track['id']
        track_name = track['name']
        artist_name = track['artists'][0]['name']
        
        print(f"Found track: {track_name} by {artist_name}")
        
        # Use the track ID to get the song's features
        features = sp.audio_features(track_id)
        return features[0]  # Return the features dictionary
    else:
        print(f"No results found for {song_title} by {artist_name}")
        return None

# Example usage
song_title = "Spybreak-Short One"
artist_name = "Propellerheads"
features = get_track_features(song_title, artist_name)

if features:
    print("Audio Features:")
    print(features)

Found track: Spybreak! - Short One by Propellerheads
Audio Features:
{'danceability': 0.552, 'energy': 0.929, 'key': 1, 'loudness': -8.626, 'mode': 1, 'speechiness': 0.0458, 'acousticness': 7.14e-06, 'instrumentalness': 0.846, 'liveness': 0.18, 'valence': 0.466, 'tempo': 127.834, 'type': 'audio_features', 'id': '6AyXbkn4cwrErFIMbRBRDs', 'uri': 'spotify:track:6AyXbkn4cwrErFIMbRBRDs', 'track_href': 'https://api.spotify.com/v1/tracks/6AyXbkn4cwrErFIMbRBRDs', 'analysis_url': 'https://api.spotify.com/v1/audio-analysis/6AyXbkn4cwrErFIMbRBRDs', 'duration_ms': 244400, 'time_signature': 4}
