Skip to content

Commit

Permalink
Merge branch 'phase/beta1' of github.com:input-output-hk/etc-client i…
Browse files Browse the repository at this point in the history
…nto fix/handleInvalidMAC
  • Loading branch information
Nicolas Tallar committed Aug 1, 2017
2 parents 8c8ff34 + c0db1a5 commit 5109cd2
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 28 deletions.
6 changes: 5 additions & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ grothendieck {
# Enabled JSON-RPC APIs over the HTTP endpoint
# Available choices are: eth, web3, net, personal
apis = "eth,web3,net"

net {
peer-manager-timeout = 5.seconds
}
}
}

Expand Down Expand Up @@ -339,7 +343,7 @@ grothendieck {
# Time at which a filter remains valid
filter-timeout = 10.minutes

filter-manager-query-timeout = 3.seconds
filter-manager-query-timeout = 3.minutes
}

}
Expand Down
29 changes: 17 additions & 12 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,10 @@ trait FastSync {

syncStateStorageActor ! fastSyncStateStorage

private val syncStatePersistCancellable =
scheduler.schedule(persistStateSnapshotInterval, persistStateSnapshotInterval) {
syncStateStorageActor ! SyncState(
initialSyncState.targetBlock,
requestedMptNodes.values.flatten.toSeq.distinct ++ mptNodesQueue,
requestedNonMptNodes.values.flatten.toSeq.distinct ++ nonMptNodesQueue,
requestedBlockBodies.values.flatten.toSeq.distinct ++ blockBodiesQueue,
requestedReceipts.values.flatten.toSeq.distinct ++ receiptsQueue,
downloadedNodesCount,
bestBlockHeaderNumber)
}

private val syncStatePersistCancellable = scheduler.schedule(persistStateSnapshotInterval, persistStateSnapshotInterval, self, PersistSyncState)
private val heartBeat = scheduler.schedule(syncRetryInterval, syncRetryInterval * 2, self, ProcessSyncing)

// scalastyle:off cyclomatic.complexity
def receive: Receive = handlePeerUpdates orElse {
case EnqueueNodes(hashes) =>
hashes.foreach {
Expand Down Expand Up @@ -229,6 +219,20 @@ trait FastSync {

case PrintStatus =>
printStatus()

case PersistSyncState =>
persistSyncState()
}

private def persistSyncState(): Unit = {
syncStateStorageActor ! SyncState(
initialSyncState.targetBlock,
requestedMptNodes.values.flatten.toSeq.distinct ++ mptNodesQueue,
requestedNonMptNodes.values.flatten.toSeq.distinct ++ nonMptNodesQueue,
requestedBlockBodies.values.flatten.toSeq.distinct ++ blockBodiesQueue,
requestedReceipts.values.flatten.toSeq.distinct ++ receiptsQueue,
downloadedNodesCount,
bestBlockHeaderNumber)
}

private def printStatus() = {
Expand Down Expand Up @@ -427,6 +431,7 @@ object FastSync {
private case object TargetBlockTimeout

private case object ProcessSyncing
private case object PersistSyncState

case class SyncState(
targetBlock: BlockHeader,
Expand Down
15 changes: 13 additions & 2 deletions src/main/scala/io/iohk/ethereum/jsonrpc/NetService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.iohk.ethereum.jsonrpc
import akka.actor.ActorRef
import akka.agent.Agent
import akka.util.Timeout
import io.iohk.ethereum.jsonrpc.NetService.NetServiceConfig
import io.iohk.ethereum.network.PeerManagerActor
import io.iohk.ethereum.utils.ServerStatus.{Listening, NotListening}
import io.iohk.ethereum.utils.{Config, NodeStatus}
Expand All @@ -20,9 +21,19 @@ object NetService {

case class PeerCountRequest()
case class PeerCountResponse(value: Int)

case class NetServiceConfig(peerManagerTimeout: FiniteDuration)

object NetServiceConfig {
def apply(etcClientConfig: com.typesafe.config.Config): NetServiceConfig = {
val netServiceConfig = etcClientConfig.getConfig("network.rpc.net")
NetServiceConfig(
peerManagerTimeout = netServiceConfig.getDuration("peer-manager-timeout").toMillis.millis)
}
}
}

class NetService(nodeStatusHolder: Agent[NodeStatus], peerManager: ActorRef) {
class NetService(nodeStatusHolder: Agent[NodeStatus], peerManager: ActorRef, config: NetServiceConfig) {
import NetService._

def version(req: VersionRequest): ServiceResponse[VersionResponse] = {
Expand All @@ -42,7 +53,7 @@ class NetService(nodeStatusHolder: Agent[NodeStatus], peerManager: ActorRef) {

def peerCount(req: PeerCountRequest): ServiceResponse[PeerCountResponse] = {
import akka.pattern.ask
implicit val timeout = Timeout(2.seconds)
implicit val timeout = Timeout(config.peerManagerTimeout)

(peerManager ? PeerManagerActor.GetPeers)
.mapTo[PeerManagerActor.Peers]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.agent.Agent
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.PeerDisconnected
import io.iohk.ethereum.network.PeerEventBusActor.Publish
import io.iohk.ethereum.network.discovery.PeerDiscoveryManager
Expand All @@ -21,7 +20,8 @@ import io.iohk.ethereum.network.handshaker.Handshaker.HandshakeResult
import io.iohk.ethereum.network.p2p.{MessageDecoder, MessageSerializable}
import io.iohk.ethereum.network.rlpx.AuthHandshaker
import io.iohk.ethereum.network.rlpx.RLPxConnectionHandler.RLPxConfiguration
import io.iohk.ethereum.utils.NodeStatus

import scala.util.{Failure, Success}

class PeerManagerActor(
peerEventBus: ActorRef,
Expand Down Expand Up @@ -161,8 +161,9 @@ class PeerManagerActor(
Future.traverse(peers.values) { peer =>
(peer.ref ? PeerActor.GetStatus)
.mapTo[PeerActor.StatusResponse]
.map(sr => (peer, sr.status))
}.map(r => Peers.apply(r.toMap))
.map { sr => Success((peer, sr.status)) }
.recover { case ex => Failure(ex) }
}.map(r => Peers.apply(r.collect { case Success(v) => v }.toMap))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ class PeerDiscoveryManager(

if (discoveryConfig.discoveryEnabled) {
discoveryListener ! DiscoveryListener.Subscribe

context.system.scheduler.schedule(discoveryConfig.scanInitialDelay, discoveryConfig.scanInterval) {
scan()
}
context.system.scheduler.schedule(discoveryConfig.scanInitialDelay, discoveryConfig.scanInterval, self, Scan)
}

def scan(): Unit = {
Expand Down Expand Up @@ -78,6 +75,8 @@ class PeerDiscoveryManager(

case GetDiscoveredNodes =>
sender() ! DiscoveredNodes(nodes.values.toSet)

case Scan => scan()
}

private def sendPing(toNodeId: ByteString, toAddr: InetSocketAddress): Unit = {
Expand Down Expand Up @@ -130,4 +129,6 @@ object PeerDiscoveryManager {

case object GetDiscoveredNodes
case class DiscoveredNodes(nodes: Set[Node])

private case object Scan
}
9 changes: 5 additions & 4 deletions src/main/scala/io/iohk/ethereum/nodebuilder/NodeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import io.iohk.ethereum.db.components.{SharedLevelDBDataSources, Storages}
import io.iohk.ethereum.db.storage.AppStateStorage
import io.iohk.ethereum.db.storage.pruning.PruningMode
import io.iohk.ethereum.domain.{Blockchain, BlockchainImpl}
import io.iohk.ethereum.jsonrpc.NetService.NetServiceConfig
import io.iohk.ethereum.ledger.{Ledger, LedgerImpl}
import io.iohk.ethereum.network.{PeerManagerActor, ServerActor}
import io.iohk.ethereum.jsonrpc._
Expand Down Expand Up @@ -226,7 +227,9 @@ trait Web3ServiceBuilder {
trait NetServiceBuilder {
this: PeerManagerActorBuilder with NodeStatusBuilder =>

lazy val netService = new NetService(nodeStatusHolder, peerManager)
lazy val netServiceConfig = NetServiceConfig(Config.config)

lazy val netService = new NetService(nodeStatusHolder, peerManager, netServiceConfig)
}

trait PendingTransactionsManagerBuilder {
Expand Down Expand Up @@ -259,9 +262,7 @@ trait FilterManagerBuilder {
keyStore,
pendingTransactionsManager,
filterConfig,
txPoolConfig
)
)
txPoolConfig), "filter-manager")
}

trait BlockGeneratorBuilder {
Expand Down
3 changes: 2 additions & 1 deletion src/test/scala/io/iohk/ethereum/jsonrpc/NetServiceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.iohk.ethereum.utils.{NodeStatus, ServerStatus}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class NetServiceSpec extends FlatSpec with Matchers with ScalaFutures with NormalPatience with SecureRandomBuilder {
Expand Down Expand Up @@ -46,7 +47,7 @@ class NetServiceSpec extends FlatSpec with Matchers with ScalaFutures with Norma

val nodeStatus = NodeStatus(crypto.generateKeyPair(secureRandom), ServerStatus.Listening(new InetSocketAddress(9000)),
discoveryStatus = ServerStatus.NotListening)
val netService = new NetService(Agent(nodeStatus), peerManager.ref)
val netService = new NetService(Agent(nodeStatus), peerManager.ref, NetServiceConfig(5.seconds))
}

}

0 comments on commit 5109cd2

Please sign in to comment.