Skip to content

Commit

Permalink
Merge pull request #770 from input-output-hk/ETCM-168-discovery-part4
Browse files Browse the repository at this point in the history
ETCM-168: Discovery part4
  • Loading branch information
aakoshh committed Nov 17, 2020
2 parents ac691fd + 02f3469 commit 0f73e7e
Show file tree
Hide file tree
Showing 28 changed files with 807 additions and 800 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object Dependencies {
)

val network: Seq[ModuleID] = {
val scalanetVersion = "0.4-SNAPSHOT"
val scalanetVersion = "0.4.2-SNAPSHOT"
Seq(
"io.iohk" %% "scalanet" % scalanetVersion,
"io.iohk" %% "scalanet-discovery" % scalanetVersion
Expand Down
48 changes: 24 additions & 24 deletions repo.nix
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,37 @@
"nix-public" = "";
};
"artifacts" = {
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-javadoc.jar";
sha256 = "E1BFC529ACE03B29B2D7AB72F8587C55EDEEEE211EC795E328E2F738D67ECE57";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-javadoc.jar";
sha256 = "190A1AB2C6EBEBDAE6E8729005018C6524C79E87447F09F0EC26DA7005D27AE2";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT-sources.jar";
sha256 = "A86A69FB1E4973AFE1FC83FA6F49C6E71221083F437CB75333461024EBC39962";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT-sources.jar";
sha256 = "BE91870FA3F3F1B4D37344563B33CB06E6451788CACF2CB7BAA77C0108D8E2E5";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.jar";
sha256 = "72DB578799E07B3D10612C8A61BFA3D21EAC31312E0BAFECCD203D0D637D7498";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.jar";
sha256 = "2FCBBF064CC95DA4328FB259F4424C1B745826A308249169EE4343C0C962CE94";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4-SNAPSHOT/scalanet-discovery_2.12-0.4-SNAPSHOT.pom";
sha256 = "30830E277F9651F63FA521F111BFF95E00DD3B96FEF481E8B334F6539521B738";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet-discovery_2.12/0.4.2-SNAPSHOT/scalanet-discovery_2.12-0.4.2-SNAPSHOT.pom";
sha256 = "46B60B737421B7E5E4CEB6412DCAAD80221EDC3ACD7E2227CDA01F1A9F57E18E";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-javadoc.jar";
sha256 = "B38F674EEA0E4660D2A39245D9BC129FD9171312C5CCCCE9DB5D1BC140865A86";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-javadoc.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-javadoc.jar";
sha256 = "84D56180DC60D6F4F6911D4507622E91DE4E2C96582F97784135DBFDECB0B12B";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT-sources.jar";
sha256 = "DBF79681BAC9A923B0EAB597BE5B849C6F444BB11662BB07DC755BBBD97590EB";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-sources.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT-sources.jar";
sha256 = "07553109F5461D45AC1B048C134132E6CBB32EC838300093E550B8CFE62C434B";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.jar";
sha256 = "98D7F789C0A80FE7F810B0BD75A0F623E787C0212CEBDB71B2501EA7588C999C";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.jar" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.jar";
sha256 = "7DF884B172D973459BB7AF6553014B984FE87EC788DD35AEB27F0DEDDD4B215E";
};
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4-SNAPSHOT/scalanet_2.12-0.4-SNAPSHOT.pom";
sha256 = "5EB55ED46D169049D4EB371DB120C6F6EE37AFE89230DFA7CE11298714801481";
"nix-Sonatype OSS Snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.pom" = {
url = "https://oss.sonatype.org/content/repositories/snapshots/io/iohk/scalanet_2.12/0.4.2-SNAPSHOT/scalanet_2.12-0.4.2-SNAPSHOT.pom";
sha256 = "A27D2641B97A06FDA1B2C81FA96FA1ECA1E563ACA36CEEF3C8C6070C0F1CF731";
};
"nix-public/ch/megard/akka-http-cors_2.12/1.1.0/akka-http-cors_2.12-1.1.0-javadoc.jar" = {
url = "https://repo1.maven.org/maven2/ch/megard/akka-http-cors_2.12/1.1.0/akka-http-cors_2.12-1.1.0-javadoc.jar";
Expand Down
11 changes: 4 additions & 7 deletions src/it/scala/io/iohk/ethereum/sync/util/CommonFakePeer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import io.iohk.ethereum.mpt.MerklePatriciaTrie
import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo
import io.iohk.ethereum.network.PeerManagerActor.{FastSyncHostConfiguration, PeerConfiguration}
import io.iohk.ethereum.network.discovery.{DiscoveryConfig, Node}
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo, DiscoveryNodeInfo}
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager.{DiscoveredNodesInfo}
import io.iohk.ethereum.network.handshaker.{EtcHandshaker, EtcHandshakerConfiguration, Handshaker}
import io.iohk.ethereum.network.p2p.EthereumMessageDecoder
import io.iohk.ethereum.network.p2p.messages.CommonMessages.NewBlock
Expand Down Expand Up @@ -193,10 +193,7 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
val listenAddress = randomAddress()

lazy val node =
DiscoveryNodeInfo(
Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort),
1
)
Node(ByteString(nodeStatus.nodeId), listenAddress.getAddress, listenAddress.getPort, listenAddress.getPort)

