Skip to content

Commit

Permalink
Initial implementation of pluggable caches - see PolicyStat#37
Browse files Browse the repository at this point in the history
  • Loading branch information
rhunwicks committed Jul 12, 2016
1 parent cce2e36 commit 8fe5133
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 65 deletions.
55 changes: 55 additions & 0 deletions jobtastic/cache/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from celery.backends import get_backend_by_url
from celery.backends.cache import CacheBackend
from celery.backends.redis import RedisBackend
from .base import BaseCache, WrappedCache


def get_cache(app):
"""
Attempt to find a valid cache from the Celery configuration
If the setting is a valid cache, just use it.
Otherwise, if Django is installed, then:
If the setting is a valid Django cache entry, then use that.
If the setting is empty use the default cache
Otherwise, if Werkzeug is installed, then:
If the setting is a valid Celery Memcache or Redis Backend, then use
that.
If the setting is empty and the default Celery Result Backend is
Memcache or Redis, then use that
Otherwise fail
"""
jobtastic_cache_setting = app.conf.get('JOBTASTIC_CACHE')
if isinstance(jobtastic_cache_setting, BaseCache):
return jobtastic_cache_setting

# Try Django
try:
from django.core.cache import caches, InvalidCacheBackendError
if jobtastic_cache_setting:
try:
return WrappedCache(caches[jobtastic_cache_setting])
except InvalidCacheBackendError:
pass
else:
return WrappedCache(caches['default'])
except ImportError:
pass

# Try Werkzeug
try:
from werkzeug.contrib.cache import MemcachedCache, RedisCache
if jobtastic_cache_setting:
backend, url = get_backend_by_url(jobtastic_cache_setting)
backend = backend(app=app, url=url)
else:
backend = app.backend
if isinstance(backend, CacheBackend):
return WrappedCache(MemcachedCache(backend.client))
elif isinstance(backend, RedisBackend):
return WrappedCache(RedisCache(backend.client))
except ImportError:
pass

# Give up
raise RuntimeError('Cannot find a suitable cache for Jobtastic')
96 changes: 96 additions & 0 deletions jobtastic/cache/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from contextlib import contextmanager


class BaseCache(object):
"""
A base class that defines the interface required for a JobTastic cache.
The cache for a JobTastic Task must be shared across all Celery Workers
running the Task and application servers submitting the Task.
It is used for scheduling, thundering herd protection and exclusive locks.
"""

def get(self, key):
raise NotImplementedError('Must implement the get method.')

def set(self, key, value, timeout):
raise NotImplementedError('Must implement the set method.')

def delete(self, key):
raise NotImplementedError('Must implement the delete method')

def lock(self, lock_name, timeout=900):
raise NotImplementedError('Must implement a lock context manager')


class WrappedCache(BaseCache):
"""
A thin wrapper around an existing cache object that supports get/set/delete
and either atomic add or lock
"""

cache = None

def __init__(self, cache):
self.cache = cache

def get(self, key):
return self.cache.get(key)

def set(self, key, value, timeout=None):
if timeout:
return self.cache.set(key, value, timeout)
else:
return self.cache.set(key, value)

def delete(self, key):
return self.cache.delete(key)

@contextmanager
def lock(self, lock_name, timeout=900):
"""
Attempt to use lock and unlock, which will work if the Cache is Redis,
but fall back to a memcached-compliant add/delete approach.
If the Jobtastic Cache isn't Redis or Memcache, or another product
with a compatible lock or add/delete API, then a custom locking function
will be required. However, Redis and Memcache are expected to account for
the vast majority of installations.
See:
- http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html
- http://celery.readthedocs.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time # NOQA
"""
# Try Redis first
try:
try:
lock = self.cache.lock
except AttributeError:
try:
# Possibly using old Django-Redis
lock = self.cache.client.lock
except AttributeError:
# Possibly using Werkzeug + Redis
lock = self.cache._client.lock
have_lock = False
lock = lock(lock_name, timeout=timeout)
try:
have_lock = lock.acquire(blocking=True)
if have_lock:
yield
finally:
if have_lock:
lock.release()
except AttributeError:
# No lock method on the cache, so fall back to add
have_lock = False
try:
while not have_lock:
have_lock = self.cache.add(lock_name, 'locked', timeout)
if have_lock:
yield
finally:
if have_lock:
self.cache.delete(lock_name)
98 changes: 33 additions & 65 deletions jobtastic/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,80 +11,24 @@
import os
import sys
import warnings
from contextlib import contextmanager
from hashlib import md5

import psutil

from celery.datastructures import ExceptionInfo
from celery.five import class_property
from celery.states import PENDING, SUCCESS
from celery.task import Task
from celery.utils import gen_unique_id
from jobtastic.cache import get_cache
from jobtastic.states import PROGRESS # NOQA

