Skip to content

Commit

Permalink
Add lease recipe.
Browse files Browse the repository at this point in the history
  • Loading branch information
Lars Albertsson committed Jun 12, 2014
1 parent c8c3140 commit 8d39d26
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 0 deletions.
7 changes: 7 additions & 0 deletions CHANGES.rst
Expand Up @@ -4,6 +4,13 @@ Changelog
2.0 (unreleased)
----------------

Features
********

- Added a NonBlockingLease recipe. The recipe allows e.g. cron jobs scheduled
on multiple machines to ensure that at most N instances will run a particular
job, with lease timeout for graceful handover in case of node failures.


2.0b1 (2014-04-24)
------------------
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Expand Up @@ -18,6 +18,7 @@ organized alphabetically by module name.
api/recipe/barrier
api/recipe/counter
api/recipe/election
api/recipe/lease
api/recipe/lock
api/recipe/partitioner
api/recipe/party
Expand Down
19 changes: 19 additions & 0 deletions docs/api/recipe/lease.rst
@@ -0,0 +1,19 @@
.. _lease_module:

:mod:`kazoo.recipe.lease`
----------------------------

.. automodule:: kazoo.recipe.lease

Public API
++++++++++

.. autoclass:: NonBlockingLease
:members:

.. automethod:: __init__

.. autoclass:: MultiNonBlockingLease
:members:

.. automethod:: __init__
4 changes: 4 additions & 0 deletions kazoo/client.py
Expand Up @@ -52,6 +52,8 @@
from kazoo.recipe.barrier import DoubleBarrier
from kazoo.recipe.counter import Counter
from kazoo.recipe.election import Election
from kazoo.recipe.lease import NonBlockingLease
from kazoo.recipe.lease import MultiNonBlockingLease
from kazoo.recipe.lock import Lock
from kazoo.recipe.lock import Semaphore
from kazoo.recipe.partitioner import SetPartitioner
Expand Down Expand Up @@ -273,6 +275,8 @@ def _retry(*args, **kwargs):
self.ChildrenWatch = partial(ChildrenWatch, self)
self.DataWatch = partial(DataWatch, self)
self.Election = partial(Election, self)
self.NonBlockingLease = partial(NonBlockingLease, self)
self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self)
self.Lock = partial(Lock, self)
self.Party = partial(Party, self)
self.Queue = partial(Queue, self)
Expand Down
130 changes: 130 additions & 0 deletions kazoo/recipe/lease.py
@@ -0,0 +1,130 @@
"""Zookeeper lease implementations
:Maintainer: Lars Albertsson <lars.albertsson@gmail.com>
:Maintainer: Jyrki Pulliainen <jyrki@spotify.com>
:Status: Beta
"""

import json
import socket
import datetime
from kazoo.exceptions import CancelledError


class NonBlockingLease(object):
"""Exclusive lease that does not block.
An exclusive lease ensures that only one client at a time owns the lease. The client may
renew the lease without losing it by obtaining a new lease with the same path and same
identity. The lease object evaluates to True if the lease was obtained.
A common use case is a situation where a task should only run on a single host. In this
case, the clients that did not obtain the lease should exit without performing the protected
task.
The lease stores time stamps using client clocks, and will therefore only work if client clocks
are roughly synchronised. It uses UTC, and works across time zones and daylight savings.
Example usage: with a :class:`~kazoo.client.KazooClient` instance::
zk = KazooClient()
# Hold lease over an hour in order to keep job on same machine, with failover if it dies.
lease = zk.NonBlockingLease("/db_leases/hourly_cleanup", datetime.timedelta(minutes = 70),
identifier = "DB hourly cleanup on " + socket.gethostname())
if lease:
do_hourly_database_cleanup()
"""

# Bump when storage format changes
_version = 1
_date_format = "%Y-%m-%dT%H:%M:%S"
_byte_encoding = 'utf-8'

def __init__(self, client, path, duration, identifier=None, utcnow=datetime.datetime.utcnow):
"""Create a non-blocking lease.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The lease path to use.
:param duration: Duration during which the lease is reserved. A :class:`~datetime.timedelta` instance.
:param identifier: Unique name to use for this lease holder. Reuse in order to renew the lease.
Defaults do :meth:`socket.gethostname()`.
:param utcnow: Clock function, by default returning :meth:`datetime.datetime.utcnow()`. Used for testing.
"""
ident = identifier or socket.gethostname()
self.obtained = False
self._attempt_obtaining(client, path, duration, ident, utcnow)

def _attempt_obtaining(self, client, path, duration, ident, utcnow):
client.ensure_path(path)
holder_path = path + "/lease_holder"
lock = client.Lock(path, ident)
try:
with lock:
now = utcnow()
if client.exists(holder_path):
raw, _ = client.get(holder_path)
data = self._decode(raw)
if data["version"] != self._version:
# We need an upgrade, let someone else take the lease
return
current_end = datetime.datetime.strptime(data['end'], self._date_format)
if data['holder'] != ident and now < current_end:
# Another client is still holding the lease
return
client.delete(holder_path)
end_lease = (now + duration).strftime(self._date_format)
new_data = {'version': self._version, 'holder': ident, 'end': end_lease}
client.create(holder_path, self._encode(new_data))
self.obtained = True

except CancelledError:
pass

def _encode(self, data_dict):
return json.dumps(data_dict).encode(self._byte_encoding)

def _decode(self, raw):
return json.loads(raw.decode(self._byte_encoding))

# Python 2.x
def __nonzero__(self):
return self.obtained

# Python 3.x
def __bool__(self):
return self.obtained


