# Setup

Import all the necessary packages.

In [1]:
import os, platform

import re
from pprint import pprint 

import pandas as pd
from pandas import Timestamp
import numpy as np
import itertools
from datetime import datetime
from pytz import timezone

import multiprocessing as mp

import seaborn as sns
import matplotlib.pyplot as plt

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials

### Credentials

Establish my credentials for the Spotify API, and setup up an object to use for calls to the API.

In [2]:
spotify_id = 'c5c5978ddbf94927a493aaa72f9d197a'
spotify_secret = '6e531cfaa4134f1a9269cc4c0a364b1d'
REQUEST_TIMEOUT = 4

client_credentials_manager = SpotifyClientCredentials(
    client_id = spotify_id, 
    client_secret = spotify_secret)
sp = spotipy.Spotify(
    client_credentials_manager = client_credentials_manager, 
    requests_timeout = REQUEST_TIMEOUT)

# Explore

Last.FM data downloaded via https://benjaminbenben.com/lastfm-to-csv/.

Load the raw data. Notice how there are some missing album titles and timestamps. This is likely just the result of a bad script pulling from Last.FM, so we'll have to fix that. Below the counts is a random sample of the data, just to get a feel of what is in there.

In [None]:
original_history = pd.read_csv('data/alexliebscher.csv')
print(original_history.count())

Fixing missing timestamp data won't be hard, we will just backfill to take care of that. A very small percentage is missing, and I assume the missing values have a high probability of being similar to the song before.

In [None]:
original_history['timestamp'] = original_history['timestamp'].bfill()
print(original_history['timestamp'].count())

The timestamps are also missing timestamp information, so we should add that to ensure our analysis reflects my local time. In this case, all timestamps are assumed to be UTC and are converted to US/Pacific, my local zone.

In [3]:
def correctTimestamp(timestamp):
    '''
    Correct missing timezone information to US/Pacific from UTC
    
    Parameters
    ----------
    timestamp : str, pandas.Timestamp
        The naive timestamp to correct
        
    Return
    ----------
    A corrected, US/Pacific aware timestamp
    '''
    if type(timestamp) is str:
        timestamp = Timestamp(datetime.strptime(timestamp, '%x %H:%M').replace(tzinfo=timezone('UTC')))
    if timestamp.tzinfo is None:
        timestamp = timestamp.tz_localize('UTC')
        
    return timestamp.tz_convert('US/Pacific')

In [None]:
timezoned_history = original_history
timezoned_history['timestamp'] = timezoned_history['timestamp'].apply(correctTimestamp)

The first song was recorded on December 18th, 2017 at roughly 7pm. This dataset covers the following 101 days after that. A random sample is available to see the corrected timestamps.

In [None]:
history_max = timezoned_history['timestamp'].max()
history_min = timezoned_history['timestamp'].min()

print(history_min)
print(history_max - history_min)
timezoned_history.sample(5)

### Fetch full track data

In [3]:
delimeter_pattern = re.compile("[\{\}\[\]\(\)\#\'\"]")
classical_pattern = re.compile("((op\.?|no\.?)\s*\d{1,3}\s?)", re.IGNORECASE)
collections_pattern = re.compile("(^\d{1,3}\s*)")
stylizations_pattern = re.compile("[\,\-\_\&\*]\s?|\:\s")

