In [None]:
# Imports

import os
import time
from pathlib import Path
import pandas as pd
import json
import spotifyCredentials
import requests
import base64

# Setup

In [None]:
# Get absolute path
absPath = str(Path(os.path.abspath(os.getcwd())).absolute())
datasetsPath = os.path.join(absPath, "datasets")

# Create dataset directory if not exists
if not os.path.exists(datasetsPath):
    os.mkdir(datasetsPath)

# Setup datasets paths
spotifyChartsPath = os.path.join(datasetsPath, "reducedSpotifyCharts.csv")
genresPath = os.path.join(datasetsPath, "genres.csv")
marketsPath = os.path.join(datasetsPath, "markets.csv")
tracksPath = os.path.join(datasetsPath, "tracks.csv")
albumsPath = os.path.join(datasetsPath, "albums.csv")
artistsPath = os.path.join(datasetsPath, "artists.csv")

# Spotify API Functions

In [None]:
# Spotify credentials to get access token to Spotify API

SPOTIFY_CLIENT_ID = spotifyCredentials.SPOTIFY_CLIENT_ID
SPOTIFY_SECRET_ID = spotifyCredentials.SPOTIFY_SECRET_ID
SPOTIFY_REFRESH_TOKEN = spotifyCredentials.SPOTIFY_REFRESH_TOKEN

In [None]:
# Function to get the Spotify API Access Token
def getAccessToken():
    # Spotify Basic Authorization Code
    authBasic = base64.b64encode("{}:{}".format(SPOTIFY_CLIENT_ID, SPOTIFY_SECRET_ID).encode()).decode()

    # Request to get Access Token from Client ID and Secret ID
    accessTokenRequest = requests.post("https://accounts.spotify.com/api/token",
        data={
            "grant_type":"refresh_token",
            "refresh_token":SPOTIFY_REFRESH_TOKEN
        },
        headers={
            "Authorization": "Basic " + authBasic
        }
    )

    # Get Access Token
    accessToken = None

    if accessTokenRequest.status_code == 200:
        accessToken = accessTokenRequest.json()["access_token"]

    return accessToken

accessToken = getAccessToken()
print("🪙 [INFO] Access Token is: {}".format(accessToken))


