Permalink
Browse files

Add ZooParty recipe: pool of ephemeral nodes

Used to track "alive" processes
  • Loading branch information...
labisso committed Feb 28, 2012
1 parent f3b9406 commit 410324aaa2924ca40215c51f3502d6ca8c986920
Showing with 141 additions and 1 deletion.
  1. +94 −0 kazoo/recipe/party.py
  2. +1 −1 kazoo/recipe/test/test_lock.py
  3. +46 −0 kazoo/recipe/test/test_party.py
View
@@ -0,0 +1,94 @@
+import uuid
+
+from kazoo.exceptions import NodeExistsException, NoNodeException
+
+class ZooParty(object):
+ """Simple pool of participating processes
+ """
+
+ _NODE_NAME = "__party__"
+
+ def __init__(self, client, path, data=None):
+ """
+ @type client KazooClient
+ """
+ self.client = client
+ self.path = path
+
+ self.data = str(data or "")
+
+ self.node = uuid.uuid4().hex + self._NODE_NAME
+ self.create_path = self.path + "/" + self.node
+
+ self.ensured_path = False
+ self.participating = False
+
+ def join(self):
+ """Join the party
+ """
+ return self.client.retry(self._inner_join)
+
+ def _inner_join(self):
+ if not self.ensured_path:
+ # make sure our election parent node exists
+ self.client.ensure_path(self.path)
+ self.ensured_path = True
+
+ try:
+ self.client.create(self.create_path, self.data, ephemeral=True)
+ self.participating = True
+ except NodeExistsException:
+ # node was already created, perhaps we are recovering from a
+ # suspended connection
+ self.participating = True
+
+ def leave(self):
+ """Leave the party
+ """
+ return self.client.retry(self._inner_leave)
+
+ def _inner_leave(self):
+ try:
+ self.client.delete(self.create_path)
+ except NoNodeException:
+ return False
+
+ return True
+
+ def get_participants(self):
+ """
+ Get a list of participating clients' data values
+ """
+ if not self.ensured_path:
+ # make sure our election parent node exists
+ self.client.ensure_path(self.path)
+ self.ensured_path = True
+
+ children = self._get_children()
+ participants = []
+ for child in children:
+ try:
+ d, _ = self.client.retry(self.client.get, self.path + "/" + child)
+ participants.append(d)
+ except NoNodeException:
+ pass
+ return participants
+
+ def get_participant_count(self):
+ """Return a count of participating clients
+ """
+ if not self.ensured_path:
+ # make sure our election parent node exists
+ self.client.ensure_path(self.path)
+ self.ensured_path = True
+ return len(self._get_children())
+
+ def _get_children(self):
+ children = self.client.retry(self.client.get_children, self.path)
+ return filter(lambda child: self._NODE_NAME in child, children)
+
+
+
+
+
+
@@ -21,7 +21,7 @@ def setUp(self):
def tearDown(self):
if self.lockpath:
try:
- self._c.delete(self.lockpath)
+ self._c.recursive_delete(self.lockpath)
except Exception:
pass
@@ -0,0 +1,46 @@
+import unittest
+import uuid
+
+from kazoo.recipe.party import ZooParty
+from kazoo.test import get_client_or_skip
+
+class ZooPartyTests(unittest.TestCase):
+ def setUp(self):
+ self._c = get_client_or_skip()
+ self._c.connect()
+ self.path = "/" + uuid.uuid4().hex
+
+ def tearDown(self):
+ if self.path:
+ try:
+ self._c.recursive_delete(self.path)
+ except Exception:
+ pass
+ if self._c:
+ self._c.close()
+
+ def test_party(self):
+ parties = [ZooParty(self._c, self.path, "p%s" % i)
+ for i in range(5)]
+
+ one_party = parties[0]
+
+ self.assertEqual(one_party.get_participants(), [])
+ self.assertEqual(one_party.get_participant_count(), 0)
+
+ participants = set()
+ for party in parties:
+ party.join()
+ participants.add(party.data)
+
+ self.assertEqual(set(party.get_participants()), participants)
+ self.assertEqual(party.get_participant_count(), len(participants))
+
+ for party in parties:
+ party.leave()
+ participants.remove(party.data)
+
+ self.assertEqual(set(party.get_participants()), participants)
+ self.assertEqual(party.get_participant_count(), len(participants))
+
+

0 comments on commit 410324a

Please sign in to comment.