`python3.11 -m pip install jupyterlab PyEnchant scipy numpy spotipy scikit-learn --user`

# Init & Login

In [1]:
import time, pickle, os
now = time.time
from math import ceil
from random import randrange, choice, random
from time import sleep
from pprint import pprint
from datetime import datetime
from uuid import uuid4

import spotipy
import spotipy.util as util

from IPython.display import clear_output
import enchant
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from scipy.spatial import cKDTree

## Client Info ##
CLIENT_ID     = ""
CLIENT_SECRET = ""
CLIENT_SCOPE  = "user-follow-modify playlist-modify-private playlist-modify-public"
USER_NAME     = "31ytgsr7wdmiaroy77msqpiupdsi"
REDIR_URI     = "https://github.com/jwatson-CO-edu/yt_shuffle_so_good"
AUTH_URL      = 'https://accounts.spotify.com/api/token'
BASE_URL      = 'https://api.spotify.com/v1/'

## API Info ##
_RESPONSE_LIMIT =  100
_ARTIST_Q_LIM   =   50
_MAX_OFFSET     = 1000
_T_LOGIN_S      = 60.0 * 10 #20 # 25 # 50
tLastAuth       = 0.0

with open( "../keys/spot_ID.txt" , 'r' ) as f:
    CLIENT_ID = f.readlines()[0].strip()

with open( "../keys/spot_SECRET.txt" , 'r' ) as f:
    CLIENT_SECRET = f.readlines()[0].strip()

token = None
spot  = None


def check_API_token():
    global tLastAuth, token, _T_LOGIN_S, spot
    tNow    = now()
    elapsed = tNow - tLastAuth
    if elapsed >= _T_LOGIN_S:
        token = util.prompt_for_user_token(
            username      = USER_NAME,
            scope         = CLIENT_SCOPE,
            client_id     = CLIENT_ID,
            client_secret = CLIENT_SECRET,
            redirect_uri  = REDIR_URI
        )
        spot = spotipy.Spotify( auth = token )
        print( token )
        clear_output( wait = True )
        sleep( 2 )
        print( "TOKEN OBTAINED" )
        tLastAuth = tNow
    else:
        print( f"TOKEN STILL VALID, AGE: {elapsed/60.0} MINUTES" )
        

In [2]:
check_API_token()

TOKEN OBTAINED


## Playlists

In [3]:

playlist = {
    'study01' : "0a2qoe6S7lYeZ6nlhZdA0v",
    'study02' : "6gbtR2cBq5PvkghidCvvGk",
    'study03' : "3o3lN2qntdEV7UKTuuC77K",
    'study04' : "41sFSisljvBDMBXtpp5NIw",
    'study05' : "02iS5AFGp8YVuUUqcQf8ys",
    'study06' : "6KI7A4MWrSM7EyKRUjxIi1",
    'study07' : "3V055Md2JdrUT8tX0af7di",
    'study08' : "0tspdJlwSgiyf2O9PO6QaP",
    'study09' : "5mHRBFoQtYy2izeZ66pG95",
    'study10' : "3832xeKGEOAXFJqE4K8kIq",
    'study11' : "65MXR4dubPL9t0P4dgTWvn",
    'study12' : "0ecSAfnD4CulIVnLt26ukI",
    'study13' : "7K9ucByFRgDuZk8KMHeJkL",
}

review = {
    'zd_Over' : "0v26bHydUxcGC5EbMlkjzG",
    'ze_Over' : "6SqlfurCBP7eeMOojaDNtS",
    'zf_Over' : "5TtKaKCouyJp7Hhtu4YlYm",
}

backfill = review['zd_Over']
_N_BKFL  = 400


## Session Database

* FILTER TYPES: {'album', 'artist', 'track', 'year', 'upc', 'tag:hipster', 'tag:new', 'isrc', 'genre',}
* SEARCH TYPES: {"album", "artist", "playlist", "track", "show", "episode", "audiobook",}

In [4]:
##### Session Database Params #####
_MOD_T_DAY_S  = 60.0 * 60 * 24
_STALE_TIME_S = _MOD_T_DAY_S * 31
_MIN_LEN_S    = 60.0 + 45.0
_DATA_DIR     = "data/"
_DATA_PREFIX  = "Study-Music-Data_"
_DATA_POSTFIX = ".pkl"
_NULL_GENRE   = "Music"

