diff --git a/scorex-basics/src/main/scala/scorex/consensus/ConsensusModule.scala b/scorex-basics/src/main/scala/scorex/consensus/ConsensusModule.scala index 865659bc..e8a3056d 100644 --- a/scorex-basics/src/main/scala/scorex/consensus/ConsensusModule.scala +++ b/scorex-basics/src/main/scala/scorex/consensus/ConsensusModule.scala @@ -1,9 +1,10 @@ package scorex.consensus -import scorex.account.{PrivateKeyAccount, Account} -import scorex.block.{BlockProcessingModule, Block} -import scorex.transaction.{TransactionModule, History, State} +import scorex.account.{Account, PrivateKeyAccount} +import scorex.block.{Block, BlockProcessingModule} +import scorex.transaction.TransactionModule +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -30,5 +31,10 @@ trait ConsensusModule[ConsensusBlockData] extends BlockProcessingModule[Consensu def generateNextBlock[TT](account: PrivateKeyAccount) (implicit transactionModule: TransactionModule[TT]): Future[Option[Block]] + def generateNextBlocks[T](accounts: Seq[PrivateKeyAccount]) + (implicit transactionModule: TransactionModule[T]): Future[Seq[Block]] = { + Future.sequence(accounts.map(acc => generateNextBlock(acc))).map(_.flatten) + } + def consensusBlockData(block: Block): ConsensusBlockData } diff --git a/scorex-basics/src/main/scala/scorex/utils/utils.scala b/scorex-basics/src/main/scala/scorex/utils/utils.scala new file mode 100644 index 00000000..8167718a --- /dev/null +++ b/scorex-basics/src/main/scala/scorex/utils/utils.scala @@ -0,0 +1,22 @@ +package scorex + +import scala.annotation.tailrec + +import scala.concurrent.duration._ + +package object utils { + + @tailrec + final def untilTimeout[T](timeout: FiniteDuration, + delay: FiniteDuration = 100.milliseconds)(fn: => T): T = { + util.Try { + fn + } match { + case util.Success(x) => x + case _ if timeout > delay => + Thread.sleep(delay.toMillis) + untilTimeout(timeout - delay, delay)(fn) + case util.Failure(e) => throw e + } + } +} diff --git a/src/main/scala/scorex/lagonaki/network/BlockchainSyncer.scala b/src/main/scala/scorex/lagonaki/network/BlockchainSyncer.scala index 3ead4ec0..a3d7d3a5 100644 --- a/src/main/scala/scorex/lagonaki/network/BlockchainSyncer.scala +++ b/src/main/scala/scorex/lagonaki/network/BlockchainSyncer.scala @@ -2,24 +2,20 @@ package scorex.lagonaki.network import java.net.InetSocketAddress -import akka.actor.FSM -import scorex.lagonaki.server.LagonakiApplication +import akka.actor.{ActorRef, FSM} import scorex.block.Block import scorex.lagonaki.network.BlockchainSyncer._ import scorex.lagonaki.network.message.{BlockMessage, GetSignaturesMessage} +import scorex.lagonaki.server.LagonakiApplication -import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ +import scala.util.{Failure, Success} case class NewBlock(block: Block, sender: Option[InetSocketAddress]) - -//todo: reduce boilerplate code -case class BlockchainSyncer(application: LagonakiApplication) extends FSM[Status, Unit] { - - private lazy val networkController = application.networkController +class BlockchainSyncer(application: LagonakiApplication, networkController: ActorRef) extends FSM[Status, Unit] { private val stateTimeout = 1.second @@ -33,75 +29,76 @@ case class BlockchainSyncer(application: LagonakiApplication) extends FSM[Status case Event(Unit, _) => log.info("Initializing") - stay + stay() } when(Syncing) { - case Event(MaxChainScore(scoreOpt), _) => scoreOpt match { - case Some(maxScore) => - val localScore = application.blockchainImpl.score() - log.info(s"maxScore: $maxScore, localScore: $localScore") - if (maxScore > localScore) { + case Event(MaxChainScore(scoreOpt), _) => + processMaxScore( + scoreOpt, + onMax = () => { val sigs = application.blockchainImpl.lastSignatures(application.settings.MaxBlocksChunks) val msg = GetSignaturesMessage(sigs) networkController ! NetworkController.SendMessageToBestPeer(msg) - stay - } else goto(Generating) - - case None => - if (application.settings.offlineGeneration) goto(Generating).using(Unit) else goto(Offline) - } + stay() + } + ) case Event(NewBlock(block, remoteOpt), _) => assert(remoteOpt.isDefined, "Local generation attempt while syncing") processNewBlock(block, remoteOpt) - stay + stay() } when(Generating) { case Event(NewBlock(block, remoteOpt), _) => processNewBlock(block, remoteOpt) - stay - - case Event(MaxChainScore(scoreOpt), _) => scoreOpt match { - case Some(maxScore) => - val localScore = application.blockchainImpl.score() - log.info(s"maxScore: $maxScore, localScore: $localScore") - if (maxScore > localScore) goto(Syncing) - else { + stay() + + case Event(MaxChainScore(scoreOpt), _) => + processMaxScore( + scoreOpt, + onNone = () => { + tryToGenerateABlock() + stay() + }, + onLocal = () => { tryToGenerateABlock() - stay + stay() } - - case None => - tryToGenerateABlock() - stay - } + ) } //common logic for all the states whenUnhandled { - case Event(MaxChainScore(scoreOpt), _) => scoreOpt match { - case Some(maxScore) => - val localScore = application.blockchainImpl.score() - log.info(s"maxScore: $maxScore, localScore: $localScore") - if (maxScore > localScore) goto(Syncing) else goto(Generating) - - case None => - if (application.settings.offlineGeneration) goto(Generating).using(Unit) else goto(Offline) - } + case Event(MaxChainScore(scoreOpt), _) => + processMaxScore(scoreOpt) case Event(GetStatus, _) => sender() ! super.stateName.name - stay + stay() case Event(e, s) => log.warning(s"received unhandled request {$e} in state {$stateName}/{$s}") - stay + stay() } initialize() + def processMaxScore( + scoreOpt: Option[BigInt], + onLocal: () => State = () => goto(Generating), + onMax: () => State = () => goto(Syncing), + onNone: () => State = () => + if (application.settings.offlineGeneration) goto(Generating).using(Unit) else goto(Offline) + ): State = scoreOpt match { + case Some(maxScore) => + val localScore = application.blockchainImpl.score() + log.info(s"maxScore: $maxScore, localScore: $localScore") + if (maxScore > localScore) onMax() else onLocal() + case None => + onNone() + } def processNewBlock(block: Block, remoteOpt: Option[InetSocketAddress]) = { val fromStr = remoteOpt.map(_.toString).getOrElse("local") @@ -115,7 +112,7 @@ case class BlockchainSyncer(application: LagonakiApplication) extends FSM[Status //broadcast block only if it is generated locally if (remoteOpt.isEmpty) { - networkController ! NetworkController.BroadcastMessage(BlockMessage(height, block), List()) + networkController ! NetworkController.BroadcastMessage(BlockMessage(height, block)) } } else { log.warning(s"Non-valid block: $block from $fromStr") @@ -123,26 +120,23 @@ case class BlockchainSyncer(application: LagonakiApplication) extends FSM[Status } def tryToGenerateABlock() = { + val consModule = application.consensusModule + implicit val transModule = application.transactionModule + log.info("Trying to generate a new block") - val appState = application.storedState - val nonEmptyAccs = application.wallet.privateKeyAccounts().filter(acc => appState.balance(acc.address) > 0) - nonEmptyAccs.find { - privKeyAcc => - implicit val transactionModule = application.transactionModule - - //As Proof-of-Stake is being used for Scorex Lagonaki, generateNextBlock() finishes quickly - // (it should be, at least) so we're just going to wait for a result - Await.result(application.consensusModule.generateNextBlock(privKeyAcc), 500.millis) match { - case Some(block) => - self ! NewBlock(block, None) - true - case None => false + val accounts = application.wallet.privateKeyAccounts() + consModule.generateNextBlocks(accounts)(transModule) onComplete { + case Success(blocks: Seq[Block]) => + if (blocks.nonEmpty) { + val bestBlock = blocks.maxBy(consModule.blockScore) + self ! NewBlock(bestBlock, None) } + case Failure(ex) => log.error("Failed to generate new block: {}", ex) + case m => log.error("Unexpected message: {}", m) } } } - object BlockchainSyncer { sealed trait Status { diff --git a/src/main/scala/scorex/lagonaki/server/LagonakiApplication.scala b/src/main/scala/scorex/lagonaki/server/LagonakiApplication.scala index eabe24ef..b438a6f6 100644 --- a/src/main/scala/scorex/lagonaki/server/LagonakiApplication.scala +++ b/src/main/scala/scorex/lagonaki/server/LagonakiApplication.scala @@ -46,7 +46,7 @@ class LagonakiApplication(val settingsFilename: String) extends ScorexLogging { private implicit lazy val actorSystem = ActorSystem("lagonaki") lazy val networkController = actorSystem.actorOf(Props(classOf[NetworkController], this)) - lazy val blockchainSyncer = actorSystem.actorOf(Props(classOf[BlockchainSyncer], this)) + lazy val blockchainSyncer = actorSystem.actorOf(Props(classOf[BlockchainSyncer], this, networkController)) private lazy val walletFileOpt = settings.walletDirOpt.map(walletDir => new java.io.File(walletDir, "wallet.s.dat")) implicit lazy val wallet = new Wallet(walletFileOpt, settings.walletPassword, settings.walletSeed.get) diff --git a/src/test/scala/scorex/lagonaki/LagonakiTestSuite.scala b/src/test/scala/scorex/lagonaki/LagonakiTestSuite.scala index 30b803f8..439c8c11 100644 --- a/src/test/scala/scorex/lagonaki/LagonakiTestSuite.scala +++ b/src/test/scala/scorex/lagonaki/LagonakiTestSuite.scala @@ -1,21 +1,22 @@ package scorex.lagonaki import org.scalatest.{BeforeAndAfterAll, Suites} -import scorex.lagonaki.integration.{BlocksRoutingSpecification, ValidChainGenerationSpecification} +import scorex.lagonaki.integration.{BlockchainSyncerSpecification, BlocksRoutingSpecification, ValidChainGenerationSpecification} import scorex.lagonaki.unit._ class LagonakiTestSuite extends Suites( //unit tests new MessageSpecification - ,new BlockSpecification - ,new BlockchainStorageSpecification - ,new WalletSpecification - ,new BlocksRoutingSpecification + , new BlockSpecification + , new BlockchainStorageSpecification + , new WalletSpecification + , new BlockchainSyncerSpecification + , new BlocksRoutingSpecification //integration tests - slow! - ,new ValidChainGenerationSpecification + , new ValidChainGenerationSpecification -) with BeforeAndAfterAll { +) with BeforeAndAfterAll { override def beforeAll() = {} diff --git a/src/test/scala/scorex/lagonaki/integration/BlockchainSyncerSpecification.scala b/src/test/scala/scorex/lagonaki/integration/BlockchainSyncerSpecification.scala new file mode 100644 index 00000000..c64eaeed --- /dev/null +++ b/src/test/scala/scorex/lagonaki/integration/BlockchainSyncerSpecification.scala @@ -0,0 +1,37 @@ +package scorex.lagonaki.integration + +import akka.actor.ActorSystem +import akka.testkit._ +import org.scalatest.{Matchers, WordSpecLike} +import scorex.lagonaki.network.BlockchainSyncer.{Generating, GetStatus, Offline} +import scorex.lagonaki.server.LagonakiApplication +import scorex.utils.untilTimeout +import scala.concurrent.duration._ + +class BlockchainSyncerSpecification(_system: ActorSystem) + extends TestKit(_system) + with ImplicitSender + with WordSpecLike + with Matchers { + + def this() = this(ActorSystem("MySpec")) + + val application = new LagonakiApplication("settings-test.json") + application.checkGenesis() + val bcs = application.blockchainSyncer + + "BlockchainSyncer actor" must { + "be offline on load" in { + bcs ! GetStatus + expectMsg(Offline.name) + } + "generate after downloading state" in { + bcs ! Unit + //Wait up to 5 seconds to download blockchain and become generating + untilTimeout(5.seconds) { + bcs ! GetStatus + expectMsg(Generating.name) + } + } + } +} \ No newline at end of file