Skip to content

Commit

Permalink
Merge pull request #63 from dougm/reset_watchers
Browse files Browse the repository at this point in the history
Reset watchers on lost connection
  • Loading branch information
bbangert committed Apr 28, 2013
2 parents f997465 + 5fcd738 commit 385e632
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
8 changes: 6 additions & 2 deletions kazoo/client.py
Expand Up @@ -224,12 +224,15 @@ def _reset(self):
"""Resets a variety of client states for a new connection."""
self._queue = deque()
self._pending = deque()
self._child_watchers = defaultdict(list)
self._data_watchers = defaultdict(list)

self._reset_watchers()
self._reset_session()
self.last_zxid = 0

def _reset_watchers(self):
self._child_watchers = defaultdict(list)
self._data_watchers = defaultdict(list)

def _reset_session(self):
self._session_id = None
self._session_passwd = b'\x00' * 16
Expand Down Expand Up @@ -336,6 +339,7 @@ def _session_callback(self, state):
self._live.clear()
self._notify_pending(state)
self._make_state_change(KazooState.SUSPENDED)
self._reset_watchers()

def _notify_pending(self, state):
"""Used to clear a pending response queue and request queue
Expand Down
47 changes: 47 additions & 0 deletions kazoo/tests/test_watchers.py
Expand Up @@ -5,6 +5,7 @@
from nose.tools import eq_
from nose.tools import raises

from kazoo.protocol.states import KazooState
from kazoo.testing import KazooTestCase


Expand Down Expand Up @@ -458,3 +459,49 @@ def test_watch_iterations(self):

children, asy = result.get()
eq_(len(children), 2)


class KazooDataWatcherRestartTests(KazooTestCase):
def setUp(self):
self.setup_zookeeper(randomize_hosts=False)
self.path = "/" + uuid.uuid4().hex
self.client.ensure_path(self.path)

def test_data_watcher(self):
update = threading.Event()
ev = threading.Event()
events = []

# Make it a non-existent path
self.path += 'f'

@self.client.DataWatch(self.path, allow_missing_node=True)
def changed(data, stat):
if data is not None:
events.append(stat)
update.set()

@self.client.add_listener
def listen(state):
if state != KazooState.LOST:
ev.set()

self.client.create(self.path, b'fred')
update.wait()
update.clear()
eq_(len(events), 1)

self.cluster[0].stop()
ev.wait(30)

time.sleep(3)

self.cluster[0].run()
ev.wait(30)

self.client.set(self.path, b'said')
update.wait()
update.clear()

time.sleep(1)
eq_(len(events), 2)

0 comments on commit 385e632

Please sign in to comment.