Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial commit.

  • Loading branch information...
commit 1b96b4d00e8ea27eee36eb733bfc417f217c34a0 0 parents
Johan Rydberg authored
21 LICENSE
@@ -0,0 +1,21 @@
+Based on node-gossip, which has the following license:
+
+Copyright 2010 Bob Potter. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to
+deal in the Software without restriction, including without limitation the
+rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+sell copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+IN THE SOFTWARE.
129 test.py
@@ -0,0 +1,129 @@
+from txgossip.gossip import Gossiper, _address_to_peer_name
+from twisted.internet import reactor
+import random
+
+
+CNT = 20
+cnt = 0
+
+
+class Participant:
+
+ def __init__(self, name):
+ self.name = name
+
+ def value_changed(self, peer, key, value):
+ if key == '__heartbeat__':
+ return
+ #if peer != self.name:
+ # print self.name, "saw", peer, "change", key, "to", value
+ if key == 'x':
+ global cnt
+ cnt += 1
+ if cnt == CNT:
+ print reactor.seconds()
+ #print "DONE"
+ if key == '/leader-election/vote':
+ try:
+ vote = self.gossiper.get_local_value('/leader-election/vote')
+ # check consensus:
+ for peer in self.gossiper.live_peers():
+ v = self.gossiper.get_peer_value(peer, '/leader-election/vote')
+ if v != vote:
+ #print "no consensus", peer, "voted on", v, "(i like", vote, ")"
+ return
+ except KeyError:
+ #print "key error in vote"
+ return
+ #print "got consensus on votes"
+ vote = self.gossiper.set_local_state(
+ '/leader-election/master', v)
+ elif key == '/leader-election/master':
+ try:
+ vote = self.gossiper.get_local_value('/leader-election/master')
+ # check consensus:
+ for peer in self.gossiper.live_peers():
+ v = self.gossiper.get_peer_value(peer, '/leader-election/master')
+ if v != vote:
+ return
+ except KeyError:
+ return
+ print self.name, "WE GOT A NEW MASTER", vote, reactor.seconds()
+
+ def peer_alive(self, peer):
+ print self.name, "thinks", peer, "is alive", self.gossiper.get_peer_keys(peer)
+ print self.gossiper.get_peer_value(peer, '/leader-election/priority')
+ self._start_election()
+
+ _election_timeout = None
+
+ def _vote(self):
+ self._election_timeout = None
+ suggested_peer = self.gossiper.name
+ arrogance = self.gossiper.get_local_value('/leader-election/priority')
+ for peer in self.gossiper.live_peers():
+ p = self.gossiper.get_peer_value(peer, '/leader-election/priority')
+ #print self.name, "local arrogance is", repr(arrogance), "where", peer, "has", repr(p), p > arrogance
+ if p > arrogance:
+ suggested_peer = peer
+ arrogance = p
+ print self.name, "votes for", suggested_peer
+ try:
+ current_master = self.gossiper.get_local_value(
+ '/leader-election/vote')
+ if current_master == suggested_peer:
+ #print self.name, "no need to update master"
+ return
+ except KeyError:
+ pass
+ self.gossiper.set_local_state('/leader-election/vote', suggested_peer)
+
+ def _start_election(self):
+ if self._election_timeout is not None:
+ self._election_timeout.cancel()
+ self._election_timeout = reactor.callLater(5, self._vote)
+
+ def peer_dead(self, peer):
+ print self.name, "thinks", peer, "is dead"
+ self._start_election()
+
+ def peer_stable(self, peer):
+ print "stable", peer
+
+
+members = []
+
+for i in range(0, CNT):
+ participant = Participant('127.0.0.1:%d' % (9000+i))
+ gossiper = Gossiper(reactor, '127.0.0.1:%d' % (9000+i), participant)
+ gossiper.set_local_state('/leader-election/priority', i)
+ p = reactor.listenUDP(9000+i, gossiper)
+ members.append((gossiper, p, participant))
+
+for i in range(1, CNT):
+ members[i][0].handle_new_peers(['127.0.0.1:9000'])
+
+seed = members[0][0]
+
+def prop_test():
+ print "START PROP TEST"
+ print reactor.seconds()
+ seed.set_local_state('x', 'value')
+
+pending = []
+
+def kill_some():
+ if len(members) > (CNT / 2):
+ i = random.randint(0, len(members) - 1)
+ gossiper, p, participant = members.pop(i)
+ print "killing", p.getHost()
+ p.stopListening()
+ #reactor.callLater(5, kill_some)
+
+def test():
+ seed.set_local_state('test', 'value')
+ reactor.callLater(5, prop_test)
+ reactor.callLater(30, kill_some)
+
+reactor.callWhenRunning(test)
+reactor.run()
0  txgossip/__init__.py
No changes.
51 txgossip/detector.py
@@ -0,0 +1,51 @@
+# Copyright (C) 2011 Johan Rydberg
+# Copyright (C) 2010 Bob Potter
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import math
+
+
+class FailureDetector(object):
+
+ def __init__(self):
+ self.last_time = None
+ self.intervals = []
+
+ def add(self, arrival_time):
+ last_time, self.last_time = self.last_time, arrival_time
+ if last_time is None:
+ i = 0.75
+ else:
+ i = arrival_time - last_time
+ self.intervals.append(i)
+ if len(self.intervals) > 1000:
+ self.intervals.pop(0)
+
+ def phi(self, current_time):
+ if self.last_time is None:
+ return 0
+ current_interval = current_time - self.last_time
+ exp = -1 * current_interval / self.interval_mean()
+ return -1 * (math.log(pow(math.e, exp)) / math.log(10))
+
+ def interval_mean(self):
+ return sum(self.intervals) / float(len(self.intervals))
154 txgossip/gossip.py
@@ -0,0 +1,154 @@
+# Copyright (C) 2011 Johan Rydberg
+# Copyright (C) 2010 Bob Potter
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+import random
+import json
+
+from txgossip.state import PeerState
+from txgossip.scuttle import Scuttle
+from twisted.python import log
+from twisted.internet.protocol import DatagramProtocol
+from twisted.internet import task
+
+
+def _address_from_peer_name(name):
+ address, port = name.split(':', 1)
+ return address, int(port)
+
+def _address_to_peer_name(address):
+ return '%s:%d' % (address.host, address.port)
+
+
+class Gossiper(DatagramProtocol):
+
+ def __init__(self, clock, name, participant):
+ self.state = PeerState(clock, name, participant)
+ self.peers = {}
+ self.name = name
+ self.scuttle = Scuttle(self.peers, self.state)
+ self._heart_beat_timer = task.LoopingCall(self.beat_heart)
+ self._heart_beat_timer.clock = clock
+ self._gossip_timer = task.LoopingCall(self.gossip)
+ self._gossip_timer.clock = clock
+ self.clock = clock
+ self.participant = participant
+ self.participant.gossiper = self
+
+ def handle_new_peers(self, seeds):
+ #print self.name, "HANDLE NEW", seeds
+ for peer_name in seeds:
+ if peer_name in self.peers:
+ continue
+ self.peers[peer_name] = PeerState(self.clock,
+ peer_name, self.participant)
+
+ def startProtocol(self):
+ self._heart_beat_timer.start(1, now=True)
+ self._gossip_timer.start(1, now=True)
+ self.peers[self.name] = self.state
+
+ def stopProtocol(self):
+ self._gossip_timer.stop()
+ self._heart_beat_timer.stop()
+
+ def beat_heart(self):
+ self.state.beat_that_heart()
+
+ def datagramReceived(self, data, address):
+ self.handle_message(json.loads(data), address)
+
+ def gossip(self):
+ live_peers = self.live_peers()
+ dead_peers = self.dead_peers()
+ if live_peers:
+ self.gossip_with_peer(random.choice(live_peers))
+
+ prob = len(dead_peers) / float(len(live_peers) + 1)
+ if random.random() < prob:
+ self.gossip_with_peer(random.choice(dead_peers))
+
+ for state in self.peers.values():
+ state.check_suspected()
+
+ def request_message(self):
+ return json.dumps({
+ 'type': 'request', 'digest': self.scuttle.digest()
+ })
+
+ def gossip_with_peer(self, peer):
+ #print self.name, "will gossip with", peer
+ self.transport.write(self.request_message(),
+ _address_from_peer_name(peer))
+
+ def handle_message(self, message, address):
+ """Handle an incoming message."""
+ if message['type'] == 'request':
+ self.handle_request(message, address)
+ elif message['type'] == 'first-response':
+ self.handle_first_response(message, address)
+ elif message['type'] == 'second-response':
+ self.handle_second_response(message, address)
+
+ def handle_request(self, message, address):
+ deltas, requests, new_peers = self.scuttle.scuttle(
+ message['digest'])
+ #print self.name, "got a request", deltas, requests, new_peers
+ self.handle_new_peers(new_peers)
+ # Send a first response to the one requesting our data.
+ response = json.dumps({
+ 'type': 'first-response', 'digest': requests, 'updates': deltas
+ })
+ self.transport.write(response, address)
+
+ def handle_first_response(self, message, address):
+ self.scuttle.update_known_state(message['updates'])
+ response = json.dumps({
+ 'type': 'second-response',
+ 'updates': self.scuttle.fetch_deltas(
+ message['digest'])
+ })
+ self.transport.write(response, address)
+
+ def handle_second_response(self, message, address):
+ self.scuttle.update_known_state(message['updates'])
+
+ def set_local_state(self, key, value):
+ self.state.update_local(key, value)
+
+ def get_local_value(self, key):
+ return self.state.attrs[key][0]
+
+ def get_peer_value(self, peer, key):
+ return self.peers[peer].attrs[key][0]
+
+ def get_peer_keys(self, peer):
+ return self.peers[peer].attrs.keys()
+
+ def live_peers(self):
+ return [n for (n, p) in self.peers.items()
+ if p.alive and n != self.name]
+
+ def dead_peers(self):
+ return [n for (n, p) in self.peers.items()
+ if not p.alive and n != self.name]
+
81 txgossip/scuttle.py
@@ -0,0 +1,81 @@
+# Copyright (C) 2011 Johan Rydberg
+# Copyright (C) 2010 Bob Potter
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+from txgossip.state import PeerState
+
+
+class Scuttle(object):
+
+ def __init__(self, peers, local_peer):
+ self.peers = peers
+ self.local_peer = local_peer
+
+ def digest(self):
+ digest = {}
+ for peer, state in self.peers.items():
+ digest[peer] = state.max_version_seen
+ return digest
+
+ def scuttle(self, digest):
+ deltas_with_peer = []
+ requests = {}
+ new_peers = []
+ for peer, digest_version in digest.items():
+ if not peer in self.peers:
+ requests[peer] = 0
+ new_peers.append(peer)
+ else:
+ state = self.peers[peer]
+ if state.max_version_seen > digest_version:
+ deltas_with_peer.append((
+ peer,
+ self.peers[peer].deltas_after_version(digest_version)
+ ))
+ elif state.max_version_seen < digest_version:
+ requests[peer] = state.max_version_seen
+
+ # Sort by peers with most deltas
+ def sort_metric(a, b):
+ return len(b[1]) - len(a[1])
+ deltas_with_peer.sort(cmp=sort_metric)
+
+ deltas = []
+ for (peer, peer_deltas) in deltas_with_peer:
+ for (key, value, version) in peer_deltas:
+ deltas.append((peer, key, value, version))
+
+ return deltas, requests, new_peers
+
+ def update_known_state(self, deltas):
+ for peer, key, value, version in deltas:
+ self.peers[peer].update_with_delta(
+ key, value, version)
+
+ def fetch_deltas(self, requests):
+ deltas = []
+ for peer, version in requests.items():
+ peer_deltas = self.peers[peer].deltas_after_version(
+ version)
+ for (key, value, version) in peer_deltas:
+ deltas.append((peer, key, value, version))
+ return deltas
97 txgossip/state.py
@@ -0,0 +1,97 @@
+# Copyright (C) 2011 Johan Rydberg
+# Copyright (C) 2010 Bob Potter
+#
+# Permission is hereby granted, free of charge, to any person
+# obtaining a copy of this software and associated documentation files
+# (the "Software"), to deal in the Software without restriction,
+# including without limitation the rights to use, copy, modify, merge,
+# publish, distribute, sublicense, and/or sell copies of the Software,
+# and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+from txgossip.detector import FailureDetector
+
+
+class PeerState(object):
+ PHI = 8
+
+ def __init__(self, clock, name, participant):
+ self.clock = clock
+ self.participant = participant
+ self.max_version_seen = 0
+ self.attrs = {}
+ self.detector = FailureDetector()
+ self.alive = False
+ self.heart_beat_version = 0
+ self.name = name
+
+ def update_with_delta(self, k, v, n):
+ """."""
+ # It's possibly to get the same updates more than once if
+ # we're gossiping with multiple peers at once ignore them
+ if n > self.max_version_seen:
+ self.max_version_seen = n
+ self.set_key(k,v,n)
+ if k == '__heartbeat__':
+ self.detector.add(self.clock.seconds())
+
+ def update_local(self, k, v):
+ # This is used when the peerState is owned by this peer
+ self.max_version_seen += 1
+ self.set_key(k, v, self.max_version_seen)
+
+ def __getitem__(self, key):
+ return self.attrs[key][0]
+
+ def keys(self):
+ return self.attrs.keys()
+
+ def set_key(self, k, v, n):
+ self.attrs[k] = (v, n)
+ self.participant.value_changed(self.name, k, v)
+
+ def beat_that_heart(self):
+ self.heart_beat_version += 1
+ self.update_local('__heartbeat__', self.heart_beat_version);
+
+ def deltas_after_version(self, lowest_version):
+ """
+ Return sorted by version.
+ """
+ deltas = []
+ for key, (value, version) in self.attrs.items():
+ if version > lowest_version:
+ deltas.append((key, value, version))
+ deltas.sort(key=lambda kvv: kvv[2])
+ return deltas
+
+ def check_suspected(self):
+ phi = self.detector.phi(self.clock.seconds())
+ if phi > self.PHI or phi == 0:
+ self.mark_dead()
+ return True
+ else:
+ self.mark_alive()
+ return False
+
+ def mark_alive(self):
+ alive, self.alive = self.alive, True
+ if not alive:
+ self.participant.peer_alive(self.name)
+
+ def mark_dead(self):
+ if self.alive:
+ self.alive = False
+ self.participant.peer_dead(self.name)
Please sign in to comment.
Something went wrong with that request. Please try again.