# Intro

In [1]:
# pip install spotipy

In [2]:
# audio_analysis(track_id)

In [3]:
import os
import re
import time
import math
import base64
import random
import pandas as pd
from datetime import datetime, timedelta

import spotipy
from spotipy.oauth2 import SpotifyOAuth
from dotenv import load_dotenv
from requests import post, get

In [4]:
# Static variables.
# Path names.
DATA_LOC = r'C:\Users\enriq\OneDrive\Desktop\Work\Code\HitFinder\Data'
TRACKS_LOC = os.path.join(DATA_LOC, 'Tracks')
# Other.
MAX_TRACKS_PER_FN_CALL = 100
MY_USERNAME = 'rc3email'   # my profile
MY_PLAYLIST_ID = '0a9qA4m3BDqwafHpXxX1zh'
TOP_10_STR = 'is_top_10'
DO_EXPORT = False
RANDOM_STATE = 0
random.seed(RANDOM_STATE)

# Obtain data

### Client definitions

In [5]:
# Load environment first.
load_dotenv()
CLIENT_ID = os.getenv('CLIENT_ID')
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
# print(CLIENT_ID, CLIENT_SECRET)

In [6]:
# Initialize spotipy variable.
if 'sp_global' not in globals():
    sp_global = spotipy.Spotify(
        auth_manager = SpotifyOAuth(
            client_id = CLIENT_ID,
            client_secret = CLIENT_SECRET,
            redirect_uri = 'http://localhost:8080',
            scope = 'user-library-read',
        )
    )
# This may ask you to copy a link that it takes you to, and paste it into the input box.

## Function definitions

### `spotipy`-based functions

In [7]:
def get_track_features(
        input_dict: dict,
        use_tracks_dict: bool,
        call_num: int = -1,
        sp: spotipy.client.Spotify = sp_global,
        max_tracks_per_fn_call: int = MAX_TRACKS_PER_FN_CALL,
        top_10_str: str = TOP_10_STR,
):
    """
    Get JSON of tracks in a playlist.

    Parameters
    --------
    playlist_dict : dict
        Playlist to get track data from.
    call_num : int
        Which call number we are on for the function.
        Determines offset to get tracks in playlists with over 100 tracks.
    sp : spotipy.client.Spotify
        spotipy object to retrieve track data.
    max_tracks_per_fn_call : int, default=MAX_TRACKS_PER_FN_CALL
        Maximum number of tracks retrieved by certain spotipy functions.
    top_10_str : str, default=TOP_10_STR
        Name of feature for whether the song is in the artist's top 10.
    
    Returns
    --------
    tracks : pandas.core.frame.DataFrame
        Partial track information without most audio-related features.

    """

    # Get the current portion of track names and IDs from the playlist.
    offset = call_num * max_tracks_per_fn_call  # use offset to skip ahead in the playlist
    # Use playlist to get tracks dictionary if none passed.
    if not use_tracks_dict:
        tracks_dict = sp.playlist_tracks(input_dict['id'], offset = offset)
        tracks_list = [item['track'] for item in tracks_dict['items']]  # subset to track data
    else:
        tracks_dict = input_dict
        track_IDs = [t['id'] for t in tracks_dict['items']]
        tracks_list = [sp.track(track_ID) for track_ID in track_IDs]
    tracks_list = [t for t in tracks_list if t != None] # remove tracks with no data

    # Get specified features from the track that are unavailable later.
    feats = ['name', 'id', 'popularity', 'explicit']
    tracks = pd.DataFrame([[t[f] for f in feats] for t in tracks_list])
    # 'artist' and 'album' are deeper in the dictionary.
    deep_feats = ['artist', 'artist_id', 'album']
    artist = [(i['artists'][0]['name'], i['artists'][0]['id']) for i in tracks_list]
    album = [i['album']['name'] for i in tracks_list]

    # Combine data so far.
    tracks = pd.concat([tracks, pd.DataFrame(artist), pd.DataFrame(album)], axis = 1)
    tracks.columns = feats + deep_feats

    # Add top-10 status.
    artist_IDs = tracks['artist_id'].unique()
    top_tracks = [sp.artist_top_tracks(_ID)['tracks'] for _ID in artist_IDs]
    top_tracks_IDs = [item['id'] for sublist in top_tracks for item in sublist]
    tracks[top_10_str] = [1 if ID in top_tracks_IDs else 0 for ID in tracks['id']]

    # Reorder columns.
    new_order = feats[:2] + deep_feats + [feats[2], top_10_str, feats[3]]
    tracks = tracks[new_order]
    
    return tracks