lazy val vmConfig = VmConfig(Config.config)

Expand Down Expand Up @@ -259,14 +256,14 @@ abstract class CommonFakePeer(peerName: String, fakePeerCustomConfig: FakePeerCu
} yield ()
}

def connectToPeers(nodes: Set[DiscoveryNodeInfo]): Task[Unit] = {
def connectToPeers(nodes: Set[Node]): Task[Unit] = {
for {
_ <- Task {
peerManager ! DiscoveredNodesInfo(nodes)
}
_ <- retryUntilWithDelay(Task(storagesInstance.storages.knownNodesStorage.getKnownNodes()), 1.second, 5) {
knownNodes =>
val requestedNodes = nodes.map(_.node.id)
val requestedNodes = nodes.map(_.id)
val currentNodes = knownNodes.map(Node.fromUri).map(_.id)
requestedNodes.subsetOf(currentNodes)
}
Expand Down
44 changes: 33 additions & 11 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,51 @@ mantis {
# Turn discovery of/off
discovery-enabled = true

# Externally visible hostname or IP.
host = null

# Listening interface for discovery protocol
interface = "0.0.0.0"

# Listening port for discovery protocol
port = 30303

# Maximum discovered nodes stored (TODO: remove me once full protocol is in place)
nodes-limit = 5000

# Initial delay for discovery scan
scan-initial-delay = 20.seconds
# If true, the node considers the bootstrap and the previously persisted nodes
# as already discovered and uses them as peer candidates to get blocks from.
# Otherwise it enroll with the bootstrap nodes and gradually discover the
# network every time we start, eventually serving candidates.
#
# Useful if discovery has problem, as the node can start syncing with the
# bootstraps straight away.
#
# Note that setting reuse-known-nodes and discovery-enabled to false at the
# same time would mean the node would have no peer candidates at all.
reuse-known-nodes = true

# Scan interval for discovery
scan-interval = 30.seconds
scan-interval = 15.minutes

# Discovery message expiration time
message-expiration = 90.minutes
message-expiration = 1.minute

# Maximum amount a message can be expired by,
# accounting for possible discrepancies between nodes' clocks.
max-clock-drift = 15.seconds

# Maximum number of peers in each k-bucket.
kademlia-bucket-size = 16

# Timeout for individual requests like Ping.
request-timeout = 3.seconds

# Timeout to collect all possible responses for a FindNode request.
kademlia-timeout = 7.seconds

# (TODO: remove me once full protocol is in place)
scan-max-nodes = 20
# Level of concurrency during lookups and enrollment.
kademlia-alpha = 3

# (TODO: remove me once full protocol is in place)
max-sent-neighbours = 10
# Maximum number of messages in the queue associated with a UDP channel.
channel-capacity = 100
}

known-nodes {
Expand Down
2 changes: 2 additions & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
<appender-ref ref="METRICS" />
</root>

<logger name="io.netty" level="WARN"/>
<logger name="io.iohk.scalanet" level="INFO" />
<logger name="io.iohk.ethereum.network.rlpx.RLPxConnectionHandler" level="DEBUG" />
<logger name="io.iohk.ethereum.blockchain.sync.SyncController" level="INFO" />
<logger name="io.iohk.ethereum.network.PeerActor" level="DEBUG" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait SecureRandomBuilder {
self: FaucetConfigBuilder =>
lazy val secureRandom: SecureRandom =
ConfigUtils
.getOptionalValue(rawMantisConfig, "secure-random-algo", config => config.getString("secure-random-algo"))
.getOptionalValue(rawMantisConfig, _.getString, "secure-random-algo")
.flatMap(name => Try { SecureRandom.getInstance(name) }.toOption)
.getOrElse(new SecureRandom())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import org.json4s.{DefaultFormats, JInt, native}

import scala.util.Try

trait JsonRpcHttpServer extends Json4sSupport {
val jsonRpcController: JsonRpcBaseController
val jsonRpcHealthChecker: JsonRpcHealthChecker
Expand Down Expand Up @@ -134,15 +132,12 @@ object JsonRpcHttpServer extends Logger {

override val corsAllowedOrigins = ConfigUtils.parseCorsAllowedOrigins(rpcHttpConfig, "cors-allowed-origins")

override val certificateKeyStorePath: Option[String] = Try(
rpcHttpConfig.getString("certificate-keystore-path")
).toOption
override val certificateKeyStoreType: Option[String] = Try(
rpcHttpConfig.getString("certificate-keystore-type")
).toOption
override val certificatePasswordFile: Option[String] = Try(
rpcHttpConfig.getString("certificate-password-file")
).toOption
override val certificateKeyStorePath: Option[String] =
ConfigUtils.getOptionalValue(rpcHttpConfig, _.getString, "certificate-keystore-path")
override val certificateKeyStoreType: Option[String] =
ConfigUtils.getOptionalValue(rpcHttpConfig, _.getString, "certificate-keystore-type")
override val certificatePasswordFile: Option[String] =
ConfigUtils.getOptionalValue(rpcHttpConfig, _.getString, "certificate-password-file")
}
}
}
Expand Down
26 changes: 14 additions & 12 deletions src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ class PeerManagerActor(
/**
* Maximum number of blacklisted nodes will never be larger than number of peers provided by discovery
* Discovery provides remote nodes from all networks (ETC,ETH, Mordor etc.) only during handshake we learn that some
* of the remote nodes are not compatible that's why we mark them as useless (blacklist them)
* of the remote nodes are not compatible that's why we mark them as useless (blacklist them).
*
* The number of nodes in the current discovery is unlimited, but a guide may be the size of the routing table:
* one bucket for each bit in the hash of the public key, times the bucket size.
*/
override val maxBlacklistedNodes: Int = discoveryConfig.nodesLimit
override val maxBlacklistedNodes: Int = 32 * 8 * discoveryConfig.kademliaBucketSize

import PeerManagerActor._
import akka.pattern.{ask, pipe}
Expand Down Expand Up @@ -96,23 +99,22 @@ class PeerManagerActor(
log.debug("The known nodes list is empty")
}

case PeerDiscoveryManager.DiscoveredNodesInfo(nodesInfo) =>
val nodesToConnect = nodesInfo
.filterNot { discoveryNodeInfo =>
val socketAddress = discoveryNodeInfo.node.tcpSocketAddress
val alreadyConnected = connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith(
discoveryNodeInfo.node.id
)
case PeerDiscoveryManager.DiscoveredNodesInfo(nodes) =>
val nodesToConnect = nodes
.filterNot { node =>
val socketAddress = node.tcpSocketAddress
val alreadyConnected =
connectedPeers.isConnectionHandled(socketAddress) || connectedPeers.hasHandshakedWith(node.id)
alreadyConnected || isBlacklisted(PeerAddress(socketAddress.getHostString))
} // not already connected to or blacklisted
.take(peerConfiguration.maxOutgoingPeers - connectedPeers.outgoingPeersCount)

NetworkMetrics.DiscoveredPeersSize.set(nodesInfo.size)
NetworkMetrics.DiscoveredPeersSize.set(nodes.size)
NetworkMetrics.BlacklistedPeersSize.set(blacklistedPeers.size)
NetworkMetrics.PendingPeersSize.set(connectedPeers.pendingPeersCount)

log.info(
s"Discovered ${nodesInfo.size} nodes, " +
s"Discovered ${nodes.size} nodes, " +
s"Blacklisted ${blacklistedPeers.size} nodes, " +
s"handshaked to ${connectedPeers.handshakedPeersCount}/${peerConfiguration.maxOutgoingPeers + peerConfiguration.maxIncomingPeers}, " +
s"pending connection attempts ${connectedPeers.pendingPeersCount}. " +
Expand All @@ -121,7 +123,7 @@ class PeerManagerActor(

if (nodesToConnect.nonEmpty) {
log.debug("Trying to connect to {} nodes", nodesToConnect.size)
nodesToConnect.foreach(n => self ! ConnectToPeer(n.node.toUri))
nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri))
} else {
log.debug("The nodes list is empty, no new nodes to connect to")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package io.iohk.ethereum.network.discovery

import io.iohk.ethereum.utils.ConfigUtils
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

case class DiscoveryConfig(
discoveryEnabled: Boolean,
host: Option[String],
interface: String,
port: Int,
bootstrapNodes: Set[Node],
nodesLimit: Int /* TODO: remove once proper discovery protocol is in place */,
scanMaxNodes: Int /* TODO: remove once proper discovery protocol is in place */,
maxNeighbours: Int /* TODO: remove once proper discovery protocol is in place */,
scanInitialDelay: FiniteDuration,
reuseKnownNodes: Boolean,
scanInterval: FiniteDuration,
messageExpiration: FiniteDuration
messageExpiration: FiniteDuration,
maxClockDrift: FiniteDuration,
requestTimeout: FiniteDuration,
kademliaTimeout: FiniteDuration,
kademliaBucketSize: Int,
kademliaAlpha: Int,
channelCapacity: Int
)

object DiscoveryConfig {
Expand All @@ -22,15 +27,19 @@ object DiscoveryConfig {

DiscoveryConfig(
discoveryEnabled = discoveryConfig.getBoolean("discovery-enabled"),
host = ConfigUtils.getOptionalValue(discoveryConfig, _.getString, "host"),
interface = discoveryConfig.getString("interface"),
port = discoveryConfig.getInt("port"),
bootstrapNodes = NodeParser.parseNodes(bootstrapNodes),
nodesLimit = discoveryConfig.getInt("nodes-limit"),
scanMaxNodes = discoveryConfig.getInt("scan-max-nodes"),
maxNeighbours = discoveryConfig.getInt("max-sent-neighbours"),
scanInitialDelay = discoveryConfig.getDuration("scan-initial-delay").toMillis.millis,
reuseKnownNodes = discoveryConfig.getBoolean("reuse-known-nodes"),
scanInterval = discoveryConfig.getDuration("scan-interval").toMillis.millis,
messageExpiration = discoveryConfig.getDuration("message-expiration").toMillis.millis
messageExpiration = discoveryConfig.getDuration("message-expiration").toMillis.millis,
maxClockDrift = discoveryConfig.getDuration("max-clock-drift").toMillis.millis,
requestTimeout = discoveryConfig.getDuration("request-timeout").toMillis.millis,
kademliaTimeout = discoveryConfig.getDuration("kademlia-timeout").toMillis.millis,
kademliaBucketSize = discoveryConfig.getInt("kademlia-bucket-size"),
kademliaAlpha = discoveryConfig.getInt("kademlia-alpha"),
channelCapacity = discoveryConfig.getInt("channel-capacity")
)
}

Expand Down

This file was deleted.

0 comments on commit 0f73e7e

Please sign in to comment.