Permalink
Browse files

Merge branch 'dilate-xfer'

Add an integration test which exercises a full w.dilate connection and the
control endpoint.

Still untested:

* reconnecting after the initial TCP connection is lost
* resending data that wasn't acked before the connection was lost
* (re)sending data that was submitted while no connection was available
* the connect- and listen- endpoints
  • Loading branch information...
warner committed Feb 11, 2019
2 parents be2dc01 + bc863de commit a5e011f7868d2ff248e352852eb67c8b15aa611a
@@ -81,8 +81,8 @@ def _build_workers(self):
self._A.wire(self._RC, self._C)
self._I.wire(self._C, self._L)
self._C.wire(self, self._A, self._N, self._K, self._I)
self._T.wire(self, self._RC, self._N, self._M)
self._D.wire(self._S)
self._T.wire(self, self._RC, self._N, self._M, self._D)
self._D.wire(self._S, self._T)

def _init_other_state(self):
self._did_start_code = False
@@ -4,6 +4,12 @@
class NoiseInvalidMessage(Exception):
pass

try:
from noise.exceptions import NoiseHandshakeError
except ImportError:
class NoiseHandshakeError(Exception):
pass

try:
from noise.connection import NoiseConnection
except ImportError:
@@ -11,8 +11,8 @@
from .._interfaces import IDilationConnector
from ..observer import OneShotObserver
from .encode import to_be4, from_be4
from .roles import FOLLOWER
from ._noise import NoiseInvalidMessage
from .roles import LEADER, FOLLOWER
from ._noise import NoiseInvalidMessage, NoiseHandshakeError

# InboundFraming is given data and returns Frames (Noise wire-side
# bytestrings). It handles the relay handshake and the prologue. The Frames it
@@ -56,6 +56,23 @@ def first(l):
class Disconnect(Exception):
pass

# all connections look like:
# (step 1: only for outbound connections)
# 1: if we're connecting to a transit relay:
# * send "sided relay handshake": "please relay TOKEN for side SIDE\n"
# * the relay will send "ok\n" if/when our peer connects
# * a non-relay will probably send junk
# * wait for "ok\n", hang up if we get anything different
# (all subsequent steps are for both inbound and outbound connections)
# 2: send PROLOGUE_LEADER/FOLLOWER: "Magic-Wormhole Dilation Handshale v1 (l/f)\n\n"
# 3: wait for the opposite PROLOGUE string, else hang up
# (everything past this point is a Frame, with be4 length prefix. Frames are
# either noise handshake or an encrypted message)
# 4: if LEADER, send noise handshake string. if FOLLOWER, wait for it
# 5: if FOLLOWER, send noise response string. if LEADER, wait for it
# 6: ...



RelayOK = namedtuple("RelayOk", [])
Prologue = namedtuple("Prologue", [])
@@ -193,7 +210,7 @@ def _get_expected(self, name, expected):
def add_and_parse(self, data):
# we can't make this an @m.input because we can't change the state
# from within an input. Instead, let the state choose the parser to
# use, and use the parsed token drive a state transition.
# use, then use the parsed token to drive a state transition.
self._buffer += data
while True:
# it'd be nice to use an iterator here, but since self.parse()
@@ -233,7 +250,7 @@ def send_frame(self, frame):
Pong = namedtuple("Pong", ["ping_id"])
Open = namedtuple("Open", ["seqnum", "scid"]) # seqnum is integer
Data = namedtuple("Data", ["seqnum", "scid", "data"])
Close = namedtuple("Close", ["seqnum", "scid"]) # scid is integer
Close = namedtuple("Close", ["seqnum", "scid"]) # scid is arbitrary 4-byte value
Ack = namedtuple("Ack", ["resp_seqnum"]) # resp_seqnum is integer
Records = (KCM, Ping, Pong, Open, Data, Close, Ack)
Handshake_or_Records = (Handshake,) + Records
@@ -258,16 +275,16 @@ def parse_record(plaintext):
ping_id = plaintext[1:5]
return Pong(ping_id)
if msgtype == T_OPEN:
scid = from_be4(plaintext[1:5])
scid = plaintext[1:5]
seqnum = from_be4(plaintext[5:9])
return Open(seqnum, scid)
if msgtype == T_DATA:
scid = from_be4(plaintext[1:5])
scid = plaintext[1:5]
seqnum = from_be4(plaintext[5:9])
data = plaintext[9:]
return Data(seqnum, scid, data)
if msgtype == T_CLOSE:
scid = from_be4(plaintext[1:5])
scid = plaintext[1:5]
seqnum = from_be4(plaintext[5:9])
return Close(seqnum, scid)
if msgtype == T_ACK:
@@ -285,28 +302,36 @@ def encode_record(r):
if isinstance(r, Pong):
return b"\x02" + r.ping_id
if isinstance(r, Open):
assert isinstance(r.scid, six.integer_types)
assert isinstance(r.scid, bytes)
assert len(r.scid) == 4
assert isinstance(r.seqnum, six.integer_types)
return b"\x03" + to_be4(r.scid) + to_be4(r.seqnum)
return b"\x03" + r.scid + to_be4(r.seqnum)
if isinstance(r, Data):
assert isinstance(r.scid, six.integer_types)
assert isinstance(r.scid, bytes)
assert len(r.scid) == 4
assert isinstance(r.seqnum, six.integer_types)
return b"\x04" + to_be4(r.scid) + to_be4(r.seqnum) + r.data
return b"\x04" + r.scid + to_be4(r.seqnum) + r.data
if isinstance(r, Close):
assert isinstance(r.scid, six.integer_types)
assert isinstance(r.scid, bytes)
assert len(r.scid) == 4
assert isinstance(r.seqnum, six.integer_types)
return b"\x05" + to_be4(r.scid) + to_be4(r.seqnum)
return b"\x05" + r.scid + to_be4(r.seqnum)
if isinstance(r, Ack):
assert isinstance(r.resp_seqnum, six.integer_types)
return b"\x06" + to_be4(r.resp_seqnum)
raise TypeError(r)


