Skip to content

Commit

Permalink
Apply changes
Browse files Browse the repository at this point in the history
- make in ancientBlockHash condition simpler by using maxNewBlockHashAge > 0 instead od comparing blocks
- move notDownloading method to RegularSyncState
- fix log formatting
- make more specific scalastyle for handleBlockImport
- add description to importingBlocks param
  • Loading branch information
Agnieszka Kowal committed Oct 1, 2018
1 parent c66d342 commit 9dcae83
Showing 1 changed file with 28 additions and 30 deletions.
58 changes: 28 additions & 30 deletions src/main/scala/io/iohk/ethereum/blockchain/sync/RegularSync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class RegularSync(
resumeRegularSync(state)

case PrintStatus =>
log.info(s"Block: ${ blockchain.getBestBlockNumber() }. Peers: ${ handshakedPeers.size } (${ blacklistedPeers.size } blacklisted)")
log.info(s"Block: ${blockchain.getBestBlockNumber()}. Peers: ${handshakedPeers.size} (${blacklistedPeers.size} blacklisted)")

case CancelResume(toCancel) =>
toCancel.cancel()
Expand Down Expand Up @@ -116,13 +116,14 @@ class RegularSync(
def handleNewBlockMessages(state: RegularSyncState): Receive = {
case MessageFromPeer(NewBlock(newBlock, _), peerId) =>
// We allow inclusion of new block only if we are not syncing
if (notDownloading(state) && state.topOfTheChain && state.notImportingBlocks) {
if (state.notDownloading && state.topOfTheChain && state.notImportingBlocks) {
context become handleBlockImport(state.withImportingBlocks(true))
self ! NewBlockImport(newBlock, peerId)
}
}

// scalastyle:off
// scalastyle:off cyclomatic.complexity
// scalastyle:off method.length
def handleBlockImport(state: RegularSyncState): Receive = {
case NewBlockImport(newBlock, peerId) =>
log.debug(s"Handling NewBlock message for block (${newBlock.idTag})")
Expand Down Expand Up @@ -156,8 +157,8 @@ class RegularSync(
broadcastBlocks(newBranch, totalDifficulties)
val header = newBranch.last.header
log.debug(s"Imported block $newNumber ($headerHash) from $peerId, " +
s"resulting in chain reorganisation: new branch of length ${ newBranch.size } with head at block " +
s"${ header.number } (${ hash2string(header.hash) })")
s"resulting in chain reorganisation: new branch of length ${newBranch.size} with head at block " +
s"${header.number} (${hash2string(header.hash)})")

case BlockImportFailed(error) =>
blacklist(peerId, blacklistDuration, error)
Expand Down Expand Up @@ -213,7 +214,6 @@ class RegularSync(
context become running(state.withImportingBlocks(false))
}(context.dispatcher)
}
// scalastyle:on

private def hash2string(hash: ByteString): String = Hex.toHexString(hash.toArray[Byte])

Expand All @@ -224,7 +224,7 @@ class RegularSync(
val possiblePeer = peersToDownloadFrom.find{ case (peer, _) => peer.id == peerId }
// we allow asking for new hashes when we are not syncing and we can download from specified peer,
// we are on top of the chain and not resolving branches currently
if (notDownloading(state) && state.topOfTheChain && possiblePeer.isDefined) {
if (state.notDownloading && state.topOfTheChain && possiblePeer.isDefined) {
log.debug("Handling NewBlockHashes message: \n" + hashes.mkString("\n"))
val (peer, _) = possiblePeer.get
val hashesToCheck = hashes.take(syncConfig.maxNewHashes)
Expand Down Expand Up @@ -273,7 +273,7 @@ class RegularSync(
}

private def ancientBlockHash(blockNumber: BigInt, currentBestBlockNumber: BigInt): Boolean =
(currentBestBlockNumber > blockNumber) && (currentBestBlockNumber - blockNumber > syncConfig.maxNewBlockHashAge)
(syncConfig.maxNewBlockHashAge > 0) && (currentBestBlockNumber - blockNumber > syncConfig.maxNewBlockHashAge)

def handleResponseToRequest(state: RegularSyncState): Receive = {
case ResponseReceived(peer, BlockHeaders(headers), timeTaken) =>
Expand All @@ -296,7 +296,7 @@ class RegularSync(
handleReDownloadedStateNodes(peer, nodes, state.withWaitingForAnActor(None))

case PeerRequestHandler.RequestFailed(peer, reason) if state.waitingForAnActor.contains(sender()) =>
log.debug(s"Request to peer ({}) failed: {}", peer, reason)
log.debug("Request to peer ({}) failed: {}", peer, reason)
if (handshakedPeers.contains(peer)) blacklist(peer.id, blacklistDuration, reason)

scheduleResume(state.withWaitingForAnActor(None))
Expand All @@ -307,7 +307,7 @@ class RegularSync(
// We allow inclusion of mined block only if we are not syncing / reorganising chain
case MinedBlock(block) =>
val header = block.header
if (notDownloading(state) && state.notImportingBlocks) {
if (state.notDownloading && state.notImportingBlocks) {
context become handleBlockImport(state.withImportingBlocks(true))
self ! MinedBlockImport(block, header)
} else {
Expand All @@ -322,7 +322,7 @@ class RegularSync(
log.debug(s"Requesting $blockHeadersPerRequest headers, starting from $blockNumber")
val headers = GetBlockHeaders(Left(blockNumber), blockHeadersPerRequest, skip = 0, reverse = false)
val request = requestBlockHeaders(peer, headers)
context become running(state.withWaitingForAnActor(request).withImportingBlocks(true)) // todo start
context become running(state.withWaitingForAnActor(request).withImportingBlocks(true))

case None =>
log.debug("No peers to download from")
Expand Down Expand Up @@ -502,7 +502,7 @@ class RegularSync(
requestMissingNode(missingNodeEx.hash, syncState)

case Some(error) =>
resumeWithDifferentPeer(peer, s"a block execution error: ${ error.toString }", state)
resumeWithDifferentPeer(peer, s"a block execution error: ${error.toString}", state)

case None =>
val syncState = state.withHeadersQueue(headers)
Expand Down Expand Up @@ -585,11 +585,6 @@ class RegularSync(
broadcaster.broadcastBlock(NewBlock(block, td), handshakedPeers)
}
}

private def notDownloading(state: RegularSyncState): Boolean = {
state.hasEmptyHeadersQueue && state.notWaitingForAnActor && state.notResolvingBranches
}

}

object RegularSync {
Expand Down Expand Up @@ -633,19 +628,20 @@ object RegularSync {

case class MissingStateNodeRetry(nodeId: ByteString, p: Peer, blocksToRetry: Seq[Block])

/** Stores all state changes during regular synchronisation.
*
* @param waitingForAnActor when on top of the chain and handling newBlockHashes message
* @param headersQueue headers queue
* @param topOfTheChain is used as an optimisation to avoid handling broadcast messages when we haven't reached the top of the chain.
* Currently it's set to true after we receive 0 headers from a peer, which usually means it doesn't have any new headers.
* But there could be other reasons for receiving 0 blocks. It may be better to make handling broadcast messages
* dependent on our current best block info (stored in this actor to avoid DB look-ups).
* @param resolvingBranches defines if branch is being resolved or not
* @param resumeRegularSyncTimeout schedule if regular synchronization should be resumed by calling self with
* `ResumeRegularSync` message or cancelled by calling cancel
* @param missingStateNodeRetry state node that is missing
*/
/** Stores all state changes during regular synchronisation.
*
* @param waitingForAnActor when on top of the chain and handling newBlockHashes message
* @param headersQueue headers queue
* @param topOfTheChain is used as an optimisation to avoid handling broadcast messages when we haven't reached the top of the chain.
* Currently it's set to true after we receive 0 headers from a peer, which usually means it doesn't have any new headers.
* But there could be other reasons for receiving 0 blocks. It may be better to make handling broadcast messages
* dependent on our current best block info (stored in this actor to avoid DB look-ups).
* @param resolvingBranches defines if branch is being resolved or not
* @param resumeRegularSyncTimeout schedule if regular synchronization should be resumed by calling self with
* `ResumeRegularSync` message or cancelled by calling cancel
* @param missingStateNodeRetry state node that is missing
* @param importingBlocks defines if blocks are imported which allows concurrent header validation
*/
case class RegularSyncState(
waitingForAnActor: Option[ActorRef] = None,
headersQueue: Seq[BlockHeader] = Seq.empty,
Expand Down Expand Up @@ -680,5 +676,7 @@ object RegularSync {

def notImportingBlocks: Boolean = !importingBlocks

def notDownloading: Boolean = hasEmptyHeadersQueue && notWaitingForAnActor && notResolvingBranches

}
}

0 comments on commit 9dcae83

Please sign in to comment.