Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Branch: features/reque…
Fetching contributors…

Cannot retrieve contributors at this time

140 lines (111 sloc) 4.593 kB
#!/usr/bin/env python
import os
import sys
import select
import time
from groundstation import logger
log = logger.getLogger("stationd")
from groundstation import settings
from groundstation.station import Station
from groundstation.broadcast_discoverer import BroadcastDiscoverer
from groundstation.broadcast_announcer import BroadcastAnnouncer
from groundstation.stream_listener import StreamListener
from groundstation.stream_client import StreamClient
from groundstation.node import Node
from groundstation.peer_socket import PeerSocket
from groundstation.sockets.socket_closed_exception import SocketClosedException
from groundstation.peer_socket_pool import PeerSocketPool
from groundstation.broadcast_events import new_broadcast_event, UnknownBroadcastEvent
from groundstation.broadcast_events.broadcast_ping import BroadcastPing
from groundstation.gizmo_factory import InvalidGizmoError
from os.path import expanduser
PORT = settings.PORT
BEACON_TIMEOUT = settings.BEACON_TIMEOUT
myself = Node()
station_path = expanduser("~/.groundstation")
station = Station(station_path, myself)
discoverer = BroadcastDiscoverer(PORT)
announcer = BroadcastAnnouncer(PORT)
listener = StreamListener(PORT)
announcer.name = myself.name
sockets = [discoverer, listener]
peer_sockets = PeerSocketPool()
last_beacon = time.time() - BEACON_TIMEOUT # Gaurantee that we'll announce on the first run
def _read_sockets():
read_sockets = []
for i in sockets: read_sockets.append(i)
for i in peer_sockets: read_sockets.append(i)
return read_sockets
def _write_sockets():
write_sockets = []
for i in peer_sockets:
if i.has_data_ready():
write_sockets.append(i)
return write_sockets
def handle_discoverer_event():
# TODO Return Event objects instead of raw data
data, peer = discoverer.recv(settings.DEFAULT_BUFSIZE)
try:
event = new_broadcast_event(data)
except UnknownBroadcastEvent:
log.warning("Ooops got some weird event: %s" % (repr(data)))
if isinstance(event, BroadcastPing):
if event.payload == myself.uuid:
log.info("Discarding PING from myself")
#elif event.addr == me!
else:
log.info("Ping from %s" % str(peer))
if event.payload > myself.uuid:
# Peer's uuid is larger, we should connect and initiate sync
if peer[0] not in peer_sockets:
# Ensure that they're in our object cache- we're updating it now
if not station.recently_queried(event.payload):
client = StreamClient(peer[0])
peer_sockets.append(client)
client.begin_handshake(station)
else:
# Peer's uuid is smaller, we should do nothing and await connection
log.info("Peer's uuid is smaller, awaiting connection")
def handle_listener_event():
peer = listener.accept(PeerSocket)
peer_sockets.append(peer)
def handle_tcpnetwork_event():
try:
i.recv()
while i.packet_queue:
data = i.packet_queue.pop()
gizmo = station.gizmo_factory.hydrate(data, i)
assert gizmo is not None, "gizmo_factory returned None"
gizmo.process()
except SocketClosedException:
peer_sockets.remove(i)
except InvalidGizmoError:
log.warn("Recieved invalid gizmo!")
def handle_iterators():
station.handle_iters()
def handler_for(thing):
if thing == discoverer:
return handle_discoverer_event
elif thing == listener:
return handle_listener_event
elif isinstance(thing, PeerSocket) or isinstance(i, StreamClient):
return handle_tcpnetwork_event
else:
raise KeyError
while True:
if time.time() > (last_beacon + BEACON_TIMEOUT):
last_beacon = time.time()
announcer.ping()
(sread, swrite, sexc) = select.select(_read_sockets(), _write_sockets(), [], BEACON_TIMEOUT)
log.info("Got %i fds. read: %s write: %i" % (len(sread) + len(swrite), len(sread), len(swrite)))
for i in sread:
handler_for(i)()
for i in swrite:
if isinstance(i, PeerSocket) or isinstance(i, StreamClient):
i.send()
else:
log.warn("Something that isn't PeerSocket or StreamClient wants to do IO")
# Deal with iterators last, this way if they queue up network stuff it'll be processed immediately.
if station.has_ready_iterators():
handle_iterators()
# else: # Right now that's the only socket
Jump to Line
Something went wrong with that request. Please try again.