Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: use lock.extend replace pexpire #128

Merged
merged 5 commits into from Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/design.rst
Expand Up @@ -9,6 +9,8 @@ The schedule set contains the task keys sorted by the next scheduled run time.

For each tick of Beat

#. check if it still owns the lock, if not, exit with ``LockNotOwnedError``

#. get list of due keys and due next tick

#. retrieve definitions and metadata for all keys from previous step
Expand Down Expand Up @@ -49,3 +51,15 @@ The meta key contains a JSON blob as follows::

For instance by default ```last_run_at``` corresponds to when Beat dispatched the task, but depending on queue latency it might not run immediately, but the application could update the metadata with
the actual run time, allowing intervals to be relative to last execution rather than last dispatch.

High Availability
~~~~~~~~~~~~~~~~~

Redbeat use a lock in redis to prevent multiple node running.
You can safely start multiple nodes as backup, when the running node fails or
experience network problems, after ``redbeat_lock_timeout`` seconds,
another node will acquire the lock and start running.

When the previous node back online, it will notice that itself no longer holds
the lock and exit with an Exception(Would be better if you use systemd or supervisord
to restart it as a backup node).
35 changes: 33 additions & 2 deletions redbeat/schedulers.py
Expand Up @@ -48,6 +48,32 @@
CELERY_4_OR_GREATER = CELERY_VERSION[0] >= 4
REDIS_3_OR_GREATER = REDIS_VERSION[0] >= 3

# Copied from:
# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33
# Changes:
# The second line from the bottom: The original Lua script intends
# to extend time to (lock remaining time + additional time); while
# the script here extend time to a expected expiration time.
# KEYS[1] - lock name
# ARGS[1] - token
# ARGS[2] - additional milliseconds
# return 1 if the locks time was extended, otherwise 0
LUA_EXTEND_TO_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
local expiration = redis.call('pttl', KEYS[1])
if not expiration then
expiration = 0
end
if expiration < 0 then
return 0
end
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
"""


class RetryingConnection(object):
"""A proxy for the Redis connection that delegates all the calls to
Expand Down Expand Up @@ -445,7 +471,7 @@ def maybe_due(self, entry, **kwargs):
def tick(self, min=min, **kwargs):
if self.lock:
logger.debug('beat: Extending lock...')
get_redis(self.app).pexpire(self.lock_key, int(self.lock_timeout * 1000))
self.lock.extend(int(self.lock_timeout))

remaining_times = []
try:
Expand Down Expand Up @@ -489,11 +515,16 @@ def acquire_distributed_beat_lock(sender=None, **kwargs):
return

logger.debug('beat: Acquiring lock...')
redis_client = get_redis(scheduler.app)

lock = get_redis(scheduler.app).lock(
lock = redis_client.lock(
scheduler.lock_key,
timeout=scheduler.lock_timeout,
sleep=scheduler.max_interval,
)
# overwrite redis-py's extend script
# which will add additional timeout instead of extend to a new timeout
lock.lua_extend = redis_client.register_script(LUA_EXTEND_TO_SCRIPT)
lock.acquire()

scheduler.lock = lock