/
stream_client.py
51 lines (40 loc) · 1.52 KB
/
stream_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from sockets.stream_socket import StreamSocket
from transfer.request import Request
import settings
import groundstation.logger
log = groundstation.logger.getLogger(__name__)
HANDSHAKE_PROTOCOLS = {}
def register_handshake(name):
def _(func):
HANDSHAKE_PROTOCOLS[name] = func
return _
@register_handshake("NAIVE")
def _(self, station):
request = Request("LISTDBHASH", payload="", station=station, stream=self)
station.register_request(request)
self.enqueue(request)
@register_handshake("INCREMENTAL")
def _(self, station):
prefixes = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'a', 'b', 'c', 'd', 'e', 'f']
def terminate(self):
if not prefixes:
pass
prefix = prefixes.pop(0)
request = Request("LISTDBHASH", payload=prefix, station=station, stream=self)
request.terminate = terminate
station.register_request(request)
self.enqueue(request)
# Our terminate handler kicks of a fetch of the next DB hash prefix. We
# bump it to start the handshake.
terminate(self)
class StreamClient(StreamSocket):
def __init__(self, addr):
super(StreamClient, self).__init__()
# TODO Pretty sure this should be a struct sockaddr
self.peer = addr
self.socket.connect((addr, settings.PORT))
self.socket.setblocking(False)
def begin_handshake(self, station, protocol=None):
protocol = protocol or settings.HANDSHAKE_PROTOCOL
HANDSHAKE_PROTOCOLS[protocol](self, station)