Skip to content

Commit

Permalink
[ETCM-178] Identify handshaked peers by their node id
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolas Tallar committed Oct 28, 2020
1 parent f0befc1 commit 9420771
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 94 deletions.
Expand Up @@ -140,13 +140,13 @@ class DumpChainActor(
val account = n.value.toArray[Byte].toAccount

if (account.codeHash != DumpChainActor.emptyEvm) {
peers.headOption.foreach { case Peer(_, _, _) =>
peers.headOption.foreach { _ =>
evmTorequest = evmTorequest :+ account.codeHash
evmCodeHashes = evmCodeHashes + account.codeHash
}
}
if (account.storageRoot != DumpChainActor.emptyStorage) {
peers.headOption.foreach { case Peer(_, _, _) =>
peers.headOption.foreach { _ =>
contractChildren = contractChildren :+ account.storageRoot
contractNodesHashes = contractNodesHashes + account.storageRoot
}
Expand Down
19 changes: 10 additions & 9 deletions src/main/scala/io/iohk/ethereum/network/PeerActor.scala
Expand Up @@ -49,9 +49,7 @@ class PeerActor[R <: HandshakeResult](

def scheduler: Scheduler = externalSchedulerOpt getOrElse system.scheduler

val peerId: PeerId = PeerId(self.path.name)

val peer: Peer = Peer(peerAddress, self, incomingConnection)
val peerId: PeerId = PeerId.fromRef(self)

override def receive: Receive = waitingForInitialCommand

Expand Down Expand Up @@ -87,7 +85,7 @@ class PeerActor[R <: HandshakeResult](
case RLPxConnectionHandler.ConnectionEstablished(remoteNodeId) =>
val newUri =
rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri))
processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries)
processHandshakerNextMessage(initHandshaker, remoteNodeId, rlpxConnection.copy(uriOpt = newUri), numRetries)

case RLPxConnectionHandler.ConnectionFailed =>
log.debug("Failed to establish RLPx connection")
Expand All @@ -109,6 +107,7 @@ class PeerActor[R <: HandshakeResult](

def processingHandshaking(
handshaker: Handshaker[R],
remoteNodeId: ByteString,
rlpxConnection: RLPxConnection,
timeout: Cancellable,
numRetries: Int
Expand All @@ -122,14 +121,14 @@ class PeerActor[R <: HandshakeResult](
// handles the received message
handshaker.applyMessage(msg).foreach { newHandshaker =>
timeout.cancel()
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
}
handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend))

case ResponseTimeout =>
timeout.cancel()
val newHandshaker = handshaker.processTimeout
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)

case GetStatus => sender() ! StatusResponse(Handshaking(numRetries))

Expand All @@ -145,18 +144,19 @@ class PeerActor[R <: HandshakeResult](
*/
private def processHandshakerNextMessage(
handshaker: Handshaker[R],
remoteNodeId: ByteString,
rlpxConnection: RLPxConnection,
numRetries: Int
): Unit =
handshaker.nextMessage match {
case Right(NextMessage(msgToSend, timeoutTime)) =>
rlpxConnection.sendMessage(msgToSend)
val newTimeout = scheduler.scheduleOnce(timeoutTime, self, ResponseTimeout)
context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries)
context become processingHandshaking(handshaker, remoteNodeId, rlpxConnection, newTimeout, numRetries)

case Left(HandshakeSuccess(handshakeResult)) =>
rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager.AddKnownNode(uri) }
context become new HandshakedPeer(rlpxConnection, handshakeResult).receive
context become new HandshakedPeer(remoteNodeId, rlpxConnection, handshakeResult).receive
unstashAll()

case Left(HandshakeFailure(reason)) =>
Expand Down Expand Up @@ -244,8 +244,9 @@ class PeerActor[R <: HandshakeResult](
stash()
}

class HandshakedPeer(rlpxConnection: RLPxConnection, handshakeResult: R) {
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {

val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId))
peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult))

/**
Expand Down
Expand Up @@ -39,7 +39,8 @@ object PeerEventBusActor {
object PeerEvent {
case class MessageFromPeer(message: Message, peerId: PeerId) extends PeerEvent
case class PeerDisconnected(peerId: PeerId) extends PeerEvent
case class PeerHandshakeSuccessful[R <: HandshakeResult](peer: Peer, handshakeResult: R) extends PeerEvent
case class PeerHandshakeSuccessful[R <: HandshakeResult](peer: Peer, handshakeResult: R)
extends PeerEvent
}

case class Subscription(subscriber: ActorRef, classifier: SubscriptionClassifier)
Expand Down

0 comments on commit 9420771

Please sign in to comment.