Skip to content

Commit

Permalink
Merge branch 'phase/daedalus' of github.com:input-output-hk/mantis in…
Browse files Browse the repository at this point in the history
…to fix/validateReceipts
  • Loading branch information
Nicolas Tallar committed Aug 16, 2017
2 parents 6fc6e00 + 6a0e219 commit 5fac2d7
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ mantis {
# size of the list that keeps track of peers that are failing to provide us with mpt node
# we switch them to download only blockchain elements
fastsync-block-chain-only-peers-pool = 100

# time between 2 consecutive requests to peer when doing fast sync, this is to prevent flagging us as spammer
fastsync-throttle = 2 seconds
}

pruning {
Expand Down
16 changes: 15 additions & 1 deletion src/main/scala/io/iohk/ethereum/blockchain/sync/FastSync.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.iohk.ethereum.blockchain.sync


import java.time.Instant
import java.util.Date

import akka.actor._
import akka.util.ByteString
import io.iohk.ethereum.domain.{Block, BlockHeader}
Expand Down Expand Up @@ -163,6 +167,7 @@ trait FastSync {
private var bestBlockHeaderNumber: BigInt = initialSyncState.bestBlockHeaderNumber

private var assignedHandlers: Map[ActorRef, Peer] = Map.empty
private var peerRequestsTime: Map[Peer, Instant] = Map.empty

private val syncStateStorageActor = context.actorOf(Props[FastSyncStateActor], "state-storage")

Expand Down Expand Up @@ -210,7 +215,7 @@ trait FastSync {
context unwatch sender()
assignedHandlers -= sender()
cleanupRequestedMaps(sender())
processSyncing()
self ! ProcessSyncing

case Terminated(ref) if assignedHandlers.contains(ref) =>
handleActorTerminate(ref)
Expand Down Expand Up @@ -251,6 +256,8 @@ trait FastSync {

private def handleActorTerminate(ref: ActorRef) = {
context unwatch ref
val peer = assignedHandlers(ref)
peerRequestsTime -= peer
assignedHandlers -= ref
mptNodesQueue ++= requestedMptNodes.getOrElse(ref, Nil)
nonMptNodesQueue ++= requestedNonMptNodes.getOrElse(ref, Nil)
Expand Down Expand Up @@ -364,6 +371,7 @@ trait FastSync {
appStateStorage.fastSyncDone()
context become idle
blockChainOnlyPeers = Seq.empty
peerRequestsTime = Map.empty
self ! FastSyncDone
}

Expand All @@ -383,7 +391,9 @@ trait FastSync {
scheduler.scheduleOnce(syncRetryInterval, self, ProcessSyncing)
}
} else {
val now = Instant.now()
val peers = unassignedPeers
.filter(p => peerRequestsTime.get(p).forall(d => d.plusMillis(fastSyncThrottle.toMillis).isBefore(now)))
val blockChainPeers = blockChainOnlyPeers.toSet
(peers -- blockChainPeers)
.take(maxConcurrentRequests - assignedHandlers.size)
Expand Down Expand Up @@ -422,6 +432,7 @@ trait FastSync {
peer, etcPeerManager, peerEventBus, receiptsToGet, appStateStorage, blockchain, validators.blockValidator))
context watch handler
assignedHandlers += (handler -> peer)
peerRequestsTime += (peer -> Instant.now())
receiptsQueue = remainingReceipts
requestedReceipts += handler -> receiptsToGet
}
Expand All @@ -431,6 +442,7 @@ trait FastSync {
val handler = context.actorOf(SyncBlockBodiesRequestHandler.props(peer, etcPeerManager, peerEventBus, blockBodiesToGet))
context watch handler
assignedHandlers += (handler -> peer)
peerRequestsTime += (peer -> Instant.now())
blockBodiesQueue = remainingBlockBodies
requestedBlockBodies += handler -> blockBodiesToGet
}
Expand All @@ -447,6 +459,7 @@ trait FastSync {
blockHeadersHandlerName)
context watch handler
assignedHandlers += (handler -> peer)
peerRequestsTime += (peer -> Instant.now())
}

def requestNodes(peer: Peer): Unit = {
Expand All @@ -457,6 +470,7 @@ trait FastSync {
blockchainStorages.nodesKeyValueStorageFor(Some(initialSyncState.targetBlock.number))))
context watch handler
assignedHandlers += (handler -> peer)
peerRequestsTime += (peer -> Instant.now())
nonMptNodesQueue = remainingNonMptNodes
mptNodesQueue = remainingMptNodes
requestedMptNodes += handler -> mptNodesToGet
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/io/iohk/ethereum/utils/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ object Config {
val checkForNewBlockInterval: FiniteDuration = syncConfig.getDuration("check-for-new-block-interval").toMillis.millis
val blockResolveDepth: Int = syncConfig.getInt("block-resolving-depth")
val blockChainOnlyPeersPoolSize: Int = syncConfig.getInt("fastsync-block-chain-only-peers-pool")
val fastSyncThrottle: FiniteDuration = syncConfig.getDuration("fastsync-throttle").toMillis.millis
}

trait DbConfig {
Expand Down
1 change: 1 addition & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mantis {
peers-scan-interval = 500.millis
blacklist-duration = 5.seconds
start-retry-interval = 500.millis
fastsync-throttle = 100.millis
sync-retry-interval = 1.second
peer-response-timeout = 1.second
print-status-interval = 1.hour
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ import akka.actor.{ActorSystem, Props}
import akka.testkit.{TestActorRef, TestProbe}
import akka.util.ByteString
import com.miguno.akka.testing.VirtualTime
import io.iohk.ethereum.{Mocks, Timeouts}
import io.iohk.ethereum.blockchain.sync.FastSync.{StateMptNodeHash, SyncState}
import io.iohk.ethereum.blockchain.sync.SyncController.MinedBlock
import io.iohk.ethereum.domain.{Account, Block, BlockHeader, SignedTransaction}
import io.iohk.ethereum.domain.{Account, Block, BlockHeader}
import io.iohk.ethereum.ledger.{BloomFilter, Ledger}
import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo}
import io.iohk.ethereum.network.PeerEventBusActor.PeerEvent.MessageFromPeer
import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.{MessageClassifier, PeerDisconnectedClassifier}
import io.iohk.ethereum.network.PeerEventBusActor.{PeerSelector, Subscribe, Unsubscribe}
import io.iohk.ethereum.network.EtcPeerManagerActor.{GetHandshakedPeers, HandshakedPeers, PeerInfo}
import io.iohk.ethereum.network.p2p.messages.CommonMessages.{NewBlock, Status}
import io.iohk.ethereum.network.p2p.messages.PV62.{BlockBody, _}
import io.iohk.ethereum.network.p2p.messages.PV63.{GetNodeData, GetReceipts, NodeData, Receipts}
import io.iohk.ethereum.network.{EtcPeerManagerActor, Peer}
import io.iohk.ethereum.ommers.OmmersPool.{AddOmmers, RemoveOmmers}
import io.iohk.ethereum.transactions.PendingTransactionsManager.{AddTransactions, RemoveTransactions}
import io.iohk.ethereum.utils.Config
import io.iohk.ethereum.{Mocks, Timeouts}
import org.scalatest.{FlatSpec, Matchers}
import org.spongycastle.util.encoders.Hex

Expand Down Expand Up @@ -121,19 +121,27 @@ class SyncControllerSpec extends FlatSpec with Matchers {
peerMessageBus.reply(MessageFromPeer(NodeData(Seq(stateMptLeafWithAccount)), peer2.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer2.id))))

//wait for peers throttle
Thread.sleep(Config.Sync.fastSyncThrottle.toMillis)
//trigger scheduling
time.advance(2.second)
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(
GetBlockHeaders(Left(targetBlockHeader.number), expectedTargetBlock - bestBlockHeaderNumber, 0, reverse = false),
peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))
peerMessageBus.reply(MessageFromPeer(BlockHeaders(Seq(targetBlockHeader)), peer2.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))

Thread.sleep(Config.Sync.fastSyncThrottle.toMillis)
time.advance(2.second)
etcPeerManager.expectMsg(
EtcPeerManagerActor.SendMessage(GetReceipts(Seq(targetBlockHeader.hash)), peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(Receipts.code), PeerSelector.WithId(peer2.id))))
peerMessageBus.reply(MessageFromPeer(Receipts(Seq(Nil)), peer2.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(Receipts.code), PeerSelector.WithId(peer2.id))))

