### Import Packages

In [3]:
import psycopg2
import pandas as pd
from spotipy.oauth2 import SpotifyClientCredentials, SpotifyOAuth
from spotipy import util
from configparser import ConfigParser
import spotipy
import json
import requests
import datetime
from sqlalchemy import create_engine

### Functions

In [4]:
def config(section:str, filename='secrets.ini'):
    # create a parser
    parser = ConfigParser()
    # read config file
    parser.read(filename)

    # get section, default to postgresql
    db = {}
    if parser.has_section(section):
        params = parser.items(section)
        for param in params:
            db[param[0]] = param[1]
    else:
        raise Exception('Section {0} not found in the {1} file'.format(section, filename))
    return db 
#def initialize_sp(scope, client_id, client_secret, redirect_uri):
    sp = spotipy.Spotify(auth_manager=SpotifyOAuth(scope, client_id, client_secret,redirect_uri))
    return sp
def getplayListDict(user_id):
    playlists = sp.user_playlists(user_id)
    names = [x['name'] for x in playlists['items']]
    ids = [x['id'] for x in playlists['items']]
    return dict(zip(names, ids))
def getTrackInfoFromPlaylist(user_id, playlist_id):
    offset = 0
    songs = []
    while True:
        content = sp.user_playlist_tracks(user_id, playlist_id, limit = 50, offset=offset)
        songs += content['items']
        if content['next'] is not None:
            offset+=50
        else:
            break
    names = [x['track']['name'] for x in songs]
    ids = [x['track']['id'] for x in songs]
    artist_ids = [x['track']['artists'][0]['id'] for x in songs]
    artist_names = [x['track']['artists'][0]['name'] for x in songs]
    popularity = [x['track']['popularity'] for x in songs]
    release_date = [x['track']['album']['release_date'] for x in songs]
    added_date = [x['added_at'] for x in songs]
    df = pd.DataFrame({'Track Name': names, 'Track ID': ids, 'Artist ID': artist_ids, 'Artist Name': artist_names,'Track popularity': popularity, 'Release date': release_date, 'Added date': added_date})
    df['Playlist ID'] = playlist_id
    df['Release date'] = pd.to_datetime(df['Release date'])
    df['Added date'] = pd.to_datetime(df['Added date'])
    df['Song-Artist'] = df['Track Name'] + ' - ' + df['Artist Name']
    return df
def getArtistInfoFromPlaylist(user_id, playlist_id):
    offset = 0
    songs = []
    genres = []
    while True:
        content = sp.user_playlist_tracks(user_id, playlist_id, limit = 50, offset=offset)
        songs += content['items']
        if content['next'] is not None:
            offset+=50
        else:
            break
            
    artist_ids = [x['track']['artists'][0]['id'] for x in songs]
    artist_ids = filter(None, artist_ids)
    
    n_calls = 0
    artists = []
    for artist_id in artist_ids:
        try:
            content = sp.artist(artist_id)
            artists.append(content)
        except Exception as e:
            print(artist_id)
            print(e)
    try:
        followers = [x['followers']['total'] for x in artists]
        genres = [x['genres'] for x in artists]
        popularity = [x['popularity'] for x in artists] 
        name = [x['name'] for x in artists]
        ids = [x['id'] for x in artists]
    except Exception as e:
        print(e)
    df = pd.DataFrame({'Artist ID':ids, 'Artist': name,'Artist Followers':followers, 'Artist genres': genres, 'Artist popularity':popularity})
    return df
def getAudioFeatures(track_id_list, limit=50):
    features = []
    index = 0
    while index < len(track_id_list):
        features += sp.audio_features(track_id_list[index:index+limit])
        index += limit
    df = pd.DataFrame(features)
    df['Track ID'] = track_id_list
    return df
def getCurUserTopArtists(token, limit=50, offset=0, time_range='medium_term'):
    offset = 0
    artists = []
    while True:
        content = spotipy.Spotify(auth = token).current_user_top_artists(limit=50,time_range=time_range)
        artists += content['items']
        if content['next'] is not None:
            offset+=50
        else:
            break
    name = [x['name'] for x in artists]
    ids = [x['id'] for x in artists]
    df = pd.DataFrame({'Artist ID': ids, 'Artist': name})
    df['User ID'] = user_id
    return df
