In [1]:
import os, json, re, requests, base64, time, datetime

import psycopg2
import sqlalchemy as sa
from urllib.parse import urlparse, parse_qs

import numpy as np
import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

In [2]:
class Spotify(object):
    def __init__(self, creds_fp:str=None, token_fp:str=None, redirect_uri:str=None, scopes:list=None):
        
        self.creds_fp = creds_fp
        self.token_fp = token_fp
        
        if not os.path.exists(self.creds_fp):
            print("Invalid path for credentials!")
        else:
            try:
                with open(self.creds_fp, "r") as f:
                    self.creds = json.load(f)
            except:
                print("Credentials file is not in JSON format!")
                
        if set(self.creds.keys()) != {'client_id', 'client_secret'}:
            print("""Credentials keys are not the expected values. Expected keys are 
            ['client_id', 'client_secret'].""")
        
        if not os.path.exists(self.token_fp):
            print("Invalid path for token!")
            self.token_status = None
        else:
            try:
                with open(self.token_fp, "r") as f:
                    self.token = json.load(f)
                    self.token_status = "ok"
            except:
                print("Token file is not in JSON format!")
        
        if self.token_status is not None:
            if set(self.token.keys()) != {'access_token', 'expires', 'expires_in', 'refresh_token', 'scope', 'token_type'}:
                print("""Token keys are not the expected values. Expected keys are 
                ['access_token', 'expires', 'expires_in', 'refresh_token', 'scope', 'token_type'].""")
        
        self.redirect_uri = redirect_uri
        self.scopes = " ".join(scopes)
        
        self.url_auth = "https://accounts.spotify.com/authorize"
        self.url_token = "https://accounts.spotify.com/api/token"
        self.url_refresh = "https://accounts.spotify.com/api/token"
        
        api_ver = "v1"
        
        self.url_base = "https://api.spotify.com"
        self.url_recs = "/".join([self.url_base, f"{api_ver}/recommendations"])
        self.url_genre_seeds = "/".join([self.url_recs, "available-genre-seeds"])
        self.url_track = "/".join([self.url_base, f"{api_ver}/tracks"])
        self.url_audio_analysis = "/".join([self.url_base, f"{api_ver}/audio-analysis"])
        self.url_audio_features = "/".join([self.url_base, f"{api_ver}/audio-features"])
        self.url_artists = "/".join([self.url_base, f"{api_ver}/artists"])
        self.url_albums = "/".join([self.url_base, f"{api_ver}/albums"])
        
        self.creds_b64 = base64.b64encode(bytes(":".join([self.creds["client_id"], 
                                                          self.creds["client_secret"]
                                                         ]
                                                        ), "utf-8")
                                         ).decode("utf-8")
        
        self.headers_auth = {"Authorization": "Basic " + self.creds_b64}
        
    def oauth2_first_time(self, token_filepath:str=None): 
        if self.token_status == None:
            params_auth_step1 = {"client_id": self.creds["client_id"],
                                 "response_type": "code",
                                 "redirect_uri": self.redirect_uri,
                                 "scope": self.scopes
                                }

            auth_step1 = requests.get(url=self.url_auth, params=params_auth_step1)

            print(auth_step1.url)
            print("\n")
            print("Visit the above URL and input redirect URL with the embedded code here:")

            code_auth = parse_qs(urlparse(input()).query)["code"][0]

            params_auth_step2 = {"grant_type": "authorization_code",
                                 "code": code_auth,
                                 "redirect_uri": self.redirect_uri
                                }

            auth_step2 = requests.post(url=self.url_token, data=params_auth_step2, headers=self.headers_auth)

            self.token = auth_step2.json()
            self.token["expires"] = time.time() + 3600
            self.token_fp = token_filepath
            
            with open(self.token_fp, "w") as token_store:
                json.dump(auth_step2.json(), token_store)
    
    def refresh_token(func):
        def refresh(self):
            if time.time() >= self.token["expires"]:
                params_refresh = {"grant_type": "refresh_token",
                                  "refresh_token": self.token["refresh_token"]
                                 }

                refresh = requests.post(self.url_refresh, data=params_refresh, headers=self.headers_auth)

                old_token = self.token["access_token"]
                self.token["access_token"] = refresh.json()["access_token"]

                with open(self.token_fp, "w") as token_store:
                    json.dump(self.token, token_store)

                self.token["expires"] = time.time() + self.token["expires_in"]
                
            return func(self)
            
        return refresh
    
    @refresh_token
    def headers_bearer(self):
        
        headers = {"Authorization": "Bearer " + self.token["access_token"],
                   "Content-Type": "application/json",
                   "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.97 Safari/537.36"
                  }
        
        return headers
    
    def get_profile(self, user="me"):
        
        if user == "me":
            url_profile = "/".join([self.url_base, f"{api_ver}/me"])
        else:
            url_profile = "/".join([self.url_base, f"{api_ver}/users", str(user)])
        
        profile = requests.get(url_profile, headers=self.headers_bearer())
    
        return profile
    
    def get_genres(self):
        
        genres = requests.get(url=self.url_genre_seeds, headers=self.headers_bearer())

        return genres
    
    def create_playlist(self, user_id=None, playlist_name=None, public=False):
        
        url_playlist = "/".join([self.url_base, api_ver, "users", str(user_id), "playlists"])
    
        params_playlist = {"name": playlist_name,
                           "public": public
                          }
    
        create = requests.post(url_playlist, json=params_playlist, headers=self.headers_bearer())

        return create
    
    def add_to_playlist(self, playlist_id=None, uris=None, position=None):
        
        url_playlist = "/".join([self.url_base, api_ver, "playlists", playlist_id, "tracks"])

        params_playlist = {"uris": uris}

        if position != None:
            params_playlist["position"] = position

        add = requests.post(url_playlist, json=params_playlist, headers=self.headers_bearer())

        return add
    
    def get_recommendations(self, parameters:dict=None):
        """
        Refer to parameters here: https://developer.spotify.com/documentation/web-api/reference/browse/get-recommendations/
        
        Pass parameters as a dictionary in the following format:
        {"QUERY_PARAMETER": value}
        
        Ex. {"limit": 100} will set the number of recommendations to 100.
        """
        
        recs = requests.get(url=self.url_recs, params=parameters, headers=self.headers_bearer())

        return recs
    
    def get_track(self, track_id:str):
        
        track = requests.get(url="/".join([self.url_track, track_id]), headers=self.headers_bearer())
        
        return track
    
    def get_track_many(self, track_ids:list):
        
        _params = {"ids": ",".join(track_ids)}
        
        tracks = requests.get(url=self.url_track, params=_params, headers=self.headers_bearer())
        
        return tracks
    
    def get_audio_analysis(self, track_id:str):
        
        analysis = requests.get(url="/".join([self.url_audio_analysis, track_id]), headers=self.headers_bearer())
        
        return analysis

    def get_audio_features(self, track_id:str):
    
        features = requests.get(url="/".join([self.url_audio_features, track_id]), headers=self.headers_bearer())
        
        return features
    
    def get_audio_features_many(self, track_ids:list):
        
        _params = {"ids": ",".join(track_ids)}
        
        features = requests.get(url=self.url_audio_features, params=_params, headers=self.headers_bearer())
        
        return features
    
    def get_artist(self, artist_id:str):
        
        artist = requests.get(url="/".join([self.url_artists, artist_id]), headers=self.headers_bearer())
        
        return artist
    
    def get_artist_many(self, artist_ids:list):
        
        _params = {"ids": ",".join(artist_ids)}
        
        artist = requests.get(url=self.url_artists, params=_params, headers=self.headers_bearer())
        
        return artist
    
    def get_album(self, album_id:str):
        
        albums = requests.get(url="/".join([self.url_albums, album_id]), headers=self.headers_bearer())
        
        return albums
    
    def get_albums_many(self, album_ids:list):
        
        _params = {"ids": ",".join(album_ids)}
        
        albums = requests.get(url=self.url_albums, params=_params, headers=self.headers_bearer())
        
        return albums

