Skip to content

Commit

Permalink
[ETCM-841] Add a mechanism for notifying PeerActor about 'negotiated …
Browse files Browse the repository at this point in the history
…protocol'
  • Loading branch information
dzajkowski committed Jun 10, 2021
1 parent d6abf0c commit 99dc3b4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 48 deletions.
13 changes: 13 additions & 0 deletions src/main/scala/io/iohk/ethereum/network/PeerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,19 @@ class PeerActor[R <: HandshakeResult](
handleDisconnectMsg(rlpxConnection, Handshaking(numRetries)) orElse
handlePingMsg(rlpxConnection) orElse stashMessages orElse {

case RLPxConnectionHandler.InitialHelloReceived(msg, negotiatedProtocol) =>
// Processes the InitialHelloReceived, cancels the timeout and processes a new message but only if the handshaker
// handles the received message
// TODO pass capability to 'EtcHelloExchangeState'
// to pass negotiatedProtocol the PeerActor should be in a "post auth handshake" state
// since ConnectionEstablished(remotePubKey) was already received
// an ETCAwaitingInitialHelloHandshaker would be an approach
handshaker.applyMessage(msg).foreach { newHandshaker =>
timeout.cancel()
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
}
handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend))

case RLPxConnectionHandler.MessageReceived(msg) =>
// Processes the received message, cancels the timeout and processes a new message but only if the handshaker
// handles the received message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,70 +157,89 @@ class RLPxConnectionHandler(
context.parent ! ConnectionEstablished(remotePubKey)
if (remainingData.nonEmpty)
context.self ! Received(remainingData)
context become awaitHello(extractor(secrets))
// following the specification at https://github.com/ethereum/devp2p/blob/master/rlpx.md#initial-handshake
// point 6 indicates that the next messages needs to be initial 'Hello'
context become awaitInitialHello(extractor(secrets))

case AuthHandshakeError =>
log.debug(s"[Stopping Connection] Auth handshake failed for peer $peerId")
context.parent ! ConnectionFailed
context stop self
}

