Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support domain names in known peers list #3947

Merged
merged 3 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion node-it/src/test/scala/com/wavesplatform/it/Docker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ class Docker(

private def getNodeInfo(containerId: String, settings: WavesSettings): NodeInfo = {
val restApiPort = settings.restAPISettings.port
val networkPort = settings.networkSettings.bindAddress.getPort
// assume test nodes always have an open port
val networkPort = settings.networkSettings.bindAddress.get.getPort

val containerInfo = inspectContainer(containerId)
val wavesIpAddress = containerInfo.networkSettings().networks().get(wavesNetwork.name()).ipAddress()
Expand Down
10 changes: 5 additions & 5 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,18 @@ waves {
# Peers and blacklist storage file
file = ${waves.directory}"/peers.dat"

# If defined, the node will bind to this address and accept the incoming connections. When commented out, the node
# will not accept any incoming connections and will only establish outgoing ones. If you're using UPnP for port
# mapping, make sure to specify the correct address here.
bind-address = "0.0.0.0"

# String with IP address and port to send as external address during handshake. Could be set automatically if UPnP
# is enabled.
#
# If `declared-address` is set, which is the common scenario for nodes running in the cloud, the node will just
# listen to incoming connections on `bind-address:port` and broadcast its `declared-address` to its peers. UPnP
# is supposed to be disabled in this scenario.
#
# If declared address is not set and UPnP is not enabled, the node will not listen to incoming connections at all.
#
# If declared address is not set and UPnP is enabled, the node will attempt to connect to an IGD, retrieve its
# external IP address and configure the gateway to allow traffic through. If the node succeeds, the IGD's external
# IP address becomes the node's declared address.
Expand All @@ -79,9 +82,6 @@ waves {
# to `bind-address:port`. Please note, however, that this setup is not recommended.
# declared-address = "1.2.3.4:6863"

# Network address
bind-address = "0.0.0.0"

# Port number
port = 6863

Expand Down
17 changes: 3 additions & 14 deletions node/src/main/resources/network-defaults.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ waves.defaults {
port = 6863

known-peers = [
"159.69.126.149:6868"
"94.130.105.239:6868"
"159.69.126.153:6868"
"94.130.172.201:6868"
"35.157.247.122:6868"
"peers-testnet.wavesnodes.com:6868"
]
}
}
Expand Down Expand Up @@ -140,10 +136,7 @@ waves.defaults {
# node-name = "My MAINNET node"

known-peers = [
"168.119.116.189:6868"
"135.181.87.72:6868"
"162.55.39.115:6868"
"168.119.155.201:6868"
"peers.wavesnodes.com:6868"
]
}

Expand All @@ -160,11 +153,7 @@ waves.defaults {
port = 6862

known-peers = [
"88.99.185.128:6868"
"95.216.205.3:6868"
"49.12.15.166:6868"
"88.198.179.16:6868"
"52.58.254.101:6868"
"peers-stagenet.wavesnodes.com:6868"
]
}
}
Expand Down
72 changes: 40 additions & 32 deletions node/src/main/scala/com/wavesplatform/network/NetworkServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.wavesplatform.Version
import com.wavesplatform.metrics.Metrics
import com.wavesplatform.network.MessageObserver.Messages
import com.wavesplatform.settings.*
import com.wavesplatform.state.Cast
import com.wavesplatform.transaction.*
import com.wavesplatform.utils.ScorexLogging
import io.netty.bootstrap.{Bootstrap, ServerBootstrap}
Expand All @@ -16,7 +17,7 @@ import io.netty.util.concurrent.DefaultThreadFactory
import monix.reactive.Observable
import org.influxdb.dto.Point

import java.net.{InetSocketAddress, NetworkInterface}
import java.net.{InetSocketAddress, NetworkInterface, SocketAddress}
import java.nio.channels.ClosedChannelException
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.duration.*
Expand Down Expand Up @@ -59,17 +60,17 @@ object NetworkServer extends ScorexLogging {
val trafficLogger = new TrafficLogger(settings.networkSettings.trafficLogger)
val messageCodec = new MessageCodec(peerDatabase)

val excludedAddresses: Set[InetSocketAddress] = {
val bindAddress = settings.networkSettings.bindAddress
val isLocal = Option(bindAddress.getAddress).exists(_.isAnyLocalAddress)
val localAddresses = if (isLocal) {
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala.map(a => new InetSocketAddress(a, bindAddress.getPort)))
.toSet
} else Set(bindAddress)
val excludedAddresses: Set[InetSocketAddress] =
settings.networkSettings.bindAddress.fold(Set.empty[InetSocketAddress]) { bindAddress =>
val isLocal = Option(bindAddress.getAddress).exists(_.isAnyLocalAddress)
val localAddresses = if (isLocal) {
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala.map(a => new InetSocketAddress(a, bindAddress.getPort)))
.toSet
} else Set(bindAddress)

localAddresses ++ settings.networkSettings.declaredAddress.toSet
}
localAddresses ++ settings.networkSettings.declaredAddress.toSet
}

val lengthFieldPrepender = new LengthFieldPrepender(4)

Expand Down Expand Up @@ -113,7 +114,7 @@ object NetworkServer extends ScorexLogging {
fatalErrorHandler
)

val serverChannel = settings.networkSettings.declaredAddress.map { _ =>
val serverChannel = settings.networkSettings.bindAddress.map { bindAddress =>
new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(classOf[NioServerSocketChannel])
Expand All @@ -128,7 +129,7 @@ object NetworkServer extends ScorexLogging {
) ++ pipelineTail
)
)
.bind(settings.networkSettings.bindAddress)
.bind(bindAddress)
.channel()
}

