# Spotify API Data Retrieval

Since the initial dataset contain only few information we decided to use the ***Spotify Web API*** to retrieve more data. 

According to the offical documentation (https://developer.spotify.com/documentation/web-api/) the ***Spotify Web API*** endpoints can be used to return JSON metadata about music artists, albums, and tracks, directly from the Spotify Data Catalogue.


##  Setup
We import all the necessary libraries and we set the paths to the input/output files. In particular, we create a CSV file for each type of data.

In [None]:
# Imports

import os
import time
from pathlib import Path
import pandas as pd
import datetime
import spotifyCredentials
import requests
import base64

In [None]:
# Get absolute path
absPath = str(Path(os.path.abspath(os.getcwd())).absolute())
datasetsPath = os.path.join(absPath, "datasets")

# Create dataset directory if not exists
if not os.path.exists(datasetsPath):
    os.mkdir(datasetsPath)

# Setup datasets paths
spotifyChartsPath = os.path.join(datasetsPath, "reducedSpotifyCharts.csv")

chartsPath = os.path.join(datasetsPath, "charts.csv")
genresPath = os.path.join(datasetsPath, "genres.csv")
marketsPath = os.path.join(datasetsPath, "markets.csv")
tracksPath = os.path.join(datasetsPath, "tracks.csv")
albumsPath = os.path.join(datasetsPath, "albums.csv")
artistsPath = os.path.join(datasetsPath, "artists.csv")

# Countries
countriesPath = os.path.join(datasetsPath, "countries.csv")
altCountriesPath = os.path.join(datasetsPath, "altCountries.csv")


## Data Processing

We fix the dataset to have the information in the right way

In [None]:
def loadCountries():
    countries = pd.read_csv(countriesPath, sep=",")
    altCountries = pd.read_csv(altCountriesPath, sep=",")
    altCountries.columns = ["AlternativeName", "Name"]

    return countries, altCountries


def getCountryCode(countryName, countries, altCountries):
    # Try to retrieve ISO CODE of the country
    try:
        matchedCountries = countries[countries["Name"].str.contains(
            countryName)]
        countryCode = matchedCountries["Code"].iloc[0]
    except IndexError as e:
        # Look if an alternative name was used
        alternativeMatchedCountries = altCountries[altCountries["AlternativeName"].str.contains(
            countryName)]
        countryName = alternativeMatchedCountries["Name"].iloc[0]

        matchedCountries = countries[countries["Name"] == countryName]
        countryCode = matchedCountries["Code"].iloc[0]

    return countryCode, countryName


In [None]:
# Load countries dataframe
countries, altCountries = loadCountries()

# Load the CSV files in memory
chartsDF = pd.read_csv(spotifyChartsPath, sep=",")

# Removing global
chartsDF = chartsDF.drop(index=chartsDF[chartsDF["country"] == "Global"].index)
chartsDF = chartsDF.drop(chartsDF.columns[0], axis=1)

# Create new DataFrame
realChartsCols = ["id", "name", "country_code", "date", "type", "position", "trackID"]
realChartsDF = pd.DataFrame([], columns=realChartsCols)


In [None]:
dataToInsert = []

# I iterate through the dataframe
for index, row in chartsDF.iterrows():

    # Retrieve country and date
    countryName = row["country"]
    chartDate = row["date"]
    topNumType = 100

    # Reformat date
    chartDate = datetime.datetime.strptime(
        chartDate, "%d/%m/%Y").strftime("%Y-%m-%d")
    
    # Get the track ID
    trackID = row['uri'].removeprefix("https://open.spotify.com/track/")
    
    # Get the country code
    countryCode, _ = getCountryCode(countryName, countries, altCountries)

    # Create a uniqueID
    chartID = "top-{}-{}-{}".format(topNumType, countryCode, chartDate)

    # Add the name of the Chart
    chartName = "TOP {} {}".format(topNumType, countryName)

    dataToInsert.append([chartID, chartName, countryCode, chartDate,
                         "top", row["position"], trackID])

    if index % 50000 == 0:
        print("🎵 [STATUS INFO #{row}]".format(row=index))


In [None]:
# Create Data Row for the element
realChartsDF = pd.DataFrame(dataToInsert, columns=realChartsCols)

print(realChartsDF.info())

In [None]:
# Save datasets to file
realChartsDF.to_csv(chartsPath)

## Spotify API Functions

We defined the functions necessary to interact with the ***Spotify Web API*** according to the official documentation.

In [None]:
# Spotify credentials to get access token to Spotify API

SPOTIFY_CLIENT_ID = spotifyCredentials.SPOTIFY_CLIENT_ID
SPOTIFY_SECRET_ID = spotifyCredentials.SPOTIFY_SECRET_ID
SPOTIFY_REFRESH_TOKEN = spotifyCredentials.SPOTIFY_REFRESH_TOKEN

In [None]:
# Function to get the Spotify API Access Token
def getAccessToken():
    # Spotify Basic Authorization Code
    authBasic = base64.b64encode("{}:{}".format(SPOTIFY_CLIENT_ID, SPOTIFY_SECRET_ID).encode()).decode()

    # Request to get Access Token from Client ID and Secret ID
    accessTokenRequest = requests.post("https://accounts.spotify.com/api/token",
        data={
            "grant_type":"refresh_token",
            "refresh_token":SPOTIFY_REFRESH_TOKEN
        },
        headers={
            "Authorization": "Basic " + authBasic
        }
    )

    # Get Access Token
    accessToken = None

    if accessTokenRequest.status_code == 200:
        accessToken = accessTokenRequest.json()["access_token"]

    return accessToken

accessToken = getAccessToken()
print("🪙 [INFO] Access Token is: {}".format(accessToken))


In [None]:
# Function to get all available Markets from Spotify API
def getAllMarkets(fromError=False):
    global accessToken

    availableMarkets = requests.get("https://api.spotify.com/v1/markets",
                                          headers={
                                              "Authorization": "Bearer " + accessToken
                                          })

    if availableMarkets.status_code == 200:
        return availableMarkets.json()
    elif availableMarkets.status_code == 401 and fromError is False:  # The access token has expired
        accessToken = getAccessToken()
        return getAllMarkets(fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getAllMarkets(fromError=True)
    else:
        raise Exception(availableMarkets.text)


In [None]:
# Function to get Track info from Spotify API
def getTrackInfo(trackID, fromError=False):
    global accessToken

    trackInfoRequest = requests.get("https://api.spotify.com/v1/tracks/{id}".format(id=trackID),
        headers={
            "Authorization": "Bearer " + accessToken
        })
    
    if trackInfoRequest.status_code == 200:
        return trackInfoRequest.json()
    elif trackInfoRequest.status_code == 401 and fromError is False: # The access token has expired
        accessToken = getAccessToken()
        return getTrackInfo(trackID, fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getTrackInfo(trackID, fromError=True)
    else:
        raise Exception(trackInfoRequest.text)


In [None]:
def getAudioFeatures(trackID, fromError=False):
    global accessToken

    audioFeaturesRequest = requests.get("https://api.spotify.com/v1/audio-features/{id}".format(id=trackID),
                                    headers={
        "Authorization": "Bearer " + accessToken
    })

    if audioFeaturesRequest.status_code == 200:
        return audioFeaturesRequest.json()
    elif audioFeaturesRequest.status_code == 401 and fromError is False:  # The access token has expired
        accessToken = getAccessToken()
        return getAudioFeatures(trackID, fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getAudioFeatures(trackID, fromError=True)
    else:
        raise Exception(audioFeaturesRequest.text)


In [None]:
# Function to get Artist info from Spotify API
def getArtistInfo(artistID, fromError=False):
    global accessToken

    artistInfoRequest = requests.get("https://api.spotify.com/v1/artists/{id}".format(id=artistID),
                                    headers={
        "Authorization": "Bearer " + accessToken
    })

    if artistInfoRequest.status_code == 200:
        return artistInfoRequest.json()
    elif artistInfoRequest.status_code == 401 and fromError is False:  # The access token has expired
        accessToken = getAccessToken()
        return getArtistInfo(artistID, fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getArtistInfo(artistID, fromError=True)
    else:
        raise Exception(artistInfoRequest.text)


## Data Retrieving
### Setup the necessary DataFrames

In [None]:
# Setup DataFrames

genresDF = None
tracksDF = None
albumsDF = None
artistsDF = None

genresTmpData = []
tracksTmpData = []
albumsTmpData = []
artistsTmpData = []

### Define functions to retrieve different types of data

In [None]:
def createGenreID(genre):
    # Replace all special chars with "-"
    genreID = ""
    for char in genre:
        genreID += char if char.isalnum() else "-"

    return genreID

In [None]:
def generateTrackObject(trackInfo, trackAudioFeatures):
    # Get only artists ids
    artistsIDs = []
    for artist in trackInfo["artists"]:
        artistsIDs.append(artist["id"])

    # Get and setup the track information needed
    trackObject = {
        "id": trackInfo["id"],
        "uri": trackInfo["external_urls"]["spotify"],
        "title": trackInfo["name"],
        "duration": trackInfo["duration_ms"],
        "popularity": trackInfo["popularity"],
        "explicit": trackInfo["explicit"],
        "key": trackAudioFeatures["key"],
        "tempo": trackAudioFeatures["tempo"],
        "mode": trackAudioFeatures["mode"],
        "time_signature": trackAudioFeatures["time_signature"],
        "acousticness": trackAudioFeatures["acousticness"],
        "danceability": trackAudioFeatures["danceability"],
        "energy": trackAudioFeatures["energy"],
        "loudness": trackAudioFeatures["loudness"],
        "liveness": trackAudioFeatures["liveness"],
        "valence": trackAudioFeatures["valence"],
        "speechiness": trackAudioFeatures["speechiness"],
        "instrumentalness": trackAudioFeatures["instrumentalness"],
        "artists": ",".join(artistsIDs),
        "album": trackInfo["album"]["id"],
        "available_countries": ",".join(trackInfo["available_markets"])
    }

    return trackObject

In [None]:
def generateAlbumObject(albumInfo):
    # Get only artists ids
    artistsIDs = []
    for artist in albumInfo["artists"]:
        artistsIDs.append(artist["id"])

    # Get and setup the album information needed
    albumObject = {
        "id": albumInfo["id"],
        "uri": albumInfo["external_urls"]["spotify"],
        "title": albumInfo["name"],
        "total_tracks": albumInfo["total_tracks"],
        "release_date": albumInfo["release_date"],
        "release_date_precision": albumInfo["release_date_precision"],
        "album_type": albumInfo["album_type"],
        "artists": ",".join(artistsIDs),
        "available_countries": ",".join(albumInfo["available_markets"]),
    }

    return albumObject

In [None]:
def generateArtistObject(artistInfo):
    # Get and setup the artist information needed
    artistObject = {
        "id": artistInfo["id"],
        "uri": artistInfo["external_urls"]["spotify"],
        "name": artistInfo["name"],
        "popularity": artistInfo["popularity"],
        "genres": ",".join([createGenreID(genre) for genre in artistInfo["genres"]]),
    }

    return artistObject

### Get all the markets (countries) in which spotify is present

In [None]:
# Get all markets
availableMarkets = getAllMarkets()

# Add markets to DataFrame
marketsDF = pd.DataFrame(availableMarkets["markets"], columns=["markets"])

# Print markets DataFrame info
marketsDF.info()

# Save markets dataset to file
marketsDF.to_csv(marketsPath)

### Get Data from Spotify Web API
Starting from the reduced CSV file with ***weekly TOP 100***, we retrieved more data about the tracks contained in the file

In [None]:
# Load Spotify Charts
trackCharts = pd.read_csv(chartsPath, sep=",", index_col=0)

# Drop NaN columns
trackCharts = trackCharts.dropna()

# Print track charts info
trackCharts.info()

In [None]:
# Get only URIs
spotifyTrackIDs = pd.DataFrame(trackCharts["trackID"].drop_duplicates())

# Print tracks uris info
spotifyTrackIDs.info()

In [None]:
# Setup DataFrames
genresTmpData = []
tracksTmpData = []
albumsTmpData = []
artistsTmpData = []

# Reduce number of tracks
# spotifyTrackIDs = spotifyTrackIDs[:2000]

# Iterate over spotify uris DataFrame
index = -1
for rowID, rowData in spotifyTrackIDs.iterrows():
    # Increment real index
    index += 1

    trackID = rowData.loc["trackID"]

    # Try to get track info and audio features
    try:
        trackInfo = getTrackInfo(trackID)
        trackAudioFeatures = getAudioFeatures(trackID)
    except Exception as e:
        print("⛔ [ERROR] Cannot retrieve data with track {id}\n\t🗨️ [RESPONSE] {resp}\n".format(index=index, id=trackID, resp=e))
        continue
    
    # Create the track and album objects
    trackObject = generateTrackObject(trackInfo, trackAudioFeatures)
    albumObject = generateAlbumObject(trackInfo["album"])

    # Append Track Object in the temporary data array
    if not trackObject is None:
        tracksTmpData.append(trackObject)

    # Append Album Object in the temporary data array
    if not albumObject is None:
        albumsTmpData.append(albumObject)

    # Create the list of artists objects
    for artist in trackInfo["artists"]:
        artistID = artist["id"]
        artistInfo = None

        try:
            artistInfo = getArtistInfo(artistID)
        except Exception as e:
            print("⛔ [ERROR] Cannot retrieve data with artist {id}\n\t🗨️ [RESPONSE] {resp}\n".format(
                index=index, id=artistID, resp=e))

        artistObject = generateArtistObject(artistInfo) if not artistInfo is None else None

        # Append Artist Object in the temporary data array
        if not artistObject is None:
            artistsTmpData.append(artistObject)

            for genre in artistInfo["genres"]:
                # Append Genre Object in the temporary data array
                genresTmpData.append({
                    "id": createGenreID(genre),
                    "name": genre,
                })

    # Save DataFrame to file every 5000 tracks
    if index % 5000 == 0:
        # Add the track info to the DataFrame
        tracksDF = pd.concat(
            [tracksDF, pd.DataFrame(tracksTmpData)], ignore_index=True)

        # Add the album info to the DataFrame
        albumsDF = pd.concat(
            [albumsDF, pd.DataFrame(albumsTmpData)], ignore_index=True)

        # Add the artist info to the DataFrame
        artistsDF = pd.concat(
            [artistsDF, pd.DataFrame(artistsTmpData)], ignore_index=True)

        # Add the genres to the DataFrame
        genresDF = pd.concat(
            [genresDF, pd.DataFrame(genresTmpData)], ignore_index=True)

        # Drop duplicates
        artistsDF = artistsDF.drop_duplicates(subset=["id"], ignore_index=True)
        albumsDF = albumsDF.drop_duplicates(subset=["id"], ignore_index=True)
        genresDF = genresDF.drop_duplicates(subset=["id"], ignore_index=True)

        genresTmpData = []
        tracksTmpData = []
        albumsTmpData = []
        artistsTmpData = []

        print("💾 [STATUS INFO #{row}] Dataset saved\n".format(row=index))
        tracksDF.to_csv(tracksPath)
        albumsDF.to_csv(albumsPath)
        artistsDF.to_csv(artistsPath)
        genresDF.to_csv(genresPath)
    
    # Print stats every 50000 tracks
    if index % 50000 == 0:
        print("🎵 [STATUS INFO #{row}]".format(row=index))
        print(tracksDF.info())
        print(albumsDF.info())
        print(artistsDF.info())
        print(genresDF.info())
        print("\n")

# Print info about the DataFrames
print(tracksDF.info())
print(albumsDF.info())
print(artistsDF.info())
print(genresDF.info())


In [None]:
# Add the track info to the DataFrame
tracksDF = pd.concat(
    [tracksDF, pd.DataFrame(tracksTmpData)], ignore_index=True)

# Add the album info to the DataFrame
albumsDF = pd.concat(
    [albumsDF, pd.DataFrame(albumsTmpData)], ignore_index=True)

# Add the artist info to the DataFrame
artistsDF = pd.concat(
    [artistsDF, pd.DataFrame(artistsTmpData)], ignore_index=True)

# Add the genres to the DataFrame
genresDF = pd.concat(
    [genresDF, pd.DataFrame(genresTmpData)], ignore_index=True)

# Drop duplicates
tracksDF = tracksDF.drop_duplicates(subset=["id"], ignore_index=True)
artistsDF = artistsDF.drop_duplicates(subset=["id"], ignore_index=True)
albumsDF = albumsDF.drop_duplicates(subset=["id"], ignore_index=True)
genresDF = genresDF.drop_duplicates(subset=["id"], ignore_index=True)

genresTmpData = []
tracksTmpData = []
albumsTmpData = []
artistsTmpData = []

# Save datasets to file
tracksDF.to_csv(tracksPath)
albumsDF.to_csv(albumsPath)
artistsDF.to_csv(artistsPath)
genresDF.to_csv(genresPath)
