Skip to content

Commit

Permalink
[ETCM-540] Improve peer discovery algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
AnastasiiaL committed Jan 26, 2021
1 parent 5ea1b6f commit 567450f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 10 deletions.
6 changes: 3 additions & 3 deletions src/main/resources/application.conf
Expand Up @@ -165,12 +165,12 @@ mantis {
# Newly discovered nodes connect attempt interval
update-nodes-interval = 30.seconds

# Peer which disconnect during tcp connection becouse of too many peers will not be retried for this short duration
# Peer which disconnect during tcp connection because of too many peers will not be retried for this short duration
short-blacklist-duration = 6.minutes

# Peer which disconnect during tcp connection becouse of other reasons will not be retried for this long duration
# Peer which disconnect during tcp connection because of other reasons will not be retried for this long duration
# other reasons include: timeout during connection, wrong protocol, incompatible network
long-blacklist-duration = 30.minutes
long-blacklist-duration = 600.minutes

# Resolution of moving window of peer statistics.
# Will be multiplied by `stat-slot-count` to give the overall length of peer statistics availability.
Expand Down
28 changes: 23 additions & 5 deletions src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
Expand Up @@ -22,7 +22,10 @@ import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import monix.eval.Task
import monix.execution.{Scheduler => MonixScheduler}
import org.bouncycastle.util.encoders.Hex

import scala.collection.mutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class PeerManagerActor(
peerEventBus: ActorRef,
Expand All @@ -48,6 +51,12 @@ class PeerManagerActor(
*/
override val maxBlacklistedNodes: Int = 32 * 8 * discoveryConfig.kademliaBucketSize

def lruCache[A, B](maxEntries: Int): mutable.Map[A, B] = new java.util.LinkedHashMap[A, B]() {
override def removeEldestEntry(eldest: java.util.Map.Entry[A, B]): Boolean = size > maxEntries
}.asScala

val triedNodes = lruCache[ByteString, Node](maxBlacklistedNodes)

import PeerManagerActor._
import akka.pattern.pipe

Expand Down Expand Up @@ -134,9 +143,15 @@ class PeerManagerActor(
}

private def maybeConnectToDiscoveredNodes(connectedPeers: ConnectedPeers, nodes: Set[Node]): Unit = {
val nodesToConnect = nodes
val discoveredNodes = nodes
.filter(connectedPeers.canConnectTo)
.take(connectedPeers.outgoingConnectionDemand)

val nodesToConnect = discoveredNodes
.filterNot(n => triedNodes.contains(n.id)) match {
case seq if seq.size > connectedPeers.outgoingConnectionDemand =>
seq.take(connectedPeers.outgoingConnectionDemand)
case _ => discoveredNodes.take(connectedPeers.outgoingConnectionDemand)
}

NetworkMetrics.DiscoveredPeersSize.set(nodes.size)
NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size)
Expand All @@ -152,14 +167,17 @@ class PeerManagerActor(

if (nodesToConnect.nonEmpty) {
log.debug("Trying to connect to {} nodes", nodesToConnect.size)
nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri))
nodesToConnect.foreach(n => {
triedNodes.put(n.id, n)
self ! ConnectToPeer(n.toUri)
})
} else {
log.debug("The nodes list is empty, no new nodes to connect to")
}

// Make sure the background lookups keep going and we don't get stuck with 0
// nodes to connect to until the next discovery scan loop. Only sending 1
// request so we don't rack up too many pending futures, just trigger a a
// request so we don't rack up too many pending futures, just trigger a
// search if needed.
if (connectedPeers.outgoingConnectionDemand > nodesToConnect.size) {
peerDiscoveryManager ! PeerDiscoveryManager.GetRandomNodeInfo
Expand All @@ -184,7 +202,7 @@ class PeerManagerActor(
private def getBlacklistDuration(reason: Long): FiniteDuration = {
import Disconnect.Reasons._
reason match {
case TooManyPeers => peerConfiguration.shortBlacklistDuration
case TooManyPeers | AlreadyConnected | ClientQuitting => peerConfiguration.shortBlacklistDuration
case _ => peerConfiguration.longBlacklistDuration
}
}
Expand Down
43 changes: 41 additions & 2 deletions src/test/scala/io/iohk/ethereum/network/PeerManagerSpec.scala
Expand Up @@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
import org.scalacheck.{Arbitrary, Gen, Shrink}, Arbitrary.arbitrary
import org.scalacheck.{Arbitrary, Gen}, Arbitrary.arbitrary
import scala.concurrent.duration._

// scalastyle:off magic.number
Expand Down Expand Up @@ -274,7 +274,7 @@ class PeerManagerSpec

behavior of "outgoingConnectionDemand"

it should "try to connect to at least min-outgoing-peers but no longer than max-outgoing-peers" in new ConnectedPeersFixture {
it should "try to connect to at least min-outgoing-peers but no more than max-outgoing-peers" in new ConnectedPeersFixture {
forAll { (connectedPeers: ConnectedPeers) =>
val demand = PeerManagerActor.outgoingConnectionDemand(connectedPeers, peerConfiguration)
demand shouldBe >=(0)
Expand All @@ -286,6 +286,45 @@ class PeerManagerSpec
}
}

it should "try to connect to discovered nodes if there's an outgoing demand: new nodes first, retried last" in new TestSetup {
start()
val discoveredNodes: Set[Node] = Set(
"enode://111bd28d5b2c1378d748383fd83ff59572967c317c3063a9f475a26ad3f1517642a164338fb5268d4e32ea1cc48e663bd627dec572f1d201c7198518e5a506b1@88.99.216.30:45834?discport=45834",
"enode://2b69a3926f36a7748c9021c34050be5e0b64346225e477fe7377070f6289bd363b2be73a06010fd516e6ea3ee90778dd0399bc007bb1281923a79374f842675a@51.15.116.226:30303?discport=30303"
).map(new java.net.URI(_)).map(Node.fromUri)

peerManager ! PeerDiscoveryManager.DiscoveredNodesInfo(discoveredNodes)

peerDiscoveryManager.expectMsg(PeerDiscoveryManager.GetRandomNodeInfo)

val probe: TestProbe = createdPeers(0).probe
probe.expectMsgClass(classOf[PeerActor.ConnectTo])

val probe2: TestProbe = createdPeers(1).probe
probe2.expectMsgClass(classOf[PeerActor.ConnectTo])

peerManager ! PeerClosedConnection(discoveredNodes.head.addr.getHostAddress, Disconnect.Reasons.TooManyPeers)

peerManager.underlyingActor.blacklistedPeers.size shouldEqual 1
peerManager.underlyingActor.triedNodes.size shouldEqual 2

time.advance(360000) // wait till the peer is out of the blacklist

val newRoundDiscoveredNodes = discoveredNodes + Node.fromUri(new java.net.URI(
"enode://a59e33ccd2b3e52d578f1fbd70c6f9babda2650f0760d6ff3b37742fdcdfdb3defba5d56d315b40c46b70198c7621e63ffa3f987389c7118634b0fefbbdfa7fd@51.158.191.43:38556?discport=38556"))

peerManager ! PeerDiscoveryManager.DiscoveredNodesInfo(newRoundDiscoveredNodes)

probe.expectNoMessage()
probe2.expectNoMessage()

val probe3: TestProbe = createdPeers(2).probe
probe3.expectMsgClass(classOf[PeerActor.ConnectTo])

peerManager.underlyingActor.blacklistedPeers.size shouldEqual 0
peerManager.underlyingActor.triedNodes.size shouldEqual 3
}

behavior of "numberOfIncomingConnectionsToPrune"

it should "try to prune incoming connections down to the minimum allowed number" in new ConnectedPeersFixture {
Expand Down

0 comments on commit 567450f

Please sign in to comment.