In [None]:
import requests
import base64
import datetime
import pandas as pd
import json
from urllib.parse import urlencode

In [None]:
client_id = "e974cca9b4ab4f79bccfe29164c97041"
client_secret = "dd9841ff8d194dfba74563d8978e7899"

In [7]:
class SpotifyAPI(object):
    access_token = None
    access_token_expires = datetime.datetime.now()
    access_token_did_expire = True
    client_id = None
    client_secret = None 
    token_url = "https://accounts.spotify.com/api/token"
    
    def __init__(self, client_id, client_secret, *args, **kwargs):
        super().__init__(*args, *kwargs)
        self.client_id = client_id 
        self.client_secret = client_secret
    
    def get_client_cred(self):
        """returns a base64 string """
        client_id = self.client_id
        client_secret = self.client_secret
        if client_id == None or client_secret == None:
            raise Exception("you must set client id and client secret ")
        client_cred= f"{client_id}:{client_secret}"
        client_cred_base64 = base64.b64encode(client_cred.encode())
        return client_cred_base64.decode()

        
    def get_token_data(self):
        return {
            "grant_type" : "client_credentials"
        }
    
    def get_token_header(self):
        return {
    "Authorization": f"Basic {self.get_client_cred()}"  # base64 encoded <client_id>:<client_secret>
        }
    
    def perform_auth(self):
        token_url = self.token_url
        token_data = self.get_token_data()
        token_header = self.get_token_header()
        r = requests.post(token_url, data = token_data, headers = token_header)
        if r.status_code in range(200, 299):
            token_response_data = r.json()
            self.access_token = token_response_data['access_token']
            now = datetime.datetime.now()
            expires_in = token_response_data['expires_in']
            expires = now + datetime.timedelta(seconds = expires_in)
            self.access_token_expires = expires
            self.access_token_did_expire = expires < now
            return True
        else:
            raise Exception("Couldn't Authenticate client")
    
    def get_access_token(self):
        token = self.access_token
        expires = self.access_token_expires 
        now = datetime.datetime.now()
        if expires < now:
            self.perform_auth()
            return self.get_access_token()
        elif token == None:
            self.perform_auth()
            return self.get_access_token()
        return token
    
    def get_resource_headers(self, ):
        access_token = self.get_access_token() 
        headers = {
            "Authorization" : f"Bearer {access_token}"
        }
        return headers
    
    
    def get_resource(self, lookup_id, resource_type = "albums", version = 'v1'):
        endpoint_url = f"https://api.spotify.com/{version}/{resource_type}/{lookup_id}"
        headers = self.get_resource_headers()
        r = requests.get(endpoint_url, headers = headers)
        if r.status_code not in range(200, 299):
            print(r.status_code)
            return {}
        else:
            return r.json()
    
    def get_album(self, _id):
        return self.get_resource(_id,resource_type = "albums" )
   
    def get_artist(self, _id):
        return self.get_resource(_id,resource_type = "artists" )
    
    
    def base_search(self, query_params, search_type = 'artist'):
        headers = self.get_resource_headers()
        # search endpoint
        # look at an example response format 
        endpoint = "https://api.spotify.com/v1/search"
        lookup_url = f"{endpoint}?{query_params}"
        print(lookup_url)
        r = requests.get(lookup_url, headers = headers)
        if r.status_code not in range(200,299):
            print(f"No Response object returned, Status code: {str(r.status_code)}")
            return {}
        else:
            return r.json()
        
    def search(self, query=None, operator= None, operator_query = None, search_type = 'artist',
              limit = 20 ):
        
        if query == None:
            raise Exception(" Require a query to perform search!")
        
        if isinstance(query, str):
            response_file_name = f"{query}.csv"
                    
        if isinstance(query, dict):
            response_file_name = "outputs/" + "_".join([str(v) for k,v in query.items()]) + ".csv"
            query = " ".join([f"{k}:{v}" for k,v in query.items()])
     
        
        if operator != None and operator_query != None:
            if operator.lower() == "or" or operator.lower() == 'not':
                operator = operator.upper()
                if isinstance(operator_query, str):
                    query = f"{query} {operator} {operator_query  }"
        query_params  = urlencode({'q':query, 'type' : search_type.lower(), 'limit': limit})
        print(query_params)
        response =  self.base_search(query_params, search_type = 'artist')
        with open('response.json', 'w') as f:
            f.write(json.dumps(response))
        if response:
            response_df = self.parse_search_2_df(response, search_type)
            if response_df.empty:
                raise Exception("Response object empty. please check your query!")
            else:
                print(f"writing output to {response_file_name} ...")
                response_df.to_csv(response_file_name, index=False)
                print("Done!")

    def parse_search_2_df(self, response, search_type = "tracks"):
        col_list = []
        row_list= []
        if search_type.lower() == 'track':
            items_list = response['tracks']['items']
        if search_type.lower() == 'album':
            items_list = response['albums']['items']
        
        if len(items_list) > 0:

            for item in items_list:
                row = {}
                if isinstance(item, dict):
                    for key in item:
                        if key == 'album':
                            album_data = item['album']
                            row['album_id']  = album_data['id']
                            row['album_type']= album_data['album_type']
                            row['album_name'] = album_data['name']
                            row['release_date'] = album_data['release_date']
                            row['total_tracks'] = album_data['total_tracks']
                            row['release_date_precision'] = album_data['release_date_precision']
                            row['album_links'] = album_data['external_urls']['spotify']
                        if key == 'artists':
                            artist_data = item['artists'][0]
                            row['artist_id'] = artist_data['id']
                            row['artist_name'] = artist_data['name']
                            row['artist_bio_link'] = artist_data['external_urls']['spotify']
                        row['track_id'] = item['id']
                        row['track_name'] = item['name']
                        row['track_number'] = item['track_number']
                        row['disc_number'] = item['disc_number']
                        row['duration_ms'] = item['duration_ms']
                        row['type'] = item['type']
                        row['popularity'] = item['popularity']
                        row['is_explicit'] = item['explicit']
                        row['available_markets'] = item['available_markets']
                    col_list = list(row.keys())
                    row_list.append([val for val in row.values()])

            return pd.DataFrame(row_list, columns=col_list)
        else:
            return pd.DataFrame()
        
    

In [8]:
spotify = SpotifyAPI(client_id, client_secret)

query = { "artist" : "Michael Jackson" }
spotify.search(query, search_type = "track",limit = 10)

q=artist%3AMichael+Jackson&type=track&limit=10
https://api.spotify.com/v1/search?q=artist%3AMichael+Jackson&type=track&limit=10
writing output to outputs/Michael Jackson.csv ...


OSError: ignored