In [None]:
import requests 
from dotenv import load_dotenv
import os 
from urllib.parse import urlencode
import base64 
import webbrowser
import datetime as dt
from datetime import timedelta
import time

In [7]:
load_dotenv()

True

In [36]:
def fetch_auth_code(scope): 
    """ Fetch Authorization Code for initial authentication. Requires human interaction as Spotify doesn't allow for headless auth. """
    try: 
        auth_headers = {"client_id": os.getenv("SPOTIPY_CLIENT_ID"), 
                    "response_type": "code", 
                    "redirect_uri": os.getenv("SPOTIPY_REDIRECT_URI"), 
                    "scope": scope}
        webbrowser.open("https://accounts.spotify.com/authorize?" + urlencode(auth_headers))
        auth_code = input("Enter authorizaton code copied from browser: ")
        return auth_code
    except Exception as e: 
        print(f"Error occurred during code retrieval. {e}")

def fetch_token(auth_code): 
    """ After fetching the code, use this to fetch the access token that can be exchanged for user data."""
    try: 
        encoded_creds = base64.b64encode(os.getenv("SPOTIPY_CLIENT_ID").encode() + b":" + os.getenv("SPOTIPY_CLIENT_SECRET").encode()).decode("utf-8")
        token_headers = {"Authorization": "Basic " + encoded_creds, 
                    "Content-Type": "application/x-www-form-urlencoded"}
        token_data = {"grant_type": "authorization_code", 
                    "code": auth_code, 
                    "redirect_uri": os.getenv("SPOTIPY_REDIRECT_URI")}
        
        obtained_at = dt.datetime.now()
        expires_at = obtained_at + timedelta(seconds=3600)
        req = requests.post("https://accounts.spotify.com/api/token", data=token_data, headers=token_headers)
        access_token = req.json()["access_token"]
        refresh_token = req.json()['refresh_token']
        os.environ["SP_ACCESS_TOKEN"] = access_token
        os.environ["SP_REFRESH_TOKEN"] = refresh_token
        os.environ["SP_EXPIRES_AT"] = str(expires_at)
        os.environ["SP_OBTAINED_AT"] = str(obtained_at)

        resp = {"access_token":access_token, "obtained_at": obtained_at, "expires_at": expires_at, "refresh_token": refresh_token}

        return resp
    except requests.exceptions.RequestException as Exception: 
        print(f"Error occured during token retrieval: {Exception.args}")

def refresh_token(refresh_token): 
    """ Using the refresh token to generate a new access token."""
    try: 
        encoded_creds = base64.b64encode(os.getenv("SPOTIPY_CLIENT_ID").encode() + b":" + os.getenv("SPOTIPY_CLIENT_SECRET").encode()).decode("utf-8")
        header = {"Authorization": "Basic " + encoded_creds, 
                "Content-Type": "application/x-www-form-urlencoded"}
        params = {"grant_type": "refresh_token",
                  "refresh_token": os.getenv("SP_REFRESH_TOKEN")}
        resp = requests.post(url="https://accounts.spotify.com/api/token", data=params, headers=header)
        return resp.json()
    except requests.exceptions.RequestException as e: 
        print(f"Error occured during token retrieval: {e.args}")
        return None

def validate_token(): 
    """Checking the current access token for validity, updating if invalid """

    if dt.datetime.strptime(os.getenv("SP_EXPIRES_AT"), "%Y-%m-%d %H:%M:%S.%f") <= dt.datetime.now(): 
        token_details = refresh_token(os.getenv("SP_REFRESH_TOKEN"))
        return token_details
    else: 
        token_details = os.getenv("SP_ACCESS_TOKEN")
        return token_details
    
    

def fetch_song_page(link, retries): 
    """ Used to fetch an individual page of a user's song list, based on the link provided (offset and limit)"""
    tk = validate_token()
    numer = 0
    del tk
    while retries > 0 :
        try: 
            user_header = {"Authorization": "Bearer " + os.getenv("SP_ACCESS_TOKEN"), "Content-Type": "application/json"}
            user_params = {"limit":50}
            user_tracks_response = requests.get(link, params=user_params, headers=user_header)
            t = user_tracks_response.json()["items"]
            n = [t[idx]['track'] for idx, val in enumerate(t)]
        except requests.exceptions.RequestException as e: 
            print(f"Error encountered during retrieval of song: {e.args}")

            if e.errno == 429: 
                retries_copy = retries
                slp = 0.5
                print(f"Retrying link. Attempt {numer+1}/{retries_copy}")
                numer = numer +1 
                retries = retries - 1 
                time.sleep(slp)
                user_header = {"Authorization": "Bearer " + os.getenv("SP_ACCESS_TOKEN"), "Content-Type": "application/json"}
                user_params = {"limit":50}
                user_tracks_response = requests.get(link, params=user_params, headers=user_header)
                t = user_tracks_response.json()["items"]
                n = [t[idx]['track'] for idx, val in enumerate(t)]
            else: 
                n = None
        return n 
            

        



