Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add {.async: (raises).} to libp2p/stream modules #1050

Merged
merged 9 commits into from
Mar 5, 2024

Conversation

etan-status
Copy link
Contributor

Annotate stream modules with {.async: (raises).} to simplify exception handling.

  • close: {.async: (raises: []).}
  • read/write: {.async: (raises: [CancelledError, LPStreamError]).}

All modules need to be updated atomically, otherwise the method mechanism does not find overrides anymore. Also, because close functions have a tendency to start writing RST packets and reading EOF packets, close and read/write need to be done together.

Higher modules have not been touched unless necessary, e.g., to remain compatible when plugging in callbacks.

Annotate `stream` modules with `{.async: (raises).}` to simplify
exception handling.

- `close`: `{.async: (raises: []).}`
- `read`/`write`: `{.async: (raises: [CancelledError, LPStreamError]).}`

All modules need to be updated atomically, otherwise the `method`
mechanism does not find overrides anymore. Also, because `close`
functions have a tendency to start writing RST packets and reading
EOF packets, `close` and `read`/`write` need to be done together.

Higher modules have not been touched unless necessary, e.g., to remain
compatible when plugging in callbacks.
Copy link

codecov bot commented Mar 2, 2024

Codecov Report

Attention: Patch coverage is 83.92371% with 59 lines in your changes are missing coverage. Please review.

Project coverage is 82.76%. Comparing base (8294d5b) to head (393ae49).

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##           unstable    #1050      +/-   ##
============================================
+ Coverage     82.73%   82.76%   +0.03%     
============================================
  Files            91       91              
  Lines         15586    15666      +80     
============================================
+ Hits          12895    12966      +71     
- Misses         2691     2700       +9     
Files Coverage Δ
libp2p/muxers/mplex/coder.nim 92.68% <100.00%> (+2.93%) ⬆️
libp2p/protocols/connectivity/relay/rconn.nim 97.36% <100.00%> (-2.64%) ⬇️
libp2p/transports/wstransport.nim 84.97% <93.33%> (+1.06%) ⬆️
libp2p/upgrademngrs/muxedupgrade.nim 85.24% <88.88%> (+1.37%) ⬆️
libp2p/muxers/mplex/lpchannel.nim 82.60% <93.33%> (+0.74%) ⬆️
libp2p/protocols/secure/noise.nim 90.09% <83.33%> (+0.43%) ⬆️
libp2p/stream/bufferstream.nim 93.84% <88.88%> (+4.69%) ⬆️
libp2p/stream/connection.nim 91.13% <90.90%> (+1.95%) ⬆️
libp2p/stream/chronosstream.nim 84.46% <86.95%> (+1.13%) ⬆️
libp2p/protocols/secure/secure.nim 72.13% <80.00%> (-0.60%) ⬇️
... and 5 more

... and 14 files with indirect coverage changes

@etan-status etan-status marked this pull request as ready for review March 3, 2024 23:30
@etan-status
Copy link
Contributor Author

tests/helpers.nim Outdated Show resolved Hide resolved
arnetheduck
arnetheduck previously approved these changes Mar 4, 2024
Copy link
Contributor

@arnetheduck arnetheduck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this broadly LGTM - I think it's useful to merge this PR as a starting point since most of it is semi-mechanical - then we can deal with polish separately

@etan-status etan-status enabled auto-merge (squash) March 4, 2024 18:26
Co-authored-by: Jacek Sieka <jacek@status.im>
@etan-status etan-status enabled auto-merge (squash) March 4, 2024 23:32
Copy link
Contributor

@dryajov dryajov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this looks good enough right now.

@dryajov
Copy link
Contributor

dryajov commented Mar 5, 2024

Bumping to latest master with the GitHub update seems to break it.

@etan-status etan-status merged commit 2860959 into unstable Mar 5, 2024
11 checks passed
@etan-status etan-status deleted the dev/etan/ex-stream branch March 5, 2024 07:06
@arnetheduck arnetheduck mentioned this pull request Mar 11, 2024
elif s.name != $s.oid and s.name.len > 0:
&"{shortLog(s.conn.peerId)}:{s.oid}:{s.name}"
else: &"{shortLog(s.conn.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@etan-status should we be raising a Defect here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raiseAssert raises a Defect :)

@@ -68,19 +68,19 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
libp2p_mplex_channels.set(
m.channels[chann.initiator].len.int64,
labelValues = [$chann.initiator, $m.connection.peerId])
except CatchableError as exc:
except CancelledError as exc:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't m.channels[chann.initiator] raise KeyError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, m.channels is an array[bool, ...], not a Table, so both false and true always exist.

T: typedesc[MuxerProvider],
creator: MuxerConstructor,
codec: string): T {.gcsafe.} =
method handle*(m: Muxer): Future[void] {.base, async: (raises: []).} = discard
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use raiseAssert("Not implemented!") here too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't done before, so rather didn't want to change behaviour. Maybe the base is sometimes called for default behaviour