def awaitHello(
def awaitInitialHello(
extractor: HelloExtractor,
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
seqNumber: Int = 0
): Receive =
handleWriteFailed orElse handleConnectionClosed orElse {
case SendMessage(h: HelloEnc) =>
val out = extractor.writeHello(h)
connection ! Write(out, Ack)
val timeout =
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
context become awaitHello(
extractor,
Some(CancellableAckTimeout(seqNumber, timeout)),
increaseSeqNumber(seqNumber)
)
handleWriteFailed orElse handleConnectionClosed orElse handleSendHello(
extractor,
cancellableAckTimeout,
seqNumber
) orElse handleReceiveHello(extractor, cancellableAckTimeout, seqNumber)

case Ack if cancellableAckTimeout.nonEmpty =>
//Cancel pending message timeout
cancellableAckTimeout.foreach(_.cancellable.cancel())
context become awaitHello(extractor, None, seqNumber)
private def handleSendHello(
extractor: HelloExtractor,
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
seqNumber: Int = 0
): Receive = {
// TODO when cancellableAckTimeout is Some
case SendMessage(h: HelloEnc) =>
val out = extractor.writeHello(h)
connection ! Write(out, Ack)
val timeout =
system.scheduler.scheduleOnce(rlpxConfiguration.waitForTcpAckTimeout, self, AckTimeout(seqNumber))
context become awaitInitialHello(
extractor,
Some(CancellableAckTimeout(seqNumber, timeout)),
increaseSeqNumber(seqNumber)
)
case Ack if cancellableAckTimeout.nonEmpty =>
//Cancel pending message timeout
cancellableAckTimeout.foreach(_.cancellable.cancel())
context become awaitInitialHello(extractor, None, seqNumber)

case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
cancellableAckTimeout.foreach(_.cancellable.cancel())
log.error(s"[Stopping Connection] Sending 'Hello' to $peerId failed")
context stop self

case AckTimeout(ackSeqNumber) if cancellableAckTimeout.exists(_.seqNumber == ackSeqNumber) =>
cancellableAckTimeout.foreach(_.cancellable.cancel())
log.error(s"[Stopping Connection] Sending 'Hello' to $peerId failed")
context stop self
}

case Received(data) =>
extractor.readHello(data) match {
case Some((hello, restFrames)) =>
val messageCodecOpt = for {
messageCodec <- negotiateCodec(hello, extractor)
_ = context.parent ! MessageReceived(hello)
_ = processFrames(restFrames, messageCodec)
} yield messageCodec
messageCodecOpt match {
case Some(messageCodec) =>
context become handshaked(
messageCodec,
cancellableAckTimeout = cancellableAckTimeout,
seqNumber = seqNumber
)
case None =>
log.debug(s"[Stopping Connection] Unable to negotiate protocol with $peerId")
context.parent ! ConnectionFailed
context stop self
}
private def handleReceiveHello(
extractor: HelloExtractor,
cancellableAckTimeout: Option[CancellableAckTimeout] = None,
seqNumber: Int = 0
): Receive = { case Received(data) =>
extractor.readHello(data) match {
case Some((hello, restFrames)) =>
val messageCodecOpt = for {
opt <- negotiateCodec(hello, extractor)
(messageCodec, negotiated) = opt
_ = context.parent ! InitialHelloReceived(hello, negotiated)
_ = processFrames(restFrames, messageCodec)
} yield messageCodec
messageCodecOpt match {
case Some(messageCodec) =>
context become handshaked(
messageCodec,
cancellableAckTimeout = cancellableAckTimeout,
seqNumber = seqNumber
)
case None =>
log.debug(s"[Stopping Connection] Did not find 'Hello' in message from $peerId")
context become awaitHello(extractor, cancellableAckTimeout, seqNumber)
log.debug(s"[Stopping Connection] Unable to negotiate protocol with $peerId")
context.parent ! ConnectionFailed
context stop self
}
case None =>
log.debug(s"[Stopping Connection] Did not find 'Hello' in message from $peerId")
context become awaitInitialHello(extractor, cancellableAckTimeout, seqNumber)
}
}

private def negotiateCodec(hello: Hello, extractor: HelloExtractor): Option[MessageCodec] =
private def negotiateCodec(hello: Hello, extractor: HelloExtractor): Option[(MessageCodec, Capability)] =
Capability.negotiate(hello.capabilities.toList, capabilities).map { negotiated =>
messageCodecFactory(extractor.frameCodec, messageDecoder, negotiated, hello.p2pVersion)
(messageCodecFactory(extractor.frameCodec, messageDecoder, negotiated, hello.p2pVersion), negotiated)
}

private def processFrames(frames: Seq[Frame], messageCodec: MessageCodec): Unit =
Expand Down Expand Up @@ -382,6 +401,8 @@ object RLPxConnectionHandler {

case class MessageReceived(message: Message)

case class InitialHelloReceived(message: Hello, capability: Capability)

case class SendMessage(serializable: MessageSerializable)

private case object AuthHandshakeTimeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.iohk.ethereum.{Timeouts, WithActorSystemShutDown}
import io.iohk.ethereum.network.p2p.{MessageDecoder, MessageSerializable}
import io.iohk.ethereum.network.p2p.messages.{Capability, ProtocolVersions}
import io.iohk.ethereum.network.p2p.messages.WireProtocol.{Hello, Ping}
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.{HelloExtractor, MessageReceived, RLPxConfiguration}
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.{HelloExtractor, InitialHelloReceived, MessageReceived, RLPxConfiguration}
import io.iohk.ethereum.security.SecureRandomBuilder
import org.scalamock.scalatest.MockFactory

Expand Down Expand Up @@ -97,7 +97,7 @@ class RLPxConnectionHandlerSpec
rlpxConnection ! RLPxConnectionHandler.SendMessage(Ping())
connection.expectMsg(Tcp.Write(ByteString("ping encoded"), RLPxConnectionHandler.Ack))

val expectedHello = rlpxConnectionParent.expectMsgType[MessageReceived]
val expectedHello = rlpxConnectionParent.expectMsgType[InitialHelloReceived]
expectedHello.message shouldBe a[Hello]

//The rlpx connection is closed after a timeout happens (after rlpxConfiguration.waitForTcpAckTimeout) and it is processed
Expand Down Expand Up @@ -224,12 +224,13 @@ class RLPxConnectionHandlerSpec

//AuthHandshaker handles initial message
val data = ByteString((0 until AuthHandshaker.InitiatePacketLength).map(_.toByte).toArray)
val hello = ByteString((1 until AuthHandshaker.InitiatePacketLength).map(_.toByte).toArray)
val response = ByteString("response data")
(mockHandshaker.handleInitialMessage _)
.expects(data)
.returning((response, AuthHandshakeSuccess(mock[Secrets], ByteString())))
(mockHelloExtractor.readHello _)
.expects(ByteString.empty)
.expects(hello)
.returning(Some(Hello(5, "", Capabilities.Eth63Capability::Nil, 30303, ByteString("abc")), Seq.empty))
(mockMessageCodec.readMessages _)
.expects(ByteString.empty)
Expand All @@ -238,6 +239,8 @@ class RLPxConnectionHandlerSpec
rlpxConnection ! Tcp.Received(data)
connection.expectMsg(Tcp.Write(response))

rlpxConnection ! Tcp.Received(hello)

//Connection fully established
rlpxConnectionParent.expectMsgClass(classOf[RLPxConnectionHandler.ConnectionEstablished])
}
Expand Down

0 comments on commit 99dc3b4

Please sign in to comment.