def get_track_info(track, artist, album='', id_excl=False):
    '''
    With a track name and artist, and optionally an album name,
    search for a corresponding track via the Spotify API and
    build an object with possible descriptive data.
    
    Parameters
    ----------
    track : str
        The name of a track
    artist : str
        The name of the track's artist
    album : str, optional
        The name of the track's album
    id_excl : bool, optional
        Return only the track's Spotify ID
    
    Return
    ----------
    Descriptive track data, or just the track ID, or an empty
    dict if no data could be found for the specified track
    '''
        # remove (feat. some artist) for cleaner search
    track = track.lower()
    if " (feat" in track:
        track = track.split(" (feat")[0]
    elif " (with" in track:
        track = track.split(" (with")[0]
    elif " (&" in track:
        track = track.split(" (&")[0]
        
    # clean album names too
    album = album.lower()
    if "nan" == album:
        album = ""
    elif " (feat" in album:
        album = album.split(" (feat")[0]
    elif " (with" in album:
        album = album.split(" (with")[0]
    elif " (&" in album:
        album = album.split(" (&")[0]
        
    # compose a clean, simple query string
    query = str(track + ' ' + artist + ' ' + album).strip()
    
    query = delimeter_pattern.sub("", query) # remove various delimeter chars
    query, subs = classical_pattern.subn("", query) # remove common strings in classical track titles
                                                    # unfortunately modifies tracks such as Candy Shop 
                                                    # by 50 Cent to "candy shCent"
    if subs > 0:
        # classical music often starts with the number of pieces in
        # a collection ("12 Etudes, Op. 10: No.10 in C minor")
        query = collections_pattern.sub("", query)
        
    query = stylizations_pattern.sub(" ", query) # common stylizations in track/album names
        
    # store a new track
    _track = {}
    
    # if the song exists in the Spotify catalog, fetch info
    try:
        meta = sp.search(q='track:' + query, type='track', limit=1)
        meta = meta['tracks']['items'][0]

        if not id_excl:
            features = sp.audio_features([meta['id']])[0]
            
    except Exception as e:
        # if the track could not be found, try once more without the album
        if album is not "":
            
            retry = get_track_info(track, artist)
            # if the track couldn't be found without the album, give up
            if retry:
                return retry
            
        print('no data for: {} by {} ({})'.format(track, artist, album))
        print('query: {}\n'.format(query))
        return {}

    if id_excl and meta['id']:
        return meta['id']
    
    # store relevant information and return the object
    try:
        _track['id'] = meta['id']
        _track['name'] = meta['name']
        _track['release'] = meta['album']['release_date']
        _track['popularity'] = meta['popularity']
        _track['explicit'] = int(meta['explicit'])
        _track['artists'] = [a['id'] for a in meta['artists']]
        _track['album'] = meta['album']['name']

        _track['acousticness'] = features['acousticness']
        _track['danceability'] = features['danceability']
        _track['duration_ms'] = features['duration_ms']
        _track['energy'] = features['energy']
        _track['key'] = features['key']
        _track['liveness'] = features['liveness']
        _track['loudness'] = features['loudness']
        _track['mode'] = features['mode']
        _track['speechiness'] = features['speechiness']
        _track['tempo'] = features['tempo']
        _track['time_signature'] = features['time_signature']
        _track['valence'] = features['valence']
    except TypeError:
        return {}
    
    return _track

Extract a random sample of 50 tracks. I can use this to compare single processor efficiency with multiprocessor efficiency.

In [None]:
sample = timezoned_history.sample(50)
sample.head()

## Baseline multiprocessor efficiency

Time and record serial processing and then time multiprocessor functionality. Let this be a simple measurement of how well we can do with multiprocessing when fetching track data from the API. I learned that this was the way to go, especially as fetch 5,000+ tracks multiple times during testing, and as I update my notebook. Unfortuntaly, when beginning this project, I ran `get_track_info()` over 7,400 times with serial processing and that took about an hour and 58 minutes. Never again!

In [None]:
cpus = mp.cpu_count() # used later

In [None]:
# initialize a sharedctypes integer to count records
v = mp.Value('i', 0, lock=False)

def async_fetch(track, artist, album):
    '''
    Count and display track searches and timing
    
    Parameters
    ----------
    track : str
        The name of a track
    artist : str
        The name of the track's artist
    album : str
        The name of the track's album
    
    Return
    ----------
    Data about the track, if the track is found (otherwise, empty dict)
    '''
    if v.value % 10 == 0 and v.value is not 0:
        # after every 10 tracks searched, print progress information
        print('record: #{} at ({})\n'.format(str(v.value), datetime.now() - s))
        
    v.value += 1
    return get_track_info(track, artist, album)

def serial(tracks):
    '''
    A serial processor for comparison's purpose (1 CPU, 1 process)
    
    Parameters
    ----------
    tracks : list
        A list of tracks to search
    
    Return
    ----------
    A list of track data in dicts
    '''
    return [get_track_info(str(t['track']), str(t['artist']), str(t['album'])) for i, t in tracks.iterrows()]

