In [1]:
import os
import json
import keyring
import requests
import traceback
import numpy as np
import pandas as pd
from OsOps import Ops
from spotipy import client, util
from collections import namedtuple, Counter

In [2]:
# SUPPLY AUTH CREDS FROM STORED IN WIN CRED MGR
user_id, sp_cid, sp_sec = ( 
    keyring.get_password( service, "" )
    for service in [
        "spottyPie_user_id",
        "spottyPie_clientID", 
        "spottyPie_clientSecret"] )

if None in [user_id, sp_cid, sp_sec]: print( "Credential not set locally" )

In [3]:
ops = Ops()

# standard API authenticate (not used)
def getAuthResponse():
    try: return (
        requests.post( 'https://accounts.spotify.com/api/token', 
            { 'grant_type': 'client_credentials',
                'client_id': sp_cid,
                'client_secret': sp_sec, }) )
    except Exception as exc: print( 
        f"\n[ ERRR ] {exc.__class__}"
        f"\n[ DSTR ] {exc.__doc__}"
        f"\n[ CTXT ] {exc.__context__}"
        f"\n{ '='*79 }"
        f"\n\n{traceback.format_exc()}" )

# get authenticated spotipy object
def auth_SpotPy():
    token = util.prompt_for_user_token(
        username= user_id, 
        scope= " ".join( [
            "playlist-read-private",
            "playlist-modify-private",
            "user-library-modify",
            "user-read-private" ]),
        client_id= sp_cid, 
        client_secret= sp_sec, 
        redirect_uri = "http://example.com/")

    return client.Spotify(token)

spot = auth_SpotPy()

In [9]:
# full albums from tracks in any list, sorted by largest album

def createNewPL( pl_name="New_PL", stamp=True, ):
    try: return spot.user_playlist_create( 
        user= user_id, 
        name= f"{pl_name}_{ops.dtStamp()}" if stamp else pl_name, 
        public=False )
    except Exception as e: return f"EXC createNewPL:\n{type(e).__name__}\n{e}"

def albsFromPList( pl_id ):
    track_list = spot.user_playlist_tracks( user=user_id, playlist_id=pl_id, )
    return { i : {
        "alb_id" : item['track']['album']['id'], 
        "tracks" : item['track']['album']['total_tracks'],
        "dct" : item, } 
        for i, item in enumerate( track_list['items']) }

def getAllTrackIDs( listDict ):
    Alb = namedtuple( "Alb", [ "alb_id", "siz", "dct" ] )
    albsSorted = sorted( [ 
        Alb( d["alb_id"], d['tracks'], d["dct"] )
        for _, d in listDict.items() ],
        key = lambda el: el.siz, reverse=True )
    return [ item['id']
        for alb in albsSorted
        for item in spot.album_tracks( alb.alb_id )['items'] ]

def addTracksByID( idList, destination ):
    def yieldSegments(li, size):
        for i in range(0, len(li), size): yield li[ i:i + size ]
    for seg in yieldSegments(idList, 100):  # max 100
        spot.user_playlist_add_tracks(
            user_id, 
            playlist_id=destination, 
            tracks=seg)
            
created_pl = createNewPL( pl_name="rrad" )
# # created_pl = createNewPL( pl_name="dsco" )
rradDct = albsFromPList( "37i9dQZEVXbuX4MySjIacD" ) # release radar 
# # dscoDct = albsFromPList( "37i9dQZEVXcXssf47BUM1F" ) # dscov weekly 
allTrackIDs = getAllTrackIDs( rradDct )
# # allTrackIDs = getAllTrackIDs( dscoDct )
addTracksByID( allTrackIDs, created_pl['id'] )
# # addTracksByID( allTrackIDs, created_pl['id'] )

In [5]:

# distance-shuffle a list (maximum distance between same-album songs)

def get_all_pl_tracks( user_id, pl_id):
    rq_dct = spot.user_playlist_tracks( user_id, pl_id )
    tracks = rq_dct['items']
    while rq_dct['next']:
        rq_dct = spot.next(rq_dct)
        tracks.extend(rq_dct['items'])
    return tracks

def get_bins(posits, count, is_reverse): return (
    np.array_split( (posits[::-1] if is_reverse else posits ), count) )
    