In [8]:
def get_audio_features(
        tracks: pd.DataFrame,
        call_num: int,
        sp: spotipy.client.Spotify = sp_global,
        max_tracks_per_fn_call: int = MAX_TRACKS_PER_FN_CALL,
):
    """
    Get audio features for a set of tracks in a playlist.

    Parameters
    --------
    tracks : pandas.core.frame.DataFrame
        Tracks data.
    call_num : int
        Which call number we are on for the function.
        Determines indices of the tracks we want to get features for.
    sp : spotipy.client.Spotify
        spotipy object to retrieve track data.
    max_tracks_per_fn_call : int, default=MAX_TRACKS_PER_FN_CALL
        Maximum number of tracks retrieved by certain spotipy functions.

    Returns
    --------
    audio_features : pandas.core.frame.DataFrame
        Audio features of the tracks we searched for.

    """
    
    # Get indices of tracks for this call.
    lower_idx = call_num * max_tracks_per_fn_call
    upper_idx = lower_idx + max_tracks_per_fn_call
    # Get audio features for this set of tracks.
    tracks_this_call = tracks[lower_idx:upper_idx]
    audio_feat_tmp = sp.audio_features(tracks_this_call['id'])
    # Remove tracks with no data.
    audio_feat_tmp = [f for f in audio_feat_tmp if f != None]
    # Convert to dataframe type.
    audio_features = pd.DataFrame(audio_feat_tmp)

    return audio_features

In [9]:
def get_tracks_data(
        input_dict: dict = {},
        max_tracks_per_fn_call: int = MAX_TRACKS_PER_FN_CALL,
        tracks_obtained: pd.DataFrame = pd.DataFrame(),
):
    """
    Get track data of all tracks in a playlist.

    Parameters
    --------
    input_dict : dict
        Either the track JSON, or the playlist JSON to get track data from.
    max_tracks_per_fn_call : int, default=MAX_TRACKS_PER_FN_CALL
        Maximum number of tracks retrieved by certain spotipy functions.
    tracks_obtained : pandas.core.frame.DataFrame, default=pd.DataFrame()
        Track data we have already obtained so far.
    
    Returns
    --------
    tracks : pandas.core.frame.DataFrame
        Track names, IDs, and audio features.

    """

    # Determine dictionary type passed: tracks, or playlist.
    use_tracks_dict = False if 'tracks' in input_dict else True
    # Initialize beginning track data from tracks in this playlist.
    try:
        if use_tracks_dict:
            num_tracks = input_dict['total']
        else:
            num_tracks = input_dict['tracks']['total']
    except KeyError:
        print('Key was not found; max API calls likely exceeded.')
        return pd.DataFrame()
    # Split the function calls into separate calls if > 100 tracks.
    num_fn_calls = math.ceil(num_tracks / max_tracks_per_fn_call)
    num_fn_calls_iter = range(num_fn_calls)
    tracks = [
        get_track_features(input_dict, use_tracks_dict, call_num = n)
        for n in num_fn_calls_iter
    ]
    # Combine data obtained.
    tracks = pd.concat([df for df in tracks], ignore_index = True)
    # Only get the tracks we need, if we already have some.
    if not tracks_obtained.empty:   # check if df has values
        tracks = tracks.loc[~tracks['id'].isin(tracks_obtained['id'])]
    # Do early stopping if no tracks.
    if tracks.empty:
        return pd.DataFrame()
    
    # Get track audio features in batches of 100.
    track_features = [get_audio_features(tracks, n) for n in num_fn_calls_iter]
    concat_dt = [df for df in track_features]
    track_features = pd.concat(concat_dt, ignore_index = True)

    # Merge audio feature data to initial track data.
    tracks = pd.merge(left = tracks, right = track_features, how = 'left', on = ['id'])
    tracks = tracks.drop(columns = ['type', 'uri', 'track_href', 'analysis_url'])

    return tracks

### Custom API functions

