Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package scorex.consensus

import scorex.account.{PrivateKeyAccount, Account}
import scorex.block.{BlockProcessingModule, Block}
import scorex.transaction.{TransactionModule, History, State}
import scorex.account.{Account, PrivateKeyAccount}
import scorex.block.{Block, BlockProcessingModule}
import scorex.transaction.TransactionModule

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future


Expand All @@ -30,5 +31,10 @@ trait ConsensusModule[ConsensusBlockData] extends BlockProcessingModule[Consensu
def generateNextBlock[TT](account: PrivateKeyAccount)
(implicit transactionModule: TransactionModule[TT]): Future[Option[Block]]

def generateNextBlocks[T](accounts: Seq[PrivateKeyAccount])
(implicit transactionModule: TransactionModule[T]): Future[Seq[Block]] = {
Future.sequence(accounts.map(acc => generateNextBlock(acc))).map(_.flatten)
}

def consensusBlockData(block: Block): ConsensusBlockData
}
22 changes: 22 additions & 0 deletions scorex-basics/src/main/scala/scorex/utils/utils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package scorex

import scala.annotation.tailrec

import scala.concurrent.duration._

package object utils {

@tailrec
final def untilTimeout[T](timeout: FiniteDuration,
delay: FiniteDuration = 100.milliseconds)(fn: => T): T = {
util.Try {
fn
} match {
case util.Success(x) => x
case _ if timeout > delay =>
Thread.sleep(delay.toMillis)
untilTimeout(timeout - delay, delay)(fn)
case util.Failure(e) => throw e
}
}
}
116 changes: 55 additions & 61 deletions src/main/scala/scorex/lagonaki/network/BlockchainSyncer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,20 @@ package scorex.lagonaki.network

import java.net.InetSocketAddress

import akka.actor.FSM
import scorex.lagonaki.server.LagonakiApplication
import akka.actor.{ActorRef, FSM}
import scorex.block.Block
import scorex.lagonaki.network.BlockchainSyncer._
import scorex.lagonaki.network.message.{BlockMessage, GetSignaturesMessage}
import scorex.lagonaki.server.LagonakiApplication

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}


case class NewBlock(block: Block, sender: Option[InetSocketAddress])