def get_total_songs(link):
    """ Determine the total number of tracks in a user's library. """
    token_details = validate_token()
    try: 
        user_header = {"Authorization": "Bearer " + os.getenv("SP_ACCESS_TOKEN"), "Content-Type": "application/json"}
        user_params = {"limit":1} # shrink the limit because we do not need an entire response
        user_tracks_response = requests.get("https://api.spotify.com/v1/me/tracks", params=user_params, headers=user_header)
        t = user_tracks_response.json()["total"]
        return t
    except requests.exceptions.RequestException as e: 
        print(f"Error encountered during request: {e.args}")
        
def generate_link_list(offset, limit, library_size): 
    """Make a list of urls to senq a request to the server for. Captures user's entire library"""
    # Logic to figure out total number of links needed in request 
    
    library_size = {"total_songs": library_size,
                    "total_pages_int": library_size // 50, 
                    "total_songs_rem":library_size % 50, 
                    "total_songs_int": library_size-(library_size % 50)} # number of pages to be fetched based on limit, remainder, to be fetched after first value is hit (?) 
    offset = 50 
    limit = 50
    track_links = []


    for pagenum in range(1, library_size["total_pages_int"]+1):

        if library_size["total_songs_rem"] > 0: 
            next_track_set = f'https://api.spotify.com/v1/me/tracks?offset={offset}&limit={limit}' 
            track_links.append(next_track_set)
            offset = offset + 50
            if offset == library_size["total_songs_int"]: 
                track_links.append(f"https://api.spotify.com/v1/me/tracks?offset={offset+library_size["total_songs_rem"]}&limit={limit}")
        else: 
            next_track_set = f'https://api.spotify.com/v1/me/tracks?offset={offset}&limit={limit}' 
            track_links.append(next_track_set)
            offset = offset + 50
    return track_links
    
    



In [37]:
scope = "user-library-read playlist-read-private playlist-read-collaborative"

In [38]:
auth_code = fetch_auth_code(scope)

In [39]:
token_details = fetch_token(auth_code)

In [40]:
library_size = get_total_songs("https://api.spotify.com/v1/me/tracks")

In [41]:
link_list = generate_link_list(0, 50, library_size)

In [43]:
fetch_song_page(link_list[1], 2)

[{'album': {'album_type': 'album',
   'artists': [{'external_urls': {'spotify': 'https://open.spotify.com/artist/3e7awlrlDSwF3iM0WBjGMp'},
     'href': 'https://api.spotify.com/v1/artists/3e7awlrlDSwF3iM0WBjGMp',
     'id': '3e7awlrlDSwF3iM0WBjGMp',
     'name': 'Little Mix',
     'type': 'artist',
     'uri': 'spotify:artist:3e7awlrlDSwF3iM0WBjGMp'}],
   'available_markets': ['AR',
    'AU',
    'AT',
    'BE',
    'BO',
    'BR',
    'BG',
    'CA',
    'CL',
    'CO',
    'CR',
    'CY',
    'CZ',
    'DK',
    'DO',
    'DE',
    'EC',
    'EE',
    'SV',
    'FI',
    'FR',
    'GR',
    'GT',
    'HN',
    'HK',
    'HU',
    'IS',
    'IE',
    'IT',
    'LV',
    'LT',
    'LU',
    'MY',
    'MT',
    'MX',
    'NL',
    'NZ',
    'NI',
    'NO',
    'PA',
    'PY',
    'PE',
    'PH',
    'PL',
    'PT',
    'SG',
    'SK',
    'ES',
    'SE',
    'CH',
    'TW',
    'TR',
    'UY',
    'US',
    'GB',
    'AD',
    'LI',
    'MC',
    'ID',
    'JP',
    'TH',
    'VN',
    

In [None]:
from dask.delayed import delayed 
output = []

for link in link_list