In [10]:
def get_token(
        client_id: str = CLIENT_ID,
        client_secret: str = CLIENT_SECRET,
):
    """
    Get token to access Spotify API.

    Parameters
    --------
    client_id : str, default=CLIENT_ID
        Personal Spotify client ID.
    client_secret : str, default=CLIENT_SECRET
        Personal Spotify client secret.

    Returns
    --------
    token : str
        Token that can be used to access Spotify API.
    
    """

    # Get authorization token to access Spotify API.
    auth_string = client_id + ':' + client_secret
    auth_bytes = auth_string.encode('utf-8')
    auth_base64 = str(base64.b64encode(auth_bytes), 'utf-8')
    url = 'https://accounts.spotify.com/api/token'
    headers = {
        'Authorization': 'Basic ' + auth_base64,
        'Content-Type': 'application/x-www-form-urlencoded'
    }
    data = {'grant_type': 'client_credentials'}
    result = post(url, headers = headers, data = data)
    json_result = result.json()
    token = json_result['access_token']

    return token

In [11]:
def get_auth_header(
        token: str,
):
    """
    Get authorization header used to bear token to use API.

    Parameters
    --------
    token : str
        Token that can be used to access Spotify API.

    Returns
    --------
    authorization_header : dict[str]
        Authorization header used to get API result.
    
    """

    authorization_header = {'Authorization': 'Bearer ' + token}

    return authorization_header

In [12]:
def search_for_artist(
        token: str,
        artist_name: str,
):
    """
    Search for an artist by their name using API.

    Parameters
    --------
    token : str
        Token that can be used to access Spotify API.
    artist_name: str
        Artist name to search for.

    Returns
    --------
    json_result : dict
        JSON of artist information from the search results.
    
    """

    # Query artist name.
    url = 'https://api.spotify.com/v1/search'
    headers = get_auth_header(token)
    query = f'?q={artist_name}&type=artist&limit=1'
    query_url = url + query
    result = get(query_url, headers = headers)
    json_result = result.json()['artists']['items']
    if len(json_result) == 0:
        print('No artist with this name exists.')
        return None
    json_result = json_result[0]    # get first search result

    return json_result

In [13]:
def get_album_tracks(
        token: str,
        album_ID: str,
):
    """
    Get track information from a given album.

    Parameters
    --------
    token : str
        Token that can be used to access Spotify API.
    album_ID: str
        Album ID to search for to get track(s) JSON.

    Returns
    --------
    json_result : dict
        JSON of album track info from the search results.
    
    """

    # Get an artist's top 10 tracks using artist ID.
    url = f'https://api.spotify.com/v1/albums/{album_ID}/tracks'
    headers = get_auth_header(token)
    result = get(url, headers = headers)
    try:
        json_result = result.json()#['tracks']
    # If error, explain potential reason.
    except:
        if result.status_code == 429:
            retry_after = int(result.headers.get('Retry-After', 1))
            time_in_min = round(retry_after / 60)
            time_in_hrs = round(retry_after / (60 ** 2), 1)
            datetime_out = datetime.now() + timedelta(hours = time_in_hrs)
            print(
                'Rate limited. Timed out for', time_in_min, 'min (' +
                str(time_in_hrs), 'hrs). Retry at:', datetime_out
            )
        json_result = {}

    return json_result

In [14]:
token = get_token()
# print(token)

## Get track data

### Global top artists tracks

In [15]:
# Read global top artists' names in.
# Data source: https://kworb.net/spotify/listeners.html
artist_names_path = os.path.join(DATA_LOC, r'tmp\global_top_artists_names.csv')
artist_names = pd.read_csv(artist_names_path)
artist_names.head()

Unnamed: 0,Artist,Listeners,Daily Trend,Peak,PkListeners
0,Billie Eilish,105759656,6626996,1,105759656
1,The Weeknd,105046690,6464509,1,117203987
2,Taylor Swift,95647057,4983541,1,116229071
3,Coldplay,87339317,7695998,4,87403624
4,Post Malone,84787711,5067028,3,98466468


In [16]:
# Get artist IDs for the artists in this file.
artist_names_IDs_path = os.path.join(DATA_LOC, r'tmp\global_top_artists_names_IDs.csv')
# Use custom function if we don't have the data already.
if not os.path.exists(artist_names_IDs_path):
    print('Using custom functions to get data.')
    artist_names_list = artist_names['Artist'].tolist()
    artist_IDs = [search_for_artist(token, name)['id'] for name in artist_names_list]
    artist_names_IDs = pd.concat([artist_names['Artist'], pd.Series(artist_IDs)], axis = 1)
    artist_names_IDs.columns = ['artist_name', 'artist_ID']
    # Export.
    artist_names_IDs.to_csv(artist_names_IDs_path, index = False)
# Read in the data otherwise.
else:
    print('Reading in previous data at', artist_names_IDs_path)
    artist_names_IDs = pd.read_csv(artist_names_IDs_path)

artist_names_IDs.head()