method handle*(m: Yamux) {.async.} =
await m.streamHandler(channel)
trace "finished handling stream"
doAssert(channel.isClosed, "connection not closed by handler!")
Copy link
Collaborator

@diegomrsantos diegomrsantos Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to raise a Defect here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not a new Defect, it's the same code from before, including the assert, except with CatchableError handling removed because m.streamHandler cannot raise errors.

Comment on lines +602 to +605
try:
await m.connection.write(YamuxHeader.goAway(ProtocolError))
except CancelledError, LPStreamError:
discard
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can extract it to a proc and avoid the repetition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, but it's only 2 occurrences. the proc has about the same overhead.

else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we raise a Defect here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raiseAssert is very similar to raise newException(Defect)

else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before.

else: &"{shortLog(s.peerId)}:{s.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before.

s: BufferStream,
pbytes: pointer,
nbytes: int
): Future[int] {.async: (raises: [CancelledError, LPStreamError]).} =
doAssert(nbytes > 0, "nbytes must be positive integer")
doAssert(not s.reading,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be raising Defects here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defects can always be raised, regardless of {.async: (raises).} annotation. they are not newly introduced

@@ -209,8 +208,8 @@ method closeImpl*(s: BufferStream): Future[void] =
if not s.readQueue.empty():
discard s.readQueue.popFirstNoWait()
except AsyncQueueFullError, AsyncQueueEmptyError:
raise newException(Defect, getCurrentExceptionMsg())
raiseAssert(getCurrentExceptionMsg())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before.

else: &"{shortLog(conn.peerId)}:{conn.oid}"
except ValueError as exc:
raise newException(Defect, exc.msg)
raiseAssert(exc.msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before.

@@ -58,23 +58,28 @@ method initStream*(s: Connection) =

procCall LPStream(s).initStream()

doAssert(isNil(s.timerTaskFut))
doAssert(s.timerTaskFut == nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous comments about Defects.

s.timeoutHandler = proc(): Future[void] =
trace "Idle timeout expired, closing connection", s
s.close()
if s.timeoutHandler == nil:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why == nil instead of isNil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug "Timeout handler cancelled", s
except CatchableError as exc: # Shouldn't happen
warn "exception in timeout handler", s, exc = exc.msg
await s.timeoutHandler()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has the try been removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeoutHandler has {.async: (raises: []).} so is statically checked at compiletime to never raise. so, the exception handlers are unreachable.

Comment on lines +291 to +293
let fut = newFuture[void]()
fut.complete()
fut
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this necessary?

Copy link
Contributor Author

@etan-status etan-status Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with raw: true, yes. but one can also use {.async.} without raw: true to have this be implicitly introduced as part of the async transformation. either should be fine (skipping the transformation is a bit more efficient -- we are not doing anything async in this function)

@@ -304,9 +340,9 @@ proc closeWithEOF*(s: LPStream): Future[void] {.async, public.} =
var buf: array[8, byte]
if (await readOnce(s, addr buf[0], buf.len)) != 0:
debug "Unexpected bytes while waiting for EOF", s
except CancelledError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has this been changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for CancelledError: the callers just end up discarding it. discarding it earlier allows {.async: (raises: []).}

for CatchableError, it cannot be raised by readOnce, catching LPStreamError is enough, and the compiler guarantees exhaustiveness because the proc itself is tagged as {.async: (raises: [].}, so it would alert us if an exception is forget to be caught.

method closeImpl*(s: WsStream): Future[void] {.async: (raises: []).} =
try:
await s.session.close()
except CatchableError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it better to swallow the error here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, if the session close fails for whatever reason, we should still proceed to close the underlying connection.

trace "Starting stream handler", conn
try:
await upgrader.ms.handle(conn) # handle incoming connection
except CancelledError as exc:
raise exc
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why has this been changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamHandler is launched using asyncSpawn, and raising an error from asyncSpawn crashes the entire app (defect).

Comment on lines +24 to +29
proc allFuturesThrowing*[T](futs: varargs[Future[T]]): Future[void] =
allFuturesThrowing(futs.mapIt(FutureBase(it)))

proc allFuturesThrowing*[T, E](
futs: varargs[InternalRaisesFuture[T, E]]): Future[void] =
allFuturesThrowing(futs.mapIt(FutureBase(it)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it better than the previous code? Mentioning InternalRaisesFuture doesn't seem like a good idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed to support calling allFuturesThrowing with Future[T].Raising(...) futures. otherwise, only those without Raising restrictions can be passed.

And yes, it's ugly, I agree. It's the same logic as allFutures and race use, though, so seems like the best way right now.

It's solely used in test code, so should be good enough.

@etan-status
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: done
Development

Successfully merging this pull request may close these issues.

None yet

4 participants