def distanceShuffle( input ):
    
    counted = { val : count for val, count in Counter( input ).most_common() }
    length = len(input)
    posits = np.arange( 1, length+1 )
    output = { i : None for i in posits }
    
    is_reverse = False
    for val, count in counted.items():
        for bin in get_bins(posits, count, is_reverse):
            for pos in bin:
                if output[pos]: continue 
                else:
                    output[pos] = val
                    break
                
        is_reverse = not is_reverse # changes direction of bin split-add
    
    return output

In [6]:
# add: group same-artist tracks under pseudo-alb_id prior to alb segment

def shuffleMain(pl_id):
    pl = spot.playlist(pl_id)
    track_dct = get_all_pl_tracks( user_id, pl_id)

    in_list = [ item["track"]["album"]["id"] for item in track_dct ]
    group_dct = { group_id : [] for group_id in in_list }

    for group_id, group_list in group_dct.items():
        group_list.extend( item["track"]["id"] for item in track_dct 
            if item["track"]["album"]["id"] == group_id )
            
    shuffle = distanceShuffle( in_list )

    out = [ 0 for i in range(len(track_dct)) ]
    for pos, group_id in shuffle.items(): out[pos-1] = group_dct[group_id].pop()

    if ( set([item["track"]["id"] for item in track_dct ]) - set(out) 
        ) or ( len(in_list) != len(out) ): print("somethin ain't right")

    created_pl = createNewPL( pl_name=f"{pl['name']}_shfl" )
    return addTracksByID( out, created_pl['id'] )

# https://open.spotify.com/playlist/552cWxXtbdxXHpYOUxn9os?si=b3303c6be2154c61
# shuffleMain("552cWxXtbdxXHpYOUxn9os")

In [7]:
# get library as dict / dataframe ( all playlists, all tracks ) ~10min
def getPlaylists( asDict=False ):
    '''folder structure not available through API as at 221224'''
    incrmt = 50  # max 50
    offset = 0
    plistsDct = {}
    while True:
        newItemDcts = spot.current_user_playlists( incrmt, offset)["items"]
        plistsDct.update( { iDct["id"]: iDct for iDct in newItemDcts } )
        if len(newItemDcts) < incrmt: 
            return pd.DataFrame( plistsDct ).T if not asDict else plistsDct
        else: offset += incrmt

def getTracks( id ):
    incrmt = 100  # max 100
    offset = 0
    tracks = {}
    while True:
        items = spot.user_playlist_tracks( 
            user=user_id, playlist_id=id, limit=incrmt, offset=offset )["items"]
        tracks.update( { i["track"]["id"]: i for i in items } )
        if len(items) < incrmt: return tracks
        else: offset += incrmt

# as pd DF
# df_pLists = getPlaylists()
# df_pLists["tracklist"] = df_pLists["id"].apply( lambda id: getAllTracks( id ) )
# ops.storePKL( df_pLists, "df_pLists", os.getcwd() )

# # as dict
# plDct = getPlaylists( asDict=True )
# for id, dct in plDct.items(): dct["tracks"].update({ "list": getTracks( id )})
# ops.storePKL( plDct, "plDct", os.getcwd() )

In [8]:
# PODCASTS: Latest [n] eps from shows in playlist of episodes
def latestNEps( pl_id, n=3 ):
    tracks = spot.user_playlist_tracks( user=user_id, 
        playlist_id=pl_id, )

    showIDs = set( artist['id']
        for item in tracks["items"]
        for artist in item["track"]["artists"] 
        if artist["type"]=="show" )  # skip non-podcast tracks

    epIDs = []
    for showID in showIDs:
        show_items = spot.show_episodes(showID)['items']
        eps_recent = sorted( [ ( item["release_date"], item["id"] )
            for item in show_items ], key= lambda i: i[0], reverse=True )
        epIDs.extend( f'spotify:episode:{id}' for _, id in eps_recent[:n] )
        
    return epIDs

# new_pCasts_PL = createNewPL( pl_name="PCST" )
# epIDs = latestNEps( "2PFeIO0B0DtenFmGKbzYvg", n=3 )
# addTracksByID( epIDs, new_pCasts_PL['id'] )