def multiprocess(processes, tracks):  
    '''
    Multiprocessing to utilize all cores for comparison's purpose
    
    Parameters
    ----------
    processes : int
        The number of processes to create in parallel
    tracks : list
        A list of tracks to search
    
    Return
    ----------
    A list of track data in dicts
    '''
    pool = mp.Pool(processes=processes)
    results = [pool.apply_async(async_fetch, args=(str(t['track']), str(t['artist']), str(t['album']))) for i, t in tracks.iterrows()]
    results = [p.get() for p in results]
        
    return results

print('\n')
print('# of CPUs:\t{}'.format(cpus))
print('Python version:\t{}'.format(platform.python_version()))
print('Compiler:\t{}'.format(platform.python_compiler()))
print('System:\t\t{}'.format(platform.system()))
print('Release:\t{}'.format(platform.release()))
print('Machine:\t{}'.format(platform.machine()))
print('Processor:\t{}'.format(platform.processor()))
print('Interpreter:\t{}'.format(platform.architecture()[0]))
print('\n')

# Test and time serial()
s = datetime.now()
serial_temp = pd.DataFrame(serial(sample)).dropna()
serial_t = datetime.now() - s

# Test and time multiprocess()
s = datetime.now()
multi_temp = pd.DataFrame(multiprocess(cpus, sample)).dropna()
multi_t = datetime.now() - s

In [None]:
print('Serial Processing')
print('search ratio (found : expected): {}'.format(len(serial_temp)/len(sample)))
print('Time: {}'.format(serial_t))
print('\nMulti Processing')
print('search ratio (found : expected): {}'.format(len(multi_temp)/len(sample)))
print('Time: {}'.format(multi_t))
print('\n{0:.2f}x faster with multiprocess'.format(serial_t / multi_t))

## Fetch all track data from Spotify

Use all 4 of my CPUs to fetch track information in parallel.

In [None]:
# initialize sharedctype integers to count records
v = mp.Value('i', 0, lock=False)
total = None

def async_fetch_real(track, artist, album, timestamp):
    '''
    Count and display track searches and timing
    
    Parameters
    ----------
    track : str
        The name of a track
    artist : str
        The name of the track's artist
    album : str
        The name of the track's album
    timestamp : str
        The timestamp of the track
    
    Return
    ----------
    Data about the track, if the track is found (otherwise, empty dict)    
    '''
    if v.value % 100 == 0 and v.value is not 0:
        # after every 100 tracks searched, print progress information
        elap = datetime.now() - s
        print('record: #{} - remaining: {}\n'.format(str(v.value), ((elap/v.value) * total.value) - elap))
        
    v.value += 1
    
    _t = get_track_info(track, artist, album)
    # re-attach the timestamp to the track data
    _t.update({
        'timestamp': timestamp
    })
        
    return _t

def multiprocess(processes, tracks):
    '''
    Multiprocessing to utilize all cores for all listening history
    
    Parameters
    ----------
    processes : int
        The number of processes to create in parallel
    tracks : list
        A list of tracks to search
    
    Return
    ----------
    A list of track data in dicts    
    '''
    pool = mp.Pool(processes=processes)
    results = [pool.apply_async(async_fetch_real, args=(str(t['track']), str(t['artist']), str(t['album']), str(t['timestamp']),)) for i, t in tracks.iterrows()]
    results = [p.get() for p in results]
    
    return results

def updateTracks(original, unique):
    '''
    Fetch track data via the Spotify API and save the compiled output to JSON
    
    Parameters
    ----------
    original : pandas.DataFrame
    unique : pandas.DataFrame
    '''
    temp = pd.DataFrame(multiprocess(cpus, unique)).dropna()
    multi_t = datetime.now() - s
    
    if not temp.empty:
        compiled = pd.concat([original, temp], ignore_index=True)
        compiled.to_json('data/history_comp.json')
        print('Saved complete history')
        
        print('Search ratio (found : expected): {}'.format(len(temp)/total.value))
        print('Total time:\t\t\t {}'.format(multi_t))
        print('Songs/sec fetched:\t\t {}'.format(total.value/multi_t.total_seconds()))
        
        return compiled
    else:
        print('No new track data found')

