A jupyer notebook to process data from my spotify data export combined with data from https://everynoise.com/ and https://everynoise.com/curio.html

In [1]:
import csv
import json
import numpy as np
from datetime import datetime, time
from collections import defaultdict, Counter

In [2]:
#history[0]
#list(index['other tracks'].values())[0]
#list(index['artist genres'].values())[0]
#list(index['artists'].values())[0]

In [3]:
def aggregate_listens(history, index):
    song_listens = []
    for song in history:
        # check it's a song
        if not song['spotify_track_uri']:
            continue # podcast episode
        # get id and use it to fetch data from index
        track_id = song['spotify_track_uri'].rsplit(':', 1)[1]
        i_song = index['other tracks'].get(track_id)
        if not i_song:
            continue # no index data
            # TODO: get from spotify api
        
        song_listens.append({
            'id': track_id,
            'timestamp': song['ts'],
            'ms_played': song['ms_played'],
            'title': song['master_metadata_track_name'],
            'artist': song['master_metadata_album_artist_name'],
            'album': song['master_metadata_album_album_name'],
            'shuffle': song['shuffle'],
            'skipped': song['skipped'],
            'offline': song['offline'],
            'num_artists': len(i_song['artists']),
            #'artist_id_primary': i_song['artists'][0]['id'],
            'artist_ids': [i['id'] for i in i_song['artists']],
            'duration_ms': i_song['duration_ms'],
            'explicit': i_song['explicit'],
            'popularity': i_song['popularity'],
            'release_date': i_song['album']['release_date'],
        })

    return song_listens

In [4]:
def aggregate_artists(song_listens, index):
    artists = {}
    for song in song_listens:
        for artist_id in song['artist_ids']:
            # look up in output dict
            if artist_id in artists:
                # bump counters and add play timestamp
                artists[artist_id]['play_count'] += 1
                artists[artist_id]['skipped_count'] += int(song['skipped'])
                artists[artist_id]['ms_played'] += song['ms_played']
                artists[artist_id]['played_songs'].add(song['id'])
                #artists[artist_id]['play_timestamps'].append(song['timestamp'])
            else:
                # get genres from first index
                genres = set(index['artist genres'][artist_id]['genres'])
                # fetch other data from index
                i_artist = index['artists'].get(artist_id)
                if i_artist:
                    name = i_artist['name']
                    followers = i_artist['popularity']
                    genres.update(i_artist['genres'])
                    popularity = i_artist['followers']['total']
                else:
                    name = song['artist']
                    followers = 0
                    popularity = 0
                
                # set initial and save data
                artists[artist_id] = {
                    'id': artist_id,
                    'name': name,
                    'play_count': 1,
                    'skipped_count': int(song['skipped']),
                    'ms_played': song['ms_played'],
                    'played_songs': {song['id']},
                    #'play_timestamps': [song['timestamp']],
                    'followers': followers,
                    'genres': list(genres),
                    'popularity': popularity,
                }

    return artists

In [5]:
def aggregate_songs(song_listens):
    songs = {}
    for song in song_listens:
        song_id = song['id']
        if song_id in songs:
            # bump counters and add play timestamp
            songs[song_id]['play_count'] += 1
            songs[song_id]['skipped_count'] += int(song['skipped'])
            songs[song_id]['ms_played'] += song['ms_played']
            #songs[song_id]['play_timestamps'].append(song['timestamp'])
        else:        
            # set initial and save data
            songs[song_id] = {
                'id': song['id'],
                'play_count': 1,
                'skipped_count': int(song['skipped']),
                'ms_played': song['ms_played'],
                #'play_timestamps': [song['timestamp']],
                'title': song['title'],
                'artist': song['artist'],
                'album': song['album'],
                'num_artists': len(song['artist_ids']),
                #'artist_id_primary': song['artists'][0]['id'],
                'artist_ids': song['artist_ids'],
                'duration_ms': song['duration_ms'],
                'explicit': song['explicit'],
                'popularity': song['popularity'],
            }

    return songs

