Skip to content

Commit

Permalink
Make Hue and Spotify setup tasks singletons (#16)
Browse files Browse the repository at this point in the history
* make setup tasks singleton tasks, move backend dev requirements to backend directory

* tidy up a little, make singleton task locks' TTL an adjustable param

* lengthen Spotify oauth timeout

* battling React dev mode

* add clarifying comment to decorator wrapper,  make SPOTIFY_OAUTH_TIMEOUT configurable
  • Loading branch information
BeeStag committed Jan 5, 2024
1 parent 79fbb87 commit 5b2ef16
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 13 deletions.
3 changes: 2 additions & 1 deletion backend/.example-env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ HUE_BRIDGE_IP_ADDRESS=address
SPOTIFY_SCOPE="user-read-currently-playing"
SPOTIFY_CLIENT_ID=client_id
SPOTIFY_CLIENT_SECRET=client_secret
SPOTIFY_REDIRECT_URI="http://localhost:8888/callback/"
SPOTIFY_REDIRECT_URI="http://localhost:8888/callback/"
SPOTIFY_OAUTH_TIMEOUT=180 # 3 minutes
File renamed without changes.
19 changes: 15 additions & 4 deletions backend/spotihue/oauth.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
import logging
import os
import webbrowser
from urllib import parse
from typing import Optional
from urllib import parse

from spotipy import oauth2, util


logger = logging.getLogger(__name__)

SPOTIFY_OAUTH_TIMEOUT = os.getenv('SPOTIFY_OAUTH_TIMEOUT', 60 * 3) # default 3 minutes


class HTTPServer(oauth2.HTTPServer):
timeout = SPOTIFY_OAUTH_TIMEOUT


def start_local_http_server(port, handler=oauth2.RequestHandler):
server = oauth2.HTTPServer(("0.0.0.0", port), handler)
server = HTTPServer(("0.0.0.0", port), handler)
server.allow_reuse_address = True
server.auth_code = None
server.auth_token_form = None
server.error = None
return server


class SpotifyOauthSocketTimeout(oauth2.SpotifyOauthError):
pass


class SpotifyOauth(oauth2.SpotifyOAuth):
def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -68,8 +79,8 @@ def _get_auth_response_local_server(self, redirect_port: int, open_browser: Opti
"Received error from OAuth server: {}".format(server.error)
)
else:
raise oauth2.SpotifyOauthError(
"Server listening on localhost has not been accessed"
raise SpotifyOauthSocketTimeout(
"Server listening on localhost was not accessed"
)

def get_auth_response(self, open_browser: Optional[bool] = None) -> str:
Expand Down
107 changes: 99 additions & 8 deletions backend/spotihue/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,105 @@
from contextlib import contextmanager
from functools import wraps
import logging
import random
import time
from typing import List
from typing import Generator, List, Optional

from celery.app import task
from phue import PhueException
from redis import Redis
from spotipy.oauth2 import SpotifyOauthError

from . import celery_app, constants, redis_client, spotihue
from . import celery_app, constants, oauth, redis_client, spotihue

logger = logging.getLogger(__name__)


class SingletonTask(task.Task):

@staticmethod
def run_singleton_task(bound_task_function):
"""Meant to decorate a bound celery task function such that there can only be 1 instance of the
task running at any given point.
Ex:
@celery_app.task(base=SingletonTask, bind=True)
@run_singleton_task
def my_celery_task(self, other_param):
pass
Args:
bound_task_function (Callable): a celery task function which is bound (ie. receives itself
as its 1st argument).
Returns: decorator for a celery @task-decorated function/Task subclass .run() implementation.
"""
@wraps(bound_task_function)
def wrapper(*args, **kwargs):
# Because this decorator is invoked statically above task declarations, we access
# the Task object's self via the following derpy line...
me = args[0]

# ...so that we can collect the task's name and ID.
my_task_name = me.name
my_task_id = me.request.id

my_task_lock = SingletonTaskLock(lock_id=my_task_name, redis=redis_client)

with my_task_lock.acquire_for(my_task_id) as acquired:
if acquired is True:
logger.info(f'Acquired lock for {my_task_name}; running task {my_task_id}')
return bound_task_function(*args, **kwargs)
else:
logger.info(f'Another {my_task_name} task is already being invoked; exiting task {my_task_id}')

return wrapper


class SingletonTaskLock:
"""
lock_id (key) = a task's name.
lock values = task IDs.
"""
DEFAULT_MAX_TTL = 60 * 5 # 5 minutes

def __init__(self, lock_id: str, redis: Redis, lock_max_ttl: Optional[int] = DEFAULT_MAX_TTL):
self.lock_id = lock_id
self.redis = redis
self.lock_max_ttl = lock_max_ttl

@contextmanager
def acquire_for(self, task_id: str) -> Generator[str, None, None]:
"""Attempts to acquire a simple Redis lock for a "singleton" celery task with ID task_id.
Args:
task_id (str): ID of a celery task that worker is trying to run.
Returns:
Generator[str, None, None]: context manager to get lock acquisition status for task,
then subsequently relinquish that acquired lock (if the initial acquisition
was successful).
"""
lock = self.redis.get(self.lock_id)

if lock is None:
# if there are 2 dueling tasks (queued within a few hundredths of a second
# apart), let one of them win.
time.sleep(random.uniform(.01, .1))
lock = self.redis.get(self.lock_id)

lock_acquired = bool(lock is None or (isinstance(lock, bytes) and lock.decode('utf-8') == task_id))

try:
if lock_acquired:
self.redis.setex(self.lock_id, self.lock_max_ttl, task_id)
yield lock_acquired
finally:
if lock_acquired:
self.redis.delete(self.lock_id)


def is_spotihue_running() -> bool:
"""Queries for the SpotiHue task by its cached spotihue_task_id. There should only be one
run_spotihue task running at any given time.
Expand Down Expand Up @@ -89,9 +178,10 @@ def run_spotihue(lights: List[str], current_track_retries: int = 0) -> None:
time.sleep(sleep_duration)


@celery_app.task
def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None:
logger.info("Attempting to connect to Hue bridge...")
@celery_app.task(base=SingletonTask, bind=True)
@SingletonTask.run_singleton_task
def setup_hue(self, backoff_seconds: int = 5, retries: int = 5) -> None:
logger.info('Attempting to connect to Hue bridge...')

for attempt in range(retries):
try:
Expand All @@ -109,9 +199,10 @@ def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None:
raise


@celery_app.task
def listen_for_spotify_redirect() -> None:
logger.info(f"Waiting to receive user authorization from Spotify...")
@celery_app.task(base=SingletonTask, bind=True, throws=(oauth.SpotifyOauthSocketTimeout,))
@SingletonTask.run_singleton_task
def listen_for_spotify_redirect(self) -> None:
logger.info('Waiting to receive user authorization from Spotify...')
spotify_oauth = spotihue.spotify_oauth

try:
Expand Down

0 comments on commit 5b2ef16

Please sign in to comment.