In [3]:
main_directory = r"D:\data_projects\spotify"
spot_creds_file = r"D:\personal\creds\spotify\apikey.json"
spot_token_file = r"D:\personal\creds\spotify\token.json"

my_scopes = ["playlist-read-collaborative", "playlist-modify-public", "playlist-read-private", 
          "playlist-modify-private", "user-library-modify", "user-library-read"]

spot = Spotify(spot_creds_file, spot_token_file, "https://example.com/callback", my_scopes)

db_creds_file = r"D:\personal\creds\psql\db_creds.json"
with open(db_creds_file) as f:
    db_creds = json.load(f)

uid = db_creds["uid"]
pw = db_creds["password"]
host = "localhost"
port = 5432
db = "spotify"

engine = sa.create_engine(f"postgresql+psycopg2://{uid}:{pw}@{host}:{port}/{db}")
conn = engine.connect()

In [4]:
spot.oauth2_first_time(spot_token_file)

In [5]:
genres = spot.get_genres().json()["genres"]

In [6]:
if "dump" not in os.listdir(main_directory):
    os.mkdir("dump")

for g in genres:
    if g not in os.listdir(os.path.join(main_directory, "dump")):
        os.mkdir(f"dump\\{g}")

In [7]:
def get_all_album_tracks(get_album_resp):
    tracks_list = []
    next_url = get_album_resp["tracks"]["next"]
    if next_url is None:
        tracks_list = [t["id"] for t in get_album_resp["tracks"]["items"]]
    else:
        while next_url is not None:
            tracks_list.extend([t["id"] for t in get_album_resp["tracks"]["items"]])

            status_code = 0

            while status_code != 200:
                r = requests.get(url=next_url, headers=spot.headers_bearer())
                status_code = r.status_code
                if status_code != 200 and status_code not in [204, 404]:
                    print(f"Attempted to fetch additional albums. Error code {status_code}.")
                    time.sleep(15)
                if status_code == 502:
                    print(f"ERROR CODE 502.")
                    break
            
            if status_code == 200:
                tracks_list.extend([t["id"] for t in r.json()["items"]])
                next_url = r.json()["next"]
    
    return tracks_list