def getCurUserTopTracks(token, limit=50, offset=0, time_range='medium_term'):
    offset = 0
    songs = []
    while True:
        content =  spotipy.Spotify(auth = token).current_user_top_tracks(limit=50,time_range=time_range)
        songs += content['items']
        if content['next'] is not None:
            offset+=50
        else:
            break
    names = [x['name'] for x in songs]
    ids = [x['id'] for x in songs]
    artists_ids = [x['artists'][0]['id'] for x in songs]
    artist_names = [x['artists'][0]['name'] for x in songs]
    df = pd.DataFrame({'Track Name': names, 'Track ID': ids, 'Artist ID': artists_ids, 'Artist Name': artist_names})
    df['User ID'] = user_id
    df['Song-Artist'] = df['Track Name'] + ' - ' + df['Artist Name']
    return df
def getCurrPlaying(token, user_id):
    #https://developer.spotify.com/console/get-users-currently-playing-track/
    params={"market":"CA", "Authorization": f"Bearer {token}"}
    resp = requests.get('https://api.spotify.com/v1/me/player/currently-playing', headers = params)
    if resp.status_code == 200:
        resp = resp.json()
        timestamp = datetime.datetime.fromtimestamp(resp['timestamp']/1000.0)
        is_playing = resp['is_playing']
        track_name = resp['item']['name']
        track_id = resp['item']['id']
        artist_name = resp['item']['artists'][0]['name']
        artist_id = resp['item']['artists'][0]['id']
        df = pd.DataFrame({'Timestamp': [timestamp], 'Track Name': [track_name], 'Track ID': [track_id],'Artist Name': [artist_name], 'Artist ID': [artist_id]})
        df['User ID'] = user_id
        df['Song-Artist'] = df['Track Name'] + ' - ' + df['Artist Name']
    else:
        df = pd.DataFrame(columns=['Timestamp', 'Track Name', 'Track ID','Artist Name', 'Artist ID', 'Song-Artist'])
    return df
def getUserRecentPlayed(token, user_id, limit=50):
    #https://developer.spotify.com/console/get-recently-played/
    params = {"limit": str(limit), "Authorization": f"Bearer {token}"}
    resp = requests.get('https://api.spotify.com/v1/me/player/recently-played', headers = params)
    if resp.status_code == 200:
        items = resp.json()['items']
        track_ids = [x['track']['id'] for x in items]
        track_names = [x['track']['name'] for x in items]
        artist_names = [x['track']['artists'][0]['name'] for x in items]
        artist_ids = [x['track']['artists'][0]['id'] for x in items]
        df = pd.DataFrame({'Track Name': track_names, 'Track ID': track_ids,'Artist Name': artist_names, 'Artist ID': artist_ids})
        df['User ID'] = user_id
        df['Song-Artist'] = df['Track Name'] + ' - ' + df['Artist Name']
    else:
        df = pd.DataFrame(columns=['Track Name', 'Track ID','Artist Name', 'Artist ID', 'User ID', 'Song-Artist'])
    return df
def getUserInfo(token, user_id):
    content = spotipy.Spotify(auth = token).current_user()
    df = pd.DataFrame()
    df['Playlist Name'] = list(getplayListDict(user_id).keys())
    df['Playlist ID'] = list(getplayListDict(user_id).values())
    df['Followers'] = content['followers']['total']
    df['Display Name'] = content['display_name']
    df['User ID'] = user_id

    return df

### Main

In [5]:
spotify_secrets = config('spotify')
cid = spotify_secrets['cid']
secret = spotify_secrets['secret'] 
user_id = spotify_secrets['user_id']
recent_token = input('Get token from https://developer.spotify.com/console/get-recently-played/')
client_credentials_manager = SpotifyClientCredentials(client_id=cid, client_secret=secret) 
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
refresh_auth = True
URI = r"http://localhost:8888"

