Skip to content

Commit

Permalink
initial simple leader promotion
Browse files Browse the repository at this point in the history
  • Loading branch information
progrium committed Mar 6, 2012
1 parent 6ab360a commit 4bce960
Showing 1 changed file with 49 additions and 0 deletions.
49 changes: 49 additions & 0 deletions gtutorial/leader.py
@@ -0,0 +1,49 @@

class Leadership(Service):
cluster = Setting('cluster', default=[])
port = Setting('leader_port', default=12345)
heartbeat_interval = Setting('leader_heartbeat_interval_secs', default=5)

def __init__(self, context, identity):
self._identity = identity
self._candidates = sorted(self.cluster)
self._promoted = Event()
self._broadcaster = context.socket(zmq.PUB)
self._listener = context.socket(zmq.SUB)
self._listener.setsockopt(zmq.SUBSCRIBE, '')

@property
def is_leader(self):
return self._identity is self._leader

def wait_for_promotion(self):
self._promoted.wait()

def do_start(self):
self._broadcaster.bind("tcp://0.0.0.0:{}".format(self.port))
self._broadcast_when_promoted()
self._listen_for_heartbeats()
self._next_leader()

def _next_leader(self):
self._leader = self._candidates.pop(0)
if self.is_leader:
self._promoted.set()
else:
self._listener.connect("tcp://{}:{}".format(self._leader, self.port))

@autospawn
def _broadcast_when_promoted(self):
self.wait_for_promotion()
while self.is_leader:
self._broadcaster.send(self._identity)
gevent.sleep(self.heartbeat_interval)

@autospawn
def _listen_for_heartbeats(self):
while not self.is_leader:
with Timeout(self.heartbeat_interval * 2, False) as timeout:
leader = self._listener.recv()
if leader is None:
self._next_leader()

0 comments on commit 4bce960

Please sign in to comment.