//todo: reduce boilerplate code
case class BlockchainSyncer(application: LagonakiApplication) extends FSM[Status, Unit] {

private lazy val networkController = application.networkController
class BlockchainSyncer(application: LagonakiApplication, networkController: ActorRef) extends FSM[Status, Unit] {

private val stateTimeout = 1.second

Expand All @@ -33,75 +29,76 @@ case class BlockchainSyncer(application: LagonakiApplication) extends FSM[Status

case Event(Unit, _) =>
log.info("Initializing")
stay
stay()
}

when(Syncing) {
case Event(MaxChainScore(scoreOpt), _) => scoreOpt match {
case Some(maxScore) =>
val localScore = application.blockchainImpl.score()
log.info(s"maxScore: $maxScore, localScore: $localScore")
if (maxScore > localScore) {
case Event(MaxChainScore(scoreOpt), _) =>
processMaxScore(
scoreOpt,
onMax = () => {
val sigs = application.blockchainImpl.lastSignatures(application.settings.MaxBlocksChunks)
val msg = GetSignaturesMessage(sigs)
networkController ! NetworkController.SendMessageToBestPeer(msg)
stay
} else goto(Generating)

case None =>
if (application.settings.offlineGeneration) goto(Generating).using(Unit) else goto(Offline)
}
stay()
}
)

case Event(NewBlock(block, remoteOpt), _) =>
assert(remoteOpt.isDefined, "Local generation attempt while syncing")
processNewBlock(block, remoteOpt)
stay
stay()
}

when(Generating) {
case Event(NewBlock(block, remoteOpt), _) =>
processNewBlock(block, remoteOpt)
stay

case Event(MaxChainScore(scoreOpt), _) => scoreOpt match {
case Some(maxScore) =>
val localScore = application.blockchainImpl.score()
log.info(s"maxScore: $maxScore, localScore: $localScore")
if (maxScore > localScore) goto(Syncing)
else {
stay()

case Event(MaxChainScore(scoreOpt), _) =>
processMaxScore(
scoreOpt,
onNone = () => {
tryToGenerateABlock()
stay()
},
onLocal = () => {
tryToGenerateABlock()
stay
stay()
}

case None =>
tryToGenerateABlock()
stay
}
)
}

//common logic for all the states
whenUnhandled {
case Event(MaxChainScore(scoreOpt), _) => scoreOpt match {
case Some(maxScore) =>
val localScore = application.blockchainImpl.score()
log.info(s"maxScore: $maxScore, localScore: $localScore")
if (maxScore > localScore) goto(Syncing) else goto(Generating)

case None =>
if (application.settings.offlineGeneration) goto(Generating).using(Unit) else goto(Offline)
}
case Event(MaxChainScore(scoreOpt), _) =>
processMaxScore(scoreOpt)

case Event(GetStatus, _) =>
sender() ! super.stateName.name
stay
stay()

case Event(e, s) =>
log.warning(s"received unhandled request {$e} in state {$stateName}/{$s}")
stay
stay()
}

initialize()

def processMaxScore(
scoreOpt: Option[BigInt],
onLocal: () => State = () => goto(Generating),
onMax: () => State = () => goto(Syncing),
onNone: () => State = () =>
if (application.settings.offlineGeneration) goto(Generating).using(Unit) else goto(Offline)
): State = scoreOpt match {
case Some(maxScore) =>
val localScore = application.blockchainImpl.score()
log.info(s"maxScore: $maxScore, localScore: $localScore")
if (maxScore > localScore) onMax() else onLocal()
case None =>
onNone()
}

def processNewBlock(block: Block, remoteOpt: Option[InetSocketAddress]) = {
val fromStr = remoteOpt.map(_.toString).getOrElse("local")
Expand All @@ -115,34 +112,31 @@ case class BlockchainSyncer(application: LagonakiApplication) extends FSM[Status

//broadcast block only if it is generated locally
if (remoteOpt.isEmpty) {
networkController ! NetworkController.BroadcastMessage(BlockMessage(height, block), List())
networkController ! NetworkController.BroadcastMessage(BlockMessage(height, block))
}
} else {
log.warning(s"Non-valid block: $block from $fromStr")
}
}

def tryToGenerateABlock() = {
val consModule = application.consensusModule
implicit val transModule = application.transactionModule

log.info("Trying to generate a new block")
val appState = application.storedState
val nonEmptyAccs = application.wallet.privateKeyAccounts().filter(acc => appState.balance(acc.address) > 0)
nonEmptyAccs.find {
privKeyAcc =>
implicit val transactionModule = application.transactionModule

//As Proof-of-Stake is being used for Scorex Lagonaki, generateNextBlock() finishes quickly
// (it should be, at least) so we're just going to wait for a result
Await.result(application.consensusModule.generateNextBlock(privKeyAcc), 500.millis) match {
case Some(block) =>
self ! NewBlock(block, None)
true
case None => false
val accounts = application.wallet.privateKeyAccounts()
consModule.generateNextBlocks(accounts)(transModule) onComplete {
case Success(blocks: Seq[Block]) =>
if (blocks.nonEmpty) {
val bestBlock = blocks.maxBy(consModule.blockScore)
self ! NewBlock(bestBlock, None)
}
case Failure(ex) => log.error("Failed to generate new block: {}", ex)
case m => log.error("Unexpected message: {}", m)
}
}
}


object BlockchainSyncer {

sealed trait Status {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class LagonakiApplication(val settingsFilename: String) extends ScorexLogging {

private implicit lazy val actorSystem = ActorSystem("lagonaki")
lazy val networkController = actorSystem.actorOf(Props(classOf[NetworkController], this))
lazy val blockchainSyncer = actorSystem.actorOf(Props(classOf[BlockchainSyncer], this))
lazy val blockchainSyncer = actorSystem.actorOf(Props(classOf[BlockchainSyncer], this, networkController))

private lazy val walletFileOpt = settings.walletDirOpt.map(walletDir => new java.io.File(walletDir, "wallet.s.dat"))
implicit lazy val wallet = new Wallet(walletFileOpt, settings.walletPassword, settings.walletSeed.get)
Expand Down
15 changes: 8 additions & 7 deletions src/test/scala/scorex/lagonaki/LagonakiTestSuite.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package scorex.lagonaki

import org.scalatest.{BeforeAndAfterAll, Suites}
import scorex.lagonaki.integration.{BlocksRoutingSpecification, ValidChainGenerationSpecification}
import scorex.lagonaki.integration.{BlockchainSyncerSpecification, BlocksRoutingSpecification, ValidChainGenerationSpecification}
import scorex.lagonaki.unit._

class LagonakiTestSuite extends Suites(
//unit tests
new MessageSpecification
,new BlockSpecification
,new BlockchainStorageSpecification
,new WalletSpecification
,new BlocksRoutingSpecification
, new BlockSpecification
, new BlockchainStorageSpecification
, new WalletSpecification
, new BlockchainSyncerSpecification
, new BlocksRoutingSpecification

//integration tests - slow!
,new ValidChainGenerationSpecification
, new ValidChainGenerationSpecification

) with BeforeAndAfterAll {
) with BeforeAndAfterAll {

override def beforeAll() = {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package scorex.lagonaki.integration

import akka.actor.ActorSystem
import akka.testkit._
import org.scalatest.{Matchers, WordSpecLike}
import scorex.lagonaki.network.BlockchainSyncer.{Generating, GetStatus, Offline}
import scorex.lagonaki.server.LagonakiApplication
import scorex.utils.untilTimeout
import scala.concurrent.duration._

class BlockchainSyncerSpecification(_system: ActorSystem)
extends TestKit(_system)
with ImplicitSender
with WordSpecLike
with Matchers {

def this() = this(ActorSystem("MySpec"))

val application = new LagonakiApplication("settings-test.json")
application.checkGenesis()
val bcs = application.blockchainSyncer

"BlockchainSyncer actor" must {
"be offline on load" in {
bcs ! GetStatus
expectMsg(Offline.name)
}
"generate after downloading state" in {
bcs ! Unit
//Wait up to 5 seconds to download blockchain and become generating
untilTimeout(5.seconds) {
bcs ! GetStatus
expectMsg(Generating.name)
}
}
}
}