Skip to content

Commit

Permalink
Fix improper yield usage in rlpx and refine exception handling (#679)
Browse files Browse the repository at this point in the history
* Fix improper yield usage in rlpx and refine exception handling

* Handle post hello step error
  • Loading branch information
jangko committed Feb 19, 2024
1 parent efe610e commit d8209f6
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 39 deletions.
8 changes: 7 additions & 1 deletion eth/p2p/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ proc send(d: DiscoveryProtocol, n: Node, data: seq[byte]) =
let f = d.transp.sendTo(ta, data)
let cb = proc(data: pointer) {.gcsafe.} =
if f.failed:
debug "Discovery send failed", msg = f.readError.msg
when defined(chronicles_log_level):
try:
# readError will raise FutureError
debug "Discovery send failed", msg = f.readError.msg
except FutureError as exc:
error "Failed to get discovery send future error", msg=exc.msg

f.addCallback cb

proc sendPing*(d: DiscoveryProtocol, n: Node): seq[byte] =
Expand Down
40 changes: 31 additions & 9 deletions eth/p2p/p2p_protocol_dsl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,12 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =

userHandlerProc.addPragma ident"gcsafe"

# we only take the pragma
let dummy = quote do:
proc dummy(): Future[void] {.async: (raises: [EthP2PError]).}

if p.isRlpx:
userHandlerProc.addPragma ident"async"
userHandlerProc.addPragma dummy.pragma[0]

var
getState = ident"getState"
Expand Down Expand Up @@ -375,7 +379,17 @@ proc augmentUserHandler(p: P2PProtocol, userHandlerProc: NimNode, msgId = -1) =
template networkState(`peerVar`: `PeerType`): `NetworkStateType` {.used.} =
`NetworkStateType`(`getNetworkState`(`peerVar`.network, `protocolInfo`))

proc addPreludeDefs*(userHandlerProc: NimNode, definitions: NimNode) =
proc addExceptionHandler(userHandlerProc: NimNode) =
let bodyTemp = userHandlerProc.body
userHandlerProc.body = quote do:
try:
`bodyTemp`
except CancelledError as exc:
raise newException(EthP2PError, exc.msg)
except CatchableError as exc:
raise newException(EthP2PError, exc.msg)

proc addPreludeDefs(userHandlerProc: NimNode, definitions: NimNode) =
userHandlerProc.body[0].add definitions

proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string): NimNode =
Expand All @@ -385,6 +399,7 @@ proc eventHandlerToProc(p: P2PProtocol, doBlock: NimNode, handlerName: string):
doBlock.copyChildrenTo(result)
result.name = ident(p.name & handlerName) # genSym(nskProc, p.name & handlerName)
p.augmentUserHandler result
result.addExceptionHandler()

proc addTimeoutParam(procDef: NimNode, defaultValue: int64) =
var
Expand Down Expand Up @@ -470,7 +485,8 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
if protocol.useRequestIds:
initResponderCall.add reqIdVar

userHandler.addPreludeDefs newVarStmt(responseVar, initResponderCall)
userHandler.addPreludeDefs quote do:
var `responseVar` {.used.} = `initResponderCall`

result.initResponderCall = initResponderCall

