In [1]:
from urllib import parse
import config
import secrets
import db
from bson import ObjectId, json_util
import requests
import time

In [2]:
def request_user_authentication(user_id):
    url = "https://accounts.spotify.com/authorize?"
    params = {}
    params["client_id"] = config.spotify_client_id
    params["response_type"] = "code"
    params["redirect_uri"] = config.redirect_uri
    params["state"] = user_id
    params["scope"] = "user-top-read user-read-currently-playing user-library-read"
    return url+parse.urlencode(params)

In [3]:
print(request_user_authentication("6309be8bef6052b04d363855"))

https://accounts.spotify.com/authorize?client_id=cac02362fc1b4686ae9373824e1f60a4&response_type=code&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fspotify-callback&state=6309be8bef6052b04d363855&scope=user-top-read+user-read-currently-playing+user-library-read


In [4]:
def refresh_token(user):
    refresh_token = user["refresh_token"]
    body = {}
    body["grant_type"] = "refresh_token"
    body["refresh_token"] = refresh_token
    body["client_id"] = config.spotify_client_id
    body["client_secret"] = secrets.spotify_client_secret
    url = "https://accounts.spotify.com/api/token"
    response = requests.post(url, data=body)
    response_json = response.json()
    access_token = response_json["access_token"]
    user["access_token"] = access_token
    db.user_collection.update_one({"_id": user["_id"]}, {"$set": user})
    print(response.content)

In [5]:
def get_currently_playing(user):
    try:
        access_token = user["access_token"]
    except KeyError:
        return None
    url = "https://api.spotify.com/v1/me/player/currently-playing"
    response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
    if (response.status_code == 401):
        refresh_token(user)
        response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
    return response

In [6]:
user = db.user_collection.find_one({"_id": ObjectId("6309b33c7cacc487a2739214")})

In [7]:
def update_user_playing(user):
    response = get_currently_playing(user)
    if (response is not None):
        if (len(response.content) == 0):
            user["currently_listening"] = None
        else:
            response_json = response.json()
            if (response_json["is_playing"]):
                user["currently_listening"] = response_json["item"]["external_urls"]["spotify"]
            else:
                user["currently_listening"] = None
    db.user_collection.update_one({"_id": user["_id"]}, {"$set": user})

In [8]:
update_user_playing(user)

In [9]:
def get_user_favourites(user):
    try:
        access_token = user["access_token"]
    except KeyError:
        return None
    url = f"https://api.spotify.com/v1/me/top/tracks?offset=0&limit=50"
    response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
    if (response.status_code == 401):
        refresh_token(user)
        response = requests.get(url, headers={"Authorization": f"Bearer {access_token}"})
    return response

In [10]:
response = get_user_favourites(user)
response_json = response.json()
print(response_json["items"][0]["external_urls"])
top_uris = [item["external_urls"]["spotify"] for item in response_json["items"]]

{'spotify': 'https://open.spotify.com/track/6WeCNrVIIGnmWH9LX5NpeH'}


In [11]:
def update_user_favourites(user):
    response = get_user_favourites(user)
    if (response is not None):
        response_json = response.json()
        top_uris = [item["external_urls"]["spotify"] for item in response_json["items"]]
        user["top_tracks"] = top_uris
        db.user_collection.update_one({"_id": user["_id"]}, {"$set": user})

In [12]:
update_user_favourites(user)

In [13]:
def spotify_worker():
    while(True):
        all_users = db.user_collection.find()
        for user in all_users:
            get_currently_playing(user)
            if not "top_tracks" in user.keys():
                update_user_favourites(user)
        time.sleep(10)

In [None]:
spotify_worker()

b'{"access_token":"BQCFTb9YHyR447TPpHilu0jlgDaJg7Pupug3rGm-MAsaGc_qcjV4_cDqYYoK_YmfK3t3q-FTzWRdun8UHHZL6Fuf7LK07PK7WEwqUyPwp7UHL-OZIMCB_iw2IRrV3aPIZeHIuzDWXE4UNhGOQiq91uXigq3NilaBxbeNCsGgoA37TrxnvCZ78O9k1vHQ4Yy-JP7JDZu8-KRpRmo4j2FoTIHW-mIFS4r2BMeEpZJNTXpYgFY","token_type":"Bearer","expires_in":3600,"scope":"playlist-read-private user-library-read user-read-currently-playing user-read-recently-played user-read-private user-top-read"}'
