# Overview

This notebook is used for augmenting and validating the data we've downloaded from the Spotify Charts website. The data itself is used to create a visualisation of popular artist nationalities within various regions. The visualisation is intended to shed light on which countries are importing/exporting music from/to which other countries.

In their raw form, each Spotify Charts file contains the weekly top-200 tracks for a given country & week. Here is a hard-coded example:

In [3]:
import pandas as pd
from io import StringIO

csv_data = """
rank,uri,artist_names,track_name,source,peak_rank,previous_rank,weeks_on_chart,streams
1,"spotify:track:3GD6eImRvT0zgr8cQnokUq","Bhavi, Seven Kayne, Milo j, Tiago PZK, KHEA, Neo Pistea","BESAME (feat. Tiago PZK, Khea & Neo Pistea) - Remix","GMR",1,13,2,"5641728"
2,"spotify:track:6XjDF6nds4DE2BBbagZol6","FloyyMenor, Cris Mj","Gata Only","FloyyMenor under exclusive license to UnitedMasters LLC",1,1,12,"3011118"
3,"spotify:track:4wS0TnQzVkY9ML1BPKpOk1","Tiago PZK, Ke Personajes","Piel","WEA Latina",2,2,12,"3008906"
4,"spotify:track:5rQSQlZXXjMcevPGoAfE1z","Salastkbron, Diel Paris","Un Besito Más","WEA Latina",3,4,6,"2955958"
5,"spotify:track:0GVPemmAwkXhFlYimhdDr3","El Turko, Mandale Flow","30 GRADOS","Sonar LLC",2,3,11,"2868533"
"""

df = pd.read_csv(StringIO(csv_data))
df.head()

Unnamed: 0,rank,uri,artist_names,track_name,source,peak_rank,previous_rank,weeks_on_chart,streams
0,1,spotify:track:3GD6eImRvT0zgr8cQnokUq,"Bhavi, Seven Kayne, Milo j, Tiago PZK, KHEA, N...","BESAME (feat. Tiago PZK, Khea & Neo Pistea) - ...",GMR,1,13,2,5641728
1,2,spotify:track:6XjDF6nds4DE2BBbagZol6,"FloyyMenor, Cris Mj",Gata Only,FloyyMenor under exclusive license to UnitedMa...,1,1,12,3011118
2,3,spotify:track:4wS0TnQzVkY9ML1BPKpOk1,"Tiago PZK, Ke Personajes",Piel,WEA Latina,2,2,12,3008906
3,4,spotify:track:5rQSQlZXXjMcevPGoAfE1z,"Salastkbron, Diel Paris",Un Besito Más,WEA Latina,3,4,6,2955958
4,5,spotify:track:0GVPemmAwkXhFlYimhdDr3,"El Turko, Mandale Flow",30 GRADOS,Sonar LLC,2,3,11,2868533


### Changes to make

We would like to make a few rudimentary changes to this data:
- Combine all of the data into a single CSV file
- Remove unnecessary columns
- Keep only the primary artist for each track (the first artist listed)

In addition, using the Spotify API and MusicBrainz API, we would like to add the following information to each of the records in our dataset:
- genre(s)
- artist country
- artist category (male/female/non-binary/band)

To start, let's import the relevant python libraries and set up our Spotify API client (make sure to populate the `spotify_credentials.json` file with your API credentials):

In [None]:
###############
### Imports ###
###############

import json
import os
import time


import pandas
import requests
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials


#############################################################
### Initialize Spotify API client with proper credentials ###
#############################################################

def load_config(file_path):
    with open(file_path, 'r') as f:
        return json.load(f)

spotify_config = load_config('spotify_credentials.json')
client_credentials_manager = SpotifyClientCredentials(client_id=spotify_config['SPOTIFY_CLIENT_ID'],
                                                      client_secret=spotify_config['SPOTIFY_CLIENT_SECRET'])
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)