Expand All @@ -479,6 +495,7 @@ proc newMsg(protocol: P2PProtocol, kind: MessageKind, id: int,
of msgResponse: userHandler.applyDecorator protocol.incomingResponseDecorator
else: discard

userHandler.addExceptionHandler()
result.userHandler = userHandler
protocol.outRecvProcs.add result.userHandler

Expand Down Expand Up @@ -796,12 +813,17 @@ proc createHandshakeTemplate*(msg: Message,

let peerVar = genSym(nskLet ,"peer")
handshakeExchanger.setBody quote do:
let `peerVar` = `peerValue`
let sendingFuture = `forwardCall`
`handshakeImpl`(`peerVar`,
sendingFuture,
`nextMsg`(`peerVar`, `msgRecName`),
`timeoutVar`)
try:
let `peerVar` = `peerValue`
let sendingFuture = `forwardCall`
`handshakeImpl`(`peerVar`,
sendingFuture,
`nextMsg`(`peerVar`, `msgRecName`),
`timeoutVar`)
except PeerDisconnected as exc:
raise newException(EthP2PError, exc.msg)
except P2PInternalError as exc:
raise newException(EthP2PError, exc.msg)

return handshakeExchanger

Expand Down
20 changes: 12 additions & 8 deletions eth/p2p/private/p2p_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,21 @@ type
name*: string
version*: int

UnsupportedProtocol* = object of Defect
EthP2PError* = object of CatchableError

UnsupportedProtocol* = object of EthP2PError
# This is raised when you attempt to send a message from a particular
# protocol to a peer that doesn't support the protocol.

MalformedMessageError* = object of CatchableError
UnsupportedMessageError* = object of CatchableError
MalformedMessageError* = object of EthP2PError
UnsupportedMessageError* = object of EthP2PError

PeerDisconnected* = object of CatchableError
PeerDisconnected* = object of EthP2PError
reason*: DisconnectionReason

UselessPeerError* = object of CatchableError
UselessPeerError* = object of EthP2PError

P2PInternalError* = object of EthP2PError

##
## Quasy-private types. Use at your own risk.
Expand Down Expand Up @@ -155,7 +159,7 @@ type
MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode

ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void]
{.gcsafe, async: (raises: [RlpError, CatchableError]).}
{.gcsafe, async: (raises: [RlpError, EthP2PError]).}

MessageContentPrinter* = proc(msg: pointer): string
{.gcsafe, raises: [].}
Expand All @@ -173,10 +177,10 @@ type
{.gcsafe, raises: [].}

HandshakeStep* = proc(peer: Peer): Future[void]
{.gcsafe, raises: [].}
{.gcsafe, async: (raises: [EthP2PError]).}

DisconnectionHandler* = proc(peer: Peer, reason: DisconnectionReason):
Future[void] {.gcsafe, raises: [].}
Future[void] {.gcsafe, async: (raises: [EthP2PError]).}

ConnectionState* = enum
None,
Expand Down
49 changes: 29 additions & 20 deletions eth/p2p/rlpx.nim
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} =
# result = $(cast[ptr MsgType](msg)[])

proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async: (raises:[CatchableError]).}
notifyOtherPeer = false) {.async: (raises:[]).}

template raisePeerDisconnected(msg: string, r: DisconnectionReason) =
var e = newException(PeerDisconnected, msg)
Expand All @@ -185,32 +185,35 @@ template raisePeerDisconnected(msg: string, r: DisconnectionReason) =

proc disconnectAndRaise(peer: Peer,
reason: DisconnectionReason,
msg: string) {.async.} =
msg: string) {.async:
(raises: [PeerDisconnected]).} =
let r = reason
await peer.disconnect(r)
raisePeerDisconnected(msg, r)

proc handshakeImpl[T](peer: Peer,
sendFut: Future[void],
responseFut: Future[T],
timeout: Duration): Future[T] {.async.} =
timeout: Duration): Future[T] {.async:
(raises: [PeerDisconnected, P2PInternalError]).} =
sendFut.addCallback do (arg: pointer) {.gcsafe.}:
if sendFut.failed:
debug "Handshake message not delivered", peer

doAssert timeout.milliseconds > 0
yield responseFut or sleepAsync(timeout)
if not responseFut.finished:

try:
let res = await responseFut.wait(timeout)
return res
except AsyncTimeoutError:
# TODO: Really shouldn't disconnect and raise everywhere. In order to avoid
# understanding what error occured where.
# And also, incoming and outgoing disconnect errors should be seperated,
# probably by seperating the actual disconnect call to begin with.
await disconnectAndRaise(peer, HandshakeTimeout,
"Protocol handshake was not received in time.")
elif responseFut.failed:
raise responseFut.error
else:
return responseFut.read
except CatchableError as exc:
raise newException(P2PInternalError, exc.msg)

# Dispatcher
#
Expand Down Expand Up @@ -363,7 +366,7 @@ template perPeerMsgId(peer: Peer, MsgType: type): int =
perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId)