Reading in previous data at C:\Users\enriq\OneDrive\Desktop\Work\Code\HitFinder\Data\tmp\global_top_artists_names_IDs.csv


Unnamed: 0,artist_name,artist_ID
0,Billie Eilish,6qqNVTkY8uBg9cP3Jd7DAH
1,The Weeknd,1Xyo4u8uXC1ZmMpatF05PJ
2,Taylor Swift,06HL4z0CvFAxyc27GXpf02
3,Coldplay,4gzpq5DPGxSnKTe4SA8HAU
4,Post Malone,246dkjvS1zLTtiykXe5h60


In [17]:
# Get artist's album IDs.
album_IDs_path = os.path.join(DATA_LOC, r'tmp\global_top_artists_album_IDs.csv')
# Use spotipy to get data if we don't have the data already.
if not os.path.exists(artist_names_IDs_path):
    print('Using spotipy to get data.')
    artist_IDs = artist_names_IDs['artist_ID'].tolist()
    album_IDs = [
        [
            album['id']
            for album in sp_global.artist_albums(_ID)['items']
        ]
        for _ID in artist_IDs
    ]
    album_IDs = pd.Series([item for row in album_IDs for item in row])
    album_IDs = pd.DataFrame(album_IDs, columns = ['album_ID'])
    # Export.
    album_IDs.to_csv(album_IDs_path, index = False)
# Read in the data otherwise.
else:
    print('Reading in previous data at', album_IDs_path)
    album_IDs = pd.read_csv(album_IDs_path)

album_IDs.head()

Reading in previous data at C:\Users\enriq\OneDrive\Desktop\Work\Code\HitFinder\Data\tmp\global_top_artists_album_IDs.csv


Unnamed: 0,album_ID
0,7aJuG4TFXa2hmE4z1yxc3n
1,0JGOiO34nwfUdDrD612dOp
2,0S0KGZnfBGSIssfF54WSJh
3,3ThlxfLSy4bfKzxWqmC7VN
4,4YCeHlXgJTKlzuwHmvZZo8


In [18]:
# Set chunks obtained.
fname = 'global_top_artist_tracks_chunk_IDs_obtained.csv'
chunk_IDs_obtained_path = os.path.join(TRACKS_LOC, fname)
if not os.path.exists(chunk_IDs_obtained_path):
    chunk_IDs_obtained = pd.DataFrame(columns = ['chunk_ID'])
else:
    chunk_IDs_obtained = pd.read_csv(chunk_IDs_obtained_path)

# Set tracks obtained from these chunks.
fname = 'global_top_artist_tracks.csv'
all_tracks_obtained_path = os.path.join(TRACKS_LOC, fname)
if not os.path.exists(all_tracks_obtained_path):
    all_tracks_obtained = pd.DataFrame()
else:
    all_tracks_obtained = pd.read_csv(all_tracks_obtained_path)

In [19]:
# There are very many albums, so we need to cut the list down and chunk it.
# Start at a chunk we don't already have.
chunk_ID_start = 0
while chunk_ID_start in set(chunk_IDs_obtained['chunk_ID']):
    chunk_ID_start += 1
# Get every nth album.
n_step = 20
album_IDs_subset = album_IDs.iloc[chunk_ID_start::n_step]
album_IDs_list = [album_ID for album_ID in album_IDs_subset['album_ID']]
# Get chunks of albums for separate function calls.
chunk_size = 50
album_IDs_chunks = [
    album_IDs_list[i:i + chunk_size]
    for i in range(0, len(album_IDs_list), chunk_size)
]
album_IDs_chunks[0][:5]

['4qZBW3f2Q8y0k1A84d4iAO',
 '3lS1y25WAhcqJDATJK70Mq',
 '2OkEsqGTfu8PWRrNHzfr0m',
 '3HHNR44YbP7XogMVwzbodx',
 '1Kw1bVd07oRqcjrcjQKC8T']

