Skip to content

Commit

Permalink
Merge pull request #33 from meejah/fixups
Browse files Browse the repository at this point in the history
Bug-fixes
  • Loading branch information
meejah committed Mar 8, 2024
2 parents 1c5470b + ed1e342 commit 00f7bfc
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 81 deletions.
3 changes: 3 additions & 0 deletions NEWS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Unreleased
----------

* (Put new changelog items here)
* Fix up some fallout from refactoring
* Enable "remote" command in --interactive
* Proper error-message rendering


24.3.1: March 1, 2024
Expand Down
6 changes: 5 additions & 1 deletion integration/test_happy.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ async def test_happy_remote(reactor, request, wormhole):

# f1 send a remote-listen request, so f0 should receive it
msg = await f0.protocol.next_message("listening")
assert msg == {'kind': 'listening', 'endpoint': 'tcp:1111:interface=localhost'}
assert msg == {
'kind': 'listening',
'listen': 'tcp:1111:interface=localhost',
'connect': 'tcp:localhost:8888',
}

ep0 = serverFromString(reactor, "tcp:8888:interface=localhost")
ep1 = clientFromString(reactor, "tcp:localhost:1111")
Expand Down
125 changes: 47 additions & 78 deletions src/fowl/_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@ def forward_bytes(self, data):
enter=evaluating,
outputs=[check_message]
)
await_confirmation.upon(
subchannel_closed,
enter=finished,
outputs=[],
)

