Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/phase/beta1' into feature/eip155…
Browse files Browse the repository at this point in the history
…conf
  • Loading branch information
adamsmo committed Jul 19, 2017
2 parents 29913e7 + 251c8be commit c585470
Show file tree
Hide file tree
Showing 20 changed files with 585 additions and 72 deletions.
11 changes: 7 additions & 4 deletions src/it/scala/io/iohk/ethereum/txExecTest/util/DumpChainApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.iohk.ethereum.utils.{BlockchainConfig, Config, NodeStatus, ServerStatu
import org.spongycastle.util.encoders.Hex

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder with AuthHandshakerBuilder {
val conf = ConfigFactory.load("txExecTest/chainDump.conf")
Expand All @@ -43,6 +43,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
override val fastSyncHostConfiguration: PeerManagerActor.FastSyncHostConfiguration = Config.Network.peer.fastSyncHostConfiguration
override val maxPeers: Int = Config.Network.peer.maxPeers
override val networkId: Int = privateNetworkId
override val updateNodesInitialDelay: FiniteDuration = 5.seconds
override val updateNodesInterval: FiniteDuration = 20.seconds
}

val actorSystem = ActorSystem("etc-client_system")
Expand All @@ -53,7 +55,8 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
val nodeStatus =
NodeStatus(
key = nodeKey,
serverStatus = ServerStatus.NotListening)
serverStatus = ServerStatus.NotListening,
discoveryStatus = ServerStatus.NotListening)

lazy val nodeStatusHolder = Agent(nodeStatus)

Expand All @@ -75,10 +78,10 @@ object DumpChainApp extends App with NodeKeyBuilder with SecureRandomBuilder wit
val peerMessageBus = actorSystem.actorOf(PeerEventBusActor.props)

val peerManager = actorSystem.actorOf(PeerManagerActor.props(
peerDiscoveryManager = actorSystem.deadLetters, // TODO: fixme
nodeStatusHolder = nodeStatusHolder,
peerConfiguration = peerConfig,
bootstrapNodes = Set(node),
peerMessageBus,
peerMessageBus = peerMessageBus,
handshaker = handshaker,
authHandshaker = authHandshaker,
messageDecoder = EthereumMessageDecoder), "peer-manager")
Expand Down
28 changes: 26 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ grothendieck {
}