Thread.sleep(Config.Sync.fastSyncThrottle.toMillis)
time.advance(2.second)
etcPeerManager.expectMsg(
EtcPeerManagerActor.SendMessage(GetBlockBodies(Seq(targetBlockHeader.hash)), peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockBodies.code), PeerSelector.WithId(peer2.id))))
Expand Down Expand Up @@ -194,19 +202,25 @@ class SyncControllerSpec extends FlatSpec with Matchers {
peerMessageBus.reply(MessageFromPeer(NodeData(Seq(stateMptLeafWithAccount)), peer2.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer2.id))))

Thread.sleep(Config.Sync.fastSyncThrottle.toMillis)
time.advance(2.second)
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(
GetBlockHeaders(Left(targetBlockHeader.number), expectedTargetBlock - bestBlockHeaderNumber, 0, reverse = false),
peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))
peerMessageBus.reply(MessageFromPeer(BlockHeaders(Seq(targetBlockHeader)), peer2.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer2.id))))

Thread.sleep(Config.Sync.fastSyncThrottle.toMillis)
time.advance(2.second)
etcPeerManager.expectMsg(
EtcPeerManagerActor.SendMessage(GetReceipts(Seq(targetBlockHeader.hash)), peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(Receipts.code), PeerSelector.WithId(peer2.id))))
peerMessageBus.reply(MessageFromPeer(Receipts(Seq(Nil)), peer2.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(Receipts.code), PeerSelector.WithId(peer2.id))))