######################################################################################################
### Helpful widget for displaying progress of an operation in Jupyter (will use later in Notebook) ###
######################################################################################################

def log_progress(sequence, every=None, size=None, name='Percentage'):
    from ipywidgets import IntProgress, HTML, VBox
    from IPython.display import display

    is_iterator = False
    if size is None:
        try:
            size = len(sequence)
        except TypeError:
            is_iterator = True
    if size is not None:
        if every is None:
            if size <= 200:
                every = 1
            else:
                every = int(size / 200)     # every 0.5%
    else:
        assert every is not None, 'sequence is iterator, set every'

    if is_iterator:
        progress = IntProgress(min=0, max=1, value=1)
        progress.bar_style = 'info'
    else:
        progress = IntProgress(min=0, max=size, value=0)
    label = HTML()
    box = VBox(children=[label, progress])
    display(box)

    index = 0
    try:
        for index, record in enumerate(sequence, 1):
            yield record
            if index % every == 0:
                percentage = round(index / size * 100, 2) if size else 100
                if is_iterator:
                    label.value = '{name}: {percentage:.2f}%'.format(
                        name=name,
                        percentage=percentage
                    )
                else:
                    progress.value = index
                    label.value = u'{name}: {percentage:.2f}%'.format(
                        name=name,
                        percentage=percentage
                    )
    except:
        progress.bar_style = 'danger'
        raise
    else:
        progress.bar_style = 'success'
        progress.value = index
        label.value = "{name}: 100.00%".format(
            name=name
        )

## Combining our original CSV files

Let's remove unnecessary information from our Spotify Charts CSV files and combine them all into a single CSV file:

In [None]:
import re
########################
### Helper functions ###
########################

def get_country_name_from_code(country_code):
    mapping = {"ar": "Argentina", "au": "Australia", "at": "Austria", "by": "Belarus", "be": "Belgium", "bo": "Bolivia", "br": "Brazil", "bg": "Bulgaria", "ca": "Canada", "cl": "Chile", "co": "Colombia", "cr": "Costa Rica", "cy": "Cyprus", "cz": "Czech Republic", "dk": "Denmark", "do": "Dominican Republic", "ec": "Ecuador", "eg": "Egypt", "sv": "El Salvador", "ee": "Estonia", "fi": "Finland", "fr": "France", "de": "Germany", "gr": "Greece", "gt": "Guatemala", "hn": "Honduras", "hk": "Hong Kong", "hu": "Hungary", "is": "Iceland", "in": "India", "id": "Indonesia", "ie": "Ireland", "il": "Israel", "it": "Italy", "jp": "Japan", "kz": "Kazakhstan", "lv": "Latvia", "lt": "Lithuania", "lu": "Luxembourg", "my": "Malaysia", "mx": "Mexico", "ma": "Morocco", "nl": "Netherlands", "nz": "New Zealand", "ni": "Nicaragua", "ng": "Nigeria", "no": "Norway", "pk": "Pakistan", "pa": "Panama", "py": "Paraguay", "pe": "Peru", "ph": "Philippines", "pl": "Poland", "pt": "Portugal", "ro": "Romania", "sa": "Saudi Arabia", "sg": "Singapore", "sk": "Slovakia", "za": "South Africa", "kr": "South Korea", "es": "Spain", "se": "Sweden", "ch": "Switzerland", "tw": "Taiwan", "th": "Thailand", "tr": "Turkey", "ae": "UAE", "ua": "Ukraine", "gb": "United Kingdom", "uy": "Uruguay", "us": "USA", "ve": "Venezuela", "vn": "Vietnam"}
    return mapping[country_code]

        
#############################################################################        
### Combine & filter all of the Spotify charts CSV files we've downloaded ###
#############################################################################  