In [None]:
last_full_history = pd.read_json('data/history_comp.json') if os.path.isfile('data/history_comp.json') else pd.DataFrame(columns=['timestamp'])
last_full_history['timestamp'] = last_full_history['timestamp'].apply(correctTimestamp)

s = datetime.now()

if last_full_history.empty or last_full_history['timestamp'].max() < timezoned_history['timestamp'].max():
    unique = timezoned_history[~timezoned_history['timestamp'].isin(last_full_history['timestamp'])]
    
    total = mp.Value('i', len(unique), lock=False)
        
    updateTracks(last_full_history, unique)
else:
    print('all records up to date')
    
# record: #9200 - remaining: 0:00:07.263164

# Saved complete history
# Search ratio (found : expected): 0.9913419913419913
# Total time:			 0:27:58.994141
# Songs/sec fetched:		 5.5032949635528245

In [None]:
artists = pd.read_json('data/artist_info.json')
artists.sample(5)

In [4]:
full_history = pd.read_json('data/history_comp.json')
full_history['timestamp'] = full_history['timestamp'].apply(correctTimestamp)

In [None]:
missing_artist_ids = []

for i, r in full_history.iterrows():
    for artist in r['artists']:
        if not (artists['id'] == artist).any():
            missing_artist_ids.append(artist)
            
len(missing_artist_ids)

In [None]:
def get_artist_info(id):
    try:
        result = sp.artist(id)
    except:
        return {}
    
    return {'artist': result['name'],
            'id': result['id'], 
            'genres': np.array(result['genres']), 
            'popularity': result['popularity'], 
            'followers': result['followers']['total']}

In [None]:
# initialize a sharedctypes integer to count records
v = mp.Value('i', 0, lock=False)
total = mp.Value('i', len(missing_artist_ids), lock=False)

cpus = mp.cpu_count()

def async_fetch_real(id):
    '''
    count and display artist searches and timing
    '''
    if v.value % 100 == 0 and v.value is not 0:
        print('record: #{}'.format(v.value))
        elap = datetime.now() - s
        print('time remaining: {}'.format(((elap/v.value) * total.value) - elap))
        
    v.value += 1
    
    return get_artist_info(id)

def multiprocess(processes, ids):
    '''
    multiprocessing to utilize all cores
    '''
    pool = mp.Pool(processes=processes)
    results = [pool.apply_async(async_fetch_real, args=(str(i),)) for i in ids]
    results = [p.get() for p in results]
    return results

s = datetime.now()
artists_temp = pd.DataFrame(multiprocess(cpus, missing_artist_ids)).dropna()
multi_t = datetime.now() - s

stitched_artists = artists.append(artists_temp, ignore_index=True)
stitched_artists = stitched_artists.drop_duplicates('id')

if not artists_temp.empty:
    stitched_artists.to_json('data/artist_info.json'.format(s.month, s.day))
    print('Saved artist info')
    
print('{} artists added (expected {})'.format(len(stitched_artists)-len(artists), len(artists_temp)))

The service to export Last.FM data overcounts the most recently listened to song, so I choose to keep the first quarter of instances and drop the remainder. This prevents an artificial skewing toward a song that shouldn't be the mode of the data set. Drawback: if the first song _really_ is the mode of the dataset, I unknowingly change that.

In [5]:
track_mode = full_history['id'].mode()[0]
L = list(full_history.loc[full_history['id'] == track_mode].index)
L = L[int(len(L)*0.25):]
print(full_history.loc[L[0]]['name'])
L

Wizard Of Finance


[3193,
 3392,
 3592,
 3790,
 397,
 3990,
 4188,
 4389,
 4587,
 4787,
 4988,
 5186,
 5386,
 5586,
 5786,
 598,
 5986,
 6185,
 6384,
 6582,
 6782,
 6978,
 7177,
 7377,
 7576,
 7775,
 797,
 7975,
 8173,
 8368,
 8566,
 8766,
 8964,
 997]

In [6]:
full_history = full_history.drop(index=L).reset_index()

In [7]:
full_history.to_json('data/history_comp.json')