get_task_logger = None
try:
from celery.utils.log import get_task_logger
except ImportError:
pass # get_task_logger is new in Celery 3.X

cache = None
try:
# For now, let's just say that if Django exists, we should use it.
# Otherwise, try Flask. This definitely needs an actual configuration
# variable so folks can make an explicit decision.
from django.core.cache import cache
HAS_DJANGO = True
except ImportError:
try:
# We should really have an explicitly-defined way of doing this, but
# for now, let's just use werkzeug Memcached if it exists
from werkzeug.contrib.cache import MemcachedCache

from celery import conf
if conf.CELERY_RESULT_BACKEND == 'cache':
uri_str = conf.CELERY_CACHE_BACKEND.strip('memcached://')
uris = uri_str.split(';')
cache = MemcachedCache(uris)
HAS_WERKZEUG = True
except ImportError:
pass

if cache is None:
raise Exception(
"Jobtastic requires either Django or Flask + Memcached result backend")


from jobtastic.states import PROGRESS # NOQA


@contextmanager
def acquire_lock(lock_name):
"""
A contextmanager to wait until an exclusive lock is available,
hold the lock and then release it when the code under context
is complete.
TODO: This code doesn't work like it should. It doesn't
wait indefinitely for the lock and in fact cycles through
very quickly.
"""
for _ in range(10):
try:
value = cache.incr(lock_name)
except ValueError:
cache.set(lock_name, 0)
value = cache.incr(lock_name)
if value == 1:
break
else:
cache.decr(lock_name)
else:
yield
cache.set(lock_name, 0)
return
yield
cache.decr(lock_name)


class JobtasticTask(Task):
"""
Expand Down Expand Up @@ -136,6 +80,9 @@ class JobtasticTask(Task):
"""
abstract = True

#: The shared cache used for locking and thundering herd protection
_cache = None

@classmethod
def delay_or_eager(self, *args, **kwargs):
"""
Expand Down Expand Up @@ -230,7 +177,7 @@ def apply_async(self, args, kwargs, **options):
cache_key = self._get_cache_key(**kwargs)

# Check for an already-computed and cached result
task_id = cache.get(cache_key) # Check for the cached result
task_id = self.cache.get(cache_key) # Check for the cached result
if task_id:
# We've already built this result, just latch on to the task that
# did the work
Expand All @@ -239,23 +186,23 @@ def apply_async(self, args, kwargs, **options):
return self.AsyncResult(task_id)

# Check for an in-progress equivalent task to avoid duplicating work
task_id = cache.get('herd:%s' % cache_key)
task_id = self.cache.get('herd:%s' % cache_key)
if task_id:
logging.info('Found existing in-progress task: %s', task_id)
return self.AsyncResult(task_id)

# It's not cached and it's not already running. Use an atomic lock to
# start the task, ensuring there isn't a race condition that could
# result in multiple identical tasks being fired at once.
with acquire_lock('lock:%s' % cache_key):
with self.cache.lock('lock:%s' % cache_key):
task_meta = super(JobtasticTask, self).apply_async(
args,
kwargs,
**options
)
logging.info('Current status: %s', task_meta.status)
if task_meta.status in (PROGRESS, PENDING):
cache.set(
self.cache.set(
'herd:%s' % cache_key,
task_meta.task_id,
timeout=self.herd_avoidance_timeout)
Expand Down Expand Up @@ -372,7 +319,7 @@ def run(self, *args, **kwargs):
cache_duration = -1 # By default, don't cache
if cache_duration >= 0:
# If we're configured to cache this result, do so.
cache.set(self.cache_key, self.request.id, cache_duration)
self.cache.set(self.cache_key, self.request.id, cache_duration)

# Now that the task is finished, we can stop all of the thundering herd
# avoidance
Expand Down Expand Up @@ -419,7 +366,28 @@ def on_success(self, retval, task_id, args, kwargs):
self.update_state(task_id, SUCCESS, retval)

def _break_thundering_herd_cache(self):
cache.delete('herd:%s' % self.cache_key)
self.cache.delete('herd:%s' % self.cache_key)

@classmethod
def _get_cache(self):
"""
Return the cache to use for thundering herd protection, etc.
"""
if not self._cache:
self._cache = get_cache(self.app)
return self._cache

@classmethod
def _set_cache(self, cache):
"""
Set the Jobtastic Cache for the Task
The cache must support get/set (with timeout)/delete/lock (as a context
manager).
"""
self._cache = cache

cache = class_property(_get_cache, _set_cache)

@classmethod
def _get_cache_key(self, **kwargs):
Expand Down

0 comments on commit 8fe5133

Please sign in to comment.