def create_combined_charts_file(input_dir, output_file):
    # Define naming pattern of our Spotify charts CSV files
    chart_file_pattern = r'^regional-(\w{2})-weekly-(\d{4}-\d{2}-\d{2})\.csv$'
    
    # Create DataFrame for Spotify charts file
    filtered_dfs = []
    for root, dirs, files in os.walk(input_dir):
        for file_name in files:
            match = re.match(chart_file_pattern, file_name)

            if match:
                file_path = os.path.join(root, file_name)
                df = pd.read_csv(file_path)

                # Add new columns for country name & week
                country_code, week = match.group(1), match.group(2)
                df['country'] = get_country_name_from_code(country_code)
                df['week'] = week

                # Keep only the first artist listed for each record
                df['artist_name'] = df['artist_names'].str.split(',').str[0]

                # Group by region and week
                grouped = df.groupby(['country', 'week'])

                # Filter top 50 songs for each group
                filtered_grouped = grouped.apply(lambda x: x.nsmallest(50, 'rank'))

                # Drop fields that are no longer needed
                fields_to_drop = ['artist_names', 'rank', 'source', 'peak_rank', 'previous_rank', 'weeks_on_chart']
                filtered_grouped = filtered_grouped.drop(fields_to_drop, axis=1)

                filtered_dfs.append(filtered_grouped)

    # Combine DataFrames
    combined_df = pd.concat(filtered_dfs, ignore_index=True)
    
    # Drop duplicate rows
    combined_df.drop_duplicates(inplace=True)
    
    # Write combined DataFrame to a CSV file
    combined_df.to_csv(output_file, index=False)


# IMPORTANT: Set this to the correct path
chart_files_directory = './input_charts_files'

combined_charts_file_path = './tmp_combined_charts_data.csv'
create_combined_charts_file(chart_files_directory, combined_charts_file_path)

## Augmenting our dataset

Now, let's augment our dataset with genre information & artist category by leveraging the Spotify API and MusicBrainz API. We'll do this in 3 steps:

1. Use the Spotify API to fetch the artist id for each track (and create a mapping from artist id to track id)
2. Use the Spotify API to fetch the genre(s) for each artist
3. Use the MusicBrainz API to fetch the artist country and category for each artist
4. Add genre(s), artist country, and artist category to our combined dataset

#### Step 1: Create mapping from track to artist

In [None]:
# Given an array of track uris, returns an array of track objects from the Spotify API.
def get_track_information_from_spotify(track_uris, retries=3):
    if len(track_uris) > 50:
        raise Error("Not allowed to request more than 50 tracks at a time!")

    track_ids = [uri.split(":")[2] for uri in track_uris]
    for _ in range(retries):
        try:
            return sp.tracks(track_ids)['tracks']
        except Exception as e:
            print(f"Error fetching track information: {str(e)}")
            if retries > 0:
                retries -= 1
                print(f"Retrying... ({retries} retries left)")
                time.sleep(5)  # Add a delay before retrying

                
# From our combined dataset, create batches of 50 track URIs
batch_size = 50
combined_df = pd.read_csv(combined_charts_file_path)
unique_spotify_uris = combined_df['uri'].unique()
uri_batches = [unique_spotify_uris[i:i+batch_size] for i in range(0, len(unique_spotify_uris), batch_size)]


# For each batch of tracks, fetch the corresponding artist ids and artist genres from Spotify
track_artist_mapping = {}
for batch_num, uri_batch in log_progress(list(enumerate(uri_batches)), every=1):
    
    # Fetch track objects
    time.sleep(1) # Sleep to prevent Spotify rate limiting
    tracks = get_track_information_from_spotify(uri_batch)
    
    # Create mapping from track to artist
    for track in tracks:
        track_artist_mapping[track['id']] = track['artists'][0]['id']

#### Step 2: Fetch genre(s) for each artist

In [None]:
# Fetch artist genre information
batch_size = 50
artist_ids = list(set(track_artist_mapping.values()))
artist_id_batches = [artist_ids[i:i+batch_size] for i in range(0, len(artist_ids), batch_size)]