In [5]:
##### Init Session Database #####
data = {
    'time'     : now()  , # Data Structure Creation Time
    'playlists': dict() , # Study Playlist Info
    'collectID': set([]), # Currently accepted track IDs
    'review'   : dict() , # Review Playlist Info
    'reviewID' : set([]), # Previously reviewed track IDs
    'artists'  : dict() , # Study Artist Info
    'queries'  : dict() , # Queries made during music searches
    'genres'   : dict() , # Study Genre Info
    # 2024-08-11: Track info does NOT contain play count
}
# Pre-emptively Name Session File #
timestamp = datetime.now().strftime( '%Y-%m-%dT%H:%M:%S' )
outFilNam = _DATA_PREFIX + timestamp + _DATA_POSTFIX
outPath   = os.path.join( 'data/', outFilNam )

## Settings

In [6]:

## Search 01 Params ##
_N_MAX_SEARCH = 50
_N_DEF_SEARCH = 10
_YEAR_PADDING =  5

## DBSCAN Params ##
_DBS_EPSILON  =  0.400 # 0.200 # 0.300 # 0.400 # 0.500 # 0.750
_DBS_MIN_MMBR =  2

## Mini-Genre Params ##
_MRG_D_FACTOR =  1.5 # 1.0 # 2.0 #3.0
_MIN_GNR_MMBR = 15
_ONE_BILLION  = 1e9

## Query History Keys ##
_NU_REL_Q_KEY = "NewReleases:Albums" # Query Key for New Releases
_FEAT_PL_Q_KY = "Featured:Playlists" # Query Key for Featured Playlists
_ART_Q_PREFIX = "ArtistTopTracks:" # - Query Key Prefix for Artist Top Tracks

## Vector Representation ##
_V_NUM_FEATURES  = 10
_V_SPEECH_FACTOR =  (1.0/0.3724) * (2.0/3.0) * 5.0
_V_INSTR_FACTOR  =  1.0 * 5.0
_V_ACOUST_FACTOR =  1.0
_V_DANCE_FACTOR  =  1.0 /  0.8701
_V_DURATN_FACTOR =  1.0 / 1000.0 / _MIN_LEN_S / 18.7821619
_V_ENERGY_FACTOR =  1.0
_V_LIVENS_FACTOR =  1.0 /   0.9173
_V_LOUDNS_FACTOR =  1.0 /  38.53
_V_TEMPO_FACTOR  =  1.0 / (174.331-45.7) * (3.0/4.0) * 3.0
_V_VALENC_FACTOR =  1.0


In [7]:

_SKIP_GENRE_BUILD = False
_SKIP_GENRE_MERGE = False


# Query Functions

In [8]:

def get_playlist_length( playlist_ID ):
    """ Get the number of total tracks in the playlist """
    response = spot.user_playlist_tracks(
        CLIENT_ID, 
        playlist_ID, 
        fields = 'items,uri,name,id,total', 
        limit  = _RESPONSE_LIMIT
    )
    return response['total']
    

def fetch_entire_playlist( playlist_ID ):
    """ Get infodump on all plalist tracks """
    plTracks = []
    trCount  = 0
    response = spot.user_playlist_tracks(
        CLIENT_ID, 
        playlist_ID, 
        fields = 'items,uri,name,id,total', 
        limit  = _RESPONSE_LIMIT
    )
    Ntracks = response['total']
    while 1:
        trCount += len(response['items'])
        plTracks.extend( response['items'] )
        
        if trCount >= Ntracks:
            break
    
        response = spot.user_playlist_tracks(
            CLIENT_ID, 
            playlist_ID, 
            fields = 'items,uri,name,id,total', 
            limit  = _RESPONSE_LIMIT,
            offset = trCount
        )
    return plTracks


def update_music_database( db, fDb ):
    """ Update the database without overwriting important info """
    
    db['playlists'].update( fDb['playlists'] )
    
    db['collectID'] = db['collectID'].union( fDb['collectID'] )
    
    db['review'].update( fDb['review'] )
    
    db['reviewID'] = db['reviewID'].union( fDb['reviewID'] )

    for artID, artDct in fDb['artists'].items():
        dct_i = dict()
        dct_i.update( artDct )
        if artID in db['artists']:
            dct_i.update( db['artists'][ artID ] )
            dct_i['releases'] = artDct['releases']
            dct_i['releases'].extend( db['artists'][ artID ]['releases'] )
        db['artists'][ artID ] = dct_i

    for query in fDb['queries']:
        db['queries'][ query ] = db['queries'].get( query, 0 ) + fDb['queries'][ query ]

    db['genres'].update( fDb['genres'] )
    