class MultiNonBlockingLease(object):
"""Exclusive lease for multiple clients.
This type of lease is useful when a limited set of hosts should run a particular task.
It will attempt to obtain leases trying a sequence of ZooKeeper lease paths.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param count: Number of host leases allowed.
:param path: ZooKeeper path under which lease files are stored.
:param duration: Duration during which the lease is reserved. A :class:`~datetime.timedelta` instance.
:param identifier: Unique name to use for this lease holder. Reuse in order to renew the lease.
Defaults do :meth:`socket.gethostname()`.
:param utcnow: Clock function, by default returning :meth:`datetime.datetime.utcnow()`. Used for testing.
"""

def __init__(self, client, count, path, duration, identifier=None,
utcnow=datetime.datetime.utcnow):
self.obtained = False
for num in range(count):
ls = NonBlockingLease(client, '%s/%d' % (path, num), duration,
identifier=identifier, utcnow=utcnow)
if ls:
self.obtained = True
break

# Python 2.x
def __nonzero__(self):
return self.obtained

# Python 3.x
def __bool__(self):
return self.obtained
198 changes: 198 additions & 0 deletions kazoo/tests/test_lease.py
@@ -0,0 +1,198 @@
import datetime
import uuid

from kazoo.recipe.lease import NonBlockingLease
from kazoo.recipe.lease import MultiNonBlockingLease

from kazoo.testing import KazooTestCase


class MockClock(object):
def __init__(self, epoch=0):
self.epoch = epoch

def forward(self, seconds):
self.epoch += seconds

def __call__(self):
return datetime.datetime.utcfromtimestamp(self.epoch)


class KazooLeaseTests(KazooTestCase):
def setUp(self):
super(KazooLeaseTests, self).setUp()
self.client2 = self._get_client(timeout=0.8)
self.client2.start()
self.client3 = self._get_client(timeout=0.8)
self.client3.start()
self.path = "/" + uuid.uuid4().hex
self.clock = MockClock(10)

def tearDown(self):
for cl in [self.client2, self.client3]:
if cl.connected:
cl.stop()
cl.close()
del self.client2
del self.client3


class NonBlockingLeaseTests(KazooLeaseTests):
def test_renew(self):
# Use client convenience method here to test it at least once. Use class directly in
# other tests in order to get better IDE support.
lease = self.client.NonBlockingLease(self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.assertTrue(lease.obtained)

self.clock.forward(2)
renewed_lease = self.client.NonBlockingLease(
self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
self.assertTrue(renewed_lease)

def test_busy(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)

self.clock.forward(2)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertFalse(foreigner_lease)
self.assertFalse(foreigner_lease.obtained)

def test_overtake(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)

self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_lease)

def test_renew_no_overtake(self):
lease = self.client.NonBlockingLease(self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)
self.assertTrue(lease.obtained)

self.clock.forward(2)
renewed_lease = self.client.NonBlockingLease(
self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
self.assertTrue(renewed_lease)

self.clock.forward(2)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertFalse(foreigner_lease)

def test_overtaker_renews(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)

self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_lease)

self.clock.forward(2)
foreigner_renew = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_renew)

def test_overtake_refuse_first(self):
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)

self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
self.assertTrue(foreigner_lease)

self.clock.forward(2)
first_again_lease = NonBlockingLease(
self.client, self.path, datetime.timedelta(seconds=3), utcnow=self.clock)
self.assertFalse(first_again_lease)

def test_old_version(self):
# Skip to a future version
NonBlockingLease._version += 1
lease = NonBlockingLease(self.client, self.path, datetime.timedelta(seconds=3),
utcnow=self.clock)
self.assertTrue(lease)

# Then back to today.
NonBlockingLease._version -= 1
self.clock.forward(4)
foreigner_lease = NonBlockingLease(
self.client2, self.path, datetime.timedelta(seconds=3),
identifier="some.other.host", utcnow=self.clock)
# Since a newer version wrote the lease file, the lease is not taken.
self.assertFalse(foreigner_lease)


class MultiNonBlockingLeaseTest(KazooLeaseTests):
def test_1_renew(self):
ls = self.client.MultiNonBlockingLease(1, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client, 1, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
self.assertTrue(ls2)

def test_1_reject(self):
ls = MultiNonBlockingLease(self.client, 1, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 1, self.path, datetime.timedelta(seconds=4),
identifier="some.other.host", utcnow=self.clock)
self.assertFalse(ls2)

def test_2_renew(self):
ls = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=7), utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=7), identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(2)
ls3 = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=7), utcnow=self.clock)
self.assertTrue(ls3)
self.clock.forward(2)
ls4 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=7), identifier="host2", utcnow=self.clock)
self.assertTrue(ls4)

def test_2_reject(self):
ls = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=7), utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=7),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(2)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path, datetime.timedelta(seconds=7),
identifier="host3", utcnow=self.clock)
self.assertFalse(ls3)

def test_2_handover(self):
ls = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
self.assertTrue(ls)
self.clock.forward(2)
ls2 = MultiNonBlockingLease(self.client2, 2, self.path, datetime.timedelta(seconds=4),
identifier="host2", utcnow=self.clock)
self.assertTrue(ls2)
self.clock.forward(3)
ls3 = MultiNonBlockingLease(self.client3, 2, self.path, datetime.timedelta(seconds=4),
identifier="host3", utcnow=self.clock)
self.assertTrue(ls3)
self.clock.forward(2)
ls4 = MultiNonBlockingLease(self.client, 2, self.path, datetime.timedelta(seconds=4), utcnow=self.clock)
self.assertTrue(ls4)

0 comments on commit 8d39d26

Please sign in to comment.