In [6]:
def aggregate_genres(artists, index):
    genres = {}
    for artist in artists.values():
        for genre in artist['genres']:
            if genre in genre_attrs:
                genre_attr = genre_attrs[genre]
            else:
                genre_attr = {}

            if genre in genres:
                genres[genre]['artist_count'] += 1
                genres[genre]['artist_ids'] += [artist['id']]
                genres[genre]['play_count'] += artist['play_count']
                genres[genre]['skipped_count'] += artist['skipped_count']
                genres[genre]['ms_played'] += artist['ms_played']
                #genres[genre]['play_timestamps'] += artist['play_timestamps']
            else:
                genres[genre] = {
                    'genre': genre,
                    'artist_count': 1,
                    'artist_ids': [artist['id']],
                    'play_count': artist['play_count'],
                    'skipped_count': artist['skipped_count'],
                    'ms_played': artist['ms_played'],
                    #'play_timestamps': artist['play_timestamps'],
                    'x_norm': genre_attr.get('x_norm'),
                    'y_norm': genre_attr.get('y_norm'),
                    'r_norm': genre_attr.get('r_norm'),
                    'g_norm': genre_attr.get('g_norm'),
                    'b_norm': genre_attr.get('b_norm'),
                    'color': genre_attr.get('color'),
                }

    return genres

In [7]:
def get_top_genres(genres, n):
    genres_by_playtime = sorted(genres.values(), key=lambda x: x['ms_played'], reverse=True)
    top_genres = [
        {
            'genre': g['genre'], 
            'artist_count': g['artist_count'],
            'play_count': g['play_count'], 
            'skipped_count': g['skipped_count'], 
            'skipped_pct': round(g['skipped_count'] / g['play_count'], 4), 
            'hours_played': round(g['ms_played']/1000/3600, 1),
            'x_norm': g['x_norm'], 
            'y_norm': g['y_norm'], 
            'r_norm': g['r_norm'], 
            'g_norm': g['g_norm'], 
            'b_norm': g['b_norm'], 
            'color': g['color'], 
        } 
        for g in genres_by_playtime[:n]
    ]
    
    return top_genres


In [8]:
def backfill_genre_data(top_genres, artists, songs):
    default_color = "#cdac52"

    # add most specific genre to artists
    genres_by_specificity = [g['genre'] for g in top_genres][::-1] # reverse list
    for artist_id in artists:
        # get first (lowest) genre
        specific_genre = next((item for item in genres_by_specificity if item in artists[artist_id]['genres']), None)
        artists[artist_id]['specific_genre'] = specific_genre
        artists[artist_id]['genre_color'] = genre_attrs.get(specific_genre, {}).get('color', default_color)
        artists[artist_id]['num_played_songs'] = len(artists[artist_id]['played_songs'])
        del artists[artist_id]['played_songs']
        #top_level_genre = next((item['genre'] for item in genres_by_playtime if item['genre'] in artists[artist_id]['genres']), None)
        #artists[artist_id]['top_level_genre'] = top_level_genre
    
    # add most specific genre to songs
    for song_id in songs:
        song_artist_ids = songs[song_id]['artist_ids']
        if len(song_artist_ids) == 1:
            # only one artist, use them
            artist_id_primary = song_artist_ids[0]
        else:
            # if multiple, loop through and thry to find one with a genre
            artist_id_primary = None
            for artist_id in song_artist_ids:
                if artists[artist_id]['specific_genre']:
                    artist_id_primary = artist_id
                    break
            if artist_id_primary == None:
                artist_id_primary = song_artist_ids[0]
        songs[song_id]['artist_id_primary'] = artist_id_primary
        specific_genre = artists[artist_id_primary]['specific_genre']
        songs[song_id]['specific_genre'] = specific_genre
        songs[song_id]['genre_color'] = genre_attrs.get(specific_genre, {}).get('color', default_color)
        #songs[song_id]['top_level_genre'] = artists[artist_id_primary]['top_level_genre']
    
    # add artist data back into genres
    for genre in top_genres:
        genre_name = genre['genre']
        # get artist count per genre
        genre_artists_all = [a for a in artists.values() if genre_name in a['genres']]
        genre_artists_specific = [a for a in artists.values() if genre_name == a['specific_genre']]
        #genre_artists_top_level = [a for a in artists.values() if genre_name == a['top_level_genre']]
        # get top artist for the genre
        if len(genre_artists_specific):
            top_genre_artists = sorted(genre_artists_specific, key=lambda x: x['ms_played'], reverse=True)
        else:
            top_genre_artists = sorted(genre_artists_all, key=lambda x: x['ms_played'], reverse=True)
        # update genres
        genre['top_artist'] = top_genre_artists[0]['name']
        genre['top_artists'] = [a['name'] for a in top_genre_artists[:3]]
        genre['artist_count_specific'] = len(genre_artists_specific)
        #genre['artist_count_top_level'] = len(genre_artists_top_level)


