In [1]:
import requests
import base64
import datetime
from urllib.parse import urlencode

import os
from dotenv import load_dotenv, find_dotenv

In [2]:
load_dotenv(find_dotenv())
client_id = os.getenv('SPOTIFY_CLIENT_ID')
client_secret = os.getenv('SPOTIFY_CLIENT_SECRET')

In [3]:
#Declare the new class with the added search method
class SpotifyAPI(object):
    access_token = None
    access_token_expires = datetime.datetime.now() #Alternatively, could just be None
    access_token_did_expire = True
    client_id = None
    client_secret = None
    token_url = 'https://accounts.spotify.com/api/token'
    method = 'POST'
    valid_filters = {'album','artist','track','year','upc','tag:hipster','tag:new','isrc','genre'}
    
    #Define the class with client id and client secret
    def __init__(self, client_id, client_secret): 
        self.client_id = client_id
        self.client_secret = client_secret
        
    #Get the token headers
    def get_token_headers(self):
        client_id = self.client_id
        client_secret = self.client_secret
        
        if client_secret == None or client_id == None:
            raise Exception('You must set client_id and client_secret')
            
        client_creds = f'{client_id}:{client_secret}'
        #Encode creds as bytes, then b64 bytes, then from b64 bytes back to a b64 string
        client_creds_b64 = base64.b64encode(client_creds.encode()).decode()
        
        return {
            'Authorization': f'Basic {client_creds_b64}'
        }
    
    #Get the grant type
    def get_token_data(self):
        return {
            'grant_type': 'client_credentials'
        }
    
    #Authenticate the session
    def perform_auth(self):
        token_url = self.token_url
        token_data = self.get_token_data()
        token_headers = self.get_token_headers()
        
        r = requests.post(token_url, data=token_data, headers=token_headers)
        
        if r.status_code not in range(200,299): #Checks if the request status code is valid
            raise Exception('Could not authenticate')
        
        #Extract information regarding when the token will expire
        token_response_data = r.json()
        now = datetime.datetime.now()
        access_token = token_response_data['access_token']
        expires_in = token_response_data['expires_in'] #seconds
        expires = now + datetime.timedelta(seconds = expires_in)
            
        self.access_token = access_token
        self.access_token_expires = expires
        self.access_token_did_expire = expires < now
        
        return True
    
    #Create a method that will get the access token
    def get_access_token(self):
        access_token = self.access_token
        expires = self.access_token_expires
        now = datetime.datetime.now()
        
        #Check if the access token is expired --> if so, get a new one
        if expires < now:
            self.perform_auth()
            return self.get_access_token()
        elif access_token == None:
            self.perform(auth)
            return self.get_access_token()
        return access_token
    
    # his will allow you to perform a search given the ID and type of search
    def get_resources(self, search_id, search_type = 'tracks'):
        lookup_url = f'https://api.spotify.com/v1/{search_type}/{search_id}'
        headers = self.get_resource_headers()
        r = requests.get(lookup_url, headers=headers)
        
        if r.status_code not in range(200,299):
            return r.status_code
        return r.json()
    
    #Get the header to access data (either in searches or get requests)
    def get_resource_headers(self):
        access_token = self.get_access_token()

        headers = {
            'Authorization' : f'Bearer {access_token}'  #Header must follow this 
        }
        return headers
    
    def response_error(self, response):
        if response == 429:
            raise Exception('Error: Rate limit exceeded')
    
    #This will allow you to perform a general search (no ID)
    def search(self, query, search_type = 'track'): #set track as the default search type
        headers = self.get_resource_headers()
        url_code = urlencode({'q': query, 'type':search_type.lower()}) #encodes the query/search_type into url form

        lookup_url = f'https://api.spotify.com/v1/search?{url_code}'

        r = requests.get(lookup_url, headers=headers)
        
        if r.status_code not in range(200,299):
            return r.status_code
        return r.json()
        
    
    ### Create a method of your choice
    def get_artists(self, search_id):
        return self.get_resources(search_id, search_type = 'artists')
    
    def get_tracks(self, search_id):
        return self.get_resources(search_id, search_type = 'tracks')
    
    def get_albums(self, search_id):
        return self.get_resources(search_id, search_type = 'albums')
        
    def get_genres(self, search_id):
        r_artist = self.get_artists(search_id, search_type = 'artists')
        return r_artist['genres']
    
