Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Make reconnections re-establish all watches.
Browse files Browse the repository at this point in the history
  • Loading branch information
Brady Catherman committed Nov 9, 2010
1 parent 8c07468 commit 8dc6012
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
4 changes: 4 additions & 0 deletions README
Expand Up @@ -176,6 +176,10 @@ include "retry x times" "fail but keep watch" "fail and stop watching"
7. Change Log
=============

Release: 1.5

2010/11/08: Make connection re-establish all existing watches.

Release: 1.4

2010/10/25: Fix client session expiration recovery.
Expand Down
2 changes: 1 addition & 1 deletion twitcher.spec
@@ -1,4 +1,4 @@
%define VERSION 1.4
%define VERSION 1.5
%define RELEASE 1

Name: twitcher
Expand Down
30 changes: 25 additions & 5 deletions twitcher/zkwrapper.py
Expand Up @@ -54,12 +54,19 @@ def __init__(self, servers):

def _global_watch(self, zh, event, state, path):
"""Called when the connection to zookeeper has a state change."""
logging.debug("Global watch fired: %r %r %r" % (event, state, path))
logging.debug('Global watch fired: %s %s %s' % (event, state, path))
if state == zookeeper.EXPIRED_SESSION_STATE:
self._clientid = None
self._connect()
elif state == zookeeper.CONNECTED_STATE:
self._clientid = zookeeper.client_id(self._zookeeper)
# Re-get all existing watched files.
logging.debug('Registering watches on existing watch objects.')
for path in self._watches.iterkeys():
logging.debug('Registering watch against: %s' % path)
h = self._handler_wrapper(path)
zookeeper.aget(self._zookeeper, path, self._watcher, h)

# Catch up all gets requested before we were able to connect.
while self._pending_gets:
path, w, h = self._pending_gets.pop()
Expand Down Expand Up @@ -99,6 +106,21 @@ def _connect(self):
except Exception, e:
logging.error('Unexpected error: %r', e)

def _handler_wrapper(self, path):
"""Returns a lambda function that wraps the self._handler call.
This returns a lambda function that actually wraps the self._handler
function in order to add path data which is not normally exposed to
the client.
Args:
path: The zookeeper path being watched.
Returns:
A lambda object.
"""
return (lambda z, r, d, s: self._handler(z, r, d, s, path))

def aget(self, path, watcher=None, handler=None):
"""A simple wrapper for zookeeper async get function.
Expand Down Expand Up @@ -138,9 +160,7 @@ def aget(self, path, watcher=None, handler=None):
w = self._watcher
else:
w = None
# We use a lambda here so we can make sure that the path gets appended
# to the args. This allows us to multiplex the call.
h = (lambda zh, rc, data, stat: self._handler(zh, rc, data, stat, path))
h = self._handler_wrapper(path)
logging.debug('Performing a get against %s', path)
zookeeper.aget(self._zookeeper, path, w, h)

Expand Down Expand Up @@ -237,5 +257,5 @@ def _handler(self, zh, rc, data, stat, path):
handler(self, rc, data, path)
elif rc == zookeeper.CONNECTIONLOSS:
logging.info('Watch event triggered for %s: Connection loss.', path)
h = (lambda zh, rc, data, stat: self._handler(zh, rc, data, stat, path))
h = self._handler_wrapper(path)
self._pending_gets.append((path, self._watcher, h))

0 comments on commit 8dc6012

Please sign in to comment.