Expand Down Expand Up @@ -166,6 +167,8 @@ object NetworkServer extends ScorexLogging {
s"Channel closed: ${Option(closeFuture.cause()).map(_.getMessage).getOrElse("no message")}"
)
)

logConnections()
}

def handleConnectionAttempt(remoteAddress: InetSocketAddress)(thisConnFuture: ChannelFuture): Unit = {
Expand All @@ -188,6 +191,7 @@ object NetworkServer extends ScorexLogging {
case other => log.debug(formatOutgoingChannelEvent(thisConnFuture.channel(), other.getMessage))
}
}
logConnections()
}

def doConnect(remoteAddress: InetSocketAddress): Unit =
Expand All @@ -201,36 +205,40 @@ object NetworkServer extends ScorexLogging {
}
)

def scheduleConnectTask(): Unit = if (!shutdownInitiated) {
val delay = (if (peerConnectionsMap.isEmpty) AverageHandshakePeriod else 5.seconds) +
(Random.nextInt(1000) - 500).millis // add some noise so that nodes don't attempt to connect to each other simultaneously
log.trace(s"Next connection attempt in $delay")
def logConnections(): Unit = {
def mkAddressString(addresses: IterableOnce[SocketAddress]) =
addresses.iterator.map(_.toString).toVector.sorted.mkString("[", ",", "]")

workerGroup.schedule(delay) {
val outgoing = outgoingChannels.keySet.iterator().asScala.toVector
val incoming = peerInfo.values().asScala.view.map(_.remoteAddress).filterNot(outgoingChannels.containsKey)

def outgoingStr = outgoing.map(_.toString).sorted.mkString("[", ", ", "]")
lazy val incomingStr = mkAddressString(incoming)
lazy val outgoingStr = mkAddressString(outgoingChannels.keySet.iterator().asScala)

val all = peerInfo.values().iterator().asScala.flatMap(_.declaredAddress).toVector
val incoming = all.filterNot(outgoing.contains)
val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress])

def incomingStr = incoming.map(_.toString).sorted.mkString("[", ", ", "]")
log.trace(s"Outgoing: $outgoingStr ++ incoming: $incomingStr")

log.trace(s"Outgoing: $outgoingStr ++ incoming: $incomingStr")
Metrics.write(
Point
.measurement("connections")
.addField("outgoing", outgoingStr)
.addField("incoming", incomingStr)
.addField("n", all.size)
)
}