In [8]:
idx = 0

In [None]:
tables = ["albums","artists","track_analysis","track_bars",
          "track_beats","track_features","track_info","track_sections",
          "track_segments","track_tatums"]

for g in genres[idx:]:
    recs_params = {"limit": 100,
                   "seed_genres": g,
                   "max_liveness": 0.85
                  }
    
    attempts = 0
    track_count = conn.execute("select count(*) from temp.track_info").fetchone()[0]
    
    while track_count <= 500:
        print(f"Starting pull for {g}.")
        status_code = 0
        
        while status_code != 200:
            recs_request = spot.get_recommendations(recs_params)
            status_code = recs_request.status_code
            if status_code != 200:                
                print("Recommendations request error:", recs_request.status_code)
                time.sleep(90)
        print("Recommendations request success!")

        ###################
        # Recommendations #
        ###################
        
        recs = recs_request.json()
        recs_request.close()
        
        tracks = pd.DataFrame(recs["tracks"])
                
        for t in tracks["id"]:
            if t+".json" not in os.listdir(f"D:\\data_projects\\spotify\\dump\\{g}"):
                with open(os.path.join(f"D:\\data_projects\\spotify\\dump\\{g}", f"{t}.json"), "w") as f:
                    json.dump(obj={"id": t}, fp=f)

            try:
                exists_check_temp = conn.execute(f"select count(*) from temp.track_info where id = '{t}'").fetchone()[0]
            except IndexError:
                exists_check_temp = 0
            
            try:
                exists_check_main = conn.execute(f"select count(*) from audio.track_info where id = '{t}'").fetchone()[0]
            except IndexError:
                exists_check_main = 0
                
            if exists_check_temp == 1 or exists_check_main == 1:
                tracks.drop(tracks[tracks["id"]==t].index, inplace=True)
                tracks.reset_index(drop=True, inplace=True)
                    
        album_ids = []
        
        if tracks.shape[0] > 0:
            tracks["album_id"] = tracks["album"].apply(lambda x: x["id"])
            tracks["artist_ids"] = tracks["artists"].apply(lambda x: [y["id"] for y in x])
            tracks["upload_dt"] = datetime.datetime.utcnow()
            
            album_ids = list({t["id"] for t in tracks["album"]})

            tracks.drop(labels=["album", "artists", "external_ids", "external_urls", "href", "preview_url"], axis=1, inplace=True)
            tracks.to_sql(name="track_info", con=conn, schema="temp", if_exists="append", index=False)
        
            track_count = conn.execute("select count(*) from temp.track_info").fetchone()[0]
        
        ##################
        # Audio Features #
        ##################
        
            status_code = 0

            while status_code != 200:

                features_req = spot.get_audio_features_many(tracks["id"])
                status_code = features_req.status_code
                
                if status_code in [204, 404]:
                    print(f"Features not found for track {t}.")
                    features_req.close()
                elif status_code == 504:
                    print("Error 504. Sleeping for 180 seconds.")
                    features_req.close()
                    time.sleep(90)
                elif status_code != 200:
                    print("Features request error:", features_req.json())
                    features_req.close()
                    time.sleep(90)

            features = pd.DataFrame(features_req.json()["audio_features"])
            features_req.close()
            
            features.drop(labels=["type", "track_href", "analysis_url"], axis=1, inplace=True)
            features.to_sql(name="track_features", con=conn, schema="temp", if_exists="append", index=False)
            
            time.sleep(10)
        
            for t in tracks["id"]:

                status_code = 0

                while status_code != 200:
                    analysis_req = spot.get_audio_analysis(t)
                    status_code = analysis_req.status_code
                    
                    if status_code in [204, 404]:
                        print(f"Analysis not found for track {t}.")
                        break
                    elif status_code == 504:
                        print("Error 504. Sleeping for 180 seconds.")
                        time.sleep(90)
                    elif status_code == 502:
                        print(f"Error 502 for track {t}. Moving to next track...")
                        break
                    elif status_code != 200:
                        print("Analysis request error:", analysis_req.json())
                        time.sleep(90)
        
        ##################
        # Audio Analysis #
        ##################
        
                if status_code == 200:
                    analysis = analysis_req.json()
                    analysis_req.close()
                    
                    _track = analysis["track"].copy()
                    keys_to_drop = ["sample_md5", "codestring", "code_version", "echoprintstring", "echoprint_version", "synchstring", "synch_version", "rhythmstring", "rhythm_version"]
                    [_track.pop(k, None) for k in keys_to_drop]

                    a_track = pd.DataFrame(_track, index=[0])
                    a_bars = pd.DataFrame(analysis["bars"])
                    a_beats = pd.DataFrame(analysis["beats"])
                    a_sections = pd.DataFrame(analysis["sections"])
                    a_segments = pd.DataFrame(analysis["segments"])
                    a_tatums = pd.DataFrame(analysis["tatums"])

                    for table in [a_track, a_bars, a_beats, a_sections, a_segments, a_tatums]:
                        table["id"] = t
                        table["upload_dt"] = datetime.datetime.utcnow()

                    a_track.to_sql(name="track_analysis", con=conn, schema="temp", if_exists="append", index=False)
                    a_bars.to_sql(name="track_bars", con=conn, schema="temp", if_exists="append", index=False)
                    a_beats.to_sql(name="track_beats", con=conn, schema="temp", if_exists="append", index=False)
                    a_sections.to_sql(name="track_sections", con=conn, schema="temp", if_exists="append", index=False)
                    a_segments.to_sql(name="track_segments", con=conn, schema="temp", if_exists="append", index=False)
                    a_tatums.to_sql(name="track_tatums", con=conn, schema="temp", if_exists="append", index=False)
                    
                    print(f"Analysis for track {t} complete!")
                    time.sleep(5)

        ##################
        #  Albums Data   #
        ##################
        
            if len(album_ids) > 0:
                for a in album_ids:
                    try:
                        exists_check_temp = conn.execute(f"select count(*) from temp.albums where id = '{a}'").fetchone()[0]
                    except IndexError:
                        exists_check_temp = 0
                    try:
                        exists_check_main = conn.execute(f"select count(*) from audio.albums where id = '{a}'").fetchone()[0]
                    except IndexError:
                        exists_check_main = 0

                    if exists_check_temp ==1 or exists_check_main == 1:
                        album_ids = [_id for _id in album_ids if _id != a]
                        
                albums_data = []

                if len(album_ids) > 20:
                    for i in range(0, len(album_ids), 20):                
                        status_code = 0

                        while status_code != 200:
                            if len(album_ids[i:i+20]) > 1:
                                album_req = spot.get_albums_many(album_ids[i:i+20])
                                album_req.close()
                            else:
                                album_req = spot.get_album(album_ids[-1])
                                album_req.close()

                            status_code = album_req.status_code

                            if status_code in [204, 404]:
                                print("Album not found.")
                                break
                            elif status_code != 200:
                                print("Albums request error:", album_req.json())
                                time.sleep(90)

                        if "albums" in album_req.json().keys():
                            albums_pull = album_req.json()["albums"]
                            
                            for album in albums_pull:
                                album["track_ids"] = get_all_album_tracks(album)                            
                            
                            albums_data.extend(album_req.json()["albums"])
                        else:
                            albums_data.append(album_req.json())

                        time.sleep(5)

                elif len(album_ids) > 1 and len(album_ids) <= 20:
                    status_code = 0

                    while status_code != 200:
                        album_req = spot.get_albums_many(album_ids)
                        status_code = album_req.status_code

                        if status_code in [204, 404]:
                            print("Album not found.")
                            album_req.close()
                            break
                        elif status_code != 200:
                            print("Albums request error:", album_req.json())
                            album_req.close()
                            time.sleep(90)

                    album_pull = album_req.json()["albums"]
                            
                    for album in album_pull:
                        album["track_ids"] = get_all_album_tracks(album) 

                    albums_data.extend(album_req.json()["albums"])
                
                elif len(album_ids) == 1:
                    status_code = 0
                    
                    while status_code != 200:
                        album_req = spot.get_album(album_ids[0])
                        status_code = album_req.status_code
                        
                        if status_code in [204, 404]:
                            print("Album not found.")
                            break
                        elif status_code != 200:
                            print("Albums request error:", album_req.json())
                            time.sleep(90)
                    
                    album_pull = album_req.json()
                    album_pull["track_ids"] = get_all_album_tracks(album_pull) 

                    albums_data.append(album_req.json())
                    
                    time.sleep(5)
                
                if len(albums_data) > 0:
                    print("Albums request success!")
                    
                    albums = pd.DataFrame(albums_data)
                    albums["artist_ids"] = albums["artists"].apply(lambda x: [y["id"] for y in x])
                    
                    albums["release_date"] = np.where(albums["id"]=="7DGc2xnFSAVP1FKpETRu10", "2010-05-04", albums["release_date"])
                    albums["release_date_precision"] = np.where(albums["id"]=="7DGc2xnFSAVP1FKpETRu10", "day", albums["release_date_precision"])
                    
                    albums["release_date"] = np.where(albums["id"]=="4jBLVlDhs6qWLwc4XIKI97", "2007-05-21", albums["release_date"])
                    albums["release_date_precision"] = np.where(albums["id"]=="4jBLVlDhs6qWLwc4XIKI97", "day", albums["release_date_precision"])
                    
                    albums["release_date"] = np.where(albums["id"]=="2XcN0r42RDvvK8YV3DxZLa", "2010-05-11", albums["release_date"])
                    albums["release_date_precision"] = np.where(albums["id"]=="2XcN0r42RDvvK8YV3DxZLa", "day", albums["release_date_precision"])
                    
                    albums["release_date"] = pd.to_datetime(albums["release_date"])
                    albums["upload_dt"] = datetime.datetime.utcnow()
                    
                    albums.drop(labels=["copyrights", "external_ids", "tracks", "artists", "external_urls", "href", "images"], axis=1, inplace=True)
                    albums.to_sql(name="albums", con=conn, schema="temp", if_exists="append", index=False)
                    
                else:
                    print("No albums to request, moving on...")
                
                time.sleep(5)

        ##################
        #  Artists Data  #
        ##################
        
            artist_ids = []

            for artists in tracks["artist_ids"]:
                for a in artists:
                    try:
                        exists_check_temp = conn.execute(f"select count(*) from temp.artists where id = '{a}'").fetchone()[0]
                    except IndexError:
                        exists_check_temp = 0
                        
                    try:
                        exists_check_main = conn.execute(f"select count(*) from audio.artists where id = '{a}'").fetchone()[0]
                    except IndexError:
                        exists_check_main = 0
                    
                    exists_check = exists_check_temp + exists_check_main
                    
                    if exists_check == 0 and a not in artist_ids:
                        artist_ids.append(a)

            if len(artist_ids) >= 1:
                artists_data = []

                if len(artist_ids) > 50:
                    for i in range(0, len(artist_ids), 50):
                        status_code = 0

                        while status_code != 200:
                            if len(artist_ids[i:i+50]) > 1:
                                artist_req = spot.get_artist_many(artist_ids[i:i+50])
                                artist_req.close()
                            else:
                                artist_req = spot.get_artist(artist_ids[-1])
                                artist_req.close()

                            status_code = artist_req.status_code

                            if status_code in [204, 404]:
                                print("Artist not found")
                                break
                            elif status_code != 200:
                                print("Artists request error:", artist_req.json())
                                artist_req.close()
                                time.sleep(90)

                        if status_code == 200:
                            if "artists" in artist_req.json().keys():
                                artists_data.extend(artist_req.json()["artists"])
                            else:
                                artists_data.append(artist_req.json())
                                
                            time.sleep(5)

                else:
                    status_code = 0

                    while status_code != 200:
                        if len(artist_ids) > 1:
                            artist_req = spot.get_artist_many(artist_ids)
                            artist_req.close()
                        else:
                            artist_req  = spot.get_artist(artist_ids[0])
                            artist_req.close()
                        status_code = artist_req.status_code

                        if status_code in [204, 404]:
                            print("Artist not found.")
                            break
                        elif status_code != 200:
                            print("Artists request error:", artist_req.json())
                            time.sleep(90)

                    if status_code == 200:
                        if len(artist_ids) > 1:
                            artists_data.extend(artist_req.json()["artists"])
                        else:
                            artists_data.append(artist_req.json())
                            
                        time.sleep(5)

                print("Artists request success!")

                if len(artists_data) > 0:
                    artists = pd.DataFrame(artists_data)
                    artists["followers"] = artists["followers"].apply(lambda x: x["total"])
                    artists["upload_dt"] = datetime.datetime.utcnow()
                    artists.drop(labels=["external_urls", "href", "images"], axis=1, inplace=True)
                    artists.to_sql(name="artists", con=conn, schema="temp", if_exists="append", index=False)
        else:
            attempts += 1
            
        if attempts > 5:
            break
    print(f"Completed pull for {g}.\n")
    
    for tbl in tables:
        conn.execute(f"INSERT INTO audio.{tbl} SELECT * FROM temp.{tbl}")
        conn.execute(f"TRUNCATE temp.{tbl}")
        print(f"Table {tbl} complete.")
    
    print("\n")
    time.sleep(15)

In [22]:
idx = genres.index(g)

In [25]:
g, genres.index(g)

('world-music', 125)

In [21]:
for t in ['track_features', 'track_analysis', 'track_bars', 'track_beats', 'track_sections', 'track_segments', 'track_tatums', 'track_info']:
    conn.execute(f"TRUNCATE temp.{t}")

In [24]:
conn.close()