Skip to content
This repository

Reset watchers on lost connection #63

Merged
merged 1 commit into from about 1 year ago

2 participants

Doug MacEachern Ben Bangert
Doug MacEachern
dougm commented

This likely isn't the correct fix, but found this issue while testing DataWatch.
Against a single zk server, client._data_watchers accumulate with each restart
of the zk server. After restarting the zk server, a DataWatch callback for given
path will be called twice. Restart again, called three times, and so on.

Without the client.py change to reset_watchers, you should see the following:
$ ZOOKEEPER_PATH=$PWD/bin/zookeeper ./bin/nosetests kazoo.tests.test_watchers:KazooDataWatcherRestartTests.test_data_watcher

File "/.../kazoo/kazoo/tests/test_watchers.py", line 507, in test_data_watcher
eq_(len(events), 2)
AssertionError: 3 != 2

Doug MacEachern
dougm commented

Any thoughts on this one? Here's a standalone example of the issue: https://gist.github.com/dougm/5399180
Against brew installed zookeeper, number of watcher callbacks increases with each server restart:

$ python kz-watcher.py

kz> Version: 2, data: foo

$ zkCli set /some/path foo

kz> Version: 3, data: foo

$ zkServer restart

kz> Version: 4, data: foo
kz> Version: 4, data: foo

$ zkServer restart
$ zkCli set /some/path foo

kz> Version: 4, data: foo
kz> Version: 4, data: foo
kz> Version: 4, data: foo

Ben Bangert
Owner

Yea, I'm wondering how this was in there so long before it was caught. THANKS!

Ben Bangert bbangert merged commit 385e632 into from
Ben Bangert bbangert closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 1 unique commit by 1 author.

Mar 22, 2013
Doug MacEachern dougm Reset watchers on lost connection 5fcd738
This page is out of date. Refresh to see the latest.

Showing 2 changed files with 53 additions and 2 deletions. Show diff stats Hide diff stats

  1. +6 2 kazoo/client.py
  2. +47 0 kazoo/tests/test_watchers.py
8 kazoo/client.py
@@ -223,12 +223,15 @@ def _reset(self):
223 223 """Resets a variety of client states for a new connection."""
224 224 self._queue = deque()
225 225 self._pending = deque()
226   - self._child_watchers = defaultdict(list)
227   - self._data_watchers = defaultdict(list)
228 226
  227 + self._reset_watchers()
229 228 self._reset_session()
230 229 self.last_zxid = 0
231 230
  231 + def _reset_watchers(self):
  232 + self._child_watchers = defaultdict(list)
  233 + self._data_watchers = defaultdict(list)
  234 +
232 235 def _reset_session(self):
233 236 self._session_id = None
234 237 self._session_passwd = b'\x00' * 16
@@ -335,6 +338,7 @@ def _session_callback(self, state):
335 338 self._live.clear()
336 339 self._notify_pending(state)
337 340 self._make_state_change(KazooState.SUSPENDED)
  341 + self._reset_watchers()
338 342
339 343 def _notify_pending(self, state):
340 344 """Used to clear a pending response queue and request queue
47 kazoo/tests/test_watchers.py
@@ -5,6 +5,7 @@
5 5 from nose.tools import eq_
6 6 from nose.tools import raises
7 7
  8 +from kazoo.protocol.states import KazooState
8 9 from kazoo.testing import KazooTestCase
9 10
10 11
@@ -458,3 +459,49 @@ def test_watch_iterations(self):
458 459
459 460 children, asy = result.get()
460 461 eq_(len(children), 2)
  462 +
  463 +
  464 +class KazooDataWatcherRestartTests(KazooTestCase):
  465 + def setUp(self):
  466 + self.setup_zookeeper(randomize_hosts=False)
  467 + self.path = "/" + uuid.uuid4().hex
  468 + self.client.ensure_path(self.path)
  469 +
  470 + def test_data_watcher(self):
  471 + update = threading.Event()
  472 + ev = threading.Event()
  473 + events = []
  474 +
  475 + # Make it a non-existent path
  476 + self.path += 'f'
  477 +
  478 + @self.client.DataWatch(self.path, allow_missing_node=True)
  479 + def changed(data, stat):
  480 + if data is not None:
  481 + events.append(stat)
  482 + update.set()
  483 +
  484 + @self.client.add_listener
  485 + def listen(state):
  486 + if state != KazooState.LOST:
  487 + ev.set()
  488 +
  489 + self.client.create(self.path, b'fred')
  490 + update.wait()
  491 + update.clear()
  492 + eq_(len(events), 1)
  493 +
  494 + self.cluster[0].stop()
  495 + ev.wait(30)
  496 +
  497 + time.sleep(3)
  498 +
  499 + self.cluster[0].run()
  500 + ev.wait(30)
  501 +
  502 + self.client.set(self.path, b'said')
  503 + update.wait()
  504 + update.clear()
  505 +
  506 + time.sleep(1)
  507 + eq_(len(events), 2)

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.