Permalink
Browse files

making leadership more interchangeable with cluster coordinator

  • Loading branch information...
1 parent 4e211f6 commit 127529c0caf25731470930f1070aa9e160d2f428 @progrium committed Mar 8, 2012
Showing with 36 additions and 16 deletions.
  1. +1 −0 config/gateway.conf.py
  2. +1 −1 config/numbers.conf.py
  3. +8 −0 gtutorial/cluster.py
  4. +21 −14 gtutorial/coordination.py
  5. +5 −1 gtutorial/gateway.py
View
@@ -2,6 +2,7 @@
identity = os.environ.get('IDENTITY', '127.0.0.1')
leader = os.environ.get('LEADER')
+cluster = os.environ.get('CLUSTER', '127.0.0.1').split(',')
def service():
from gtutorial.gateway import NumberGateway
View
@@ -1,4 +1,4 @@
-rate_per_minute = 320
+rate_per_minute = 60
def service():
from gtutorial.numbers import NumberServer
View
@@ -42,6 +42,14 @@ def __init__(self, identity, leader=None, cluster=None):
def wait_for_promotion(self):
self.promoted.wait()
+ @property
+ def leader(self):
+ return self.client.leader
+
+ @property
+ def identity(self):
+ return self.client.identity
+
class PeerServer(Service):
def __init__(self, coordinator, identity):
self.c = coordinator
View
@@ -1,6 +1,7 @@
import time
import gevent
from gevent import Timeout
+from gevent.event import Event
from gevent_zeromq import zmq
from ginkgo.core import Service, autospawn
@@ -10,9 +11,11 @@ class Leadership(Service):
port = Setting('leader_port', default=12345)
heartbeat_interval = Setting('leader_heartbeat_interval_secs', default=3)
- def __init__(self, identity, cluster, zmq_):
- self._identity = identity
- self._leader = None
+ def __init__(self, identity, cluster, zmq_=None):
+ zmq_ = zmq_ or zmq.Context()
+ self.identity = identity
+ self.leader = None
+ self.set = cluster
self._candidates = sorted(list(cluster))
self._promoted = Event()
self._broadcaster = zmq_.socket(zmq.PUB)
@@ -21,57 +24,61 @@ def __init__(self, identity, cluster, zmq_):
@property
def is_leader(self):
- return self._identity == self._leader
+ return self.identity == self.leader
def wait_for_promotion(self):
self._promoted.wait()
def do_start(self):
- self._broadcaster.bind("tcp://{}:{}".format(self._identity, self.port))
+ self._broadcaster.bind("tcp://{}:{}".format(self.identity, self.port))
self._broadcast_when_promoted()
self._listen_for_heartbeats()
self._next_leader()
def _next_leader(self):
- self._leader = self._candidates.pop(0)
+ self.leader = self._candidates.pop(0)
if self.is_leader:
self._promoted.set()
else:
- self._listener.connect("tcp://{}:{}".format(self._leader, self.port))
+ self._listener.connect("tcp://{}:{}".format(self.leader, self.port))
@autospawn
def _broadcast_when_promoted(self):
self.wait_for_promotion()
while self.is_leader:
- self._broadcaster.send(self._identity)
+ self._broadcaster.send(self.identity)
gevent.sleep(self.heartbeat_interval)
@autospawn
def _listen_for_heartbeats(self):
while not self.is_leader:
+ leader = None
with Timeout(self.heartbeat_interval * 2, False) as timeout:
leader = self._listener.recv()
if leader is None:
self._next_leader()
class Announcer(Service):
- def __init__(self, hub, cluster, identity):
+ def __init__(self, hub, cluster):
self.hub = hub
self.cluster = cluster
- self.identity = identity
def do_start(self):
self._announce()
@autospawn
def _announce(self):
while True:
- if self.identity in self.cluster:
- cluster_snapshot = sorted(list(self.cluster))
- identity_index = cluster_snapshot.index(self.identity)
+ if self.cluster.identity in self.cluster.set:
+ cluster_snapshot = sorted(list(self.cluster.set))
+ identity_index = cluster_snapshot.index(self.cluster.identity)
announcer_index = int(time.time()) % len(cluster_snapshot)
if announcer_index is identity_index:
- self.hub.publish("/announce", self.identity)
+ if self.cluster.is_leader:
+ announcement = "{}*".format(self.cluster.identity)
+ else:
+ announcement = self.cluster.identity
+ self.hub.publish("/announce", announcement)
gevent.sleep(1)
View
@@ -8,19 +8,23 @@
from .numbers import NumberClient
from .messaging.hub import MessageHub
from .coordination import Announcer
+from .coordination import Leadership
from .cluster import ClusterCoordinator
+from .util import ObservableSet
logger = logging.getLogger(__name__)
class NumberGateway(Service):
identity = Setting('identity', default='127.0.0.1')
leader = Setting('leader', default=None)
+ cluster_ = Setting('cluster', default=['127.0.0.1'])
def __init__(self):
self.client = NumberClient(('127.0.0.1', 7776))
self.cluster = ClusterCoordinator(self.identity, self.leader)
+ #self.cluster = Leadership(self.identity, ObservableSet(self.cluster_))
self.hub = MessageHub(self.cluster.set, self.identity)
- self.announcer = Announcer(self.hub, self.cluster.set, self.identity)
+ self.announcer = Announcer(self.hub, self.cluster)
self.add_service(self.cluster)
self.add_service(self.hub)

0 comments on commit 127529c

Please sign in to comment.