def load_music_database( dataDir = _DATA_DIR, forceLoad = False ):
    """ Find the latest music database, test for freshness, and set current db if fresh """
    global data
    dbFiles = [os.path.join( dataDir, f ) for f in os.listdir( dataDir ) if (_DATA_PREFIX in str(f))]
    if len( dbFiles ):
        dbFiles.sort( reverse = True )
        with open( dbFiles[0], 'rb' ) as f:
            db = pickle.load( f )
        if (((data['time'] - db['time']) <= _STALE_TIME_S) or forceLoad):
            
            # data.update( db )
            update_music_database( data, db )
            
            print( f"Loaded {dbFiles[0]}!" )
            return dbFiles[0]
        else:
            print( f"File {dbFiles[0]} was STALE by {(data['time']-db['time']-_STALE_TIME_S)/_MOD_T_DAY_S} days!" )
    return None


def save_music_database( dataDct ):
    """ Pickle `dataDct` to store current music collection data as well as search activity """
    print( f"About to write {outPath} ..." )
    with open( outPath, 'wb' ) as f:
        pickle.dump( dataDct, f )
    print( "COMPLETE!" )


def populate_playlist_data( dataDct, plDict, pause_s = 1.0 ):
    """ Gather data across specified playlists """
    print( "\n### READ MUSIC COLLECTION ###\n" )
    nuDB = load_music_database()
    if nuDB is not None:
        print( f"Found current collection data at {nuDB}!" )
    else:
        for plName_i, plID_i in plDict.items():
            print( plName_i, '-', plID_i, '...' )
            dataDct['playlists'][ plName_i ] = {
                'ID'    : plID_i,
                'tracks': fetch_entire_playlist( plID_i ),
            }
            
            plSet_i = set([item['track']['id'] for item in dataDct['playlists'][ plName_i ]['tracks']])
            dataDct['collectID'] = dataDct['collectID'].union( plSet_i )
    
            for track_j in dataDct['playlists'][ plName_i ]['tracks']:
    
                for artist_k in track_j['track']['artists']:
                    artistID_j = artist_k['id']
                    if artistID_j not in dataDct['artists']:
                        dataDct['artists'][ artistID_j ] = { 
                            'name'    : track_j['track']['artists'][0]['name'], 
                            'count'   : 1, 
                            'releases': [track_j['track']['album']['release_date'],], 
                        }
                    else:
                        dataDct['artists'][ artistID_j ]['count'   ] += 1
                        dataDct['artists'][ artistID_j ]['releases'].append( track_j['track']['album']['release_date'] )
    
            sleep( pause_s )
    
    print( "\n### COMPLETE ###\n" )

# Helper Functions

In [10]:

########## CONTAINER FUNCTIONS #####################################################################

def sort_keys_by_value( dct, reverse = True ):
    """ Return a list of keys sorted by their (numeric) values """
    srtLst = list()
    for k, v in dct.items():
        srtLst.append( [v,k,] )
    srtLst.sort( key = lambda x: x[0], reverse = reverse )
    return [pair[1] for pair in srtLst] 



########## STRING ANALYSIS #########################################################################

def levenshtein_dist( s1, s2 ):
    """ Get the edit distance between two strings """
    # Author: Salvador Dali, https://stackoverflow.com/a/32558749
    if len(s1) > len(s2):
        s1, s2 = s2, s1
    distances = range(len(s1) + 1)
    for i2, c2 in enumerate(s2):
        distances_ = [i2+1]
        for i1, c1 in enumerate(s1):
            if c1 == c2:
                distances_.append(distances[i1])
            else:
                distances_.append(1 + min((distances[i1], distances[i1 + 1], distances_[-1])))
        distances = distances_
    return distances[-1]



########## STATS & SAMPLING ########################################################################


def total_pop( odds ):
    """ Sum over all categories in the prior odds """
    total = 0
    for k in odds:
        total += odds[k]
    return total


def normalize_dist( odds_ ):
    """ Normalize the distribution so that the sum equals 1.0 """
    total  = total_pop( odds_ )
    rtnDst = dict()
    for k in odds_:
        rtnDst[k] = odds_[k] / total
    return rtnDst


def roll_outcome( odds ):
    """ Get a random outcome from the distribution """
    oddsNorm = normalize_dist( odds )
    distrib  = []
    outcome  = []
    total    = 0.0
    for o, p in oddsNorm.items():
        total += p
        distrib.append( total )
        outcome.append( o )
    roll = random()
    for i, p in enumerate( distrib ):
        if roll <= p:
            return outcome[i]
    return None