artist_info = {}
for batch_num, artist_id_batch in log_progress(list(enumerate(artist_id_batches)), every=1):
    artist_infos = sp.artists(artist_id_batch)['artists']
    for artist in artist_infos:
        artist_info[artist['id']] = dict(id=artist['id'], name=artist['name'], genres=artist['genres'])

#### Step 3: Fetch artist country and category

In [None]:
#################################
### MusicBrainz API functions ###
#################################

def get_artist_info_from_musicbrainz(artist_name):
    # Set up the API endpoint
    base_url = "https://musicbrainz.org/ws/2/"
    endpoint = "artist"
    params = {
        "query": f'artist:"{artist_name}"',
        "fmt": "json"
    }
    response = requests.get(f"{base_url}{endpoint}", params=params)

    # Check if the request was successful
    if response.status_code == 200:
        # Extract artist information from the response
        data = response.json()
        if 'artists' in data and len(data['artists']) > 0:
            artist = data['artists'][0]
            return artist
        else:
            raise RuntimeError("No information found for artist {} on Musicbrainz".format(artist_name))
    else:
        raise RuntimeError('Got error MusicBrainz: {}'.format(response.json()))


def get_country_from_musicbrainz(area_id):
    """Fetches country information from a given area ID."""
    url = f"https://musicbrainz.org/ws/2/area/{area_id}?inc=area-rels&fmt=json"
    response = requests.get(url)

    if response.status_code == 200:
        data = response.json()
        if data.get('type') == 'Country':
            return data.get('name')
        else:
            for relation in data.get('relations', []):
                if relation['type'] == 'part of' and relation['direction'] == 'backward':
                    rel_area = relation['area']
                    target_area_id = rel_area['id']
                    return get_country_from_musicbrainz(target_area_id)  # Recurse if needed
    else:
        raise RuntimeError('Got error MusicBrainz: {}'.format(response.json()))


#########################################
### Fetch artist category and country ###
#########################################

def get_artist_category_and_country(artist_name):
    try:
        artist_info = get_artist_info_from_musicbrainz(artist_name)
        artist_type = artist_info.get('type', 'unknown')
        gender = artist_info.get('gender', 'unknown')
        
        if artist_type == 'Group':
            artist_category = 'group'
        else:
            artist_category = gender

        if 'area' in artist_info:
            country = get_country_from_musicbrainz(artist_info['area']['id'])
            return artist_category, country
        else:
            return artist_category, 'unknown'
        
    except Exception as e:
        print("No information found on MusicBrainz for {}...".format(artist_name))
        return 'unknown', 'unknown'


for artist_id in log_progress(artist_info, every=1):
    time.sleep(2) # Don't want to get rate limited by MusicBrainz
    artist_name = artist_info[artist_id]['name']
    artist_category, artist_country = get_artist_category_and_country(artist_name)
    artist_info[artist_id]['category'] = artist_category
    artist_info[artist_id]['country'] = artist_country

#### Step 4: Add genre(s), artist country, and artist category to our combined dataset

In [None]:
def update_dataframe_with_artist_info(df, track_artist_mapping, artist_info):
    # Parse Spotify track ID from uri
    df['spotify_track_id'] = df['uri'].apply(lambda x: x.split(':')[-1])
    
    # Fetch artist ID from track_artist_mapping
    df['artist_id'] = df['spotify_track_id'].map(track_artist_mapping)
    
    # Fetch artist information from artist_info
    df['artist_genres'] = df['artist_id'].map(lambda x: artist_info.get(x, {}).get('genres', 'Unknown'))
    df['artist_category'] = df['artist_id'].map(lambda x: artist_info.get(x, {}).get('category', 'Unknown'))
    df['artist_country'] = df['artist_id'].map(lambda x: artist_info.get(x, {}).get('country', 'Unknown'))

    # Drop fields that are no longer needed
    fields_to_drop = ['spotify_track_id', 'artist_id']
    df = df.drop(fields_to_drop, axis=1)
    
    return df

