Spotify doesn't provide an API call to get a user's queue, so we do some creative stuff to get the user's queue, sort it, then add it back. There may be some latency.

Queue can be sorted by any of Spotify's exposed metric, found [here](https://developer.spotify.com/documentation/web-api/reference/#objects-index) in the audio features object

Note: Using this a lot will mess up your spotify metrics, as it technically plays all the songs in your queue everytime you run it

In [2]:
import base64
import datetime
from urllib.parse import urlencode

import requests
import json

#Follows Spotify's "Implicit Grant" Authorization flow
class SpotifyAPI(object):
    access_token = None
    access_token_expires = datetime.datetime.now()
    access_token_did_expire = True
    client_id = None
    client_secret = None
    token_url = "https://accounts.spotify.com/api/token"
    authorization_url = "https://accounts.spotify.com/authorize"
    
    #external functions ("public")
    

    def __init__(self, client_id, client_secret, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client_id = client_id
        self.client_secret = client_secret

    def get_album(self, _id):
        return self.get_resource(_id, resource_type='albums')
    
    def get_artist(self, _id):
        return self.get_resource(_id, resource_type='artists')

    def get_playlist(self, _id):
        return self.get_resource(_id, resource_type='playlists')

    def get_playlist_tracks(self, _id):
        return self.get_resource(_id, resource_type='playlists', addendum='tracks')

    def get_track_features(self, _id):
        return self.get_resource(_id, resource_type='audio-features')

    def get_track_analysis(self, _id):
      return self.get_resource(_id, resource_type='audio-analysis') 

    def search(self, query=None, operator=None, operator_query=None, search_type='artist', limit=20 ):
        if query == None:
            raise Exception("A query is required")
        if isinstance(query, dict):
            query = " ".join([f"{k}:{v}" for k,v in query.items()])
        if operator != None and operator_query != None:
            if operator.lower() == "or" or operator.lower() == "not":
                operator = operator.upper()
                if isinstance(operator_query, str):
                    query = f"{query} {operator} {operator_query}"
        query_params = urlencode({"q": query, "type": search_type.lower(), "limit": str(limit)})
        return self.base_search(query_params)


    #internal functions ("private")


    def get_client_credentials(self):
        """
        Returns a base64 encoded string
        """
        client_id = self.client_id
        client_secret = self.client_secret
        if client_secret == None or client_id == None:
            raise Exception("You must set client_id and client_secret")
        client_creds = f"{client_id}:{client_secret}"
        client_creds_b64 = base64.b64encode(client_creds.encode())
        return client_creds_b64.decode()
    
    def get_token_headers(self):
        client_creds_b64 = self.get_client_credentials()
        return {
            "Authorization": f"Basic {client_creds_b64}"
        }
    
    def get_token_data(self):
        return {
            "grant_type": "client_credentials"
        } 
    
    def perform_auth(self):
        token_url = self.token_url
        token_data = self.get_token_data()
        token_headers = self.get_token_headers()
        r = requests.post(token_url, data=token_data, headers=token_headers)
        if r.status_code not in range(200, 299):
            raise Exception("Could not authenticate client.")
            # return False
        data = r.json()
        now = datetime.datetime.now()
        access_token = data['access_token']
        expires_in = data['expires_in'] # seconds
        expires = now + datetime.timedelta(seconds=expires_in)
        self.access_token = access_token
        self.access_token_expires = expires
        self.access_token_did_expire = expires < now
        return True
    
    def get_access_token(self):
        token = self.access_token
        expires = self.access_token_expires
        now = datetime.datetime.now()
        if expires < now:
            self.perform_auth()
            return self.get_access_token()
        elif token == None:
            self.perform_auth()
            return self.get_access_token() 
        return token
    
    def get_resource_header(self):
        access_token = self.get_access_token()
        headers = {
            "Authorization": f"Bearer {access_token}"
        }
        return headers
        
        
    def get_resource(self, lookup_id, resource_type='albums', version='v1', addendum=''):
        endpoint = f"https://api.spotify.com/{version}/{resource_type}/{lookup_id}/{addendum}"
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()
    
    
    
    def base_search(self, query_params): # type
        headers = self.get_resource_header()
        endpoint = "https://api.spotify.com/v1/search"
        lookup_url = f"{endpoint}?{query_params}"
        r = requests.get(lookup_url, headers=headers)
        if r.status_code not in range(200, 299):  
            return {}
        return r.json()

    #generic api request, accepts the full endpoint string

    def generic_get(self, endpoint):
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            print(r.status_code)
            return {}
        return r.json()

    def generic_post(self, endpoint):
        headers = self.get_resource_header()
        r = requests.post(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
          print(r.status_code)
          return r.status_code
        return r.status_code
      

    def generic_put(self, endpoint, data={}):
        headers = self.get_resource_header()
        #r = requests.put(endpoint, headers=headers)
        r = requests.put(endpoint, data=json.dumps(data), headers=headers)
        if r.status_code not in range(200, 299):
            print(r.status_code)
            return {}
        # return r.json()

    #new functions for user functions

    def get_current_user(self):
        endpoint = f"https://api.spotify.com/v1/me"
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()

    def get_current_user_recently_played(self):
        endpoint = f"https://api.spotify.com/v1/me/player/recently-played"
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()

    def get_current_user_top(self, top_type='tracks', time_range='medium_term'):
        #endpoint = f"https://api.spotify.com/v1/me/top/{top_type}?limit=50&time_range{time_range}"
        endpoint = f"https://api.spotify.com/v1/me/top/{top_type}?time_range={time_range}&limit=50"
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            print(r.status_code)
            return {}
        return r.json()

    def get_user(self, user):
        endpoint = f"https://api.spotify.com/v1/users/{user}"
        headers = self.get_resource_header()
        r = requests.get(endpoint, headers=headers)
        if r.status_code not in range(200, 299):
            return {}
        return r.json()


In [27]:
import webbrowser
import json

import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt
import time

In [4]:
client_id = ""
client_secret = ""

spotify =  SpotifyAPI(client_id, client_secret)

In [5]:
scopes = ['user-read-email', 'user-read-private', 'playlist-read-collaborative', 'playlist-modify-public', 'playlist-read-private', 'playlist-modify-private', 'user-library-read', 'user-top-read', 'user-read-recently-played', 'user-read-currently-playing']
scopes_string = ' '.join(scopes)
authorization_url = "https://accounts.spotify.com/authorize"


params = {
            "client_id": client_id,
            "response_type": "token",
            "redirect_uri": "http://localhost:7777/callback",
            "scope": scopes_string
        } 

In [6]:
r = requests.get(authorization_url, params)
print("Successful if this is 200: " + str(r.status_code))
print("URL to paste in browser: " + str(r.url))

Successful if this is 200: 200
URL to paste in browser: https://accounts.spotify.com/login?continue=https%3A%2F%2Faccounts.spotify.com%2Fauthorize%3Fscope%3Duser-read-email%2Buser-read-private%2Bplaylist-read-collaborative%2Bplaylist-modify-public%2Bplaylist-read-private%2Bplaylist-modify-private%2Buser-library-read%2Buser-top-read%2Buser-read-recently-played%2Buser-read-currently-playing%26response_type%3Dtoken%26redirect_uri%3Dhttp%253A%252F%252Flocalhost%253A7777%252Fcallback%26client_id%3D5de0c3326eed4e17a4e2b226fc57a68f


In [10]:
#paste the entire url you are sent to here as a string(in quotes)
response = ''
response_list_raw = response.split('&')

response_list = []
#hard coded extracting token and expires
for item in response_list_raw:
  response_list.append(item.split('='))

response_list

access_token = response_list[0][1]
expires_in = response_list[2][1]
print(access_token)

BQCuULSJ9ihfeU2I0WLOCZkoLROyIBADjLKcn-ENFrhR5wHlVQ6sJwL0DC79ysR1diqTpQbEQ8u64qshHs57_A1kabfsP1obfvXC6_Oue2aylOJMFJ6rdi7gATdrZmYuxqDn2X4qiVMQ_lnbk0FGRzWyAIu9dxekeq1nuzY9MSTEFw5tqDKn2cdu2ejwGufkCvchJrvGXz8k8NOxFGRtvq4DEnSTu0Sx8mtps82Xd3fisbhA7m_GQ9WtenztNyY5lvc


In [11]:
spotify_user = SpotifyAPI(client_id, client_secret)

spotify_user.access_token = access_token
spotify_user.access_token_expires = datetime.datetime.now() + datetime.timedelta(seconds=3600)
spotify_user.access_token_did_expire = False



In [13]:
nu = spotify.search("Khalid", search_type='artist')

u = spotify_user.search("Khalid", search_type='artist')

print(spotify.access_token)
print(nu)
print(spotify_user.access_token)
print(u)

BQARhwI-It_WpXHyskvZ-CbXz_YjxytNeG-I4M1EtvGhMNuql8lApK1kj69cxcD5Bbv2L9bWaLfXduTt2is
{'artists': {'href': 'https://api.spotify.com/v1/search?query=Khalid&type=artist&offset=0&limit=20', 'items': [{'external_urls': {'spotify': 'https://open.spotify.com/artist/6LuN9FCkKOj5PcnpouEgny'}, 'followers': {'href': None, 'total': 14120534}, 'genres': ['pop'], 'href': 'https://api.spotify.com/v1/artists/6LuN9FCkKOj5PcnpouEgny', 'id': '6LuN9FCkKOj5PcnpouEgny', 'images': [{'height': 640, 'url': 'https://i.scdn.co/image/ab6761610000e5eb012b37d6dec8872b18524f78', 'width': 640}, {'height': 320, 'url': 'https://i.scdn.co/image/ab67616100005174012b37d6dec8872b18524f78', 'width': 320}, {'height': 160, 'url': 'https://i.scdn.co/image/ab6761610000f178012b37d6dec8872b18524f78', 'width': 160}], 'name': 'Khalid', 'popularity': 91, 'type': 'artist', 'uri': 'spotify:artist:6LuN9FCkKOj5PcnpouEgny'}, {'external_urls': {'spotify': 'https://open.spotify.com/artist/7w9NGVft7cOEEa3CGGILZM'}, 'followers': {'href': None

End Initialization and test. Begin New stuff

In [14]:
#finding jeopardy theme song ID
res = spotify.search('Jeopardy Theme', search_type='track')
res
# for track in res['items']:
#   print(track['name'] + ' : ' + track['id'])

{'tracks': {'href': 'https://api.spotify.com/v1/search?query=Jeopardy+Theme&type=track&offset=0&limit=20',
  'items': [{'album': {'album_type': 'album',
     'artists': [{'external_urls': {'spotify': 'https://open.spotify.com/artist/54qL6HOA0vyhhnFw5inqXq'},
       'href': 'https://api.spotify.com/v1/artists/54qL6HOA0vyhhnFw5inqXq',
       'id': '54qL6HOA0vyhhnFw5inqXq',
       'name': 'Mount Royal Orchestra',
       'type': 'artist',
       'uri': 'spotify:artist:54qL6HOA0vyhhnFw5inqXq'}],
     'available_markets': ['AD',
      'AE',
      'AG',
      'AL',
      'AM',
      'AO',
      'AR',
      'AT',
      'AU',
      'AZ',
      'BA',
      'BB',
      'BD',
      'BE',
      'BF',
      'BG',
      'BH',
      'BI',
      'BJ',
      'BN',
      'BO',
      'BR',
      'BS',
      'BT',
      'BW',
      'BY',
      'BZ',
      'CA',
      'CH',
      'CI',
      'CL',
      'CM',
      'CO',
      'CR',
      'CV',
      'CY',
      'CZ',
      'DE',
      'DJ',
      'DK',
   

In [15]:
def appendToUserQueue(track_uri):
  spotify_user.generic_post('https://api.spotify.com/v1/me/player/queue' + '?uri=' + track_uri)

In [16]:
def getUserCurrentTrack():
  return spotify_user.generic_get('https://api.spotify.com/v1/me/player/currently-playing')['item']

In [17]:
def getUserCurrentTrackProgress():
  return spotify_user.generic_get('https://api.spotify.com/v1/me/player/currently-playing')['progress_ms']

In [18]:
def skipToNextTrack():
  spotify_user.generic_post('https://api.spotify.com/v1/me/player/next')

In [19]:
def playTrack(track_uri, pos = 0):
  track_json = {}
  track_json['uris'] = [track_uri]
  track_json['position_ms'] = pos
  spotify_user.generic_put('https://api.spotify.com/v1/me/player/play', data = track_json)

In [20]:
def addAudioFeatures(track_list):
  track_id_string = ''
  for track in track_list:
    track_id_string = track_id_string + track['id'] + ','
  track_id_string = track_id_string[:-1] #remove last character of string, as an extra comma was added
  features = spotify.generic_get('https://api.spotify.com/v1/audio-features?ids=' + track_id_string)['audio_features']
  for i in range(len(track_list)):
    track_list[i]['danceability'] = features[i]['danceability']
    track_list[i]['energy'] = features[i]['energy']
    track_list[i]['key'] = features[i]['key']
    track_list[i]['loudness'] = features[i]['loudness']
    track_list[i]['mode'] = features[i]['mode']
    track_list[i]['speechiness'] = features[i]['speechiness']
    track_list[i]['acousticness'] = features[i]['acousticness']
    track_list[i]['instrumentalness'] = features[i]['instrumentalness']
    track_list[i]['liveness'] = features[i]['liveness']
    track_list[i]['valence'] = features[i]['valence']
    track_list[i]['tempo'] = features[i]['tempo']
    track_list[i]['time_signature'] = features[i]['time_signature']

  return track_list
  


In [21]:
def sortTracksByFeatures(track_list, feature = 'energy', ascending=True):
  def getFeature(track):
    return track[feature]

  # sort list with key
  track_list.sort(key=getFeature, reverse= not ascending)

In [22]:
def sweepVolumeDown(speed = 10):
  for volume in range(0, 101, speed):
    spotify_user.generic_put('https://api.spotify.com/v1/me/player/volume?volume_percent=' + str(100 - volume))

In [23]:
def sweepVolumeUp(speed = 10):
  for volume in range(0, 101, speed):
    spotify_user.generic_put('https://api.spotify.com/v1/me/player/volume?volume_percent=' + str(volume))   

In [24]:
#To get the user's queue, we;
#1 - add a song with a known ID (in this case, the jeopardy wait song) to the end of the queue (we could possibly make this the users currently playing song, but what if they have this song in the queue?)
#2 - skip songs until we get to our known ID, adding the ID of each song in the queue to a list
#3 - return the list of tracks 

#Current implementation seems to work, but does is not efficient, need to find a way to wait until posts are complete to do gets
def getUserQueue():
  user_queue = []
  appendToUserQueue('spotify:track:7GyGUC8QFMS9NoHJsgNqh1') #1
  time.sleep(1)
  skipToNextTrack()
  time.sleep(1)
  current_track = getUserCurrentTrack()
  time.sleep(1)
  while current_track['id'] != '7GyGUC8QFMS9NoHJsgNqh1': #2
    user_queue.append(current_track)
    skipToNextTrack()
    time.sleep(1)
    current_track = getUserCurrentTrack()
    time.sleep(1)


  return user_queue #3



In [31]:
#To sort a user's queue, we:
#1 - get the users currently playing track and the time in the track at which we are beginning this
#2 - get the items currently in the User's Queue
#3 - return the user's playback to what they were listening to while we sort their queue
#4 - sort their queue by passed values
#5 - add tracks back to queue in sorted order
def sortUserQueue(feature = 'energy', ascending = True):
  #sweepVolumeDown()
  current_track = getUserCurrentTrack() #1
  progress = getUserCurrentTrackProgress()
  user_queue = getUserQueue() #2
  playTrack(current_track['uri'], pos = progress) #3
  #sweepVolumeUp()
  user_queue_plus_features = addAudioFeatures(user_queue) #4
  sortTracksByFeatures(user_queue_plus_features, feature, ascending)
  for track in user_queue_plus_features: #5
    appendToUserQueue(track['uri'])




In [32]:
sortUserQueue()

In [33]:
queue = getUserQueue()
for track in queue:
  print(track['name'])

Ohio Is For Lovers - Lo-Fi
Circadian Deity
Friends Suck
Hosnian Prime
God Damn
Violet
Greyblood


In [30]:
for track in queue:
  print(track['name'])