Get token from https://developer.spotify.com/console/get-recently-played/BQDahymmzVrvnMeJKCL1QPXU-ztTRxn5jXZMvL6FJB3NwIKBozaHUc_78Dedpjwtae01y8uLFYZh0F1m4r4YdaK1tPqtXMauuqr4C0rfB7vN8fVGCl7HtI-saE9opkZnqbH8Q88vHR6m2TVlqdejJBWMs5FX2Bn8kUgteKs93KaZ


In [6]:
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
get_token = 'top'
try:
    if get_token=='top':
        token_top = util.prompt_for_user_token(scope='user-top-read', client_id=cid, client_secret=secret, redirect_uri=URI, username=user_id)
    if get_token=='recent':
        token_recplayed = util.prompt_for_user_token(scope='user-read-recently-played', client_id=cid, client_secret=secret, redirect_uri=URI, username=user_id)
    if get_token=='current':
        token_currplaying = util.prompt_for_user_token(scope='user-read-currently-playing', client_id=cid, client_secret=secret, redirect_uri=URI, username=user_id)
except Exception as e:
    print(e)

In [7]:
#Generate Artist and Track DataFrames
my_playlist = getplayListDict(user_id)
concat_df_tracks = []
concat_df_artists = []
for playlist in my_playlist.keys():
    playlist_df_tracks = getTrackInfoFromPlaylist(user_id, my_playlist[playlist])
    playlist_df_artists = getArtistInfoFromPlaylist(user_id, my_playlist[playlist])
    
    playlist_df_tracks['Playlist Name'] = playlist
    playlist_df_artists['Playlist Name'] = playlist

    concat_df_tracks.append(playlist_df_tracks)
    concat_df_artists.append(playlist_df_artists)
    
tracks_df = pd.concat(concat_df_tracks)
artists_df = pd.concat(concat_df_artists)

#Tracks df
tracks_df.dropna(inplace=True)
#Artists df
artists_df.dropna(inplace=True)
artists_df.drop_duplicates(inplace=True, subset='Artist ID')

#Audio features
audio_features = getAudioFeatures(tracks_df['Track ID'].tolist())
audio_features.drop_duplicates(inplace=True, subset='Track ID')
#Currently playing
#curr_playing_df = getCurrPlaying(current_token, user_id)
#Recent played
recent_playing_df = getUserRecentPlayed(recent_token, user_id)
#top artists and tracks
top_tracks_df_med = getCurUserTopTracks(token_top, limit=50, time_range='medium_term')
top_tracks_df_long = getCurUserTopTracks(token_top, limit=50, time_range='long_term')
top_artists_df_med = getCurUserTopArtists(token_top, limit=20, time_range='medium_term')
top_artists_df_long = getCurUserTopArtists(token_top, limit=20, time_range='long_term')
#user df
user_df = getUserInfo(token_top, user_id)

### Export to PostgreSQL DB

In [8]:
postgresql_secrets = config('postgresql')
host=postgresql_secrets['host']
database=postgresql_secrets['database']
user=postgresql_secrets['user']
password=postgresql_secrets['password']
port=postgresql_secrets['port']

In [9]:
engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}')

In [10]:
user_df.to_sql('user_info', engine, if_exists='replace',index=False)
tracks_df.to_sql('tracks', engine, if_exists='replace',index=False)
artists_df.to_sql('artists', engine, if_exists='replace',index=False)
audio_features.to_sql('audio_features', engine, if_exists='replace',index=False)
recent_playing_df.to_sql('recent_playing', engine, if_exists='replace',index=False)
top_tracks_df_med.to_sql('top_tracks_med', engine, if_exists='replace',index=False)
top_tracks_df_long.to_sql('top_tracks_long', engine, if_exists='replace',index=False)
top_artists_df_med.to_sql('top_artists_med', engine, if_exists='replace',index=False)
top_artists_df_long.to_sql('top_artists_long', engine, if_exists='replace',index=False)