def _is_role(_record, _attr, value):
if value not in [LEADER, FOLLOWER]:
raise ValueError("role must be LEADER or FOLLOWER")

@attrs
@implementer(IRecord)
class _Record(object):
_framer = attrib(validator=provides(IFramer))
_noise = attrib()
_role = attrib(default="unspecified", validator=_is_role) # for debugging

n = MethodicalMachine()
# TODO: set_trace
@@ -321,17 +346,37 @@ def __attrs_post_init__(self):
# states: want_prologue, want_handshake, want_record

@n.state(initial=True)
def want_prologue(self):
def no_role_set(self):
pass # pragma: no cover

@n.state()
def want_prologue_leader(self):
pass # pragma: no cover

@n.state()
def want_prologue_follower(self):
pass # pragma: no cover

@n.state()
def want_handshake_leader(self):
pass # pragma: no cover

@n.state()
def want_handshake(self):
def want_handshake_follower(self):
pass # pragma: no cover

@n.state()
def want_message(self):
pass # pragma: no cover

@n.input()
def set_role_leader(self):
pass

@n.input()
def set_role_follower(self):
pass

@n.input()
def got_prologue(self):
pass
@@ -340,9 +385,20 @@ def got_prologue(self):
def got_frame(self, frame):
pass

@n.output()
def ignore_and_send_handshake(self, frame):
self._send_handshake()

@n.output()
def send_handshake(self):
handshake = self._noise.write_message() # generate the ephemeral key
self._send_handshake()

def _send_handshake(self):
try:
handshake = self._noise.write_message() # generate the ephemeral key
except NoiseHandshakeError as e:
log.err(e, "noise error during handshake")
raise
self._framer.send_frame(handshake)

@n.output()
@@ -367,10 +423,19 @@ def decrypt_message(self, frame):
raise Disconnect()
return parse_record(message)

want_prologue.upon(got_prologue, outputs=[send_handshake],
enter=want_handshake)
want_handshake.upon(got_frame, outputs=[process_handshake],
collector=first, enter=want_message)
no_role_set.upon(set_role_leader, outputs=[], enter=want_prologue_leader)
want_prologue_leader.upon(got_prologue, outputs=[send_handshake],
enter=want_handshake_leader)
want_handshake_leader.upon(got_frame, outputs=[process_handshake],
collector=first, enter=want_message)

no_role_set.upon(set_role_follower, outputs=[], enter=want_prologue_follower)
want_prologue_follower.upon(got_prologue, outputs=[],
enter=want_handshake_follower)
want_handshake_follower.upon(got_frame, outputs=[process_handshake,
ignore_and_send_handshake],
collector=first, enter=want_message)

want_message.upon(got_frame, outputs=[decrypt_message],
collector=first, enter=want_message)

@@ -393,7 +458,7 @@ def send_record(self, r):
self._framer.send_frame(frame)


@attrs
@attrs(cmp=False)
class DilatedConnectionProtocol(Protocol, object):
"""I manage an L2 connection.
@@ -408,12 +473,13 @@ class DilatedConnectionProtocol(Protocol, object):
At any given time, there is at most one active L2 connection.
"""

_eventual_queue = attrib()
_eventual_queue = attrib(repr=False)
_role = attrib()
_connector = attrib(validator=provides(IDilationConnector))
_noise = attrib()
_outbound_prologue = attrib(validator=instance_of(bytes))
_inbound_prologue = attrib(validator=instance_of(bytes))
_description = attrib()
_connector = attrib(validator=provides(IDilationConnector), repr=False)
_noise = attrib(repr=False)
_outbound_prologue = attrib(validator=instance_of(bytes), repr=False)
_inbound_prologue = attrib(validator=instance_of(bytes), repr=False)

_use_relay = False
_relay_handshake = None
@@ -457,6 +523,8 @@ def add_candidate(self):
@m.output()
def set_manager(self, manager):
self._manager = manager
self.when_disconnected().addCallback(lambda c:
manager.connector_connection_lost())

@m.output()
def can_send_records(self, manager):
@@ -493,12 +561,20 @@ def send_record(self, record):
# IProtocol methods

def connectionMade(self):
framer = _Framer(self.transport,
self._outbound_prologue, self._inbound_prologue)
if self._use_relay:
framer.use_relay(self._relay_handshake)
self._record = _Record(framer, self._noise)
self._record.connectionMade()
try:
framer = _Framer(self.transport,
self._outbound_prologue, self._inbound_prologue)
if self._use_relay:
framer.use_relay(self._relay_handshake)
self._record = _Record(framer, self._noise, self._role)
if self._role is LEADER:
self._record.set_role_leader()
else:
self._record.set_role_follower()
self._record.connectionMade()
except:
log.err()
raise

def dataReceived(self, data):
try:
Oops, something went wrong.

0 comments on commit a5e011f

Please sign in to comment.