def scheduleConnectTask(): Unit = if (!shutdownInitiated) {
val delay = (if (peerConnectionsMap.isEmpty) AverageHandshakePeriod else 5.seconds) +
(Random.nextInt(1000) - 500).millis // add some noise so that nodes don't attempt to connect to each other simultaneously

workerGroup.schedule(delay) {
if (outgoingChannels.size() < settings.networkSettings.maxOutboundConnections) {
val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress])
peerDatabase
.randomPeer(excluded = excludedAddresses ++ all)
.foreach(doConnect)
}

Metrics.write(
Point
.measurement("connections")
.addField("outgoing", outgoingStr)
.addField("incoming", incomingStr)
.addField("n", all.size)
)

scheduleConnectTask()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.wavesplatform.network

import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit

import com.google.common.cache.{CacheBuilder, RemovalNotification}
import com.google.common.collect.EvictingQueue
import com.wavesplatform.settings.NetworkSettings
import com.wavesplatform.utils.{JsonFileStorage, ScorexLogging}
import io.netty.channel.Channel
import io.netty.channel.socket.nio.NioSocketChannel

import scala.jdk.CollectionConverters.*
import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.collection.*
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters.*
import scala.util.Random
import scala.util.control.NonFatal

Expand All @@ -36,23 +36,12 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor
}

private type PeersPersistenceType = Set[String]
private val peersPersistence = cache[InetSocketAddress](settings.peersDataResidenceTime, Some(nonExpiringKnownPeers))
private val peersPersistence = cache[InetSocketAddress](settings.peersDataResidenceTime)
private val blacklist = cache[InetAddress](settings.blackListResidenceTime)
private val suspension = cache[InetAddress](settings.suspensionResidenceTime)
private val reasons = mutable.Map.empty[InetAddress, String]
private val unverifiedPeers = EvictingQueue.create[InetSocketAddress](settings.maxUnverifiedPeers)

private val knownPeersAddresses = settings.knownPeers.map(inetSocketAddress(_, 6863))

private def nonExpiringKnownPeers(n: PeerRemoved[InetSocketAddress]): Unit =
if (n.wasEvicted() && knownPeersAddresses.contains(n.getKey))
peersPersistence.put(n.getKey, n.getValue)

for (a <- knownPeersAddresses) {
// add peers from config with max timestamp so they never get evicted from the list of known peers
doTouch(a, Long.MaxValue)
}