#    def filtered_search(self, query = None, search_type = 'track'):
#        #If no query, raise exception
#        if query == None:
#            raise Exception('Error: Query is required')
#            
#        #If query is a dict, format the query into one line
#        if isinstance(query, dict):
#            querylist = [f'{k}:{v}' for k,v in query.items()]
#            query = ' '.join(querylist)
#
#        return self.search(query, search_type)
    
    def to_query_list(self, df):
        '''
        Creates a list of strings from a data frame
        keys = column names
        values = column values
        Returns a list of strings where all key/value pairs are concatenated
        '''
        valid_filters = self.valid_filters
        
        #Check if the filters are valid
        if not set(df.columns).issubset(valid_filters):
            raise Exception('Invalid filter type included')
        
        #search filters --> track_data.columns 
        #search values --> track_data.values

        #Initialize output list
        query_list = []
        
        #Loop through each row
        for row in df.values:
            row_dict = {} #Initialized dict

            #Loop through each column, add {col_name:value} to the dictionary
            for i,s_filter in enumerate(df.columns):
                row_dict.update({s_filter:row[i]})

            #Append the dictionary to the output list
            query_list.append(row_dict)

        #Loop through each dictionary and convert to a list of strings
        for i,row in enumerate(query_list):
            filter_value = [f'{k}:{v}' for k,v in row.items()]
            query_list[i] = ' '.join(filter_value)
            
        return query_list

        
         

In [4]:
sp = SpotifyAPI(client_id, client_secret)

In [5]:
with open('filenames.txt') as fid:
    file_list = fid.read()
    
file_list = file_list.split('\n')

file_list

['JStreamingHistory_2021_1.json',
 'JStreamingHistory_2021_2.json',
 'JStreamingHistory_2021_3.json']

In [6]:
#Load files and combine them into a master file

import json
import pandas as pd

for i,file in enumerate(file_list):
    temp = pd.read_json(file)
    temp.to_csv()
    
    if i == 0:
        d = temp.copy()
    else:
        d = pd.concat([d,temp], ignore_index = True)
        
d['artistName'] = d['artistName'].str.lower()
d['both'] = d['artistName'] + ' ' + d['trackName']


In [7]:
#Extract some items
#artists = list(d['artistName'].drop_duplicates())
#artists

track_df = d.drop_duplicates(subset='both') \
            .drop(columns={'endTime','msPlayed'}) \
            .rename(columns={'artistName':'artist','trackName':'track'}) \
            .reset_index(drop=True)

track_query = track_df[['artist','track']]

q = sp.to_query_list(track_query) #list of queries for the search


In [8]:
track_df

Unnamed: 0,artist,track,both
0,jeebanoff,Then We,jeebanoff Then We
1,nct 127,Chain - Korean Version,nct 127 Chain - Korean Version
2,nct u,90's Love,nct u 90's Love
3,seventeen,AH! LOVE,seventeen AH! LOVE
4,loote,tomorrow tonight,loote tomorrow tonight
...,...,...,...
1877,bazzi,I.F.L.Y.,bazzi I.F.L.Y.
1878,bazzi,Beautiful (feat. Camila Cabello),bazzi Beautiful (feat. Camila Cabello)
1879,lil nas x,THATS WHAT I WANT,lil nas x THATS WHAT I WANT
1880,bazzi,Paradise,bazzi Paradise


In [10]:
for i,track_query in enumerate(q):
    result = sp.search(track_query, search_type = 'track')   
    artist = track_df.loc[i, 'artist']
    artist_check = 0
    
    #If the search fails, try an unfiltered query with only alphanumeric characters and spaces
    #Do an extra check to make sure that the artist matches
    if isinstance(result, int):
        result = sp.search(track_df.loc[i, 'both'])
        
        #If it still fails, output ? for all fields except artist_id
        #Do a separate search for the artist id information
        if isinstance(result, int):
            track_df.loc[i,'track_id'] = '?'
            track_df.loc[i,'duration'] = '?'
            track_df.loc[i,'artist_id'] = '?'
            track_df.loc[i,'explicit'] = '?'
            continue
            
    #Ensure that the correct track and artist are chosen from results
    for num,items in enumerate(result['tracks']['items']):
        for artistnum, artistval in enumerate(result['tracks']['items'][num]['artists']):
            if artist == artistval['name'].lower():
                artist_check = 1
                break
                
        if artist_check > 0:
            break
        
                
    if artist_check > 0:
        track_df.loc[i,'track_id'] = result['tracks']['items'][num]['id']
        track_df.loc[i,'duration'] = result['tracks']['items'][num]['duration_ms']
        track_df.loc[i,'artist_id'] = result['tracks']['items'][num]['artists'][artistnum]['id']
        track_df.loc[i,'explicit'] = result['tracks']['items'][num]['explicit']
    else:
        track_df.loc[i,'track_id'] = '?'
        track_df.loc[i,'duration'] = '?'
        track_df.loc[i,'artist_id'] = '?'
        track_df.loc[i,'explicit'] = '?'
    
    
    
    
    



In [11]:
track_df