# Read the original CSV file into a pandas DataFrame
df = pd.read_csv(combined_charts_file_path)

# Update DataFrame with artist information
df_w_artist_information = update_dataframe_with_artist_info(df, track_artist_mapping, artist_info)

# # Save the updated DataFrame to a new CSV file
df_w_artist_information.to_csv('output/augmented_dataset.csv', index=False)

Now, our dataset has all of the information it needs in a single CSV file:

In [None]:
df_w_artist_information.head(52)

### Splitting up artist genres

One final transformation we'll perform is splitting up artist genres so that there is only one genre per row (for example, if the artist genres for a particular row are ["pop", "hip hop"], then we'd like to split that row into a with an artist genre of "pop" and a row with an artist genre of "hip hop").

The reason we want to do this is so that Tableau can easily count the number of streams per genre.

However, when doing so, we also want to ensure that we don't cause our visualisation to *over count* the number of streams for a prticular country. Therefore, we will add an additional column called `adjusted_streams` which will be equal to the number of streams for a given entry *divided by* the number of artist genres for that entry (for instance, if an original entry has genres ["pop", "hip hop"] and 100 streams, then the resulting "pop" row and "hip hop" row should each have 50 adjusted streams).

In [None]:
# Convert artist_genres from string representation to list
df_w_artist_information['artist_genres'] = df_w_artist_information['artist_genres'].apply(eval)

# Rename "artist_genres" column to "artist_genre"
df_w_artist_information = df_w_artist_information.rename(columns={'artist_genres': 'artist_genre'})
df = df.rename(columns={'artist_genres': 'artist_genre'})

# Create an empty list to store DataFrames
dfs = []

# Iterate through each row in the original DataFrame
for index, row in df_w_artist_information.iterrows():
    # Check if the list of genres is empty
    if not row['artist_genre']:
        new_row = row.copy()
        new_row['artist_genre'] = ''  # Assign an empty genre
        new_row['adjusted_streams'] = row['streams']  # Set adjusted streams to original value
        # Convert the row to a DataFrame and append it to the list
        dfs.append(pd.DataFrame([new_row]))
    else:
        # Explode the genres into separate rows
        for genre in row['artist_genre']:
            new_row = row.copy()
            new_row['artist_genre'] = genre.strip(" '")
            # Calculate adjusted streams
            new_row['adjusted_streams'] = new_row['streams'] // len(row['artist_genre'])
            # Convert the row to a DataFrame and append it to the list
            dfs.append(pd.DataFrame([new_row]))

# Concatenate all DataFrames in the list into a single DataFrame
transformed_df = pd.concat(dfs, ignore_index=True)

# Write the new DataFrame to a new CSV file
transformed_df.to_csv('output/actual_final_augmented_dataset.csv', index=False)

### End result

In [None]:
transformed_df.head()

# Validation

### Checking Unknowns

In [None]:
df = pd.read_csv("output/actual_final_augmented_dataset.csv")
list_countries = df['country'].unique()
threshold = 0.40
for cou in list_countries:
    # Calculate total number of streams linked to unknown artist countries/gender/genres
    unknown_countries = df[(df['country'] == cou) & (df['artist_country'] == 'unknown')]['adjusted_streams'].sum()
    unknown_genders = df[(df['country'] == cou) & (df['artist_category'] == 'unknown')]['adjusted_streams'].sum()
    unknown_genres = df[(df['country'] == cou) & (df['artist_genre'] == 'unknown')]['adjusted_streams'].sum()
    
    total_streams = df[df['country'] == cou]['adjusted_streams'].sum()
    
    # Remove countries over the threshold for unknowns from our dataset
    if unknown_countries / total_streams > threshold or unknown_genders / total_streams > threshold or unknown_genres / total_streams > threshold:
        df = df[df['country'] != cou]
        
df.to_csv('output/known_final_dataset.csv', index=False)