proc invokeThunk*(peer: Peer, msgId: int, msgData: Rlp): Future[void]
{.async: (raises: [CatchableError, rlp.RlpError]).} =
{.async: (raises: [rlp.RlpError, EthP2PError]).} =
template invalidIdError: untyped =
raise newException(UnsupportedMessageError,
"RLPx message with an invalid id " & $msgId &
Expand Down Expand Up @@ -393,6 +396,7 @@ proc sendMsg*(peer: Peer, data: seq[byte]) {.async.} =
if res != len(cipherText):
# This is ECONNRESET or EPIPE case when remote peer disconnected.
await peer.disconnect(TcpError)
discard
except CatchableError as e:
await peer.disconnect(TcpError)
raise e
Expand Down Expand Up @@ -948,7 +952,7 @@ proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend =
proc `thunkName`(`peerVar`: `Peer`, _: int, data: Rlp)
# Fun error if you just use `RlpError` instead of `rlp.RlpError`:
# "Error: type expected, but got symbol 'RlpError' of kind 'EnumField'"
{.async: (raises: [rlp.RlpError, CatchableError]).} =
{.async: (raises: [rlp.RlpError, EthP2PError]).} =
var `receivedRlp` = data
var `receivedMsg` {.noinit.}: `msgRecName`
`readParamsPrelude`
Expand Down Expand Up @@ -1067,22 +1071,22 @@ proc removePeer(network: EthereumNode, peer: Peer) =
observer.onPeerDisconnected(peer)

proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason):
Future[void] {.async.} =
Future[void] {.async: (raises: []).} =
var futures = newSeqOfCap[Future[void]](protocolCount())

for protocol in peer.dispatcher.activeProtocols:
if protocol.disconnectHandler != nil:
futures.add((protocol.disconnectHandler)(peer, reason))

await allFutures(futures)
await noCancel allFutures(futures)

for f in futures:
doAssert(f.finished())
if f.failed():
trace "Disconnection handler ended with an error", err = f.error.msg

proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async: (raises: [CatchableError]).} =
notifyOtherPeer = false) {.async: (raises: []).} =
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
# Do this first so sub-protocols have time to clean up and stop sending
Expand All @@ -1094,15 +1098,16 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
await callDisconnectHandlers(peer, reason)

if notifyOtherPeer and not peer.transport.closed:
var fut = peer.sendDisconnectMsg(DisconnectionReasonList(value: reason))
yield fut
if fut.failed:
debug "Failed to deliver disconnect message", peer

proc waitAndClose(peer: Peer, time: Duration) {.async.} =
await sleepAsync(time)
await peer.transport.closeWait()

try:
await peer.sendDisconnectMsg(DisconnectionReasonList(value: reason))
except CatchableError as exc:
debug "Failed to deliver disconnect message", peer, msg=exc.msg

# Give the peer a chance to disconnect
traceAsyncErrors peer.waitAndClose(2.seconds)
elif not peer.transport.closed:
Expand Down Expand Up @@ -1336,7 +1341,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
10.seconds)
except RlpError:
return err(ProtocolError)
except PeerDisconnected as e:
except PeerDisconnected:
return err(PeerDisconnectedError)
# TODO: Strange compiler error
# case e.reason:
Expand All @@ -1350,6 +1355,8 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
# return err(PeerDisconnectedError)
except TransportError:
return err(P2PTransportError)
except P2PInternalError:
return err(P2PHandshakeError)
except CatchableError as e:
raiseAssert($e.name & " " & $e.msg)

Expand All @@ -1374,6 +1381,8 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):
return err(UselessRlpxPeerError)
except TransportError:
return err(P2PTransportError)
except EthP2PError:
return err(ProtocolError)
except CatchableError as e:
raiseAssert($e.name & " " & $e.msg)

Expand All @@ -1385,7 +1394,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node):

# TODO: rework rlpxAccept similar to rlpxConnect.
proc rlpxAccept*(
node: EthereumNode, transport: StreamTransport): Future[Peer] {.async: (raises: [CatchableError]).} =
node: EthereumNode, transport: StreamTransport): Future[Peer] {.async: (raises: []).} =
initTracing(devp2pInfo, node.protocols)

let peer = Peer(transport: transport, network: node)
Expand Down
5 changes: 4 additions & 1 deletion tests/p2p/eth_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ p2pProtocol eth(version = 63,
genesisHash: KeccakHash)

requestResponse:
proc getBlockHeaders(peer: Peer, request: openArray[KeccakHash]) {.gcsafe.} = discard
proc getBlockHeaders(peer: Peer, request: openArray[KeccakHash]) {.gcsafe.} =
var headers: seq[BlockHeader]
await response.send(headers)

proc blockHeaders(p: Peer, headers: openArray[BlockHeader])

requestResponse:
Expand Down

0 comments on commit d8209f6

Please sign in to comment.