Unnamed: 0,artist,track,both,track_id,duration,artist_id,explicit
0,jeebanoff,Then We,jeebanoff Then We,2TIrMrDbvfGiOzzBv7Xg8k,232235,6FK6uP46ntwU9gaQQxTlDV,False
1,nct 127,Chain - Korean Version,nct 127 Chain - Korean Version,4yzHOucbzNABkX4yb5USVl,223600,7f4ignuCJhLXfZ9giKT7rH,False
2,nct u,90's Love,nct u 90's Love,4UlJjrQM5woly29xrGQpe8,161454,3paGCCtX1Xr4Gx53mSeZuQ,False
3,seventeen,AH! LOVE,seventeen AH! LOVE,5TgfB8nBnuKaytBbjmrzO7,194626,7nqOGRxlXj7N2JYbgNEjYH,False
4,loote,tomorrow tonight,loote tomorrow tonight,4vsAUbPb6q9FpC4AGBFdVh,202600,00TKPo9MxwZ0j4ooveIxWZ,False
...,...,...,...,...,...,...,...
1877,bazzi,I.F.L.Y.,bazzi I.F.L.Y.,4a6q8CR2hzLk2plDkSxkfD,165714,4GvEc3ANtPPjt1ZJllr5Zl,True
1878,bazzi,Beautiful (feat. Camila Cabello),bazzi Beautiful (feat. Camila Cabello),4VUwkH455At9kENOfzTqmF,180000,4GvEc3ANtPPjt1ZJllr5Zl,False
1879,lil nas x,THATS WHAT I WANT,lil nas x THATS WHAT I WANT,0e8nrvls4Qqv5Rfa2UhqmO,143901,7jVv8c5Fj3E9VhNjxT4snq,True
1880,bazzi,Paradise,bazzi Paradise,0Rx0DJI556Ix5gBny6EWmn,169038,4GvEc3ANtPPjt1ZJllr5Zl,True


In [12]:
artist_df = track_df[track_df['track_id'] != '?'].drop_duplicates(subset='artist').reset_index(drop=True)


In [13]:
no_genre = 0
for i,artist_id in enumerate(artist_df['artist_id']):
    result = sp.get_artists(artist_id)
    artist_df.loc[i,'popularity'] = result['popularity']
    artist_df.loc[i, 'followers'] = result['followers']['total']
    artist_df.loc[i, 'num_genres'] = len(result['genres'])
    
    if len(result['genres']) == 0: 
        artist_df.loc[i,'genres'] = 'unknown'
        no_genre += 1
    else: artist_df.loc[i,'genres'] = result['genres'][0] #Take only the first genre
    

    


In [14]:
artist_df = artist_df.drop(columns = {'duration','explicit','track','track_id'})

artist_df

Unnamed: 0,artist,both,artist_id,popularity,followers,num_genres,genres
0,jeebanoff,jeebanoff Then We,6FK6uP46ntwU9gaQQxTlDV,44.0,119376.0,4.0,chill r&b
1,nct 127,nct 127 Chain - Korean Version,7f4ignuCJhLXfZ9giKT7rH,68.0,5424745.0,2.0,k-pop
2,nct u,nct u 90's Love,3paGCCtX1Xr4Gx53mSeZuQ,64.0,3577991.0,2.0,k-pop
3,seventeen,seventeen AH! LOVE,7nqOGRxlXj7N2JYbgNEjYH,77.0,7001926.0,2.0,k-pop
4,loote,loote tomorrow tonight,00TKPo9MxwZ0j4ooveIxWZ,62.0,189652.0,12.0,alt z
...,...,...,...,...,...,...,...
514,h.i.n.p (hot issue of ntl. producers),h.i.n.p (hot issue of ntl. producers) Rumor,2eRjqXdTsssjpimRlz1i41,42.0,14492.0,2.0,k-pop
515,lightsum,lightsum Vanilla,57HNdw2ObRmfwWHG8Xhs8t,45.0,78906.0,2.0,k-pop
516,ive,ive ELEVEN,6RHTUrRF63xao58xh9FXYJ,70.0,756124.0,2.0,k-pop
517,bazzi,bazzi I.F.L.Y.,4GvEc3ANtPPjt1ZJllr5Zl,72.0,5056317.0,3.0,dance pop


In [15]:
genre_df = artist_df['genres'].value_counts()

In [16]:
###Add the main genre to the track df and combined df

#Extract a series where the artist in the index and the genres are the values, then convert this to a dictionary
maingenre = artist_df.copy().set_index('artist')['genres'].reset_index()
artist_genre_dict = {}

for (artist,genre) in zip(maingenre['artist'],maingenre['genres']):
    artist_genre_dict.update({artist:genre})
    
#Map the dictionary to the track_df
track_df['genres'] = track_df['artist'].map(artist_genre_dict)
track_df_final = track_df.fillna('?')

#Similarly, map the dictionary to the combined df
d['genres'] = d['artistName'].map(artist_genre_dict)
d_final = d.fillna('?')


In [17]:
#Save dataframes

d_final.to_csv('combined_df.csv', index = False)
track_df_final.to_csv('track_info.csv', index = False)
artist_df.to_csv('artist_info.csv', index = False)
genre_df.to_csv('genre_counts.csv')

In [20]:
#Extras
n_badSearch = track_df_final[ track_df_final['artist_id'] == '?'].shape[0]


print(f'Number of failed searches: {n_badSearch}/{track_df_final.shape[0]}')
print(f'Number of artists with no genres: {no_genre}/{artist_df.shape[0]}')

Number of failed searches: 100/1882
Number of artists with no genres: 51/519
