Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(yamux): set EoF when remote peer half closes the stream in yamux #1086

Merged
merged 19 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions libp2p/muxers/yamux/yamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ type
closedRemotely: Future[void].Raising([])
closedLocally: bool
receivedData: AsyncEvent
returnedEof: bool

proc `$`(channel: YamuxChannel): string =
result = if channel.conn.dir == Out: "=> " else: "<= "
Expand Down Expand Up @@ -204,8 +203,8 @@ proc remoteClosed(channel: YamuxChannel) {.async: (raises: []).} =

method closeImpl*(channel: YamuxChannel) {.async: (raises: []).} =
if not channel.closedLocally:
trace "Closing yamux channel locally", streamId = channel.id, conn = channel.conn
channel.closedLocally = true
channel.isEof = true

if not channel.isReset and channel.sendQueue.len == 0:
try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin}))
Expand Down Expand Up @@ -273,17 +272,16 @@ method readOnce*(
newLPStreamClosedError()
else:
newLPStreamConnDownError()
if channel.returnedEof:
if channel.isEof:
lchenut marked this conversation as resolved.
Show resolved Hide resolved
raise newLPStreamRemoteClosedError()
if channel.recvQueue.len == 0:
channel.receivedData.clear()
try: # https://github.com/status-im/nim-chronos/issues/516
discard await race(channel.closedRemotely, channel.receivedData.wait())
except ValueError: raiseAssert("Futures list is not empty")
if channel.closedRemotely.completed() and channel.recvQueue.len == 0:
channel.returnedEof = true
channel.isEof = true
return 0
return 0 # we return 0 to indicate that the channel is closed for reading from now on

let toRead = min(channel.recvQueue.len, nbytes)

Expand Down
2 changes: 1 addition & 1 deletion libp2p/protocols/ping.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ method init*(p: Ping) =
trace "handling ping", conn
var buf: array[PingSize, byte]
await conn.readExactly(addr buf[0], PingSize)
trace "echoing ping", conn
trace "echoing ping", conn, pingData = @buf
await conn.write(@buf)
if not isNil(p.pingHandler):
await p.pingHandler(conn.peerId)
Expand Down
1 change: 0 additions & 1 deletion tests/testrelayv2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,6 @@ suite "Circuit Relay V2":
await sleepAsync(chronos.timer.seconds(ttl + 1))

expect(DialFailedError):
check: conn.atEof()
await conn.close()
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[ addrs ], customProtoCodec)
Expand Down
21 changes: 21 additions & 0 deletions tests/testyamux.nim
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,24 @@ suite "Yamux":
expect LPStreamClosedError: discard await streamA.readLp(100)
blocker.complete()
await streamA.close()

asyncTest "Peer must be able to read from stream after closing it for writing":
mSetup()

yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
except CancelledError, LPStreamError:
return
try:
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
await conn.close()

let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]

await streamA.writeLp(fromHex("1234"))
await streamA.close()
check (await streamA.readLp(100)) == fromHex("5678")
2 changes: 1 addition & 1 deletion tests/transport-interop/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ COPY . nim-libp2p/

RUN \
cd nim-libp2p && \
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN --threads:off ./tests/transport-interop/main.nim
nim c --skipProjCfg --skipParentCfg --NimblePath:./nimbledeps/pkgs -p:nim-libp2p -d:chronicles_log_level=WARN -d:chronicles_default_output_device=stderr --threads:off ./tests/transport-interop/main.nim
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how it should be according to the test spec in test-plans. This is a bit unrelated to the current scope of the PR, but the PR itself was born from debugging this test and this change was necessary, otherwise, the test fails when increasing the log level. Ideally, it should be done in a different PR, but it is fine in this case.


ENTRYPOINT ["/app/nim-libp2p/tests/transport-interop/main"]
Loading