In [20]:
# Get tracks from albums specified.
chunk_added = False
for idx, album_ID_chunk in enumerate(album_IDs_chunks):

    # Set file name using chunk we're currently on.
    chunk_ID = (idx * n_step) + chunk_ID_start
    fname = 'global_top_artist_tracks_chunk_' + str(chunk_ID) + '.csv'
    global_top_artist_tracks_path = os.path.join(TRACKS_LOC, fname)

    # Use spotipy to get data if we don't have the data already.
    global_top_artist_tracks = pd.DataFrame()
    if chunk_ID not in chunk_IDs_obtained.values:
        print('Checking for', fname + '...\nFile not found. Getting data from Spotify.')
        if chunk_added:
            time.sleep(0)  # space out calls
        for album_ID in album_ID_chunk:
            tracks_dict = get_album_tracks(token, album_ID)
            new_tracks = get_tracks_data(tracks_dict, tracks_obtained = global_top_artist_tracks)
            concat_dt = [global_top_artist_tracks, new_tracks]
            global_top_artist_tracks = pd.concat(concat_dt, ignore_index = True)
        # Export if data was returned.
        if global_top_artist_tracks.empty:
            break
        print('Exporting chunk ID', chunk_ID, 'to', global_top_artist_tracks_path)
        global_top_artist_tracks.to_csv(global_top_artist_tracks_path, index = False)

        # Update chunk IDs obtained.
        chunk_IDs_obtained.loc[len(chunk_IDs_obtained), 'chunk_ID'] = chunk_ID
        chunk_IDs_obtained = chunk_IDs_obtained.sort_values('chunk_ID').astype('int32')
        chunk_IDs_obtained.to_csv(chunk_IDs_obtained_path, index = False)
        chunk_added = True
    else:
        print('Checking for', fname + '...\nFile found.')

# global_top_artist_tracks.head()

Checking for global_top_artist_tracks_chunk_26.csv...
File not found. Getting data from Spotify.


In [21]:
# Update chunks/tracks obtained.
tracks_fnames = os.listdir(TRACKS_LOC)
chunk_files = [f for f in tracks_fnames if '_chunk_' in f and 'IDs_obtained' not in f]

if chunk_files != []:
    # Combine chunked track output dataframes.
    chunk_paths = [os.path.join(TRACKS_LOC, f) for f in chunk_files]
    new_data = pd.concat([pd.read_csv(p) for p in chunk_paths], ignore_index = True)
    all_tracks_obtained = pd.concat([all_tracks_obtained, new_data], ignore_index = True)
    all_tracks_obtained.to_csv(all_tracks_obtained_path, index = False)

    # Move old chunked files.
    chunk_paths_new = [os.path.join(TRACKS_LOC, 'old', f) for f in chunk_files]
    for old_path, new_path in list(zip(chunk_paths, chunk_paths_new)):
        try:
            os.rename(old_path, new_path)
            print(f"File '{old_path}' moved to '{new_path}'.")
        except FileNotFoundError:
            print(f"File '{old_path}' does not exist.")

### Spotify playlists

In [None]:
# Get one of my playlists :-).
my_playlist_path = os.path.join(TRACKS_LOC, 'my_playlist_tracks.csv')
# Use spotipy to get data if we don't have the data already.
if not os.path.exists(my_playlist_path):
    print('Using spotipy to get track data.')
    playlists = sp_global.user_playlists(MY_USERNAME)
    my_playlist_tracks = pd.DataFrame()
    playlist = [p for p in playlists['items'] if p['id'] == MY_PLAYLIST_ID][0]
    print(f'Playlist:', playlist['name'], '\nID:', playlist['id'])
    my_playlist_tracks = get_tracks_data(playlist)
    # Export.
    my_playlist_tracks.to_csv(my_playlist_path, index = False)
# Read in the data otherwise.
else:
    print('Reading in previous data at', my_playlist_path)
    my_playlist_tracks = pd.read_csv(my_playlist_path)

# my_playlist_tracks.head()

In [None]:
# Get many playlists for larger data pool.
spotify_sample_tracks_path = os.path.join(TRACKS_LOC, 'spotify_sample_tracks.csv')
# Use spotipy to get data if we don't have the data already.
if not os.path.exists(spotify_sample_tracks_path):
    print('Using spotipy to get track data.')
    playlists = sp_global.user_playlists('spotify')
    spotify_sample_tracks = pd.DataFrame()
    for idx, playlist in enumerate(playlists['items']):
        print(f'Playlist #{idx+1}:', playlist['name'])
        new_tracks = get_tracks_data(playlist, tracks_obtained = spotify_sample_tracks)
        spotify_sample_tracks = pd.concat([spotify_sample_tracks, new_tracks], ignore_index = True)
    # Export.
    spotify_sample_tracks.to_csv(spotify_sample_tracks_path, index = False)
# Read in the data otherwise.
else:
    print('Reading in previous data at', spotify_sample_tracks_path)
    spotify_sample_tracks = pd.read_csv(spotify_sample_tracks_path)

# spotify_sample_tracks.head()