Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
149 lines (119 sloc) 4.91 KB
#!/usr/bin/env python
import os
import sys
import select
import time
from groundstation import logger
log = logger.getLogger("stationd")
from groundstation import settings
from groundstation.station import Station
from groundstation.sockets.broadcast_socket import BroadcastUnrouteable
from groundstation.broadcast_discoverer import BroadcastDiscoverer
from groundstation.broadcast_announcer import BroadcastAnnouncer
from groundstation.stream_listener import StreamListener
from groundstation.stream_client import StreamClient
from groundstation.node import Node
from groundstation.peer_socket import PeerSocket
from groundstation.sockets.socket_closed_exception import SocketClosedException
from groundstation.peer_socket_pool import PeerSocketPool
from groundstation.broadcast_events import new_broadcast_event, UnknownBroadcastEvent
from groundstation.broadcast_events.broadcast_ping import BroadcastPing
from groundstation.gizmo_factory import InvalidGizmoError
from os.path import expanduser
PORT = settings.PORT
myself = Node()
station_path = expanduser("~/.groundstation")
station = Station(station_path, myself)
discoverer = BroadcastDiscoverer(PORT)
announcer = BroadcastAnnouncer(PORT)
listener = StreamListener(PORT) =
sockets = [discoverer, listener]
peer_sockets = PeerSocketPool()
last_beacon = time.time() - BEACON_TIMEOUT # Gaurantee that we'll announce on the first run
def _read_sockets():
read_sockets = []
for i in sockets: read_sockets.append(i)
for i in peer_sockets: read_sockets.append(i)
return read_sockets
def _write_sockets():
write_sockets = []
for i in peer_sockets:
if i.has_data_ready():
return write_sockets
def handle_discoverer_event():
# TODO Return Event objects instead of raw data
data, peer = discoverer.recv(settings.DEFAULT_BUFSIZE)
event = new_broadcast_event(data)
except UnknownBroadcastEvent:
log.warning("Ooops got some weird event: %s" % (repr(data)))
if isinstance(event, BroadcastPing):
if event.payload == myself.uuid:"Discarding PING from myself")
#elif event.addr == me!
else:"Ping from %s" % str(peer))
if event.payload > myself.uuid:
# Peer's uuid is larger, we should connect and initiate sync
if peer[0] not in peer_sockets:
# Ensure that they're in our object cache- we're updating it now
if not station.recently_queried(event.payload):
client = StreamClient(peer[0])
else: # XXX This should check if we have open transactions.
if not station.recently_queried(event.payload):
client = peer_sockets(peer[0])
# Peer's uuid is smaller, we should do nothing and await connection"Peer's uuid is smaller, awaiting connection")
def handle_listener_event():
peer = listener.accept(PeerSocket)
def handle_tcpnetwork_event():
while i.packet_queue:
data = i.packet_queue.pop()
gizmo = station.gizmo_factory.hydrate(data, i)
assert gizmo is not None, "gizmo_factory returned None"
except SocketClosedException:
except InvalidGizmoError:
log.warn("Recieved invalid gizmo!")
def handle_iterators():
def handler_for(thing):
if thing == discoverer:
return handle_discoverer_event
elif thing == listener:
return handle_listener_event
elif isinstance(thing, PeerSocket) or isinstance(i, StreamClient):
return handle_tcpnetwork_event
raise KeyError
while True:
if time.time() > (last_beacon + BEACON_TIMEOUT):
last_beacon = time.time()
except BroadcastUnrouteable as e:
log.fatal("broadcast was unrouteable")
(sread, swrite, sexc) =, _write_sockets(), [], BEACON_TIMEOUT)"Got %i fds. read: %s write: %i" % (len(sread) + len(swrite), len(sread), len(swrite)))
for i in sread:
for i in swrite:
if isinstance(i, PeerSocket) or isinstance(i, StreamClient):
log.warn("Something that isn't PeerSocket or StreamClient wants to do IO")
# Deal with iterators last, this way if they queue up network stuff it'll be processed immediately.
if station.has_ready_iterators():
# else: # Right now that's the only socket
Jump to Line
Something went wrong with that request. Please try again.