# Mini-Genre Extraction

In [11]:


def fetch_entire_playlist_with_audio_features( playlist_ID, pause_s = 1.00 ):
    """ Get maximum infodump on all playlist tracks """

    plTracks = []
    Ntracks  = get_playlist_length( playlist_ID )
    trCount  = 0
    trOffst  = 0

    while trCount < Ntracks:

        lim = min( _RESPONSE_LIMIT, Ntracks-trCount )

        response = spot.user_playlist_tracks(
            CLIENT_ID, 
            playlist_ID, 
            fields = 'items,uri,name,id,total', 
            limit  = lim,
            offset = trCount
        )
        sleep( pause_s )

        trCount  += len( response['items'] )
        resTracks = response['items']
        resIDs    = list()
        
        for item in resTracks:
            resIDs.append( item['track']['id'] )

        resFeatrs = spot.audio_features( resIDs )
        # pprint( resFeatrs[0] )
        sleep( pause_s )

        for i, track_i in enumerate( resTracks ):
            track_i.update( resFeatrs[i] )

        plTracks.extend( resTracks )

    return plTracks


def get_track_vector( track ):
    """ Express the track characteristics as a vector """
    return np.array([
        track['speechiness'] * _V_SPEECH_FACTOR,
        track['instrumentalness'] * _V_INSTR_FACTOR,
        track['acousticness'] * _V_ACOUST_FACTOR,
        track['danceability'] * _V_DANCE_FACTOR,
        track['duration_ms'] * _V_DURATN_FACTOR,
        track['energy'] * _V_ENERGY_FACTOR,
        track['liveness'] * _V_LIVENS_FACTOR,
        track['loudness'] * _V_LOUDNS_FACTOR,
        (track['tempo']-45.7) * _V_TEMPO_FACTOR,
        track['valence'] * _V_VALENC_FACTOR,
    ])


def get_tracks_as_vectors( tracks ):
    """ Convert all tracks to vectors """
    Mrows  = len( tracks )
    if Mrows > 0:
        Ncols  = len( get_track_vector( tracks[0] ) )
        rtnMtx = np.zeros( (Mrows, Ncols,) ) 
        for i, trk in enumerate( tracks ):
            rtnMtx[i,:] = get_track_vector( trk )
        return rtnMtx
    else:
        return list()


def vector_distance_to_genre( qVec, genreDct ):
    """ Get Euclidean distance between `qVec` and the nearest track vector of `genreDct` """
    return genreDct['kdTree'].query( qVec )[0]
    

def track_distance_to_genre( qTrack, genreDct ):
    """ Get Euclidean distance between `qTrack` and the nearest track vector of `genreDct` """
    return vector_distance_to_genre( get_track_vector( qTrack ), genreDct )


def assign_vectors_to_tracks( tracks ):
    """ Store vector info in each track dictionary """
    matxTrks = get_tracks_as_vectors( tracks )
    for i, vec_i in enumerate( matxTrks ):
        trk_i = tracks[i]
        trk_i['vector'] = vec_i
    print( f"Stored vectors for {len(tracks)} tracks!" )


def fetch_collection_with_audio_features( dataDct, plDct, rvDct = None, pause_s = 3.0, renewSets = True ):
    """ Get maximum infodump on all playlists """

    if renewSets:
        dataDct['collectID'] = set([])
    
    print( "##### Get Collection Data #####" )
    for plName_i, plID_i in plDct.items():
        print( plName_i, '-', plID_i, '...' )
        tracks_i = fetch_entire_playlist_with_audio_features( plID_i )
        assign_vectors_to_tracks( tracks_i )
        dataDct['playlists'][ plName_i ] = {
            'ID'    : plID_i,
            'tracks': tracks_i,
            'len'   : len( tracks_i ),
        }
        dataDct['collectID'] = dataDct['collectID'].union( set([trk['track']['id'] for trk in tracks_i]) )

        # Catalog artists from collection tracks (Req'd for (Sub-)Search 01)
        for track_j in tracks_i:
            for artist_k in track_j['track']['artists']:
                artistID_j = artist_k['id']
                if artistID_j not in dataDct['artists']:
                    dataDct['artists'][ artistID_j ] = { 
                        'name'    : track_j['track']['artists'][0]['name'], 
                        'count'   : 1, 
                        'releases': [track_j['track']['album']['release_date'],], 
                    }
                else:
                    dataDct['artists'][ artistID_j ]['count'   ] += 1
                    dataDct['artists'][ artistID_j ]['releases'].append( track_j['track']['album']['release_date'] )
        
        sleep( pause_s )

    if rvDct is not None:
        print( "\n##### Get Review Data #####" )
        for plName_i, plID_i in rvDct.items():
            print( plName_i, '-', plID_i, '...' )
            tracks_i = fetch_entire_playlist_with_audio_features( plID_i )
            assign_vectors_to_tracks( tracks_i )
            dataDct['review'][ plName_i ] = {
                'ID'    : plID_i,
                'tracks': tracks_i,
                'len'   : len( tracks_i ),
            }
            dataDct['reviewID'] = dataDct['reviewID'].union( set([trk['track']['id'] for trk in tracks_i]) )
            sleep( pause_s )

    print( "\n##### Complete #####" )


