Skip to content

Commit

Permalink
Merge branch 'master' into features/incremental_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
richo committed May 7, 2013
2 parents 9d7fbe5 + cff5617 commit a584cb5
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 21 deletions.
4 changes: 4 additions & 0 deletions groundstation/events/tcpnetwork_event.py
@@ -0,0 +1,4 @@
def payloads(sock):
sock.recv()
while sock.packet_queue:
yield sock.packet_queue.pop()
34 changes: 13 additions & 21 deletions stationd
Expand Up @@ -23,6 +23,8 @@ from groundstation.broadcast_events import new_broadcast_event, UnknownBroadcast
from groundstation.broadcast_events.broadcast_ping import BroadcastPing
from groundstation.gizmo_factory import InvalidGizmoError

import groundstation.events.tcpnetwork_event as tcpnetwork_event

import groundstation.deferred

PORT = settings.PORT
Expand Down Expand Up @@ -55,9 +57,9 @@ def _write_sockets():
write_sockets.append(i)
return write_sockets

def handle_discoverer_event():
def handle_discoverer_event(sock):
# TODO Return Event objects instead of raw data
data, peer = discoverer.recv(settings.DEFAULT_BUFSIZE)
data, peer = sock.recv(settings.DEFAULT_BUFSIZE)
try:
event = new_broadcast_event(data)
except UnknownBroadcastEvent:
Expand Down Expand Up @@ -86,33 +88,23 @@ def handle_discoverer_event():
log.info("Peer's uuid is smaller, awaiting connection")


def handle_listener_event():
peer = listener.accept(PeerSocket)
def handle_listener_event(sock):
peer = sock.accept(PeerSocket)
peer_sockets.append(peer)


def handle_tcpnetwork_event():
def handle_tcpnetwork_event(sock):
try:
i.recv()
while i.packet_queue:
data = i.packet_queue.pop()
gizmo = station.gizmo_factory.hydrate(data, i)
for payload in tcpnetwork_event.payloads(sock):
gizmo = station.gizmo_factory.hydrate(payload, sock)
assert gizmo is not None, "gizmo_factory returned None"
gizmo.process()
except SocketClosedException:
peer_sockets.remove(i)
peer_sockets.remove(sock)
except InvalidGizmoError:
log.warn("Recieved invalid gizmo!")


def handle_iterators():
station.handle_iters()


def handle_deferreds():
station.handle_deferreds()


def handler_for(thing):
if thing == discoverer:
return handle_discoverer_event
Expand Down Expand Up @@ -142,7 +134,7 @@ while True:
log.info("Got %i fds. read: %s write: %i" % (len(sread) + len(swrite), len(sread), len(swrite)))

for i in sread:
handler_for(i)()
handler_for(i)(i)

for i in swrite:
if not (isinstance(i, PeerSocket) or isinstance(i, StreamClient)):
Expand All @@ -164,9 +156,9 @@ while True:

# Deal with iterators last, this way if they queue up network stuff it'll be processed immediately.
if station.has_ready_iterators():
handle_iterators()
station.handle_iters()

if station.has_ready_deferreds():
handle_deferreds()
station.handle_deferreds()

# else: # Right now that's the only socket

0 comments on commit a584cb5

Please sign in to comment.