In [None]:
# Function to get all available Markets from Spotify API
def getAllMarkets(fromError=False):
    global accessToken

    availableMarkets = requests.get("https://api.spotify.com/v1/markets",
                                          headers={
                                              "Authorization": "Bearer " + accessToken
                                          })

    if availableMarkets.status_code == 200:
        return availableMarkets.json()
    elif availableMarkets.status_code == 401 and fromError is False:  # The access token has expired
        accessToken = getAccessToken()
        return getAllMarkets(fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getAllMarkets(fromError=True)
    else:
        raise Exception(availableMarkets.text)


In [None]:
# Function to get Track info from Spotify API
def getTrackInfo(trackID, fromError=False):
    global accessToken

    trackInfoRequest = requests.get("https://api.spotify.com/v1/tracks/{id}".format(id=trackID),
        headers={
            "Authorization": "Bearer " + accessToken
        })
    
    if trackInfoRequest.status_code == 200:
        return trackInfoRequest.json()
    elif trackInfoRequest.status_code == 401 and fromError is False: # The access token has expired
        accessToken = getAccessToken()
        return getTrackInfo(trackID, fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getTrackInfo(trackID, fromError=True)
    else:
        raise Exception(trackInfoRequest.text)


In [None]:
def getAudioFeatures(trackID, fromError=False):
    global accessToken

    audioFeaturesRequest = requests.get("https://api.spotify.com/v1/audio-features/{id}".format(id=trackID),
                                    headers={
        "Authorization": "Bearer " + accessToken
    })

    if audioFeaturesRequest.status_code == 200:
        return audioFeaturesRequest.json()
    elif audioFeaturesRequest.status_code == 401 and fromError is False:  # The access token has expired
        accessToken = getAccessToken()
        return getAudioFeatures(trackID, fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getAudioFeatures(trackID, fromError=True)
    else:
        raise Exception(audioFeaturesRequest.text)


In [None]:
# Function to get Artist info from Spotify API
def getArtistInfo(artistID, fromError=False):
    global accessToken

    artistInfoRequest = requests.get("https://api.spotify.com/v1/artists/{id}".format(id=artistID),
                                    headers={
        "Authorization": "Bearer " + accessToken
    })

    if artistInfoRequest.status_code == 200:
        return artistInfoRequest.json()
    elif artistInfoRequest.status_code == 401 and fromError is False:  # The access token has expired
        accessToken = getAccessToken()
        return getArtistInfo(artistID, fromError=True)
    elif fromError is False:  # The api rate limit has reached or other errors
        time.sleep(30)
        return getArtistInfo(artistID, fromError=True)
    else:
        raise Exception(artistInfoRequest.text)


# Parse Data

In [None]:
# Setup DataFrames

genresDF = None
genresCols = ["genre"]

tracksDF = None
tracksCols = ["id", "uri", "title", "duration", "popularity", "explicit",
              "key", "tempo", "mode", "time_signature", "acousticness", "danceability",
              "energy", "loudness", "liveness", "valence", "speechiness", "instrumentalness",
              "artists", "album", "available_countries"]

albumsDF = None
albumsCols = ["id", "uri", "title", "total_tracks", "release_date", "release_date_precision",
             "album_type", "artists", "available_countries"]

artistsDF = None
artistsCols = ["id", "uri", "name", "popularity", "genres"]


## Get markets

In [None]:
# Get all markets
availableMarkets = getAllMarkets()

# Add markets to DataFrame
marketsDF = pd.DataFrame(availableMarkets["markets"], columns=["markets"])

# Print markets DataFrame info
marketsDF.info()

# Save markets dataset to file
marketsDF.to_csv(marketsPath)

## Get Data from Spotify API

In [None]:
# Load Spotify Charts
trackCharts = pd.read_csv(spotifyChartsPath, sep=",")

# Drop NaN columns
trackCharts = trackCharts.dropna()

# Print track charts info
trackCharts.info()


In [None]:
# Get only uris
spotifyTrackLinks = pd.DataFrame(trackCharts["uri"].drop_duplicates())

# Print tracks uris info
spotifyTrackLinks.info()


In [None]:
def generateTrackObject(trackInfo, trackAudioFeatures):
    # Get only artists ids
    artistsIDs = []
    for artist in trackInfo["artists"]:
        artistsIDs.append(artist["id"])

    # Get and setup the track information needed
    trackObject = {
        "id": trackInfo["id"],
        "uri": trackInfo["external_urls"]["spotify"],
        "title": trackInfo["name"],
        "duration": trackInfo["duration_ms"],
        "popularity": trackInfo["popularity"],
        "explicit": trackInfo["explicit"],
        "key": trackAudioFeatures["key"],
        "tempo": trackAudioFeatures["tempo"],
        "mode": trackAudioFeatures["mode"],
        "time_signature": trackAudioFeatures["time_signature"],
        "acousticness": trackAudioFeatures["acousticness"],
        "danceability": trackAudioFeatures["danceability"],
        "energy": trackAudioFeatures["energy"],
        "loudness": trackAudioFeatures["loudness"],
        "liveness": trackAudioFeatures["liveness"],
        "valence": trackAudioFeatures["valence"],
        "speechiness": trackAudioFeatures["speechiness"],
        "instrumentalness": trackAudioFeatures["instrumentalness"],
        "artists": ",".join(artistsIDs),
        "album": trackInfo["album"]["id"],
        "available_countries": ",".join(trackInfo["available_markets"])
    }

    return trackObject


In [None]:
def generateAlbumObject(albumInfo):
    # Get only artists ids
    artistsIDs = []
    for artist in albumInfo["artists"]:
        artistsIDs.append(artist["id"])

    # Get and setup the album information needed
    albumObject = {
        "id": albumInfo["id"],
        "uri": albumInfo["external_urls"]["spotify"],
        "title": albumInfo["name"],
        "total_tracks": albumInfo["total_tracks"],
        "release_date": albumInfo["release_date"],
        "release_date_precision": albumInfo["release_date_precision"],
        "album_type": albumInfo["album_type"],
        "artists": ",".join(artistsIDs),
        "available_countries": ",".join(albumInfo["available_markets"]),
    }

    return albumObject


In [None]:
def generateArtistObject(artistInfo):
    # Get and setup the artist information needed
    artistObject = {
        "id": artistInfo["id"],
        "uri": artistInfo["external_urls"]["spotify"],
        "name": artistInfo["name"],
        "popularity": artistInfo["popularity"],
        "genres": ",".join(artistInfo["genres"]),
    }

    return artistObject


In [None]:
# Setup DataFrames
tracksDF = pd.DataFrame([], columns=tracksCols)
albumsDF = pd.DataFrame([], columns=albumsCols)
artistsDF = pd.DataFrame([], columns=artistsCols)
genresDF = pd.DataFrame([], columns=genresCols)

# Reduce number of tracks
# spotifyTrackLinks = spotifyTrackLinks[:200]

# Iterate over spotify uris DataFrame
index = -1
for rowID, rowData in spotifyTrackLinks.iterrows():
    # Increment real index
    index += 1

    spotifyUri = rowData.loc["uri"]

    # Get the track id from the uri
    trackID = spotifyUri.removeprefix("https://open.spotify.com/track/")

    # Try to get track info and audio features
    try:
        trackInfo = getTrackInfo(trackID)
        trackAudioFeatures = getAudioFeatures(trackID)
    except Exception as e:
        print("⛔ [ERROR] Cannot retrieve data with track {id}\n\t🗨️ [RESPONSE] {resp}\n".format(index=index, id=trackID, resp=e))
        continue
    
    # Create the track and album objects
    trackObject = generateTrackObject(trackInfo, trackAudioFeatures)
    albumObject = generateAlbumObject(trackInfo["album"])

    # Create 1 row DataFrame for the track
    trackObjDF = pd.DataFrame([list(trackObject.values())], columns=tracksCols)

    # Create 1 row DataFrame for the album
    albumObjDF = pd.DataFrame([list(albumObject.values())], columns=albumsCols)

    # Create the list of artists objects
    artistsObjList = []
    genresList = []
    for artist in trackInfo["artists"]:
        artistID = artist["id"]
        artistInfo = None

        try:
            artistInfo = getArtistInfo(artistID)
        except Exception as e:
            print("⛔ [ERROR] Cannot retrieve data with artist {id}\n\t🗨️ [RESPONSE] {resp}\n".format(
                index=index, id=artistID, resp=e))

        artistObject = generateArtistObject(artistInfo) if not artistInfo is None else None
        artistsObjList.append(list(artistObject.values()))
        genresList.extend(artistInfo["genres"])
    
    # Create rows DataFrame for the artists
    artistsObjDF = pd.DataFrame(artistsObjList, columns=artistsCols)

    # Create rows DataFrame for the genres
    genresObjDF = pd.DataFrame(genresList, columns=genresCols)

    # Add the track info to the DataFrame
    tracksDF = pd.concat([tracksDF, trackObjDF], ignore_index=True)

    # Add the album info to the DataFrame
    albumsDF = pd.concat([albumsDF, albumObjDF], ignore_index=True)

    # Add the artist info to the DataFrame
    artistsDF = pd.concat([artistsDF, artistsObjDF], ignore_index=True)

    # Add the genres to the DataFrame
    genresDF = pd.concat([genresDF, genresObjDF], ignore_index=True)
    
    # Drop duplicates
    artistsDF = artistsDF.drop_duplicates(subset=["id"], ignore_index=True)
    albumsDF = albumsDF.drop_duplicates(subset=["id"], ignore_index=True)
    genresDF = genresDF.drop_duplicates(subset=["genre"], ignore_index=True)
    
    # Print stats every 5000 tracks
    if index % 5000 == 0:
        print("🎵 [STATUS INFO #{row}]".format(row=index))
        print(tracksDF.info())
        print(albumsDF.info())
        print(artistsDF.info())
        print(genresDF.info())
        print("\n")

    # Save DataFrame to file every 100 tracks
    if index % 100 == 0:
        print("💾 [STATUS INFO #{row}] Dataset saved\n".format(row=index))
        tracksDF.to_csv(tracksPath)
        albumsDF.to_csv(albumsPath)
        artistsDF.to_csv(artistsPath)
        genresDF.to_csv(genresPath)

# Print info about the DataFrames
print(tracksDF.info())
print(albumsDF.info())
print(artistsDF.info())
print(genresDF.info())


In [None]:
# Save datasets to file
tracksDF.to_csv(tracksPath)
albumsDF.to_csv(albumsPath)
artistsDF.to_csv(artistsPath)
genresDF.to_csv(genresPath)