Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Hue and Spotify setup tasks singletons #16

Merged
merged 6 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/.example-env
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ HUE_BRIDGE_IP_ADDRESS=address
SPOTIFY_SCOPE="user-read-currently-playing"
SPOTIFY_CLIENT_ID=client_id
SPOTIFY_CLIENT_SECRET=client_secret
SPOTIFY_REDIRECT_URI="http://localhost:8888/callback/"
SPOTIFY_REDIRECT_URI="http://localhost:8888/callback/"
SPOTIFY_OAUTH_TIMEOUT=180 # 3 minutes
File renamed without changes.
19 changes: 15 additions & 4 deletions backend/spotihue/oauth.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
import logging
import os
import webbrowser
from urllib import parse
from typing import Optional
from urllib import parse

from spotipy import oauth2, util


logger = logging.getLogger(__name__)

SPOTIFY_OAUTH_TIMEOUT = os.getenv('SPOTIFY_OAUTH_TIMEOUT', 60 * 3) # default 3 minutes


class HTTPServer(oauth2.HTTPServer):
timeout = SPOTIFY_OAUTH_TIMEOUT


def start_local_http_server(port, handler=oauth2.RequestHandler):
server = oauth2.HTTPServer(("0.0.0.0", port), handler)
server = HTTPServer(("0.0.0.0", port), handler)
server.allow_reuse_address = True
server.auth_code = None
server.auth_token_form = None
server.error = None
return server


class SpotifyOauthSocketTimeout(oauth2.SpotifyOauthError):
pass


class SpotifyOauth(oauth2.SpotifyOAuth):
def __init__(self, **kwargs):
super().__init__(**kwargs)
Expand Down Expand Up @@ -68,8 +79,8 @@ def _get_auth_response_local_server(self, redirect_port: int, open_browser: Opti
"Received error from OAuth server: {}".format(server.error)
)
else:
raise oauth2.SpotifyOauthError(
"Server listening on localhost has not been accessed"
raise SpotifyOauthSocketTimeout(
"Server listening on localhost was not accessed"
)

def get_auth_response(self, open_browser: Optional[bool] = None) -> str:
Expand Down
107 changes: 99 additions & 8 deletions backend/spotihue/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,105 @@
from contextlib import contextmanager
from functools import wraps
import logging
import random
import time
from typing import List
from typing import Generator, List, Optional

from celery.app import task
from phue import PhueException
from redis import Redis
from spotipy.oauth2 import SpotifyOauthError

from . import celery_app, constants, redis_client, spotihue
from . import celery_app, constants, oauth, redis_client, spotihue

logger = logging.getLogger(__name__)


class SingletonTask(task.Task):

@staticmethod
def run_singleton_task(bound_task_function):
"""Meant to decorate a bound celery task function such that there can only be 1 instance of the
task running at any given point.
Ex:

@celery_app.task(base=SingletonTask, bind=True)
@run_singleton_task
def my_celery_task(self, other_param):
pass

Args:
bound_task_function (Callable): a celery task function which is bound (ie. receives itself
as its 1st argument).

Returns: decorator for a celery @task-decorated function/Task subclass .run() implementation.

"""
@wraps(bound_task_function)
def wrapper(*args, **kwargs):
# Because this decorator is invoked statically above task declarations, we access
# the Task object's self via the following derpy line...
me = args[0]
Copy link
Owner

@mgibbs1259 mgibbs1259 Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here explaining this line based on our convo?


# ...so that we can collect the task's name and ID.
my_task_name = me.name
my_task_id = me.request.id

my_task_lock = SingletonTaskLock(lock_id=my_task_name, redis=redis_client)

with my_task_lock.acquire_for(my_task_id) as acquired:
if acquired is True:
logger.info(f'Acquired lock for {my_task_name}; running task {my_task_id}')
return bound_task_function(*args, **kwargs)
else:
logger.info(f'Another {my_task_name} task is already being invoked; exiting task {my_task_id}')

return wrapper


class SingletonTaskLock:
"""
lock_id (key) = a task's name.
lock values = task IDs.
"""
DEFAULT_MAX_TTL = 60 * 5 # 5 minutes

def __init__(self, lock_id: str, redis: Redis, lock_max_ttl: Optional[int] = DEFAULT_MAX_TTL):
self.lock_id = lock_id
self.redis = redis
self.lock_max_ttl = lock_max_ttl

@contextmanager
def acquire_for(self, task_id: str) -> Generator[str, None, None]:
"""Attempts to acquire a simple Redis lock for a "singleton" celery task with ID task_id.

Args:
task_id (str): ID of a celery task that worker is trying to run.

Returns:
Generator[str, None, None]: context manager to get lock acquisition status for task,
then subsequently relinquish that acquired lock (if the initial acquisition
was successful).
"""
lock = self.redis.get(self.lock_id)

if lock is None:
# if there are 2 dueling tasks (queued within a few hundredths of a second
# apart), let one of them win.
time.sleep(random.uniform(.01, .1))
lock = self.redis.get(self.lock_id)

lock_acquired = bool(lock is None or (isinstance(lock, bytes) and lock.decode('utf-8') == task_id))

try:
if lock_acquired:
self.redis.setex(self.lock_id, self.lock_max_ttl, task_id)
yield lock_acquired
finally:
if lock_acquired:
self.redis.delete(self.lock_id)


def is_spotihue_running() -> bool:
"""Queries for the SpotiHue task by its cached spotihue_task_id. There should only be one
run_spotihue task running at any given time.
Expand Down Expand Up @@ -89,9 +178,10 @@ def run_spotihue(lights: List[str], current_track_retries: int = 0) -> None:
time.sleep(sleep_duration)


@celery_app.task
def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None:
logger.info("Attempting to connect to Hue bridge...")
@celery_app.task(base=SingletonTask, bind=True)
@SingletonTask.run_singleton_task
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_singleton_task is static method

def setup_hue(self, backoff_seconds: int = 5, retries: int = 5) -> None:
logger.info('Attempting to connect to Hue bridge...')

for attempt in range(retries):
try:
Expand All @@ -109,9 +199,10 @@ def setup_hue(backoff_seconds: int = 5, retries: int = 5) -> None:
raise


@celery_app.task
def listen_for_spotify_redirect() -> None:
logger.info(f"Waiting to receive user authorization from Spotify...")
@celery_app.task(base=SingletonTask, bind=True, throws=(oauth.SpotifyOauthSocketTimeout,))
@SingletonTask.run_singleton_task
def listen_for_spotify_redirect(self) -> None:
logger.info('Waiting to receive user authorization from Spotify...')
spotify_oauth = spotihue.spotify_oauth

try:
Expand Down