Skip to content

Commit

Permalink
[ETCM-178] Disallow repeated connections: identify handshaked peers b…
Browse files Browse the repository at this point in the history
…y their node id
  • Loading branch information
Nicolas Tallar committed Oct 28, 2020
1 parent 635f002 commit d60cd85
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 136 deletions.
Expand Up @@ -140,13 +140,13 @@ class DumpChainActor(
val account = n.value.toArray[Byte].toAccount

if (account.codeHash != DumpChainActor.emptyEvm) {
peers.headOption.foreach { case Peer(_, _, _) =>
peers.headOption.foreach { _ =>
evmTorequest = evmTorequest :+ account.codeHash
evmCodeHashes = evmCodeHashes + account.codeHash
}
}
if (account.storageRoot != DumpChainActor.emptyStorage) {
peers.headOption.foreach { case Peer(_, _, _) =>
peers.headOption.foreach { _ =>
contractChildren = contractChildren :+ account.storageRoot
contractNodesHashes = contractNodesHashes + account.storageRoot
}
Expand Down
Expand Up @@ -3,11 +3,21 @@ package io.iohk.ethereum.network
import java.net.InetSocketAddress

import akka.actor.ActorRef
import akka.util.ByteString
import io.iohk.ethereum.blockchain.sync.BlacklistSupport.BlackListId

case class PeerId(value: String) extends BlackListId

case class Peer(remoteAddress: InetSocketAddress, ref: ActorRef, incomingConnection: Boolean) {
object PeerId {
def fromRef(ref: ActorRef): PeerId = PeerId(ref.path.name)
}

case class Peer(
remoteAddress: InetSocketAddress,
ref: ActorRef,
incomingConnection: Boolean,
nodeId: Option[ByteString] = None
) {
// FIXME PeerId should be actual peerId i.e id derived form node public key
def id: PeerId = PeerId(ref.path.name)
def id: PeerId = PeerId.fromRef(ref)
}
19 changes: 10 additions & 9 deletions src/main/scala/io/iohk/ethereum/network/PeerActor.scala
Expand Up @@ -49,9 +49,7 @@ class PeerActor[R <: HandshakeResult](

def scheduler: Scheduler = externalSchedulerOpt getOrElse system.scheduler

val peerId: PeerId = PeerId(self.path.name)

val peer: Peer = Peer(peerAddress, self, incomingConnection)
val peerId: PeerId = PeerId.fromRef(self)

override def receive: Receive = waitingForInitialCommand

Expand Down Expand Up @@ -87,7 +85,7 @@ class PeerActor[R <: HandshakeResult](
case RLPxConnectionHandler.ConnectionEstablished(remoteNodeId) =>
val newUri =
rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri))
processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries)
processHandshakerNextMessage(initHandshaker, remoteNodeId, rlpxConnection.copy(uriOpt = newUri), numRetries)

case RLPxConnectionHandler.ConnectionFailed =>
log.debug("Failed to establish RLPx connection")
Expand All @@ -109,6 +107,7 @@ class PeerActor[R <: HandshakeResult](

def processingHandshaking(
handshaker: Handshaker[R],
remoteNodeId: ByteString,
rlpxConnection: RLPxConnection,
timeout: Cancellable,
numRetries: Int
Expand All @@ -122,14 +121,14 @@ class PeerActor[R <: HandshakeResult](
// handles the received message
handshaker.applyMessage(msg).foreach { newHandshaker =>
timeout.cancel()
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)
}
handshaker.respondToRequest(msg).foreach(msgToSend => rlpxConnection.sendMessage(msgToSend))

case ResponseTimeout =>
timeout.cancel()
val newHandshaker = handshaker.processTimeout
processHandshakerNextMessage(newHandshaker, rlpxConnection, numRetries)
processHandshakerNextMessage(newHandshaker, remoteNodeId, rlpxConnection, numRetries)

case GetStatus => sender() ! StatusResponse(Handshaking(numRetries))

Expand All @@ -145,18 +144,19 @@ class PeerActor[R <: HandshakeResult](
*/
private def processHandshakerNextMessage(
handshaker: Handshaker[R],
remoteNodeId: ByteString,
rlpxConnection: RLPxConnection,
numRetries: Int
): Unit =
handshaker.nextMessage match {
case Right(NextMessage(msgToSend, timeoutTime)) =>
rlpxConnection.sendMessage(msgToSend)
val newTimeout = scheduler.scheduleOnce(timeoutTime, self, ResponseTimeout)
context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries)
context become processingHandshaking(handshaker, remoteNodeId, rlpxConnection, newTimeout, numRetries)

case Left(HandshakeSuccess(handshakeResult)) =>
rlpxConnection.uriOpt.foreach { uri => knownNodesManager ! KnownNodesManager.AddKnownNode(uri) }
context become new HandshakedPeer(rlpxConnection, handshakeResult).receive
context become new HandshakedPeer(remoteNodeId, rlpxConnection, handshakeResult).receive
unstashAll()

case Left(HandshakeFailure(reason)) =>
Expand Down Expand Up @@ -244,8 +244,9 @@ class PeerActor[R <: HandshakeResult](
stash()
}

class HandshakedPeer(rlpxConnection: RLPxConnection, handshakeResult: R) {
class HandshakedPeer(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, handshakeResult: R) {

val peer: Peer = Peer(peerAddress, self, incomingConnection, Some(remoteNodeId))
peerEventBus ! Publish(PeerHandshakeSuccessful(peer, handshakeResult))

/**
Expand Down

0 comments on commit d60cd85

Please sign in to comment.