evaluating.upon(
remote_connected,
enter=forwarding_bytes,
Expand Down Expand Up @@ -565,15 +571,11 @@ def got_proto(proto):
d.addCallback(got_proto)

def err(f):
# XXX fixme use a OutputMessage
print(
json.dumps({
"kind": "error",
"id": self._conn_id,
"message": str(f.value),
}),
file=self.factory.config.stdout,
flush=True,
self.factory.message_out(
WormholeError(
str(f.value),
# extra={"id": self._conn_id}
)
)
d.addErrback(err)
return d
Expand All @@ -590,25 +592,10 @@ def connectionLost(self, reason):
self.remote.transport.loseConnection()

def dataReceived(self, data):
# XXX FIXME if len(data) >= 65535 must split "because noise"
# -- handle in Dilation code? (yes)

self.factory.message_out(
BytesIn(self._conn_id, len(data))
)
if False:
max_noise = 65000
while len(data):
d = data[:max_noise]
data = data[max_noise:]

if self.queue is not None:
self.queue.append(d)
else:
self.remote.transport.write(d)
else:
## if Dilation handling of "more than noise packet" works
self.remote.transport.write(data)
self.remote.transport.write(data)


class Incoming(Protocol):
Expand Down Expand Up @@ -760,7 +747,7 @@ def send_positive_reply(self):
"""
Reply to the other side that we've connected properly
"""
# XXX another section like this: pack_netstring() or something
# XXX there's other sections like this; factor to pack_netstring() or something
msg = msgpack.packb({
"connected": True,
})
Expand All @@ -787,15 +774,11 @@ def establish_local_connection(self, msg):
d = ep.connect(factory)

def bad(fail):
# XXXX use an OutputMessage
print(
json.dumps({
"kind": "error",
"id": self._conn_id,
"message": fail.getErrorMessage(),
}),
file=self.factory.config.stdout,
flush=True,
self.factory.message_out(
WormholeError(
fail.getErrorMessage(),
# extra={"id": self._conn_id}
)
)
reactor.callLater(0, lambda: self.connection_failed())
return None
Expand Down Expand Up @@ -882,14 +865,8 @@ def connectionLost(self, reason):
"""
Twisted API
"""
# XXX use an OutputMessage
print(
json.dumps({
"kind": "incoming-lost",
"id": self._conn_id,
}),
file=self.factory.config.stdout,
flush=True,
self.factory.message_out(
IncomingLost(self._conn_id)
)
self.subchannel_closed()

Expand Down Expand Up @@ -1000,8 +977,10 @@ def was_closed(why):
reason = why
elif isinstance(why.value, wormhole_errors.LonelyError):
reason = "lonely"
else:
elif why.value.args:
reason = why.value.args[0]
else:
reason = str(why)
self._done.trigger(self._reactor, reason)
self._daemon.shutdown(reason)
ensureDeferred(self._wormhole._closed_observer.when_fired()).addBoth(was_closed)
Expand Down Expand Up @@ -1100,9 +1079,7 @@ def when_connected(self):
"""
return self._connected.when_triggered()

#XXX
def do_dilate(self):
##XXX wait for do_dilate + _post_dilation_setup() to run before emitting the "code-allocated" message.....it's currently not working (the control connection)
self.control_ep, self.connect_ep, self.listen_ep = self._wormhole.dilate(
transit_relay_location="tcp:magic-wormhole-transit.debian.net:4001",
)
Expand Down Expand Up @@ -1131,14 +1108,8 @@ def _handle_error(self, f):
self._report_error(f.value)

def _report_error(self, e):
#XXX use an OutputMessage
print(
json.dumps({
"kind": "error",
"message": str(e),
}),
file=self._config.stdout,
flush=True,
self._daemon._message_out(
WormholeError(str(e))
)

async def _post_dilation_setup(self):
Expand Down Expand Up @@ -1500,8 +1471,10 @@ def fowld_output_to_json(msg: FowlOutputMessage) -> dict:
Listening: "listening",
LocalConnection: "local-connection",
IncomingConnection: "incoming-connection",
IncomingLost: "incoming-lost",
BytesIn: "bytes-in",
BytesOut: "bytes-out",
WormholeError: "error",
}[type(msg)]
return js

Expand Down Expand Up @@ -1554,7 +1527,6 @@ async def _forward_loop(reactor, config, w):

def output_fowl_message(msg):
js = fowld_output_to_json(msg)
#XXX use an outputmessage
print(
json.dumps(js),
file=config.stdout,
Expand Down Expand Up @@ -1656,30 +1628,32 @@ def dataReceived(self, data):
d = listen_ep.listen(factory)

def got_port(port):
#XXX use a Message
print(
json.dumps({
"kind": "listening",
"endpoint": msg["listen-endpoint"],
}),
file=self.factory.config.stdout,
flush=True,
self.factory.message_out(
Listening(
msg["listen-endpoint"],
msg["connect-endpoint"],
)
)

self._ports.append(port)
return port

def error(f):
self.factory.message_out(
WormholeError(
'Failed to listen on "{}": {}'.format(
msg["listen-endpoint"],
f.value,
)
)
)
d.addCallback(got_port)
d.addErrback(error)
# XXX should await port.stopListening() somewhere...at the appropriate time
else:
# XXX use a Message
print(
json.dumps({
"kind": "error",
f"message": "Unknown control command: {msg[kind]}",
"endpoint": msg["listen-endpoint"],
}),
file=self.factory.config.stdout,
flush=True,
self.factory.message_out(
WormholeError(
"Unknown control command: {msg[kind]}",
)
)

async def _unregister_ports(self):
Expand Down Expand Up @@ -1714,11 +1688,6 @@ def connectionMade(self):
pass

def lineReceived(self, line):
# XXX FIXME since we don't have req/resp IDs etc we should
# only allow ONE command to be run at a time, and then its
# answer printed (so e.g. even if our controller gets
# ahead and issues 3 commands without waiting for the
# answer, we need to do them in order)
try:
cmd = parse_fowld_command(line)
self.fowl.command(cmd)
Expand Down
42 changes: 40 additions & 2 deletions src/fowl/_tui.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
BytesIn,
BytesOut,
IncomingConnection,
IncomingLost,
LocalConnection,
)

Expand Down Expand Up @@ -78,6 +79,12 @@ def _(msg):
conn[msg.id] = Connection(0, 0)
replace_state(attr.evolve(state[0], connections=conn))

@output_message.register(IncomingLost)
def _(msg):
conn = state[0].connections
del conn[msg.id]
replace_state(attr.evolve(state[0], connections=conn))

@output_message.register(LocalConnection)
def _(msg):
conn = state[0].connections
Expand Down Expand Up @@ -204,7 +211,7 @@ async def _cmd_help(reactor, wh, state, *args):
for fn, aliases in funs.items():
name = sorted(aliases)[-1]
rest = " ".join(sorted(aliases)[:-1])
helptext = fn.__doc__
helptext = textwrap.dedent(fn.__doc__)
if helptext:
print(f"{name} ({rest})")
print(textwrap.fill(helptext.strip(), 80, initial_indent=" ", subsequent_indent=" "))
Expand Down Expand Up @@ -239,6 +246,10 @@ async def _cmd_accept(reactor, wh, state, *args):


async def _cmd_listen_local(reactor, wh, state, *args):
"""
Listen locally on the given port; connect to the remote side on
the same port (or a custom one if two ports are passed)
"""
try:
port = int(args[0])
except (ValueError, IndexError):
Expand Down Expand Up @@ -266,7 +277,34 @@ async def _cmd_listen_local(reactor, wh, state, *args):


async def _cmd_listen_remote(reactor, wh, state, *args):
pass
"""
Listen on the remote side on the given port; connect back to this
side on the same port (or a custom one if two ports are passed)
"""
try:
remote_port = int(args[0])
except (ValueError, IndexError):
print("Requires a TCP port, as an integer.")
print("We will listen on this TCP port on the remote side and connect to the same")
print("localhost port on this side. Optionally, a second port may be specified")
print("to use a different local port")
return

if len(args) > 1:
try:
local_port = int(args[1])
except ValueError:
print(f"Not port-number: {args[1]}")
return
else:
local_port = remote_port

wh.command(
RemoteListener(
listen=f"tcp:{remote_port}:interface=localhost",
connect=f"tcp:localhost:{local_port}",
)
)


class CommandReader(LineReceiver):
Expand Down
13 changes: 13 additions & 0 deletions src/fowl/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ class IncomingConnection(FowlOutputMessage):
endpoint: str


@frozen
class IncomingLost(FowlOutputMessage):
"""
We have lost one of our connections
"""
id: int


@frozen
class BytesIn(FowlOutputMessage):
id: int
Expand All @@ -129,3 +137,8 @@ class BytesIn(FowlOutputMessage):
class BytesOut(FowlOutputMessage):
id: int
bytes: int


@frozen
class WormholeError(FowlOutputMessage):
message: str

0 comments on commit 00f7bfc

Please sign in to comment.