In [9]:
def aggregate_discovered_monthly(song_listens):
    # Step 1: Parse timestamps and prepare data
    parsed_listens = [
        {
            **entry,
            'month': datetime.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m'),
            'date': datetime.strptime(entry['timestamp'], '%Y-%m-%dT%H:%M:%SZ').date()
        }
        for entry in song_listens
    ]

    # Step 2: Identify the first time each song was played
    first_played = {}
    for entry in parsed_listens:
        song_id = entry['id']
        if song_id not in first_played or entry['date'] < first_played[song_id]:
            first_played[song_id] = entry['date']

    # Step 3: Group by month and count first-time plays
    monthly_plays = defaultdict(Counter)
    for entry in parsed_listens:
        song_id = entry['id']
        month = entry['month']
        if first_played[song_id] == entry['date']:
            monthly_plays[month][song_id] += 1

    # Step 4: Find the most played song for each month
    most_played_songs = {}
    for month, plays in monthly_plays.items():
        most_played_song = max(plays, key=plays.get)
        most_played_songs[month] = {
            'song_id': most_played_song,
            'title': songs[most_played_song]['title'],
            'artist': songs[most_played_song]['artist'],
            'plays': plays[most_played_song]
        }

    # Display & save results
    discovered_monthly = []
    for month, data in sorted(most_played_songs.items(), reverse=True):
        if data['plays'] > 2 and '202' in month:
            discovered_monthly.append({
                'month': month,
                'title': data['title'],
                'artist': data['artist'],
                'plays': data['plays'],
            })
    return discovered_monthly

In [10]:
def get_monthly_summary(song_listens):
    # Group data by month
    monthly_data = defaultdict(list)
    for listen in song_listens:
        month = datetime.fromisoformat(listen['timestamp'][:-1]).strftime('%Y-%m')
        monthly_data[month].append(listen)

    # Process each month
    monthly_summary = []
    for month, listens in monthly_data.items():
        total_plays = len(listens)
        total_skips = sum(1 for l in listens if l['skipped'])
        total_listen_ms = sum(l['ms_played'] for l in listens)
        total_listen_hours = round(total_listen_ms / (1000 * 60 * 60), 2)
        unique_tracks = len(set(l['id'] for l in listens))
        track_counter = Counter((l['title'], l['artist']) for l in listens)
        top_track, _ = track_counter.most_common(1)[0]
        unique_artists = len(set(l['artist'] for l in listens))
        artist_counter = Counter(l['artist'] for l in listens)
        top_artist, _ = artist_counter.most_common(1)[0]

        monthly_summary.append({
            'month': month,
            'total_plays': total_plays,
            'total_skips': total_skips,
            'total_listen_hours': total_listen_hours,
            'unique_tracks': unique_tracks,
            'top_track': f"{top_track[0]} by {top_track[1]}",
            'unique_artists': unique_artists,
            'top_artist': top_artist,
        })
    
    monthly_summary.sort(key=lambda x: x['month'], reverse=True)
    return monthly_summary