Thread.sleep(2.second.toMillis)
time.advance(2.second)
etcPeerManager.expectMsg(
EtcPeerManagerActor.SendMessage(GetBlockBodies(Seq(targetBlockHeader.hash)), peer2.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockBodies.code), PeerSelector.WithId(peer2.id))))
Expand All @@ -224,6 +238,57 @@ class SyncControllerSpec extends FlatSpec with Matchers {
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockBodies.code), PeerSelector.WithId(peer1.id))))
}

it should "throttle requests to peer" in new TestSetup() {
val peerTestProbe: TestProbe = TestProbe()(system)
val peer = Peer(new InetSocketAddress("127.0.0.1", 0), peerTestProbe.ref, incomingConnection = false)

val expectedTargetBlock = 399500
val targetBlockHeader: BlockHeader = baseBlockHeader.copy(
number = expectedTargetBlock,
stateRoot = ByteString(Hex.decode("deae1dfad5ec8dcef15915811e1f044d2543674fd648f94345231da9fc2646cc")))
val bestBlockHeaderNumber: BigInt = targetBlockHeader.number - 1
storagesInstance.storages.fastSyncStateStorage.putSyncState(SyncState(targetBlockHeader)
.copy(bestBlockHeaderNumber = bestBlockHeaderNumber,
mptNodesQueue = Seq(StateMptNodeHash(targetBlockHeader.stateRoot))))

time.advance(1.seconds)

val peerStatus = Status(1, 1, 20, ByteString("peer2_bestHash"), ByteString("unused"))

etcPeerManager.send(syncController, HandshakedPeers(Map(
peer -> PeerInfo(peerStatus, forkAccepted = true, totalDifficulty = peerStatus.totalDifficulty, maxBlockNumber = 0))))

syncController ! SyncController.StartSync

val stateMptLeafWithAccount =
ByteString(Hex.decode("f86d9e328415c225a782bb339b22acad1c739e42277bc7ef34de3623114997ce78b84cf84a0186cb7d8738d800a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a0c5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"))

val watcher = TestProbe()
watcher.watch(syncController)

etcPeerManager.expectMsg(
EtcPeerManagerActor.SendMessage(GetNodeData(Seq(targetBlockHeader.stateRoot)), peer.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer.id))))
peerMessageBus.reply(MessageFromPeer(NodeData(Seq(stateMptLeafWithAccount)), peer.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(NodeData.code), PeerSelector.WithId(peer.id))))

//trigger scheduling
time.advance(2.second)
etcPeerManager.expectNoMsg()

//wait for peers throttle
Thread.sleep(Config.Sync.fastSyncThrottle.toMillis)

//trigger scheduling again
time.advance(2.second)
etcPeerManager.expectMsg(EtcPeerManagerActor.SendMessage(
GetBlockHeaders(Left(targetBlockHeader.number), expectedTargetBlock - bestBlockHeaderNumber, 0, reverse = false),
peer.id))
peerMessageBus.expectMsg(Subscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id))))
peerMessageBus.reply(MessageFromPeer(BlockHeaders(Seq(targetBlockHeader)), peer.id))
peerMessageBus.expectMsg(Unsubscribe(MessageClassifier(Set(BlockHeaders.code), PeerSelector.WithId(peer.id))))
}

it should "not use (blacklist) a peer that fails to respond within time limit" in new TestSetup() {
val peer2TestProbe: TestProbe = TestProbe()(system)

Expand Down

0 comments on commit 5fac2d7

Please sign in to comment.