discovery {
# Listening interface for discovery protocol
interface = "0.0.0.0"

# Listening port for discovery protocol
port = 30303

# Set of initial nodes
bootstrap-nodes = [
"enode://18a551bee469c2e02de660ab01dede06503c986f6b8520cb5a65ad122df88b17b285e3fef09a40a0d44f99e014f8616cf1ebc2e094f96c6e09e2f390f5d34857@47.90.36.129:30303",
Expand All @@ -46,8 +52,20 @@ grothendieck {
"enode://fba5a07e283d517a2680bcfc7aeb498ac2d246d756556a2ebd5edeb39496491c47a6d27e27f82833b7d7d12defc8de994de04bb58beb72472649f9a323006820@41.135.121.6:30303"
]

# TODO: remove me
bootstrap-nodes-scan-interval = 2 minutes
# Maximum discovered nodes stored (TODO: remove me once full protocol is in place)
nodes-limit = 1000

# Initial delay for discovery scan
scan-initial-delay = 10.seconds

# Scan interval for discovery
scan-interval = 1.minute

# Discovery message expiration time
message-expiration = 90.minutes

# (TODO: remove me once full protocol is in place)
scan-max-nodes = 10
}

peer {
Expand Down Expand Up @@ -88,6 +106,12 @@ grothendieck {
# Ethereum network identifier:
# 1 - mainnet, 2 - morden
network-id = 1

# Initial delay before connecting to nodes
update-nodes-initial-delay = 5.seconds

# Newly discovered nodes connect attempt interval
update-nodes-interval = 20.seconds
}

rpc {
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/io/iohk/ethereum/App.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.iohk.ethereum

import io.iohk.ethereum.blockchain.sync.SyncController
import io.iohk.ethereum.network.discovery.DiscoveryListener
import io.iohk.ethereum.network.{PeerManagerActor, ServerActor}
import io.iohk.ethereum.utils.Logger
import io.iohk.ethereum.nodebuilder.Node
Expand Down Expand Up @@ -28,8 +29,11 @@ object App {

peerManager ! PeerManagerActor.StartConnecting
server ! ServerActor.StartServer(networkConfig.Server.listenAddress)
discoveryListener ! DiscoveryListener.Start
syncController ! SyncController.StartSync

peerDiscoveryManager // unlazy

if (jsonRpcHttpServerConfig.enabled) jsonRpcHttpServer.run()
}

Expand Down
51 changes: 23 additions & 28 deletions src/main/scala/io/iohk/ethereum/network/PeerManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,34 @@ import akka.actor._
import akka.agent.Agent
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
import io.iohk.ethereum.network.PeerEventBusActor.Publish
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager
import io.iohk.ethereum.network.handshaker.Handshaker
import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
import io.iohk.ethereum.network.p2p.{MessageDecoder, MessageSerializable}
import io.iohk.ethereum.network.rlpx.AuthHandshaker
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.utils.{Config, NodeStatus}
import io.iohk.ethereum.utils.NodeStatus

import scala.util.{Failure, Success}

class PeerManagerActor(
peerEventBus: ActorRef,
peerDiscoveryManager: ActorRef,
peerConfiguration: PeerConfiguration,
peerFactory: (ActorContext, InetSocketAddress) => ActorRef,
externalSchedulerOpt: Option[Scheduler] = None,
bootstrapNodes: Set[String] = Config.Network.Discovery.bootstrapNodes)
externalSchedulerOpt: Option[Scheduler] = None)
extends Actor with ActorLogging with Stash {

import akka.pattern.{ask, pipe}
import PeerManagerActor._
import Config.Network.Discovery._

var peers: Map[PeerId, Peer] = Map.empty

private def scheduler = externalSchedulerOpt getOrElse context.system.scheduler

scheduler.schedule(0.seconds, bootstrapNodesScanInterval, self, ScanBootstrapNodes)
scheduler.schedule(peerConfiguration.updateNodesInitialDelay, peerConfiguration.updateNodesInterval) {
peerDiscoveryManager ! PeerDiscoveryManager.GetDiscoveredNodes
}

override val supervisorStrategy: OneForOneStrategy =
OneForOneStrategy() {
Expand Down Expand Up @@ -82,14 +84,18 @@ class PeerManagerActor(
log.info("Maximum number of connected peers reached. Not connecting to {}", msg.uri)
}

case ScanBootstrapNodes =>
val nodesToConnect = bootstrapNodes
.map(new URI(_))
.filterNot { uri => isConnectionHandled(new InetSocketAddress(uri.getHost, uri.getPort)) }
case PeerDiscoveryManager.DiscoveredNodes(nodes) =>
val peerAddresses = peers.values.map(_.remoteAddress).toSet

val nodesToConnect = nodes
.filterNot(n => peerAddresses.contains(n.addr)) // not already connected to
.toSeq
.sortBy(-_.addTimestamp)
.take(peerConfiguration.maxPeers - peerAddresses.size)

if (nodesToConnect.nonEmpty) {
log.info("Trying to connect to {} bootstrap nodes", nodesToConnect.size)
nodesToConnect.foreach(self ! ConnectToPeer(_))
log.info("Trying to connect to {} nodes", nodesToConnect.size)
nodesToConnect.foreach(n => self ! ConnectToPeer(n.toUri))
}
}

Expand All @@ -114,7 +120,7 @@ class PeerManagerActor(
}

def tryingToConnect: Receive = handleCommonMessages orElse {
case _: HandlePeerConnection | _: ConnectToPeer | ScanBootstrapNodes =>
case _: HandlePeerConnection | _: ConnectToPeer | PeerDiscoveryManager.DiscoveredNodes =>
stash()

case (HandlePeerConnection(connection, remoteAddress), Success(_)) =>
Expand Down Expand Up @@ -200,25 +206,14 @@ class PeerManagerActor(

object PeerManagerActor {
def props[R <: HandshakeResult](nodeStatusHolder: Agent[NodeStatus],
peerDiscoveryManager: ActorRef,
peerConfiguration: PeerConfiguration,
peerEventBus: ActorRef,
handshaker: Handshaker[R],
authHandshaker: AuthHandshaker,
messageDecoder: MessageDecoder): Props =
Props(new PeerManagerActor(peerEventBus, peerConfiguration,
peerFactory(nodeStatusHolder, peerConfiguration, peerEventBus, handshaker, authHandshaker, messageDecoder)))

def props[R <: HandshakeResult](nodeStatusHolder: Agent[NodeStatus],
peerConfiguration: PeerConfiguration,
bootstrapNodes: Set[String],
peerMessageBus: ActorRef,
handshaker: Handshaker[R],
authHandshaker: AuthHandshaker,
messageDecoder: MessageDecoder): Props =
Props(new PeerManagerActor(peerMessageBus, peerConfiguration,
peerFactory = peerFactory(nodeStatusHolder, peerConfiguration, peerMessageBus, handshaker, authHandshaker, messageDecoder),
bootstrapNodes = bootstrapNodes)
)
Props(new PeerManagerActor(peerMessageBus, peerDiscoveryManager, peerConfiguration,
peerFactory = peerFactory(nodeStatusHolder, peerConfiguration, peerMessageBus, handshaker, authHandshaker, messageDecoder)))

def peerFactory[R <: HandshakeResult](nodeStatusHolder: Agent[NodeStatus],
peerConfiguration: PeerConfiguration,
Expand All @@ -243,6 +238,8 @@ object PeerManagerActor {
val rlpxConfiguration: RLPxConfiguration
val maxPeers: Int
val networkId: Int
val updateNodesInitialDelay: FiniteDuration
val updateNodesInterval: FiniteDuration
}

trait FastSyncHostConfiguration {
Expand All @@ -263,7 +260,5 @@ object PeerManagerActor {
def handshaked: Seq[Peer] = peers.collect{ case (peer, Handshaked) => peer }.toSeq
}

private case object ScanBootstrapNodes

case class SendMessage(message: MessageSerializable, peerId: PeerId)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.iohk.ethereum.network.discovery

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

case class DiscoveryConfig(
interface: String,
port: Int,
bootstrapNodes: Set[String],
nodesLimit: Int /* TODO: remove once proper discovery protocol is in place */,
scanMaxNodes: Int /* TODO: remove once proper discovery protocol is in place */,
scanInitialDelay: FiniteDuration,
scanInterval: FiniteDuration,
messageExpiration: FiniteDuration)

object DiscoveryConfig {
def apply(etcClientConfig: com.typesafe.config.Config): DiscoveryConfig = {
import scala.collection.JavaConverters._
val discoveryConfig = etcClientConfig.getConfig("network.discovery")
val bootstrapNodes = discoveryConfig.getStringList("bootstrap-nodes").asScala.toSet
DiscoveryConfig(
interface = discoveryConfig.getString("interface"),
port = discoveryConfig.getInt("port"),
bootstrapNodes = bootstrapNodes,
nodesLimit = discoveryConfig.getInt("nodes-limit"),
scanMaxNodes = discoveryConfig.getInt("scan-max-nodes"),
scanInitialDelay = discoveryConfig.getDuration("scan-initial-delay").toMillis.millis,
scanInterval = discoveryConfig.getDuration("scan-interval").toMillis.millis,
messageExpiration = discoveryConfig.getDuration("message-expiration").toMillis.millis)
}
}

0 comments on commit c585470

Please sign in to comment.