Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: features/docke…
Fetching contributors…

Cannot retrieve contributors at this time

executable file 182 lines (146 sloc) 6.3 kb
#!/usr/bin/env python
import os
import sys
import select
import time
from groundstation import logger
log = logger.getLogger("stationd")
from groundstation import settings
from groundstation.station import Station
from groundstation.sockets.broadcast_socket import BroadcastUnrouteable
from groundstation.broadcast_discoverer import BroadcastDiscoverer
from groundstation.broadcast_announcer import BroadcastAnnouncer
from groundstation.stream_listener import StreamListener
from groundstation.stream_client import StreamClient
from groundstation.node import Node
from groundstation.peer_socket import PeerSocket
from groundstation.sockets.socket_closed_exception import SocketClosedException
from groundstation.peer_socket_pool import PeerSocketPool
from groundstation.broadcast_events import new_broadcast_event, UnknownBroadcastEvent
from groundstation.broadcast_events.broadcast_ping import BroadcastPing
from groundstation.gizmo_factory import InvalidGizmoError
import groundstation.fs_watcher as fs_watcher
from groundstation.utils import path2id
import groundstation.events.tcpnetwork_event as tcpnetwork_event
import groundstation.deferred
PORT = settings.PORT
BEACON_TIMEOUT = settings.BEACON_TIMEOUT
myself = Node()
station = Station.from_env(myself)
discoverer = BroadcastDiscoverer(PORT)
announcer = BroadcastAnnouncer(PORT)
listener = StreamListener(PORT)
announcer.name = myself.name
sockets = [discoverer, listener]
peer_sockets = PeerSocketPool()
last_beacon = time.time() - BEACON_TIMEOUT # Gaurantee that we'll announce on the first run
def _read_sockets():
read_sockets = []
for i in sockets: read_sockets.append(i)
for i in peer_sockets: read_sockets.append(i)
return read_sockets
def _write_sockets():
write_sockets = []
for i in peer_sockets:
if i.has_data_ready():
write_sockets.append(i)
return write_sockets
def handle_discoverer_event(sock):
# TODO Return Event objects instead of raw data
data, peer = sock.recv(settings.DEFAULT_BUFSIZE)
try:
event = new_broadcast_event(data)
except UnknownBroadcastEvent:
log.warning("Ooops got some weird event: %s" % (repr(data)))
if isinstance(event, BroadcastPing):
if event.payload == myself.uuid:
log.info("Discarding PING from myself")
#elif event.addr == me!
else:
log.info("Ping from %s" % str(peer))
if event.payload > myself.uuid:
# Peer's uuid is larger, we should connect and initiate sync
if peer[0] not in peer_sockets:
# Ensure that they're in our object cache- we're updating it now
if not station.recently_queried(event.payload):
client = StreamClient(peer[0])
peer_sockets.append(client)
client.begin_handshake(station)
else: # XXX This should check if we have open transactions.
if not station.recently_queried(event.payload):
client = peer_sockets[peer[0]]
client.begin_handshake(station)
else:
# Peer's uuid is smaller, we should do nothing and await connection
log.info("Peer's uuid is smaller, awaiting connection")
def handle_listener_event(sock):
peer = sock.accept(PeerSocket)
peer_sockets.append(peer)
def handle_tcpnetwork_event(sock):
try:
for payload in tcpnetwork_event.payloads(sock):
gizmo = station.gizmo_factory.hydrate(payload, sock)
assert gizmo is not None, "gizmo_factory returned None"
gizmo.process()
except SocketClosedException:
peer_sockets.remove(sock)
except InvalidGizmoError:
log.warn("Recieved invalid gizmo!")
def handle_fsevent():
obj_name = i.read()
assert obj_name in station.store
for sock in peer_sockets:
sock.notify_new_object(station, obj_name)
def handler_for(thing):
if thing == discoverer:
return handle_discoverer_event
elif thing == listener:
return handle_listener_event
elif isinstance(thing, PeerSocket) or isinstance(i, StreamClient):
return handle_tcpnetwork_event
elif isinstance(thing, fs_watcher.Watcher):
return handle_fsevent
else:
raise KeyError
if sys.argv.count("--watch-fs"):
watcher = fs_watcher.FSWatcher(station.store.object_root)
peer_sockets.append(watcher)
for host in sys.argv[1:]:
client = StreamClient(host)
peer_sockets.append(client)
client.begin_handshake(station)
while True:
log.info("Current status :: Iterators: %i Deferreds: %i" %
(len(station.iterators), len(station.deferreds)))
if time.time() > (last_beacon + BEACON_TIMEOUT):
last_beacon = time.time()
try:
announcer.ping()
except BroadcastUnrouteable as e:
log.fatal("broadcast was unrouteable")
(sread, swrite, sexc) = select.select(_read_sockets(), _write_sockets(), [], BEACON_TIMEOUT)
log.info("Got %i fds. read: %s write: %i" % (len(sread) + len(swrite), len(sread), len(swrite)))
for i in sread:
handler_for(i)(i)
for i in swrite:
if not (isinstance(i, PeerSocket) or isinstance(i, StreamClient)):
log.warn("%s wants to do IO" % (type(i)))
continue
try:
i.send()
except SocketClosedException as e:
log.info("Removing %s from clients" % (str(e.peer)))
if peer_sockets.remove(e):
log.info("Registering purged deferred")
@groundstation.deferred.defer_until(time.time() + settings.CLOSEWAIT_TIMEOUT)
def purge():
log.info("Purging %s from clients" % (str(e.peer)))
peer_sockets.purge(e.peer)
log.info("Made %s" % repr(purge))
station.register_deferred(purge)
# Deal with iterators last, this way if they queue up network stuff it'll be processed immediately.
if station.has_ready_iterators():
station.handle_iters()
if station.has_ready_deferreds():
station.handle_deferreds()
# else: # Right now that's the only socket
Jump to Line
Something went wrong with that request. Please try again.