In [11]:
def get_monthly_listen_data(song_listens):
    # Group data by month
    monthly_data = defaultdict(list)
    for listen in song_listens:
        month = datetime.fromisoformat(listen['timestamp'][:-1]).strftime('%Y-%m')
        monthly_data[month].append(listen)

    # Process each month
    listen_data = []
    for month, listens in monthly_data.items():
        total_listen_ms = sum(l['ms_played'] for l in listens)

        # Skipped duration
        skipped_ms = sum(l['duration_ms'] - l['ms_played'] for l in listens)
        trend_pct = skipped_ms / (skipped_ms + total_listen_ms)
        listen_data.extend([
            {'month': month, 'trend': 'skipped', 'label': 'Skipped', 'value': round(trend_pct, 4)},
            {'month': month, 'trend': 'skipped', 'label': 'Played', 'value': round(1 - trend_pct, 4)}
        ])

        # Explicit duration
        explicit_ms = sum(l['ms_played'] for l in listens if l['explicit'])
        trend_pct = explicit_ms / total_listen_ms
        listen_data.extend([
            {'month': month, 'trend': 'explicit', 'label': 'Explicit', 'value': round(trend_pct, 4)},
            {'month': month, 'trend': 'explicit', 'label': 'Not Explicit', 'value': round(1 - trend_pct, 4)}
        ])

        # Shuffled duration
        shuffled_ms = sum(l['ms_played'] for l in listens if l['shuffle'])
        trend_pct = shuffled_ms / total_listen_ms
        listen_data.extend([
            {'month': month, 'trend': 'shuffle', 'label': 'Shuffled', 'value': round(trend_pct, 4)},
            {'month': month, 'trend': 'shuffle', 'label': 'Not Shuffled', 'value': round(1 - trend_pct, 4)}
        ])

        # Time of Day duration
        time_of_day = {'Morning': 0, 'Afternoon': 0, 'Night': 0}
        for listen in listens:
            listen_time = datetime.fromisoformat(listen['timestamp'][:-1]).time()
            if listen_time >= time(4, 0) and listen_time < time(12, 0):
                time_of_day['Morning'] += listen['ms_played']
            elif listen_time >= time(12, 0) and listen_time < time(20, 0):
                time_of_day['Afternoon'] += listen['ms_played']
            else:
                time_of_day['Night'] += listen['ms_played']
        for period, duration in time_of_day.items():
            trend_pct = duration / total_listen_ms
            listen_data.extend([
                {'month': month, 'trend': 'time_of_day', 'label': period, 'value': round(trend_pct, 4)}
            ])

        # Popularity duration
        popularity = {'Super': 0, 'High': 0, 'Medium': 0, 'Low': 0}
        for listen in listens:
            if listen['popularity'] > 75:
                popularity['Super'] += listen['ms_played']
            elif listen['popularity'] >= 50:
                popularity['High'] += listen['ms_played']
            elif listen['popularity'] >= 25:
                popularity['Medium'] += listen['ms_played']
            else:
                popularity['Low'] += listen['ms_played']
        for level, duration in popularity.items():
            trend_pct = duration / total_listen_ms
            listen_data.extend([
                {'month': month, 'trend': 'popularity', 'label': level, 'value': round(trend_pct, 4)}
            ])
        
        # Release Age duration
        release_age = {'Under 1 year': 0, '1-2 years': 0, '2-5 years': 0, '5+ years': 0}
        for listen in listens:
            listen_date = datetime.fromisoformat(listen['timestamp']).replace(tzinfo=None)
            release_date_str = listen['release_date']
            if len(release_date_str) == 4:
                release_date_str += '-01-01'
            try:
                release_date = datetime.fromisoformat(release_date_str)
            except ValueError:
                #print(listen)
                continue
            age_years = (listen_date - release_date).days // 365
            if age_years < 1:
                release_age['Under 1 year'] += listen['ms_played']
            elif 1 <= age_years < 2:
                release_age['1-2 years'] += listen['ms_played']
            elif 2 <= age_years < 5:
                release_age['2-5 years'] += listen['ms_played']
            else:
                release_age['5+ years'] += listen['ms_played']
        for age_range, duration in release_age.items():
            trend_pct = duration / total_listen_ms
            listen_data.extend([
                {'month': month, 'trend': 'release_age', 'label': age_range, 'value': round(trend_pct, 4)}
            ])
    
    return listen_data

In [12]:
def simple_get_top_genres(song_listens, index, n):
    artists = aggregate_artists(song_listens, index)
    genres = aggregate_genres(artists, index)
    top_genres = get_top_genres(genres, n)
    backfill_genre_data(top_genres, artists, {})
    return top_genres

def get_monthly_top_genres(song_listens, index):
    # Group data by month
    monthly_data = defaultdict(list)
    for listen in song_listens:
        month = datetime.fromisoformat(listen['timestamp'][:-1]).strftime('%Y-%m')
        monthly_data[month].append(listen)
    
    # Process each month
    monthly_top_genres = {}
    for month, listens in monthly_data.items():
        top_genres = simple_get_top_genres(listens, index, 25)
        total_hours = sum(genre['hours_played'] for genre in top_genres if genre['color'])
        centroid_x = int(sum(genre['x_norm'] * genre['hours_played'] for genre in top_genres if genre['color']) / total_hours)
        centroid_y = int(sum(genre['y_norm'] * genre['hours_played'] for genre in top_genres if genre['color']) / total_hours)
        centroid_r = int(sum(genre['r_norm'] * genre['hours_played'] for genre in top_genres if genre['color']) / total_hours)
        centroid_g = int(sum(genre['g_norm'] * genre['hours_played'] for genre in top_genres if genre['color']) / total_hours)
        centroid_b = int(sum(genre['b_norm'] * genre['hours_played'] for genre in top_genres if genre['color']) / total_hours)
        monthly_top_genres[month] = {
            'month': month,
            'top_genres': top_genres,
            'centroid_x': centroid_x,
            'centroid_y': centroid_y,
            'centroid_r': centroid_r,
            'centroid_g': centroid_g,
            'centroid_b': centroid_b,
            'centroid_color': f"#{centroid_r:02x}{centroid_g:02x}{centroid_b:02x}",
        }
    
    #monthly_top_genres.sort(key=lambda x: x['month'], reverse=True)
    return monthly_top_genres

