Skip to content

Loading…

Features/incremental sync #28

Open
wants to merge 15 commits into from

1 participant

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on May 7, 2013
  1. Tweak begin_handshake to incrementally fetch DBHASHs

    committed
    This will still grow exponentially, but will be very cheap (16 requests)
    for db's that are in sync, regardless of size.
    
    The drawback is that with the current implementation, each of these
    requests will trigger a full DB scan at both ends.
  2. ! Implement a handler for describehash

    committed
    This breaks the existing object caching API
  3. Fix calling and style

    committed
  4. Fixup for tick function

    committed
  5. More of the DBHASH handshake

    committed
  6. Missing events include

    committed
  7. Handle our hash request

    committed
Commits on May 27, 2013
  1. Refactor handshake lookup

    committed
This page is out of date. Refresh to see the latest.
View
0 groundstation/events/__init__.py
No changes.
View
2 groundstation/settings.py
@@ -4,6 +4,8 @@
DEFAULT_CACHE_LIFETIME=30 # 30 Seconds
STORAGE_BACKEND="git"
+HANDSHAKE_PROTOCOL="NAIVE"
+
LISTALLOBJECTS_CHUNK_THRESHOLD=1024
LISTALLCHANNELS_RETRY_TIMEOUT=5
View
39 groundstation/stream_client.py
@@ -5,6 +5,38 @@
import groundstation.logger
log = groundstation.logger.getLogger(__name__)
+HANDSHAKE_PROTOCOLS = {}
+def register_handshake(name):
+ def _(func):
+ HANDSHAKE_PROTOCOLS[name] = func
+ return _
+
+
+@register_handshake("NAIVE")
+def _(self, station):
+ request = Request("LISTDBHASH", payload="", station=station, stream=self)
+ station.register_request(request)
+ self.enqueue(request)
+
+
+@register_handshake("INCREMENTAL")
+def _(self, station):
+ prefixes = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
+ 'a', 'b', 'c', 'd', 'e', 'f']
+
+ def terminate(self):
+ if not prefixes:
+ pass
+ prefix = prefixes.pop(0)
+ request = Request("LISTDBHASH", payload=prefix, station=station, stream=self)
+ request.terminate = terminate
+ station.register_request(request)
+ self.enqueue(request)
+
+ # Our terminate handler kicks of a fetch of the next DB hash prefix. We
+ # bump it to start the handshake.
+ terminate(self)
+
class StreamClient(StreamSocket):
def __init__(self, addr):
@@ -14,7 +46,6 @@ def __init__(self, addr):
self.socket.connect((addr, settings.PORT))
self.socket.setblocking(False)
- def begin_handshake(self, station):
- request = Request("LISTALLOBJECTS", station=station, stream=self)
- station.register_request(request)
- self.enqueue(request)
+ def begin_handshake(self, station, protocol=None):
+ protocol = protocol or settings.HANDSHAKE_PROTOCOL
+ HANDSHAKE_PROTOCOLS[protocol](self, station)
View
9 groundstation/transfer/request_handlers/listallobjects.py
@@ -9,6 +9,7 @@
def handle_listallobjects(self):
+ # XXX This is going to be obsolete soon
if not self.station.recently_queried(self.origin):
log.info("%s not up to date, issuing LISTALLOBJECTS" % (self.origin))
# Pass in the station for gizmo_factory in the constructor
@@ -18,7 +19,13 @@ def handle_listallobjects(self):
else:
log.info("object cache for %s still valid" % (self.origin))
log.info("Handling LISTALLOBJECTS")
- payload = [groundstation.utils.oid2hex(i) for i in self.station.objects()]
+ prefix = self.payload
+ payload = []
+ for i in self.station.objects():
+ name = groundstation.utils.oid2hex(i)
+ if name.startswith(prefix):
+ payload.append(name)
+
if len(payload) > settings.LISTALLOBJECTS_CHUNK_THRESHOLD:
log.info("Lots of objects to send, registering an iterator")
View
1 groundstation/transfer/response.py
@@ -51,6 +51,7 @@ def process(self):
"TRANSFER": response_handlers.handle_transfer,
"DESCRIBEOBJECTS": response_handlers.handle_describeobjects,
"DESCRIBECHANNELS": response_handlers.handle_describechannels,
+ "DESCRIBEHASH": response_handlers.handle_describehash,
"TERMINATE": response_handlers.handle_terminate,
}
View
1 groundstation/transfer/response_handlers/__init__.py
@@ -1,4 +1,5 @@
from describeobjects import handle_describeobjects
from describechannels import handle_describechannels
+from describehash import handle_describehash
from transfer import handle_transfer
from terminate import handle_terminate
View
13 groundstation/transfer/response_handlers/describehash.py
@@ -0,0 +1,13 @@
+import groundstation.proto.db_hash_pb2
+
+
+def handle_describehash(self):
+ db_hash = groundstation.proto.db_hash_pb2.DBHash()
+
+ db_hash.ParseFromString(self.payload)
+ if not self.station.get_hash(db_hash.prefix) == db_hash.hash:
+ # Objects are missing. Work out what they are
+ request = self._Request("LISTALLOBJECTS", payload=db_hash.prefix)
+ self.stream.enqueue(request)
+ # We also need to work out how to tell them what we've got. Request-request type?
+ self.TERMINATE()
View
28 test/test_listallobjects_handler.py
@@ -15,6 +15,7 @@ def test_handle_listallobjects_returns_stream_for_few_objects(self):
for i in xrange(64):
oids.append(self.station.station.write("test_%i" % (i)))
+ self.station.payload = ""
handle_listallobjects(self.station)
resp = self.station.stream.pop()
self.assertIsInstance(resp, response.Response)
@@ -30,6 +31,8 @@ def test_follows_up_on_channels(self):
self.station.set_real_terminate(True)
self.station.set_real_id(True)
self.station.set_real_register(True)
+
+ self.station.payload = ""
handle_listallobjects(self.station)
req1 = self.station.stream.pop(0)
self.assertEqual(req1.verb, "LISTALLOBJECTS")
@@ -51,6 +54,7 @@ def test_follows_up_on_channels(self):
class TestHandlerListAllObjectsCached(StationHandlerTestCase):
def test_has_cache(self):
+ self.station.payload = ""
handle_listallobjects(self.station)
req1 = self.station.stream.pop(0)
self.assertEqual(req1.verb, "LISTALLOBJECTS")
@@ -72,7 +76,31 @@ def test_queues_retry(self):
self.assertFalse(self.station.station.has_ready_deferreds())
self.assertEqual(len(self.station.station.deferreds), 0)
+ self.station.payload = ""
handle_listallobjects(self.station)
req1 = self.station.stream.pop(0)
handle_terminate(req1)
self.assertEqual(len(self.station.station.deferreds), 1)
+
+
+class TestListAllObjectsWithPrefix(StationHandlerTestCase):
+ def test_only_fetches_for_prefix(self):
+ # Make ourselves cached
+ self.station.station.mark_queried(self.station.origin)
+ oids = list()
+ for i in xrange(64):
+ oid = self.station.station.write("test_%i" % (i))
+ if oid.startswith("0"):
+ oids.append(oid)
+
+ self.station.payload = "0"
+ handle_listallobjects(self.station)
+ resp = self.station.stream.pop()
+ self.assertIsInstance(resp, response.Response)
+ objects = ObjectList()
+ objects.ParseFromString(resp.payload)
+
+ self.assertEqual(len(objects.objectname), len(oids))
+
+ for i in objects.objectname:
+ self.assertIn(i, oids)
View
64 test/test_station_integration.py
@@ -6,6 +6,8 @@
import tempfile
import shutil
+from collections import defaultdict
+
from groundstation.node import Node
from groundstation.station import Station
@@ -14,6 +16,10 @@
from groundstation.peer_socket import PeerSocket
+import groundstation.events.tcpnetwork_event as tcpnetwork_event
+
+from groundstation.transfer.request_handlers import handle_listdbhash
+
class TestListener(StreamListener):
def __init__(self, path):
@@ -36,7 +42,13 @@ class StationIntegrationFixture(unittest.TestCase):
def setUp(self):
self.dir = tempfile.mkdtemp()
self.node = Node()
- self.station = Station(os.path.join(self.dir, "station"), self.node)
+ current_station = [-1]
+
+ def new_station():
+ current_station[0] += 1
+ return Station(os.path.join(self.dir, "station%d" % current_station[0]), self.node)
+
+ self.stations = defaultdict(new_station)
def tearDown(self):
shutil.rmtree(self.dir)
@@ -72,8 +84,56 @@ def tick():
self.assertEqual(len(swrite), 1)
- client.begin_handshake(self.station)
+ client.begin_handshake(self.stations[0])
client.send()
(sread, swrite, _) = tick()
self.assertEqual(sread, [peer])
+
+class StationHandshakeTestCase(StationIntegrationFixture):
+ def test_handshake(self):
+ active = self.stations["active"]
+ passive = self.stations["passive"]
+
+ # Put some unique objects in each station
+
+ for i in xrange(100):
+ active.write("active%d" % i)
+ passive.write("passive%d" % i)
+
+ read_sockets = [];
+ write_sockets = [];
+ def tick():
+ for i in write_sockets:
+ while i.has_data_ready():
+ i.send()
+
+ sread, swrite, sexc = select.select(read_sockets, [], [], 1)
+ if sexc:
+ assert False, "Sockets kerploded"
+
+ return sread
+
+ addr = os.path.join(self.dir, "listener")
+ listener = TestListener(addr)
+
+ client = TestClient(addr)
+ write_sockets.append(client)
+
+ # Get our client
+ peer = listener.accept(PeerSocket)
+ read_sockets.append(peer)
+ write_sockets.append(peer)
+
+ # Start out handlshake
+ client.begin_handshake(passive)
+ client.send()
+ sread = tick()
+
+ self.assertEqual(len(sread), 1)
+ self.assertIsInstance(sread[0], PeerSocket)
+ for payload in tcpnetwork_event.payloads(sread[0]):
+ gizmo = active.gizmo_factory.hydrate(payload, peer)
+ self.assertEqual(gizmo.verb, "LISTDBHASH")
+ self.assertEqual(gizmo.payload, "")
+ handle_listdbhash(gizmo)
Something went wrong with that request. Please try again.