for (f <- settings.file if f.exists()) try {
JsonFileStorage.load[PeersPersistenceType](f.getCanonicalPath).foreach(a => touch(inetSocketAddress(a, 6863)))
log.info(s"Loaded ${peersPersistence.size} known peer(s) from ${f.getName}")
Expand All @@ -62,7 +51,7 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor

override def addCandidate(socketAddress: InetSocketAddress): Boolean = unverifiedPeers.synchronized {
val r = !socketAddress.getAddress.isAnyLocalAddress &&
!(socketAddress.getAddress.isLoopbackAddress && socketAddress.getPort == settings.bindAddress.getPort) &&
!(socketAddress.getAddress.isLoopbackAddress && settings.bindAddress.exists(_.getPort == socketAddress.getPort)) &&
Option(peersPersistence.getIfPresent(socketAddress)).isEmpty &&
!unverifiedPeers.contains(socketAddress)
if (r) unverifiedPeers.add(socketAddress)
Expand Down Expand Up @@ -112,26 +101,29 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor
override def suspendedHosts: immutable.Set[InetAddress] = suspension.asMap().asScala.keys.toSet

override def detailedBlacklist: immutable.Map[InetAddress, (Long, String)] =
blacklist.asMap().asScala.view.mapValues(_.toLong).map { case ((h, t)) => h -> ((t, Option(reasons(h)).getOrElse(""))) }.toMap
blacklist.asMap().asScala.view.mapValues(_.toLong).map { case (h, t) => h -> ((t, Option(reasons(h)).getOrElse(""))) }.toMap

override def detailedSuspended: immutable.Map[InetAddress, Long] = suspension.asMap().asScala.view.mapValues(_.toLong).toMap

override def randomPeer(excluded: immutable.Set[InetSocketAddress]): Option[InetSocketAddress] = unverifiedPeers.synchronized {
def excludeAddress(isa: InetSocketAddress): Boolean = {
excluded(isa) || Option(isa.getAddress).exists(blacklistedHosts) || suspendedHosts(isa.getAddress)
}
// excluded only contains local addresses, our declared address, and external declared addresses we already have
// connection to, so it's safe to filter out all matching candidates
unverifiedPeers.removeIf(excluded(_))
val unverified = Option(unverifiedPeers.peek()).filterNot(excludeAddress)
val verified = Random.shuffle(knownPeers.keySet.diff(excluded).toSeq).headOption.filterNot(excludeAddress)

(unverified, verified) match {
case (Some(_), v @ Some(_)) => if (Random.nextBoolean()) Some(unverifiedPeers.poll()) else v
case (Some(_), None) => Some(unverifiedPeers.poll())
case (None, v @ Some(_)) => v
case _ => None

@tailrec
def nextUnverified(): Option[InetSocketAddress] = {
unverifiedPeers.poll() match {
case null => None
case nonNull =>
if (!excludeAddress(nonNull)) Some(nonNull) else nextUnverified()
}
}

nextUnverified() orElse Random
.shuffle(
(knownPeers.keySet ++ settings.knownPeers.map(p => inetSocketAddress(p, 6868))).filterNot(excludeAddress)
)
.headOption
}

def clearBlacklist(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ case class UPnPSettings(enable: Boolean, gatewayTimeout: FiniteDuration, discove

case class NetworkSettings(
file: Option[File],
bindAddress: InetSocketAddress,
bindAddress: Option[InetSocketAddress],
declaredAddress: Option[InetSocketAddress],
nodeName: String,
nonce: Long,
Expand Down Expand Up @@ -48,7 +48,7 @@ object NetworkSettings {

private[this] def fromConfig(config: Config): NetworkSettings = {
val file = config.getAs[File]("file")
val bindAddress = new InetSocketAddress(config.as[String]("bind-address"), config.as[Int]("port"))
val bindAddress = config.getAs[String]("bind-address").map(addr => new InetSocketAddress(addr, config.as[Int]("port")))
val nonce = config.getOrElse("nonce", randomNonce)
val nodeName = config.getOrElse("node-name", s"Node-$nonce")
require(nodeName.utf8Bytes.length <= MaxNodeNameBytesLength, s"Node name should have length less than $MaxNodeNameBytesLength bytes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ class PeerDatabaseImplSpecification extends FreeSpec {
.resolve()
private val settings2 = config2.as[NetworkSettings]("waves.network")

private val config3 = ConfigFactory
.parseString(s"""waves.network {
| file = null
| known-peers = ["$host1:1"]
| peers-data-residence-time = 2s
| enable-peers-exchange = no
|}""".stripMargin)
.withFallback(ConfigFactory.load())
.resolve()
private val settings3 = config3.as[NetworkSettings]("waves.network")

private def withDatabase(settings: NetworkSettings)(f: PeerDatabase => Unit): Unit = {
val pdb = new PeerDatabaseImpl(settings)
f(pdb)
Expand Down Expand Up @@ -78,14 +67,6 @@ class PeerDatabaseImplSpecification extends FreeSpec {
database.randomPeer(Set()) shouldBe empty
}

"known-peers should be always in database" in withDatabase(settings3) { database3 =>
database3.knownPeers.keys should contain(address1)
sleepLong()
database3.knownPeers.keys should contain(address1)
sleepShort()
database3.knownPeers.keys should contain(address1)
}

"touching peer prevent it from obsoleting" in withDatabase(settings1) { database =>
database.addCandidate(address1)
database.touch(address1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class NetworkSettingsSpecification extends FlatSpec {
|}""".stripMargin))
val networkSettings = config.as[NetworkSettings]("waves.network")

networkSettings.bindAddress should be(new InetSocketAddress("127.0.0.1", 6868))
networkSettings.bindAddress should be(Some(new InetSocketAddress("127.0.0.1", 6868)))
networkSettings.nodeName should be("default-node-name")
networkSettings.declaredAddress should be(Some(new InetSocketAddress("127.0.0.1", 6868)))
networkSettings.nonce should be(0)
Expand Down