-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make Hue and Spotify setup tasks singletons #16
Changes from 4 commits
e4fff1a
6a6d2d0
ae9d1b0
43d2d7f
3657f57
d229c7e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,33 @@ | ||
import logging | ||
import webbrowser | ||
from urllib import parse | ||
from typing import Optional | ||
from urllib import parse | ||
|
||
from spotipy import oauth2, util | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
SPOTIFY_OAUTH_TIMEOUT = 60 * 3 # 3 minutes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we create a .env for this to make it configurable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good call! will do |
||
|
||
|
||
class HTTPServer(oauth2.HTTPServer): | ||
timeout = SPOTIFY_OAUTH_TIMEOUT | ||
|
||
|
||
def start_local_http_server(port, handler=oauth2.RequestHandler): | ||
server = oauth2.HTTPServer(("0.0.0.0", port), handler) | ||
server = HTTPServer(("0.0.0.0", port), handler) | ||
server.allow_reuse_address = True | ||
server.auth_code = None | ||
server.auth_token_form = None | ||
server.error = None | ||
return server | ||
|
||
|
||
class SpotifyOauthSocketTimeout(oauth2.SpotifyOauthError): | ||
pass | ||
|
||
|
||
class SpotifyOauth(oauth2.SpotifyOAuth): | ||
def __init__(self, **kwargs): | ||
super().__init__(**kwargs) | ||
|
@@ -68,8 +78,8 @@ def _get_auth_response_local_server(self, redirect_port: int, open_browser: Opti | |
"Received error from OAuth server: {}".format(server.error) | ||
) | ||
else: | ||
raise oauth2.SpotifyOauthError( | ||
"Server listening on localhost has not been accessed" | ||
raise SpotifyOauthSocketTimeout( | ||
"Server listening on localhost was not accessed" | ||
) | ||
|
||
def get_auth_response(self, open_browser: Optional[bool] = None) -> str: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,101 @@ | ||
from contextlib import contextmanager | ||
from functools import wraps | ||
import logging | ||
import random | ||
import time | ||
from typing import List | ||
from typing import Generator, List, Optional | ||
|
||
from celery.app import task | ||
from phue import PhueException | ||
from redis import Redis | ||
from spotipy.oauth2 import SpotifyOauthError | ||
|
||
from . import celery_app, constants, redis_client, spotihue | ||
from . import celery_app, constants, oauth, redis_client, spotihue | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class SingletonTask(task.Task): | ||
|
||
@staticmethod | ||
def run_singleton_task(bound_task_function): | ||
"""Meant to decorate a bound celery task function such that there can only be 1 instance of the | ||
task running at any given point. | ||
Ex: | ||
|
||
@celery_app.task(base=SingletonTask, bind=True) | ||
@run_singleton_task | ||
def my_celery_task(self, other_param): | ||
pass | ||
|
||
Args: | ||
bound_task_function (Callable): a celery task function which is bound (ie. receives itself | ||
as its 1st argument). | ||
|
||
Returns: decorator for a celery @task-decorated function/Task subclass .run() implementation. | ||
|
||
""" | ||
@wraps(bound_task_function) | ||
def wrapper(*args, **kwargs): | ||
me = args[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a comment here explaining this line based on our convo? |
||
my_task_name = me.name | ||
my_task_id = me.request.id | ||
|
||
my_task_lock = SingletonTaskLock(lock_id=my_task_name, redis=redis_client) | ||
|
||
with my_task_lock.acquire_for(my_task_id) as acquired: | ||
if acquired is True: | ||
logger.info(f'Acquired lock for {my_task_name}; running task {my_task_id}') | ||
return bound_task_function(*args, **kwargs) | ||
else: | ||
logger.info(f'Another {my_task_name} task is already being invoked; exiting task {my_task_id}') | ||
|
||
return wrapper | ||
|
||
|
||
class SingletonTaskLock: | ||
""" | ||
lock_id (key) = a task's name. | ||
lock values = task IDs. | ||
""" | ||
DEFAULT_MAX_TTL = 60 * 5 # 5 minutes | ||
|
||
def __init__(self, lock_id: str, redis: Redis, lock_max_ttl: Optional[int] = DEFAULT_MAX_TTL): | ||
self.lock_id = lock_id | ||
self.redis = redis | ||
self.lock_max_ttl = lock_max_ttl | ||
|
||
@contextmanager | ||
def acquire_for(self, task_id: str) -> Generator[str, None, None]: | ||
"""Attempts to acquire a simple Redis lock for a "singleton" celery task with ID task_id. | ||
|
||
Args: | ||
task_id (str): ID of a celery task that worker is trying to run. | ||
|
||
Returns: | ||
Generator[str, None, None]: context manager to get lock acquisition status for task, | ||
then subsequently relinquish that acquired lock (if the initial acquisition | ||
was successful). | ||
""" | ||
lock = self.redis.get(self.lock_id) | ||
|
||
if lock is None: | ||
# if there are 2 dueling tasks (queued within a few hundredths of a second | ||
# apart), let one of them win. | ||
time.sleep(random.uniform(.01, .1)) | ||
lock = self.redis.get(self.lock_id) | ||
|
||
lock_acquired = bool(lock is None or (isinstance(lock, bytes) and lock.decode('utf-8') == task_id)) | ||
|
||
try: | ||
if lock_acquired: | ||
self.redis.setex(self.lock_id, self.lock_max_ttl, task_id) | ||
yield lock_acquired | ||
finally: | ||
if lock_acquired: | ||
self.redis.delete(self.lock_id) | ||
|
||
|
||
def is_spotihue_running() -> bool: | ||
"""Queries for the SpotiHue task by its cached spotihue_task_id. There should only be one | ||
run_spotihue task running at any given time. | ||
|
@@ -83,8 +168,9 @@ def run_spotihue(lights: List[str], current_track_retries: int = 0) -> None: | |
time.sleep(sleep_duration) | ||
|
||
|
||
@celery_app.task | ||
def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None: | ||
@celery_app.task(base=SingletonTask, bind=True) | ||
@SingletonTask.run_singleton_task | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. run_singleton_task is static method |
||
def setup_hue(self, backoff_seconds: int = 5, retries: int = 5) -> None: | ||
logger.info('Attempting to connect to Hue bridge...') | ||
|
||
for attempt in range(retries): | ||
|
@@ -103,8 +189,9 @@ def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None: | |
raise | ||
|
||
|
||
@celery_app.task | ||
def listen_for_spotify_redirect() -> None: | ||
@celery_app.task(base=SingletonTask, bind=True, throws=(oauth.SpotifyOauthSocketTimeout,)) | ||
@SingletonTask.run_singleton_task | ||
def listen_for_spotify_redirect(self) -> None: | ||
logger.info(f'Waiting to receive user authorization from Spotify...') | ||
spotify_oauth = spotihue.spotify_oauth | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the name change here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it's been a while and I can't remember exactly where we left this - I have an upcoming MR that will add a docker compose entry for the frontend service, and I thought it best to put requirement specifications for the frontend and backend in their own respective directories, for the sake of organization. Since these 2 requirements.txt files are both strictly Python module requirements for the backend, I moved this one into the backend directory. But I changed its name so as not to interfere with the backend's dockerfile, and for clarity (since this just-moved file is for dev requirements, and not the bare-bones requirements that the backend's dockerfile cares about).