diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala index 0c5797f939..82dc089cb4 100644 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala +++ b/src/main/scala/io/iohk/ethereum/blockchain/sync/PeersClient.scala @@ -1,7 +1,5 @@ package io.iohk.ethereum.blockchain.sync -import java.util.Collections.newSetFromMap - import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef @@ -9,12 +7,9 @@ import akka.actor.Cancellable import akka.actor.Props import akka.actor.Scheduler -import scala.collection.mutable import scala.concurrent.ExecutionContext -import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag -import io.iohk.ethereum.blockchain.PeerComparator import io.iohk.ethereum.blockchain.sync.Blacklist.BlacklistReason import io.iohk.ethereum.blockchain.sync.PeerListSupportNg.PeerWithInfo import io.iohk.ethereum.network.EtcPeerManagerActor.PeerInfo @@ -44,10 +39,6 @@ class PeersClient( val statusSchedule: Cancellable = scheduler.scheduleWithFixedDelay(syncConfig.printStatusInterval, syncConfig.printStatusInterval, self, PrintStatus) - val numberOfPeersToFetchFrom = syncConfig.peersToFetchFrom - - val activeFetchingNodes: mutable.Set[PeerWithInfo] = lruSet(numberOfPeersToFetchFrom) - def receive: Receive = running(Map()) override def postStop(): Unit = { @@ -104,8 +95,7 @@ class PeersClient( private def selectPeer(peerSelector: PeerSelector): Option[Peer] = peerSelector match { - case BestPeer => bestPeer(peersToDownloadFrom) - case NextBestPeer => nextBestPeer(peersToDownloadFrom, activeFetchingNodes.toSet) + case BestPeer => bestPeer(peersToDownloadFrom) } private def responseClassTag[RequestMsg <: Message](requestMsg: RequestMsg): ClassTag[_ <: Message] = @@ -154,11 +144,6 @@ object PeersClient { type Requesters = Map[ActorRef, ActorRef] - private def lruSet[A](maxEntries: Int): mutable.Set[A] = - newSetFromMap[A](new java.util.LinkedHashMap[A, java.lang.Boolean]() { - override def removeEldestEntry(eldest: java.util.Map.Entry[A, java.lang.Boolean]): Boolean = size > maxEntries - }).asScala - sealed trait PeersClientMessage case class BlacklistPeer(peerId: PeerId, reason: BlacklistReason) extends PeersClientMessage case class Request[RequestMsg <: Message]( @@ -192,7 +177,6 @@ object PeersClient { sealed trait PeerSelector case object BestPeer extends PeerSelector - case object NextBestPeer extends PeerSelector def bestPeer(peersToDownloadFrom: Map[PeerId, PeerWithInfo]): Option[Peer] = { val peersToUse = peersToDownloadFrom.values @@ -207,26 +191,4 @@ object PeersClient { None } } - - //returns the next best peer after the one already returned previously - //TODO: whenever this method is called - do activeFetchingNodes.add(_) on the peer returned - def nextBestPeer( - peersToDownloadFrom: Map[PeerId, PeerWithInfo], - activeFetchingNodes: Set[PeerWithInfo] - ): Option[Peer] = { - val peersToUse = peersToDownloadFrom.values - .collect { case PeerWithInfo(peer, peerInfo @ PeerInfo(_, _, true, _, _)) => - (peer, peerInfo) - } - - val peer = - peersToUse - .filterNot { case (peer, _) => activeFetchingNodes.map(_.peer).contains(peer) } - .filterNot { case (_, peerInfo) => - activeFetchingNodes.map(_.peerInfo).exists(PeerComparator.doPeersHaveSameBestBlock(peerInfo, _)) - } - .maxByOption { case (_, peerInfo) => peerInfo.chainWeight } - .map { case (peer, _) => peer } - peer - } } diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/ExecutionSync.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/ExecutionSync.scala deleted file mode 100644 index a2a06092af..0000000000 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/ExecutionSync.scala +++ /dev/null @@ -1,80 +0,0 @@ -package io.iohk.ethereum.blockchain.sync.regular - -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.AllForOneStrategy -import akka.actor.Props -import akka.actor.Scheduler -import akka.actor.SupervisorStrategy - -import io.iohk.ethereum.blockchain.sync.SyncProtocol -import io.iohk.ethereum.domain.BlockchainReader -import io.iohk.ethereum.network.Peer -import io.iohk.ethereum.network.PeerEventBusActor.PeerSelector -import io.iohk.ethereum.network.PeerEventBusActor.Subscribe -import io.iohk.ethereum.network.PeerEventBusActor.SubscriptionClassifier.MessageClassifier -import io.iohk.ethereum.network.p2p.messages.Codes - -class ExecutionSync( - fetcherService: FetcherService, - peersClient: ActorRef, - peerEventBus: ActorRef, - blockchainReader: BlockchainReader, - scheduler: Scheduler -) extends Actor - with ActorLogging { - - override def receive: Receive = { - - case SyncProtocol.Start => - log.info("Starting regular sync") - peerEventBus ! Subscribe( - MessageClassifier( - Set(Codes.NewBlockCode, Codes.NewBlockHashesCode, Codes.BlockHeadersCode), - PeerSelector.AllPeers - ) - ) - val listOfPeersToFetchFromWithBestBlock: List[(Peer, Int)] = ??? //peerClient.get - val startFrom: BigInt = blockchainReader.getBestBlockNumber() - listOfPeersToFetchFromWithBestBlock.map { case (peerId, peerBestBlockNumber) => - fetcherService.fetchBlocksUntil(peerId, Left(startFrom), Left(peerBestBlockNumber)) - } - - //pass peers containing fetchedBlocks streams to the ChainManager - //TODO: how to run it in the loop? - - case SyncProtocol.MinedBlock(block) => - log.info(s"Block mined [number = {}, hash = {}]", block.number, block.header.hashAsHexString) - //pass mined block to the ChainManager? - - //TODO: add handling of messages from peerEventBus and passing it onto ChainManager - - //TODO: handle messages from ChainManagerService - } - - override def supervisorStrategy: SupervisorStrategy = AllForOneStrategy()(SupervisorStrategy.defaultDecider) - - override def postStop(): Unit = - log.info("Execution Sync stopped") -} - -object ExecutionSync { - // scalastyle:off parameter.number - def props( - fetcherService: FetcherService, - peersClient: ActorRef, - peerEventBus: ActorRef, - blockchainReader: BlockchainReader, - scheduler: Scheduler - ): Props = - Props( - new ExecutionSync( - fetcherService, - peersClient, - peerEventBus, - blockchainReader, - scheduler - ) - ) -} diff --git a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/FetcherService.scala b/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/FetcherService.scala deleted file mode 100644 index 387f7d9ae1..0000000000 --- a/src/main/scala/io/iohk/ethereum/blockchain/sync/regular/FetcherService.scala +++ /dev/null @@ -1,92 +0,0 @@ -package io.iohk.ethereum.blockchain.sync.regular - -import akka.util.ByteString - -import cats.data.EitherT -import cats.implicits._ - -import monix.eval.Task - -import scala.annotation.tailrec - -import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.RequestFailed -import io.iohk.ethereum.consensus.validators.BlockValidator -import io.iohk.ethereum.domain.Block -import io.iohk.ethereum.domain.BlockBody -import io.iohk.ethereum.domain.BlockHeader -import io.iohk.ethereum.domain.BlockchainReader -import io.iohk.ethereum.network.Peer -import io.iohk.ethereum.network.p2p.messages.ETH62.BlockBodies -import io.iohk.ethereum.network.p2p.messages.ETH62.BlockHeaders -import io.iohk.ethereum.utils.Config.SyncConfig - -//not used atm, a part of the future ExecutionSync -class FetcherService(validator: BlockValidator, blockchainReader: BlockchainReader, syncConfig: SyncConfig) { - val batchSize = syncConfig.blockHeadersPerRequest - - private def requestHeaders( - block: Either[BigInt, ByteString], - amount: BigInt - ): Task[Either[RequestFailed, BlockHeaders]] = ??? - - private def requestBodies(hashes: Seq[ByteString]): Task[Either[RequestFailed, BlockBodies]] = ??? - -//TODO: add private def requestStateNode(hash: ByteString): Task[Either[RequestFailed, Seq[ByteString]]] = ??? - - private def placeBlockInPeerStream(block: Block, peer: Peer): Peer = ??? - - def fetchBlocksUntil( - peer: Peer, - startFromBlock: Either[BigInt, ByteString], - fetchUntilBlock: Either[BigInt, ByteString] - ): EitherT[Task, RequestFailed, Unit] = { - val endNumber: Option[BigInt] = - fetchUntilBlock.fold(Some(_), blockchainReader.getBlockHeaderByHash(_).map(_.number)) - val startNumber: Option[BigInt] = - startFromBlock.fold(Some(_), blockchainReader.getBlockHeaderByHash(_).map(_.number)) - - //TODO: make sure negatives are not possible - val startBatchBlocks: Option[Seq[BigInt]] = - for { - start <- startNumber - end <- endNumber - } yield start.to(end, batchSize) - - startBatchBlocks match { - case None => EitherT.leftT(RequestFailed(peer, "Couldn't find blocks to fetch")) - case Some(seq) => seq.traverse(num => fetchBlocks(peer, batchSize, Left(num))).map(_ => ()) - } - } - - private def fetchBlocks( - peer: Peer, - amount: BigInt, - block: Either[BigInt, ByteString] - ): EitherT[Task, RequestFailed, Peer] = - for { - headers <- EitherT(requestHeaders(block, amount)) - bodies <- EitherT(requestBodies(headers.headers.map(_.hash))) - blocks <- EitherT.fromOption[Task]( - bodiesAreOrderedSubsetOfRequested(headers.headers, bodies.bodies), - RequestFailed(peer, "Unmatching bodies") - ) - _ = blocks.foreach(placeBlockInPeerStream(_, peer)) - } yield peer - - // Checks that the received block bodies are an ordered subset of the ones requested - @tailrec - private def bodiesAreOrderedSubsetOfRequested( - requestedHeaders: Seq[BlockHeader], - respondedBodies: Seq[BlockBody], - matchedBlocks: Seq[Block] = Nil - ): Option[Seq[Block]] = - (requestedHeaders, respondedBodies) match { - case (Seq(), _ +: _) => None - case (_, Seq()) => Some(matchedBlocks) - case (header +: remainingHeaders, body +: remainingBodies) => - if (validator.validateHeaderAndBody(header, body).isRight) - bodiesAreOrderedSubsetOfRequested(remainingHeaders, remainingBodies, matchedBlocks :+ Block(header, body)) - else - bodiesAreOrderedSubsetOfRequested(remainingHeaders, respondedBodies, matchedBlocks) - } -} diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/PeersClientSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/PeersClientSpec.scala index f0f881b2f8..50a61fd78b 100644 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/PeersClientSpec.scala +++ b/src/test/scala/io/iohk/ethereum/blockchain/sync/PeersClientSpec.scala @@ -6,8 +6,6 @@ import akka.actor.ActorSystem import akka.testkit.TestProbe import akka.util.ByteString -import scala.collection.mutable - import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableFor3 @@ -24,109 +22,55 @@ import io.iohk.ethereum.network.p2p.messages.Capability class PeersClientSpec extends AnyFlatSpec with Matchers with ScalaCheckPropertyChecks { import Peers._ - val table: TableFor3[Map[PeerId, PeerWithInfo], Option[Peer], String] = - Table[Map[PeerId, PeerWithInfo], Option[Peer], String]( - ("PeerInfo map", "Expected best peer", "Scenario info (selected peer)"), - ( - Map(), - None, - "No peers" - ), - ( - Map(peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100, fork = false))), - None, - "Single peer" - ), - ( - Map( - peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100, fork = false)), - peer2.id -> PeerWithInfo(peer2, peerInfo(0, 50, fork = true)) + + "PeerClient" should "determine the best peer based on its latest checkpoint number and total difficulty" in { + val table: TableFor3[Map[PeerId, PeerWithInfo], Option[Peer], String] = + Table[Map[PeerId, PeerWithInfo], Option[Peer], String]( + ("PeerInfo map", "Expected best peer", "Scenario info (selected peer)"), + ( + Map(), + None, + "No peers" ), - Some(peer2), - "Peer2 with lower TD but following the ETC fork" - ), - ( - Map(peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), peer2.id -> PeerWithInfo(peer2, peerInfo(0, 101))), - Some(peer2), - "Peer2 with higher TD" - ), - ( - Map( - peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), - peer2.id -> PeerWithInfo(peer2, peerInfo(0, 101)), - peer3.id -> PeerWithInfo(peer3, peerInfo(1, 50)) + ( + Map(peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100, fork = false))), + None, + "Single peer" ), - Some(peer3), - "Peer3 with lower TD but higher checkpoint number" - ), - ( - Map( - peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), - peer2.id -> PeerWithInfo(peer2, peerInfo(4, 101)), - peer3.id -> PeerWithInfo(peer3, peerInfo(4, 50)) + ( + Map( + peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100, fork = false)), + peer2.id -> PeerWithInfo(peer2, peerInfo(0, 50, fork = true)) + ), + Some(peer2), + "Peer2 with lower TD but following the ETC fork" ), - Some(peer2), - "Peer2 with equal checkpoint number and higher TD" - ) - ) - - "PeerClient" should "determine the best peer based on its latest checkpoint number and total difficulty" in { - forAll(table) { (peerInfoMap, expectedPeer, _) => - PeersClient.bestPeer(peerInfoMap) shouldEqual expectedPeer - } - } - - it should "determine the next best peer (same as bestPeer when lru set is empty)" in { - forAll(table) { (peerInfoMap, expectedPeer, _) => - PeersClient.nextBestPeer(peerInfoMap, Set.empty) shouldEqual expectedPeer - } - } - - it should "determine the next best peer with a different best block each time" in { - val table = Table[Map[PeerId, PeerWithInfo], Option[PeerWithInfo], Option[Peer], String]( - ("PeerInfo map", "Used best peer", "Expected best peer", "Scenario info (selected peer)"), - ( - Map(), - None, - None, - "No peers" - ), - ( - Map(peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100, fork = false))), - None, - None, - "Single peer" - ), - ( - Map( - peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100, fork = false)), - peer2.id -> PeerWithInfo(peer2, peerInfo(0, 50, fork = true)) + ( + Map(peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), peer2.id -> PeerWithInfo(peer2, peerInfo(0, 101))), + Some(peer2), + "Peer2 with higher TD" ), - Some(PeerWithInfo(peer2, peerInfo(0, 50, fork = true))), - None, - "Peer2 with lower TD but following the ETC fork, peer2 is already used" - ), - ( - Map(peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), peer2.id -> PeerWithInfo(peer2, peerInfo(0, 101))), - Some(PeerWithInfo(peer2, peerInfo(0, 101))), - None, - "Both peer are used" - ), - ( - Map( - peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), - peer2.id -> PeerWithInfo(peer2, peerInfo(1, 50)), - peer3.id -> PeerWithInfo(peer3, peerInfo(0, 80).copy(bestBlockHash = ByteString("differenthash"))) + ( + Map( + peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), + peer2.id -> PeerWithInfo(peer2, peerInfo(0, 101)), + peer3.id -> PeerWithInfo(peer3, peerInfo(1, 50)) + ), + Some(peer3), + "Peer3 with lower TD but higher checkpoint number" ), - Some(PeerWithInfo(peer2, peerInfo(1, 50))), - Some(peer3), - "Peer2 with lower TD but higher checkpoint number, peer 1 and 2 are used" + ( + Map( + peer1.id -> PeerWithInfo(peer1, peerInfo(0, 100)), + peer2.id -> PeerWithInfo(peer2, peerInfo(4, 101)), + peer3.id -> PeerWithInfo(peer3, peerInfo(4, 50)) + ), + Some(peer2), + "Peer2 with equal checkpoint number and higher TD" + ) ) - ) - val activeFetchingNodes: mutable.Set[PeerWithInfo] = mutable.Set.empty - forAll(table) { (peerInfoMap, usedPeerWithInfo, expectedPeer, _) => - usedPeerWithInfo.map(activeFetchingNodes.add) - PeersClient.nextBestPeer(peerInfoMap, activeFetchingNodes.toSet) shouldEqual expectedPeer + forAll(table) { (peerInfoMap, expectedPeer, _) => + PeersClient.bestPeer(peerInfoMap) shouldEqual expectedPeer } } diff --git a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/FetcherServiceSpec.scala b/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/FetcherServiceSpec.scala deleted file mode 100644 index c4f3e120a6..0000000000 --- a/src/test/scala/io/iohk/ethereum/blockchain/sync/regular/FetcherServiceSpec.scala +++ /dev/null @@ -1,94 +0,0 @@ -package io.iohk.ethereum.blockchain.sync.regular - -import java.net.InetSocketAddress - -import akka.actor.ActorSystem -import akka.testkit.TestKit -import akka.testkit.TestProbe -import akka.util.ByteString - -import cats.data.EitherT - -import monix.eval.Task - -import org.scalamock.scalatest.MockFactory -import org.scalatest.PrivateMethodTester -import org.scalatest.flatspec.AnyFlatSpecLike -import org.scalatest.matchers.should.Matchers - -import io.iohk.ethereum.Fixtures.Blocks.ValidBlock -import io.iohk.ethereum.Mocks.MockValidatorsAlwaysSucceed -import io.iohk.ethereum.Mocks.MockValidatorsFailingOnBlockBodies -import io.iohk.ethereum.blockchain.sync.PeerRequestHandler.RequestFailed -import io.iohk.ethereum.blockchain.sync.TestSyncConfig -import io.iohk.ethereum.consensus.validators.BlockValidator -import io.iohk.ethereum.domain.Block -import io.iohk.ethereum.domain.BlockBody -import io.iohk.ethereum.domain.BlockHeader -import io.iohk.ethereum.domain.BlockchainReader -import io.iohk.ethereum.network.Peer -import io.iohk.ethereum.network.PeerId -import io.iohk.ethereum.utils.Hex - -class FetcherServiceSpec - extends TestKit(ActorSystem("FetcherServiceSpec_System")) - with AnyFlatSpecLike - with Matchers - with PrivateMethodTester - with MockFactory - with TestSyncConfig { - - val genesisHash: ByteString = ByteString( - Hex.decode("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f") - ) - - //TODO: add a happy path scenario once the implementation for fetching is finished - "FetcherService" should "return RequestFailed when asking for a not existing blocks" in { - val blockchainReader: BlockchainReader = mock[BlockchainReader] - (blockchainReader.getBlockHeaderByHash _).expects(*).returning(None).anyNumberOfTimes() - val blockValidator: BlockValidator = mock[BlockValidator] - val fetcherService = new FetcherService(blockValidator, blockchainReader, syncConfig) - val peerProbe: TestProbe = TestProbe() - val peer = Peer(PeerId("peerId"), new InetSocketAddress(9191), peerProbe.ref, true) - val result = fetcherService.fetchBlocksUntil(peer, Right(ByteString("byteString")), Right(ByteString("byteString"))) - result.value shouldBe Task.now(Left(RequestFailed(peer, "Couldn't find blocks to fetch"))) - } - - //TODO: enable when the implementation for fetching is finished - it should "return an error when request to fetch headers in fetchBlocks fails" ignore { - val blockchainReader: BlockchainReader = mock[BlockchainReader] - (blockchainReader.getHashByBlockNumber _).expects(*, *).returning(Some(genesisHash)) - val blockValidator: BlockValidator = mock[BlockValidator] - val fetcherService = new FetcherService(blockValidator, blockchainReader, syncConfig) - val fetchBlocks = PrivateMethod[EitherT[Task, RequestFailed, Peer]]('fetchBlocks) - val eitherPeerOrError = fetcherService.invokePrivate(fetchBlocks()) - assert(eitherPeerOrError === RequestFailed) - } - - it should "fail to verify that bodies are a subset of headers if they don't match" in { - val blockchainReader: BlockchainReader = mock[BlockchainReader] - // Here we are forcing the mismatch between request headers and received bodies - val blockValidator = new MockValidatorsFailingOnBlockBodies - val fetcherService = new FetcherService(blockValidator.blockValidator, blockchainReader, syncConfig) - val bodiesAreOrderedSubsetOfRequested = PrivateMethod[Option[Seq[Block]]]('bodiesAreOrderedSubsetOfRequested) - val blocks = Seq(Block(ValidBlock.header.copy(number = 97), ValidBlock.body)) - val headers: Seq[BlockHeader] = blocks.map(_.header) - val bodies: Seq[BlockBody] = blocks.map(_.body) - val verifiedBlocks: Option[Seq[Block]] = - fetcherService.invokePrivate(bodiesAreOrderedSubsetOfRequested(headers, bodies, Nil)) - verifiedBlocks shouldBe None - } - - it should "verify that bodies are a subset of headers" in { - val blockchainReader: BlockchainReader = mock[BlockchainReader] - val blockValidator = new MockValidatorsAlwaysSucceed - val fetcherService = new FetcherService(blockValidator.blockValidator, blockchainReader, syncConfig) - val bodiesAreOrderedSubsetOfRequested = PrivateMethod[Option[Seq[Block]]]('bodiesAreOrderedSubsetOfRequested) - val blocks = Seq(Block(ValidBlock.header.copy(number = 97), ValidBlock.body)) - val headers: Seq[BlockHeader] = blocks.map(_.header) - val bodies: Seq[BlockBody] = blocks.map(_.body) - val verifiedBlocks: Option[Seq[Block]] = - fetcherService.invokePrivate(bodiesAreOrderedSubsetOfRequested(headers, bodies, Nil)) - verifiedBlocks shouldBe Some(blocks) - } -}