def analyze_db_vector_spread( db ):
    """ Gather info for feature scaling and print it for manual scaling update """
    rtnMatx = np.ones( (2, _V_NUM_FEATURES,) )
    rtnMatx[0,:] *=  1e6
    rtnMatx[1,:] *= -1e6
    totlDct = dict()
    totlDct.update( db['playlists'] )
    totlDct.update( db['review'   ] )
    for plName_i, playls_i in totlDct.items():
        if len( playls_i['tracks'] ):
            matx_i = get_tracks_as_vectors( playls_i['tracks'] )
            mMin_i = np.min( matx_i, axis = 0 )
            mMax_i = np.max( matx_i, axis = 0 )
            for j in range( _V_NUM_FEATURES ):
                if mMin_i[j] < rtnMatx[0,j]:
                    rtnMatx[0,j] = mMin_i[j]
                if mMax_i[j] > rtnMatx[1,j]:
                    rtnMatx[1,j] = mMax_i[j]
    for j in range( _V_NUM_FEATURES ):
        print( f"Feature {j+1}, Span: {rtnMatx[1,j] - rtnMatx[0,j]}, Min: {rtnMatx[0,j]}" )
    return rtnMatx
    

In [12]:
load_music_database( forceLoad = 1 );
# fetch_collection_with_audio_features( data, playlist, review, pause_s = 3.0 )
# analyze_db_vector_spread( data );
# save_music_database( data )

Loaded data/Study-Music-Data_2024-09-01T20:53:35.pkl!


In [None]:
def genre_vector_ops( gnre ):
    """ Calculate track vectors and properties derived from them """
    gnre['vectors'] = get_tracks_as_vectors( gnre['tracks'] )
    gnre['len']     = len( gnre['tracks'] )
    if gnre['len'] > 1:
        cntr = np.mean( gnre['vectors'], axis = 0 )
        dim  = len( cntr )
        for i in range( gnre['len'] ):
            pnt_i   = gnre['vectors'][i,:]
            dist_i  = np.linalg.norm( np.subtract( cntr, pnt_i ) )
            alpha_i = np.exp( -dist_i )
            cntr    = cntr * (1.0 - alpha_i) + pnt_i * alpha_i
        gnre['center'] = cntr # 2024-08-16: This is probably guaranteed to be inside the convex hull
        gnre['kdTree'] = cKDTree( gnre['vectors'], balanced_tree = False, compact_nodes = False )
    elif gnre['len'] > 0:
        gnre['center'] = gnre['vectors'][0]
        gnre['kdTree'] = cKDTree( gnre['vectors'], balanced_tree = False, compact_nodes = False )
    else:
        gnre['center'] = None
        gnre['kdTree'] = None
    

In [None]:
from random import shuffle


def set_genre_membership( db ):
    """ Make sure all genres have a membership ID hash """
    for gnreID_k, genre_k in db['genres'].items():
        if ('trackIDs' not in genre_k):
            genre_k['trackIDs'] = set([])
        for l, track_l in enumerate( genre_k['tracks'] ):
            genre_k['trackIDs'].add( track_l['id'] )
    

def get_homeless_tracks( db ):
    """ Return a list of tracks not assicated with a current mini-genre """
    set_genre_membership( db )
    rtnLst = list()
    for plName_i, playls_i in db['playlists'].items():
        for j, track_j in enumerate( playls_i['tracks'] ):
            found   = False
            trkID_j = track_j['id']
            for gnreID_k, genre_k in db['genres'].items():
                if trkID_j in genre_k['trackIDs']:
                    found = True
                    break
            if not found:
                rtnLst.append( track_j )
    shuffle( rtnLst )
    print( f"Found {len(rtnLst)} unaffiliated tracks!" )
    return rtnLst