From e4fff1abf81525ef2d1fff6edb1c0b0aee2dc81d Mon Sep 17 00:00:00 2001 From: Camille <142921874+BeeStag@users.noreply.github.com> Date: Wed, 1 Nov 2023 17:13:48 -0400 Subject: [PATCH 1/5] make setup tasks singleton tasks, move backend dev requirements to backend directory --- .../dev_requirements.txt | 0 backend/spotihue/oauth.py | 18 +++- backend/spotihue/tasks.py | 91 +++++++++++++++++-- 3 files changed, 99 insertions(+), 10 deletions(-) rename requirements.txt => backend/dev_requirements.txt (100%) diff --git a/requirements.txt b/backend/dev_requirements.txt similarity index 100% rename from requirements.txt rename to backend/dev_requirements.txt diff --git a/backend/spotihue/oauth.py b/backend/spotihue/oauth.py index e4bc118..c7cfd16 100644 --- a/backend/spotihue/oauth.py +++ b/backend/spotihue/oauth.py @@ -1,16 +1,22 @@ import logging import webbrowser -from urllib import parse from typing import Optional +from urllib import parse from spotipy import oauth2, util logger = logging.getLogger(__name__) +SPOTIFY_OAUTH_TIMEOUT = 60 * 2 # 2 minutes + + +class HTTPServer(oauth2.HTTPServer): + timeout = SPOTIFY_OAUTH_TIMEOUT + def start_local_http_server(port, handler=oauth2.RequestHandler): - server = oauth2.HTTPServer(("0.0.0.0", port), handler) + server = HTTPServer(("0.0.0.0", port), handler) server.allow_reuse_address = True server.auth_code = None server.auth_token_form = None @@ -18,6 +24,10 @@ def start_local_http_server(port, handler=oauth2.RequestHandler): return server +class SpotifyOauthSocketTimeout(oauth2.SpotifyOauthError): + pass + + class SpotifyOauth(oauth2.SpotifyOAuth): def __init__(self, **kwargs): super().__init__(**kwargs) @@ -68,8 +78,8 @@ def _get_auth_response_local_server(self, redirect_port: int, open_browser: Opti "Received error from OAuth server: {}".format(server.error) ) else: - raise oauth2.SpotifyOauthError( - "Server listening on localhost has not been accessed" + raise SpotifyOauthSocketTimeout( + "Server listening on localhost was not accessed" ) def get_auth_response(self, open_browser: Optional[bool] = None) -> str: diff --git a/backend/spotihue/tasks.py b/backend/spotihue/tasks.py index 3b1a25f..3b0a6c1 100644 --- a/backend/spotihue/tasks.py +++ b/backend/spotihue/tasks.py @@ -1,16 +1,93 @@ +from contextlib import contextmanager +from functools import wraps import logging import random import time -from typing import List +from typing import Generator, List +from celery.app import task from phue import PhueException +from redis import Redis from spotipy.oauth2 import SpotifyOauthError -from . import celery_app, constants, redis_client, spotihue +from . import celery_app, constants, oauth, redis_client, spotihue logger = logging.getLogger(__name__) +class SingletonTask(task.Task): + + @staticmethod + def run_singleton_task(bound_task_function): + """Meant to decorate a bound celery task function such that there can only be 1 instance of the + task running at any given point. + Ex: + + @celery_app.task(base=SingletonTask, bind=True) + @run_singleton_task + def my_celery_task(self, other_param): + pass + + Args: + bound_task_function (Callable): a celery task function which is bound (ie. receives itself + as its 1st argument). + + Returns: + + """ + @wraps(bound_task_function) + def wrapper(*args, **kwargs): + myself = args[0] + my_task_name = myself.name + my_task_id = myself.request.id + + my_task_lock = SingletonTaskLock(lock_id=myself.name, redis=redis_client) + + with my_task_lock.acquire_for(my_task_id) as acquired: + if acquired is True: + logger.info(f'Acquired lock for {my_task_name}; running task {my_task_id}') + return bound_task_function(*args, **kwargs) + else: + logger.info(f'Another {my_task_name} task is already being invoked; exiting task {my_task_id}') + + return wrapper + + +class SingletonTaskLock: + """ + lock_id (key) = task name. + lock values = task IDs. + """ + LOCK_MAX_DURATION_SECONDS = 60 * 5 # 5 minutes + + def __init__(self, lock_id: str, redis: Redis): + self.lock_id = lock_id + self.redis = redis + + @contextmanager + def acquire_for(self, task_id: str) -> Generator[str, None, None]: + """Acquires simple Redis lock for a "singleton" celery task with ID task_id. + + Args: + task_id (str): ID of a celery task that worker is trying to run. + + Returns: + Generator[str, None, None]: context manager to get lock acquisition status for task, + then subsequently relinquish that acquired lock (assuming the initial acquisition + was successful). + """ + lock = self.redis.get(self.lock_id) + lock_acquired = bool(lock is None or (isinstance(lock, bytes) and lock.decode('utf-8') == task_id)) + + try: + if lock_acquired: + self.redis.setex(self.lock_id, self.LOCK_MAX_DURATION_SECONDS, task_id) + yield lock_acquired + finally: + if lock_acquired: + self.redis.delete(self.lock_id) + + def is_spotihue_running() -> bool: """Queries for the SpotiHue task by its cached spotihue_task_id. There should only be one run_spotihue task running at any given time. @@ -83,8 +160,9 @@ def run_spotihue(lights: List[str], current_track_retries: int = 0) -> None: time.sleep(sleep_duration) -@celery_app.task -def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None: +@celery_app.task(base=SingletonTask, bind=True) +@SingletonTask.run_singleton_task +def setup_hue(self, backoff_seconds: int = 5, retries: int = 5) -> None: logger.info('Attempting to connect to Hue bridge...') for attempt in range(retries): @@ -103,8 +181,9 @@ def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None: raise -@celery_app.task -def listen_for_spotify_redirect() -> None: +@celery_app.task(base=SingletonTask, bind=True, throws=(oauth.SpotifyOauthSocketTimeout,)) +@SingletonTask.run_singleton_task +def listen_for_spotify_redirect(self) -> None: logger.info(f'Waiting to receive user authorization from Spotify...') spotify_oauth = spotihue.spotify_oauth From 6a6d2d0217c934d4800bbbb40233ad20ce754d5f Mon Sep 17 00:00:00 2001 From: Camille <142921874+BeeStag@users.noreply.github.com> Date: Thu, 2 Nov 2023 12:52:47 -0400 Subject: [PATCH 2/5] tidy up a little, make singleton task locks' TTL an adjustable param --- backend/spotihue/tasks.py | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/backend/spotihue/tasks.py b/backend/spotihue/tasks.py index 3b0a6c1..6170ab0 100644 --- a/backend/spotihue/tasks.py +++ b/backend/spotihue/tasks.py @@ -3,7 +3,7 @@ import logging import random import time -from typing import Generator, List +from typing import Generator, List, Optional from celery.app import task from phue import PhueException @@ -32,16 +32,16 @@ def my_celery_task(self, other_param): bound_task_function (Callable): a celery task function which is bound (ie. receives itself as its 1st argument). - Returns: + Returns: decorator for a celery @task-decorated function/Task subclass .run() implementation. """ @wraps(bound_task_function) def wrapper(*args, **kwargs): - myself = args[0] - my_task_name = myself.name - my_task_id = myself.request.id + me = args[0] + my_task_name = me.name + my_task_id = me.request.id - my_task_lock = SingletonTaskLock(lock_id=myself.name, redis=redis_client) + my_task_lock = SingletonTaskLock(lock_id=my_task_name, redis=redis_client) with my_task_lock.acquire_for(my_task_id) as acquired: if acquired is True: @@ -55,25 +55,26 @@ def wrapper(*args, **kwargs): class SingletonTaskLock: """ - lock_id (key) = task name. + lock_id (key) = a task's name. lock values = task IDs. """ - LOCK_MAX_DURATION_SECONDS = 60 * 5 # 5 minutes + DEFAULT_MAX_TTL = 60 * 5 # 5 minutes - def __init__(self, lock_id: str, redis: Redis): + def __init__(self, lock_id: str, redis: Redis, lock_max_ttl: Optional[int] = DEFAULT_MAX_TTL): self.lock_id = lock_id self.redis = redis + self.lock_max_ttl = lock_max_ttl @contextmanager def acquire_for(self, task_id: str) -> Generator[str, None, None]: - """Acquires simple Redis lock for a "singleton" celery task with ID task_id. + """Attempts to acquire a simple Redis lock for a "singleton" celery task with ID task_id. Args: task_id (str): ID of a celery task that worker is trying to run. Returns: Generator[str, None, None]: context manager to get lock acquisition status for task, - then subsequently relinquish that acquired lock (assuming the initial acquisition + then subsequently relinquish that acquired lock (if the initial acquisition was successful). """ lock = self.redis.get(self.lock_id) @@ -81,7 +82,7 @@ def acquire_for(self, task_id: str) -> Generator[str, None, None]: try: if lock_acquired: - self.redis.setex(self.lock_id, self.LOCK_MAX_DURATION_SECONDS, task_id) + self.redis.setex(self.lock_id, self.lock_max_ttl, task_id) yield lock_acquired finally: if lock_acquired: From ae9d1b0463871c77094244ba3c344a4412cdfdb7 Mon Sep 17 00:00:00 2001 From: Camille <142921874+BeeStag@users.noreply.github.com> Date: Thu, 2 Nov 2023 13:35:26 -0400 Subject: [PATCH 3/5] lengthen Spotify oauth timeout --- backend/spotihue/oauth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/spotihue/oauth.py b/backend/spotihue/oauth.py index c7cfd16..9244142 100644 --- a/backend/spotihue/oauth.py +++ b/backend/spotihue/oauth.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -SPOTIFY_OAUTH_TIMEOUT = 60 * 2 # 2 minutes +SPOTIFY_OAUTH_TIMEOUT = 60 * 3 # 3 minutes class HTTPServer(oauth2.HTTPServer): From 43d2d7faaf92c99d75bb8bf07c4824dc3dfc3d1e Mon Sep 17 00:00:00 2001 From: Camille <142921874+BeeStag@users.noreply.github.com> Date: Fri, 3 Nov 2023 20:02:56 -0400 Subject: [PATCH 4/5] battling React dev mode --- backend/spotihue/tasks.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/backend/spotihue/tasks.py b/backend/spotihue/tasks.py index 6170ab0..a86d226 100644 --- a/backend/spotihue/tasks.py +++ b/backend/spotihue/tasks.py @@ -78,6 +78,13 @@ def acquire_for(self, task_id: str) -> Generator[str, None, None]: was successful). """ lock = self.redis.get(self.lock_id) + + if lock is None: + # if there are 2 dueling tasks (queued within a few hundredths of a second + # apart), let one of them win. + time.sleep(random.uniform(.01, .1)) + lock = self.redis.get(self.lock_id) + lock_acquired = bool(lock is None or (isinstance(lock, bytes) and lock.decode('utf-8') == task_id)) try: From d229c7ea00c6d47929398c6443b07623a4ded470 Mon Sep 17 00:00:00 2001 From: Camille <142921874+BeeStag@users.noreply.github.com> Date: Wed, 3 Jan 2024 13:46:06 -0500 Subject: [PATCH 5/5] add clarifying comment to decorator wrapper, make SPOTIFY_OAUTH_TIMEOUT configurable --- backend/.example-env | 3 ++- backend/spotihue/oauth.py | 3 ++- backend/spotihue/tasks.py | 4 ++++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/backend/.example-env b/backend/.example-env index 2e11821..b6363d1 100644 --- a/backend/.example-env +++ b/backend/.example-env @@ -2,4 +2,5 @@ HUE_BRIDGE_IP_ADDRESS=address SPOTIFY_SCOPE="user-read-currently-playing" SPOTIFY_CLIENT_ID=client_id SPOTIFY_CLIENT_SECRET=client_secret -SPOTIFY_REDIRECT_URI="http://localhost:8888/callback/" \ No newline at end of file +SPOTIFY_REDIRECT_URI="http://localhost:8888/callback/" +SPOTIFY_OAUTH_TIMEOUT=180 # 3 minutes diff --git a/backend/spotihue/oauth.py b/backend/spotihue/oauth.py index 9244142..e39c7f8 100644 --- a/backend/spotihue/oauth.py +++ b/backend/spotihue/oauth.py @@ -1,4 +1,5 @@ import logging +import os import webbrowser from typing import Optional from urllib import parse @@ -8,7 +9,7 @@ logger = logging.getLogger(__name__) -SPOTIFY_OAUTH_TIMEOUT = 60 * 3 # 3 minutes +SPOTIFY_OAUTH_TIMEOUT = os.getenv('SPOTIFY_OAUTH_TIMEOUT', 60 * 3) # default 3 minutes class HTTPServer(oauth2.HTTPServer): diff --git a/backend/spotihue/tasks.py b/backend/spotihue/tasks.py index ac45531..3a8a262 100644 --- a/backend/spotihue/tasks.py +++ b/backend/spotihue/tasks.py @@ -37,7 +37,11 @@ def my_celery_task(self, other_param): """ @wraps(bound_task_function) def wrapper(*args, **kwargs): + # Because this decorator is invoked statically above task declarations, we access + # the Task object's self via the following derpy line... me = args[0] + + # ...so that we can collect the task's name and ID. my_task_name = me.name my_task_id = me.request.id