#get_monthly_top_genres(song_listens, index)

In [13]:
# load source files
with open('input/listening_history.json', 'r') as file:
    history = json.load(file)
with open('input/index.json', 'r') as file:
    index = json.load(file)
with open('cache/genre_attrs.json', 'r') as file:
    genre_attrs = json.load(file)

# remove old items
history = [i for i in history if not '2012' in i['ts']]

# run aggregations
song_listens = aggregate_listens(history, index)
print('Total Listen Count:', len(song_listens))

artists = aggregate_artists(song_listens, index)
print('Unique Artist Count:', len(artists))
print('Artists Missing Data:', len([i for i in artists.values() if i['followers'] == 0]), '/', len(artists))
print('Artists Missing Genres:', len([i for i in artists.values() if len(i['genres']) == 0]), '/', len(artists))

songs = aggregate_songs(song_listens)
print('Unique Song Count:', len(songs))

genres = aggregate_genres(artists, index)
print('Unique Genre Count:', len(genres))
print('Genres Missing Attributes:', len([g for g in genres.values() if not g['color']]), '/', len(genres))

top_genres = get_top_genres(genres, 75)
print('Top Genre Count:', len(top_genres))
print('Top Genres Missing Attributes:', len([g for g in top_genres if not g['color']]), '/', len(top_genres))

backfill_genre_data(top_genres, artists, songs)
print('Artists Missing Specific Genre:', len([a for a in artists.values() if not a['specific_genre']]), '/', len(artists))
print('Songs Missing Specific Genre:', len([s for s in songs.values() if not s['specific_genre']]), '/', len(songs))
print('Median Songs per Genre:', np.median([g['artist_count_specific'] for g in top_genres]))

discovered_monthly = aggregate_discovered_monthly(song_listens)
print('Months with Discoveries:', len(discovered_monthly))
print('Median Plays per Discovery:', np.median([d['plays'] for d in discovered_monthly]))

monthly_summary = get_monthly_summary(song_listens)
print('Months of Summary Data:', len(monthly_summary))

monthly_listen_data = get_monthly_listen_data(song_listens)
print('Months of Listening Data:', len(set([i['month'] for i in monthly_listen_data])))
print('Monthly Listening Trend Count:', len(set([i['trend'] for i in monthly_listen_data])))
print('Monthly Listening Data Points:', len(monthly_listen_data))

monthly_top_genres = get_monthly_top_genres(song_listens, index)
print('Months of Genre Data:', len(set([i for i in monthly_top_genres.keys()])))

# dump data to disk
with open('output/genre_attrs.json', 'w') as file:
    json.dump(list(genre_attrs.values()), file)
#with open('output/song_listens.json', 'w') as file:
#    json.dump(song_listens, file)
with open('output/artists.json', 'w') as file:
    json.dump(list(artists.values()), file)
with open('output/songs.json', 'w') as file:
    json.dump(list(songs.values()), file)
#with open('output/genres.json', 'w') as file:
#    json.dump(list(genres.values()), file)
with open('output/top_genres.json', 'w') as file:
    json.dump(top_genres, file)
with open('output/discovered_monthly.json', 'w') as file:
    json.dump(discovered_monthly, file)
with open('output/monthly_summary.json', 'w') as file:
    json.dump(monthly_summary, file)
with open('output/monthly_listen_data.json', 'w') as file:
    json.dump(monthly_listen_data, file)
with open('output/monthly_top_genres.json', 'w') as file:
    json.dump(monthly_top_genres, file)

Total Listen Count: 99457
Unique Artist Count: 5463
Artists Missing Data: 3394 / 5463
Artists Missing Genres: 1388 / 5463
Unique Song Count: 13745
Unique Genre Count: 1481
Genres Missing Attributes: 58 / 1481
Top Genre Count: 75
Top Genres Missing Attributes: 0 / 75
Artists Missing Specific Genre: 3467 / 5463
Songs Missing Specific Genre: 4555 / 13745
Median Songs per Genre: 21.0
Months with Discoveries: 39
Median Plays per Discovery: 4.0
Months of Summary Data: 79
Months of Listening Data: 79
Monthly Listening Trend Count: 6
Monthly Listening Data Points: 1343
Months of Genre Data: 79
