From 7986e9bde38e0b2276091cf17ab8d0627ee93a3f Mon Sep 17 00:00:00 2001 From: Jiri Peinlich Date: Thu, 14 Apr 2022 09:12:33 +0100 Subject: [PATCH] Improving backwards sync (#3638) Refactor Backwards sync to use a rocks db. Signed-off-by: Jiri Peinlich --- .../MergeBesuControllerBuilder.java | 8 +- .../TransitionBesuControllerBuilder.java | 6 - .../TransitionControllerBuilderTest.java | 5 + .../merge/TransitionBackwardSyncContext.java | 8 +- .../merge/blockcreation/MergeCoordinator.java | 3 +- .../keyvalue/KeyValueSegmentIdentifier.java | 3 +- .../eth/sync/backwardsync/BackwardChain.java | 240 ++++++---------- .../backwardsync/BackwardSyncContext.java | 258 +++++++++--------- .../BackwardSyncLookupService.java | 172 ------------ .../sync/backwardsync/BackwardSyncPhase.java | 193 ------------- .../sync/backwardsync/BackwardSyncStep.java | 112 ++++++++ .../sync/backwardsync/BackwardSyncTask.java | 58 ---- ...ardSyncPhase.java => ForwardSyncStep.java} | 91 ++---- .../GenericKeyValueStorageFacade.java | 20 ++ .../eth/sync/backwardsync/HashConvertor.java | 34 +++ .../eth/sync/backwardsync/SyncStepStep.java | 74 +++++ .../ethereum/eth/sync/state/SyncState.java | 11 + .../backwardsync/BackwardSyncContextTest.java | 132 ++++++--- .../BackwardSyncLookupServiceTest.java | 132 --------- ...aseTest.java => BackwardSyncStepTest.java} | 142 ++-------- .../backwardsync/BackwardSyncTaskTest.java | 111 -------- ...haseTest.java => ForwardSyncStepTest.java} | 100 ++----- .../InMemoryBackwardChainTest.java | 131 +++++---- plugin-api/build.gradle | 2 +- .../besu/plugin/services/BesuEvents.java | 12 + 25 files changed, 760 insertions(+), 1298 deletions(-) delete mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupService.java delete mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhase.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java delete mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTask.java rename ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/{ForwardSyncPhase.java => ForwardSyncStep.java} (73%) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/HashConvertor.java create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java delete mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupServiceTest.java rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/{BackwardSyncPhaseTest.java => BackwardSyncStepTest.java} (58%) delete mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTaskTest.java rename ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/{ForwardSyncPhaseTest.java => ForwardSyncStepTest.java} (70%) diff --git a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java index 2e578e09659..6a59bbbf1a6 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/MergeBesuControllerBuilder.java @@ -28,11 +28,12 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator; import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator; +import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain; import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; -import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncLookupService; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; +import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import java.util.List; @@ -68,9 +69,8 @@ protected MiningCoordinator createMiningCoordinator( metricsSystem, ethProtocolManager.ethContext(), syncState, - new BackwardSyncLookupService( - protocolSchedule, ethProtocolManager.ethContext(), metricsSystem, protocolContext), - storageProvider)); + BackwardChain.from( + storageProvider, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule)))); } protected MiningCoordinator createTransitionMiningCoordinator( diff --git a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java index e4e596bb48f..3e257cbc30c 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/TransitionBesuControllerBuilder.java @@ -34,7 +34,6 @@ import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration; import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; -import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncLookupService; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; @@ -98,11 +97,6 @@ protected MiningCoordinator createMiningCoordinator( metricsSystem, ethProtocolManager.ethContext(), syncState, - new BackwardSyncLookupService( - transitionProtocolSchedule, - ethProtocolManager.ethContext(), - metricsSystem, - protocolContext), storageProvider); final TransitionCoordinator composedCoordinator = diff --git a/besu/src/test/java/org/hyperledger/besu/controller/TransitionControllerBuilderTest.java b/besu/src/test/java/org/hyperledger/besu/controller/TransitionControllerBuilderTest.java index 89a2d559617..2e69d9fb9a7 100644 --- a/besu/src/test/java/org/hyperledger/besu/controller/TransitionControllerBuilderTest.java +++ b/besu/src/test/java/org/hyperledger/besu/controller/TransitionControllerBuilderTest.java @@ -30,12 +30,14 @@ import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.Difficulty; +import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; +import org.hyperledger.besu.ethereum.storage.StorageProvider; import java.util.Optional; @@ -61,6 +63,7 @@ public class TransitionControllerBuilderTest { @Mock SyncState syncState; @Mock EthProtocolManager ethProtocolManager; @Mock PostMergeContext mergeContext; + StorageProvider storageProvider = new InMemoryKeyValueStorageProvider(); @Spy CliqueBesuControllerBuilder cliqueBuilder = new CliqueBesuControllerBuilder(); @Spy BesuControllerBuilder powBuilder = new MainnetBesuControllerBuilder(); @@ -76,6 +79,7 @@ public void setup() { new TransitionProtocolSchedule( preMergeProtocolSchedule, postMergeProtocolSchedule, mergeContext)); cliqueBuilder.nodeKey(NodeKeyUtils.generate()); + postMergeBuilder.storageProvider(storageProvider); when(protocolContext.getBlockchain()).thenReturn(mockBlockchain); when(transitionProtocolSchedule.getPostMergeSchedule()).thenReturn(postMergeProtocolSchedule); when(transitionProtocolSchedule.getPreMergeSchedule()).thenReturn(preMergeProtocolSchedule); @@ -152,6 +156,7 @@ public void assertPreMergeScheduleForBelowTerminalBlockWhenPostMergeIfNotFinaliz TransitionCoordinator buildTransitionCoordinator( final BesuControllerBuilder preMerge, final MergeBesuControllerBuilder postMerge) { var builder = new TransitionBesuControllerBuilder(preMerge, postMerge); + builder.storageProvider(storageProvider); var coordinator = builder.createMiningCoordinator( transitionProtocolSchedule, diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionBackwardSyncContext.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionBackwardSyncContext.java index 819644d1303..fe6bb8860f6 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionBackwardSyncContext.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/TransitionBackwardSyncContext.java @@ -18,9 +18,10 @@ import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardChain; import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncContext; -import org.hyperledger.besu.ethereum.eth.sync.backwardsync.BackwardSyncLookupService; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; +import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.plugin.services.MetricsSystem; @@ -34,7 +35,6 @@ public TransitionBackwardSyncContext( final MetricsSystem metricsSystem, final EthContext ethContext, final SyncState syncState, - final BackwardSyncLookupService backwardSyncLookupService, final StorageProvider storageProvider) { super( protocolContext, @@ -42,8 +42,8 @@ public TransitionBackwardSyncContext( metricsSystem, ethContext, syncState, - backwardSyncLookupService, - storageProvider); + BackwardChain.from( + storageProvider, ScheduleBasedBlockHeaderFunctions.create(transitionProtocolSchedule))); this.transitionProtocolSchedule = transitionProtocolSchedule; } diff --git a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java index 2554073d7ff..917badbe987 100644 --- a/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java +++ b/consensus/merge/src/main/java/org/hyperledger/besu/consensus/merge/blockcreation/MergeCoordinator.java @@ -16,7 +16,6 @@ import static org.hyperledger.besu.consensus.merge.TransitionUtils.isTerminalProofOfWorkBlock; import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; -import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda; import org.hyperledger.besu.consensus.merge.MergeContext; import org.hyperledger.besu.datatypes.Address; @@ -215,7 +214,7 @@ public Optional getOrSyncHeaderByHash(final Hash blockhash) { if (optHeader.isPresent()) { debugLambda(LOG, "BlockHeader {} is already present", () -> optHeader.get().toLogString()); } else { - infoLambda(LOG, "appending block hash {} to backward sync", blockhash::toHexString); + debugLambda(LOG, "appending block hash {} to backward sync", blockhash::toHexString); backwardSyncContext.syncBackwardsUntil(blockhash); } return optHeader; diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java index 1dcb6468825..5c624acece9 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/storage/keyvalue/KeyValueSegmentIdentifier.java @@ -32,7 +32,8 @@ public enum KeyValueSegmentIdentifier implements SegmentIdentifier { GOQUORUM_PRIVATE_WORLD_STATE(new byte[] {11}), GOQUORUM_PRIVATE_STORAGE(new byte[] {12}), BACKWARD_SYNC_HEADERS(new byte[] {13}), - BACKWARD_SYNC_BLOCKS(new byte[] {14}); + BACKWARD_SYNC_BLOCKS(new byte[] {14}), + BACKWARD_SYNC_CHAIN(new byte[] {15}); private final byte[] id; private final int[] versionList; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardChain.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardChain.java index d4150bb47b1..52ab61254c3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardChain.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardChain.java @@ -16,7 +16,6 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync; import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; -import static org.hyperledger.besu.util.Slf4jLambdaHelper.warnLambda; import static org.slf4j.LoggerFactory.getLogger; import org.hyperledger.besu.datatypes.Hash; @@ -26,81 +25,75 @@ import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; +import java.util.Queue; import org.slf4j.Logger; public class BackwardChain { private static final Logger LOG = getLogger(BackwardChain.class); - private final List ancestors = new ArrayList<>(); - private final List successors = new ArrayList<>(); - - protected final GenericKeyValueStorageFacade headers; - protected final GenericKeyValueStorageFacade blocks; + private final GenericKeyValueStorageFacade headers; + private final GenericKeyValueStorageFacade blocks; + private final GenericKeyValueStorageFacade chainStorage; + private Optional firstStoredAncestor = Optional.empty(); + private Optional lastStoredPivot = Optional.empty(); + private final Queue hashesToAppend = new ArrayDeque<>(); public BackwardChain( - final StorageProvider provider, - final BlockHeaderFunctions blockHeaderFunctions, - final Block pivot) { - this( + final GenericKeyValueStorageFacade headersStorage, + final GenericKeyValueStorageFacade blocksStorage, + final GenericKeyValueStorageFacade chainStorage) { + this.headers = headersStorage; + this.blocks = blocksStorage; + this.chainStorage = chainStorage; + } + + public static BackwardChain from( + final StorageProvider storageProvider, final BlockHeaderFunctions blockHeaderFunctions) { + return new BackwardChain( new GenericKeyValueStorageFacade<>( Hash::toArrayUnsafe, BlocksHeadersConvertor.of(blockHeaderFunctions), - provider.getStorageBySegmentIdentifier( + storageProvider.getStorageBySegmentIdentifier( KeyValueSegmentIdentifier.BACKWARD_SYNC_HEADERS)), new GenericKeyValueStorageFacade<>( Hash::toArrayUnsafe, BlocksConvertor.of(blockHeaderFunctions), - provider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.BACKWARD_SYNC_BLOCKS)), - pivot); + storageProvider.getStorageBySegmentIdentifier( + KeyValueSegmentIdentifier.BACKWARD_SYNC_BLOCKS)), + new GenericKeyValueStorageFacade<>( + Hash::toArrayUnsafe, + new HashConvertor(), + storageProvider.getStorageBySegmentIdentifier( + KeyValueSegmentIdentifier.BACKWARD_SYNC_CHAIN))); } - public BackwardChain( - final GenericKeyValueStorageFacade headersStorage, - final GenericKeyValueStorageFacade blocksStorage, - final Block pivot) { - - this.headers = headersStorage; - this.blocks = blocksStorage; - headersStorage.put(pivot.getHeader().getHash(), pivot.getHeader()); - blocksStorage.put(pivot.getHash(), pivot); - ancestors.add(pivot.getHeader().getHash()); - successors.add(pivot.getHash()); + public synchronized Optional getFirstAncestorHeader() { + return firstStoredAncestor; } - public Optional getFirstAncestorHeader() { - if (ancestors.isEmpty()) { - return Optional.empty(); + public synchronized List getFirstNAncestorHeaders(final int size) { + List result = new ArrayList<>(size); + Optional it = firstStoredAncestor; + while (it.isPresent() && result.size() < size) { + result.add(it.get()); + it = chainStorage.get(it.get().getHash()).flatMap(headers::get); } - return headers.get(ancestors.get(ancestors.size() - 1)); + return result; } - public List getFirstNAncestorHeaders(final int size) { - List resultList = new ArrayList<>(size); - for (int i = Math.min(size, ancestors.size()); i > 0; --i) { - resultList.add(ancestors.get(ancestors.size() - i)); + public synchronized void prependAncestorsHeader(final BlockHeader blockHeader) { + if (firstStoredAncestor.isEmpty()) { + firstStoredAncestor = Optional.of(blockHeader); + lastStoredPivot = Optional.of(blockHeader); + headers.put(blockHeader.getHash(), blockHeader); + return; } - return resultList.stream() - .map(h -> this.headers.get(h).orElseThrow()) - .collect(Collectors.toList()); - } - - public List getAllAncestors() { - return getFirstNAncestorHeaders(ancestors.size()); - } - - public void prependAncestorsHeader(final BlockHeader blockHeader) { - BlockHeader firstHeader = - getFirstAncestorHeader() - .orElseThrow( - () -> - new BackwardSyncException( - "Cannot save more headers during forward sync", true)); + BlockHeader firstHeader = firstStoredAncestor.get(); if (firstHeader.getNumber() != blockHeader.getNumber() + 1) { throw new BackwardSyncException( "Wrong height of header " @@ -118,134 +111,81 @@ public void prependAncestorsHeader(final BlockHeader blockHeader) { + firstHeader.getParentHash().toHexString()); } headers.put(blockHeader.getHash(), blockHeader); - ancestors.add(blockHeader.getHash()); + chainStorage.put(blockHeader.getHash(), firstStoredAncestor.get().getHash()); + firstStoredAncestor = Optional.of(blockHeader); debugLambda( LOG, "Added header {} on height {} to backward chain led by pivot {} on height {}", () -> blockHeader.getHash().toHexString(), blockHeader::getNumber, - () -> firstHeader.getHash().toHexString(), + () -> lastStoredPivot.orElseThrow().getHash().toHexString(), firstHeader::getNumber); } - public void prependChain(final BackwardChain historicalBackwardChain) { - BlockHeader firstHeader = - getFirstAncestorHeader() - .orElseThrow( - () -> new BackwardSyncException("Cannot merge when syncing forward...", true)); - Optional historicalHeader = - historicalBackwardChain.getHeaderOnHeight(firstHeader.getNumber() - 1); - if (historicalHeader.isEmpty()) { - return; - } - if (firstHeader.getParentHash().equals(historicalHeader.orElseThrow().getHash())) { - for (Block successor : historicalBackwardChain.getSuccessors()) { - if (successor.getHeader().getNumber() > getPivot().getHeader().getNumber()) { - this.successors.add(successor.getHeader().getHash()); - } - } - Collections.reverse(historicalBackwardChain.getSuccessors()); - for (Block successor : historicalBackwardChain.getSuccessors()) { - if (successor.getHeader().getNumber() - < getFirstAncestorHeader().orElseThrow().getNumber()) { - this.ancestors.add(successor.getHeader().getHash()); - } - } - for (BlockHeader ancestor : historicalBackwardChain.getAllAncestors()) { - if (ancestor.getNumber() < getFirstAncestorHeader().orElseThrow().getNumber()) { - this.ancestors.add(ancestor.getHash()); - } - } - debugLambda( - LOG, - "Merged backward chain. New chain starts at height {} and ends at height {}", - () -> getPivot().getHeader().getNumber(), - () -> getFirstAncestorHeader().orElseThrow().getNumber()); - } else { - warnLambda( - LOG, - "Cannot merge previous historical run because headers on height {} ({}) of {} and {} are not equal. Ignoring previous run. Did someone lie to us?", - () -> firstHeader.getNumber() - 1, - () -> historicalHeader.orElseThrow().getNumber(), - () -> firstHeader.getParentHash().toHexString(), - () -> historicalHeader.orElseThrow().getHash().toHexString()); + public synchronized Optional getPivot() { + if (lastStoredPivot.isEmpty()) { + return Optional.empty(); } + return blocks.get(lastStoredPivot.get().getHash()); } - public Block getPivot() { - return blocks.get(successors.get(successors.size() - 1)).orElseThrow(); - } - - public void dropFirstHeader() { - headers.drop(ancestors.get(ancestors.size() - 1)); - ancestors.remove(ancestors.size() - 1); + public synchronized void dropFirstHeader() { + if (firstStoredAncestor.isEmpty()) { + return; + } + headers.drop(firstStoredAncestor.get().getHash()); + final Optional hash = chainStorage.get(firstStoredAncestor.get().getHash()); + chainStorage.drop(firstStoredAncestor.get().getHash()); + firstStoredAncestor = hash.flatMap(headers::get); + if (firstStoredAncestor.isEmpty()) { + lastStoredPivot = Optional.empty(); + } } - public void appendExpectedBlock(final Block newPivot) { - successors.add(newPivot.getHash()); + public synchronized void appendTrustedBlock(final Block newPivot) { headers.put(newPivot.getHash(), newPivot.getHeader()); blocks.put(newPivot.getHash(), newPivot); + if (lastStoredPivot.isEmpty()) { + firstStoredAncestor = Optional.of(newPivot.getHeader()); + } else { + if (newPivot.getHeader().getParentHash().equals(lastStoredPivot.get().getHash())) { + chainStorage.put(lastStoredPivot.get().getHash(), newPivot.getHash()); + } else { + firstStoredAncestor = Optional.of(newPivot.getHeader()); + } + } + lastStoredPivot = Optional.of(newPivot.getHeader()); } - public List getSuccessors() { - return successors.stream() - .map(hash -> blocks.get(hash).orElseThrow()) - .collect(Collectors.toList()); - } - - public boolean isTrusted(final Hash hash) { + public synchronized boolean isTrusted(final Hash hash) { return blocks.get(hash).isPresent(); } - public Block getTrustedBlock(final Hash hash) { + public synchronized Block getTrustedBlock(final Hash hash) { return blocks.get(hash).orElseThrow(); } - public void clear() { - ancestors.clear(); - successors.clear(); + public synchronized void clear() { blocks.clear(); headers.clear(); + chainStorage.clear(); + firstStoredAncestor = Optional.empty(); + lastStoredPivot = Optional.empty(); + hashesToAppend.clear(); } - public void commit() {} + public synchronized Optional getHeader(final Hash hash) { + return headers.get(hash); + } - public Optional getHeaderOnHeight(final long height) { - if (ancestors.isEmpty()) { - return Optional.empty(); - } - final long firstAncestor = headers.get(ancestors.get(0)).orElseThrow().getNumber(); - if (firstAncestor >= height) { - if (firstAncestor - height < ancestors.size()) { - final Optional blockHeader = - headers.get(ancestors.get((int) (firstAncestor - height))); - blockHeader.ifPresent( - blockHeader1 -> - LOG.debug( - "First: {} Height: {}, result: {}", - firstAncestor, - height, - blockHeader.orElseThrow().getNumber())); - return blockHeader; - } else { - return Optional.empty(); - } - } else { - if (successors.isEmpty()) { - return Optional.empty(); - } - final long firstSuccessor = headers.get(successors.get(0)).orElseThrow().getNumber(); - if (height - firstSuccessor < successors.size()) { - LOG.debug( - "First: {} Height: {}, result: {}", - firstSuccessor, - height, - headers.get(successors.get((int) (height - firstSuccessor))).orElseThrow().getNumber()); - - return headers.get(successors.get((int) (height - firstSuccessor))); - } else { - return Optional.empty(); - } + public synchronized void addNewHash(final Hash newBlockHash) { + if (hashesToAppend.contains(newBlockHash)) { + return; } + this.hashesToAppend.add(newBlockHash); + } + + public synchronized Optional getFirstHash() { + return Optional.ofNullable(hashesToAppend.poll()); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java index acf97118e03..f72556a76b7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContext.java @@ -15,29 +15,27 @@ package org.hyperledger.besu.ethereum.eth.sync.backwardsync; import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.BlockValidator; import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; -import org.hyperledger.besu.ethereum.mainnet.ScheduleBasedBlockHeaderFunctions; -import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.plugin.services.MetricsSystem; import java.time.Duration; -import java.util.Collections; -import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,12 +50,10 @@ public class BackwardSyncContext { private final MetricsSystem metricsSystem; private final SyncState syncState; - private final Map backwardChainMap = new ConcurrentHashMap<>(); - private final AtomicReference currentChain = new AtomicReference<>(); private final AtomicReference> currentBackwardSyncFuture = new AtomicReference<>(); - private final BackwardSyncLookupService service; - private final StorageProvider storageProvider; + private final BackwardChain backwardChain; + private int batchSize = BATCH_SIZE; public BackwardSyncContext( final ProtocolContext protocolContext, @@ -65,16 +61,14 @@ public BackwardSyncContext( final MetricsSystem metricsSystem, final EthContext ethContext, final SyncState syncState, - final BackwardSyncLookupService backwardSyncLookupService, - final StorageProvider storageProvider) { + final BackwardChain backwardChain) { this.protocolContext = protocolContext; this.protocolSchedule = protocolSchedule; this.ethContext = ethContext; this.metricsSystem = metricsSystem; this.syncState = syncState; - this.service = backwardSyncLookupService; - this.storageProvider = storageProvider; + this.backwardChain = backwardChain; } public boolean isSyncing() { @@ -83,114 +77,49 @@ public boolean isSyncing() { .orElse(Boolean.FALSE); } - public CompletableFuture syncBackwardsUntil(final Hash newBlockhash) { - final Optional chain = getCurrentChain(); - CompletableFuture> completableFuture; - if (chain.isPresent() && chain.get().isTrusted(newBlockhash)) { + public CompletableFuture syncBackwardsUntil(final Hash newBlockHash) { + final CompletableFuture future = this.currentBackwardSyncFuture.get(); + if (backwardChain.isTrusted(newBlockHash)) { debugLambda( LOG, - "not fetching and appending hash {} to backwards sync since it is present in successors", - newBlockhash::toHexString); - completableFuture = CompletableFuture.completedFuture(Collections.emptyList()); - } else { - completableFuture = service.lookup(newBlockhash); + "not fetching or appending hash {} to backwards sync since it is present in successors", + newBlockHash::toHexString); + return future; } - - // kick off async process to fetch this block by hash then delegate to syncBackwardsUntil - final CompletableFuture future = - completableFuture.thenCompose( - blocks -> { - if (blocks.isEmpty()) { - return CompletableFuture.completedFuture(null); - } else return this.syncBackwardsUntil(blocks); - }); - this.currentBackwardSyncFuture.set(future); - return future; - } - - private CompletionStage syncBackwardsUntil(final List blocks) { - CompletableFuture future = CompletableFuture.completedFuture(null); - for (Block block : blocks) { - future = future.thenCompose(unused -> syncBackwardsUntil(block)); + backwardChain.addNewHash(newBlockHash); + if (future != null) { + return future; } - return future; + infoLambda(LOG, "Starting new backward sync towards a pivot {}", newBlockHash::toHexString); + this.currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry()); + return this.currentBackwardSyncFuture.get(); } public CompletableFuture syncBackwardsUntil(final Block newPivot) { - final BackwardChain backwardChain = currentChain.get(); - if (backwardChain == null) { + final CompletableFuture future = this.currentBackwardSyncFuture.get(); + if (backwardChain.isTrusted(newPivot.getHash())) { debugLambda( LOG, - "Starting new backward sync towards a pivot {} at height {}", - () -> newPivot.getHash().toString().substring(0, 20), - () -> newPivot.getHeader().getNumber()); - final BackwardChain newChain = - new BackwardChain( - storageProvider, - ScheduleBasedBlockHeaderFunctions.create(protocolSchedule), - newPivot); - this.currentChain.set(newChain); - backwardChainMap.put(newPivot.getHeader().getNumber(), newChain); - currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry(newChain)); - return currentBackwardSyncFuture.get(); + "not fetching or appending hash {} to backwards sync since it is present in successors", + () -> newPivot.getHash().toHexString()); + return future; } - if (newPivot.getHeader().getParentHash().equals(currentChain.get().getPivot().getHash())) { - LOG.debug( - "Backward sync is ongoing. Appending expected next block to the end of backward sync chain"); - backwardChain.appendExpectedBlock(newPivot); - backwardChainMap.put(newPivot.getHeader().getNumber(), backwardChain); - return currentBackwardSyncFuture.get(); + backwardChain.appendTrustedBlock(newPivot); + if (future != null) { + return future; } - debugLambda( + infoLambda( LOG, - "Stopping existing backward sync from pivot {} at height {} and restarting with pivot {} at height {}", - () -> backwardChain.getPivot().getHash().toString().substring(0, 20), - () -> backwardChain.getPivot().getHeader().getNumber(), - () -> newPivot.getHash().toString().substring(0, 20), - () -> newPivot.getHeader().getNumber()); - - BackwardChain newBackwardChain = - new BackwardChain( - storageProvider, ScheduleBasedBlockHeaderFunctions.create(protocolSchedule), newPivot); - backwardChainMap.put(newPivot.getHeader().getNumber(), newBackwardChain); - this.currentChain.set( - newBackwardChain); // the current ongoing backward sync will finish its current step and end - - currentBackwardSyncFuture.set( - currentBackwardSyncFuture - .get() - .handle( - (unused, error) -> { - if (error != null) { - if ((error.getCause() != null) - && (error.getCause() instanceof BackwardSyncException)) { - LOG.info( - "Previous Backward sync ended exceptionally with message {}", - error.getMessage()); - } else { - LOG.info( - "Previous Backward sync ended exceptionally with message {}", - error.getMessage()); - if (error instanceof RuntimeException) { - throw (RuntimeException) error; - } else { - throw new BackwardSyncException(error); - } - } - } else { - LOG.info("The previous backward sync finished without and exception"); - } - - return newBackwardChain; - }) - .thenCompose(this::prepareBackwardSyncFutureWithRetry)); - return currentBackwardSyncFuture.get(); + "Starting new backward sync towards a pivot {}({})", + () -> newPivot.getHeader().getNumber(), + () -> newPivot.getHash().toHexString()); + this.currentBackwardSyncFuture.set(prepareBackwardSyncFutureWithRetry()); + return this.currentBackwardSyncFuture.get(); } - private CompletableFuture prepareBackwardSyncFutureWithRetry( - final BackwardChain backwardChain) { + private CompletableFuture prepareBackwardSyncFutureWithRetry() { - CompletableFuture f = prepareBackwardSyncFuture(backwardChain); + CompletableFuture f = prepareBackwardSyncFuture(); for (int i = 0; i < MAX_RETRIES; i++) { f = f.thenApply(CompletableFuture::completedFuture) @@ -207,13 +136,13 @@ private CompletableFuture prepareBackwardSyncFutureWithRetry( return ethContext .getScheduler() .scheduleFutureTask( - () -> prepareBackwardSyncFuture(backwardChain), Duration.ofSeconds(5)); + () -> prepareBackwardSyncFuture(), Duration.ofSeconds(5)); }) .thenCompose(Function.identity()); } return f.handle( (unused, throwable) -> { - this.cleanup(backwardChain); + this.currentBackwardSyncFuture.set(null); if (throwable != null) { throw new BackwardSyncException(throwable); } @@ -221,20 +150,8 @@ private CompletableFuture prepareBackwardSyncFutureWithRetry( }); } - private CompletableFuture prepareBackwardSyncFuture(final BackwardChain backwardChain) { - return new BackwardSyncPhase(this, backwardChain) - .executeAsync(null) - .thenCompose(new ForwardSyncPhase(this, backwardChain)::executeAsync); - } - - private void cleanup(final BackwardChain chain) { - if (currentChain.compareAndSet(chain, null)) { - this.currentBackwardSyncFuture.set(null); - } - } - - public Optional getCurrentChain() { - return Optional.ofNullable(currentChain.get()); + private CompletableFuture prepareBackwardSyncFuture() { + return executeNextStep(null); } public ProtocolSchedule getProtocolSchedule() { @@ -261,15 +178,96 @@ public BlockValidator getBlockValidatorForBlock(final Block block) { return getBlockValidator(block.getHeader().getNumber()); } - public Optional findCorrectChainFromPivot(final long number) { - return Optional.ofNullable(backwardChainMap.get(number)); + public boolean isOnTTD() { + return syncState.hasReachedTerminalDifficulty().orElse(false); } - public void putCurrentChainToHeight(final long height, final BackwardChain backwardChain) { - backwardChainMap.put(height, backwardChain); + public CompletableFuture stop() { + return currentBackwardSyncFuture.get(); } - public boolean isOnTTD() { - return syncState.hasReachedTerminalDifficulty().orElse(false); + public CompletableFuture executeNextStep(final Void unused) { + final Optional firstHash = backwardChain.getFirstHash(); + if (firstHash.isPresent()) { + return executeSyncStep(firstHash.get()); + } + if (!isOnTTD()) { + return waitForTTD().thenCompose(this::executeNextStep); + } + final Optional firstAncestorHeader = backwardChain.getFirstAncestorHeader(); + if (firstAncestorHeader.isEmpty()) { + LOG.info("The Backward sync is done..."); + return CompletableFuture.completedFuture(null); + } + if (getProtocolContext().getBlockchain().getChainHead().getHeight() + > firstAncestorHeader.get().getNumber() - 1) { + LOG.info( + "Backward reached bellow previous head {}({}) : {} ({})", + getProtocolContext().getBlockchain().getChainHead().getHeight(), + getProtocolContext().getBlockchain().getChainHead().getHash().toHexString(), + firstAncestorHeader.get().getNumber(), + firstAncestorHeader.get().getHash()); + } + if (getProtocolContext().getBlockchain().contains(firstAncestorHeader.get().getParentHash())) { + return executeForwardAsync(firstAncestorHeader.get()); + } + return executeBackwardAsync(firstAncestorHeader.get()); + } + + private CompletableFuture executeSyncStep(final Hash hash) { + return new SyncStepStep(this, backwardChain).executeAsync(hash); + } + + @VisibleForTesting + protected CompletableFuture executeBackwardAsync(final BlockHeader firstHeader) { + return new BackwardSyncStep(this, backwardChain).executeAsync(firstHeader); + } + + @VisibleForTesting + protected CompletableFuture executeForwardAsync(final BlockHeader firstHeader) { + return new ForwardSyncStep(this, backwardChain).executeAsync(); + } + + @VisibleForTesting + protected CompletableFuture waitForTTD() { + final CountDownLatch latch = new CountDownLatch(1); + final long id = + syncState.subscribeTTDReached( + reached -> { + if (reached) { + latch.countDown(); + } + }); + return CompletableFuture.runAsync( + () -> { + try { + if (!isOnTTD()) { + LOG.info("Waiting for TTD..."); + final boolean await = latch.await(2, TimeUnit.MINUTES); + if (await) { + LOG.info("TTD reached..."); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new BackwardSyncException("Wait for TTD was interrupted"); + } finally { + syncState.unsubscribeTTDReached(id); + } + }); + } + + // In rare case when we request too many headers/blocks we get response that does not contain all + // data and we might want to retry with smaller batch size + public int getBatchSize() { + return batchSize; + } + + public void halveBatchSize() { + this.batchSize = batchSize / 2 + 1; + } + + public void resetBatchSize() { + this.batchSize = BATCH_SIZE; } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupService.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupService.java deleted file mode 100644 index 700ec2c046a..00000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupService.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright Hyperledger Besu Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.sync.backwardsync; - -import static org.slf4j.LoggerFactory.getLogger; - -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.ProtocolContext; -import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; -import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlockFromPeersTask; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; -import org.hyperledger.besu.plugin.services.MetricsSystem; - -import java.time.Duration; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; - -import org.slf4j.Logger; - -@ThreadSafe -public class BackwardSyncLookupService { - private static final Logger LOG = getLogger(BackwardSyncLookupService.class); - private static final int MAX_RETRIES = 100; - public static final int UNUSED = -1; - - @GuardedBy("this") - private final Queue hashes = new ArrayDeque<>(); - - @GuardedBy("this") - boolean running = false; - - private final ProtocolSchedule protocolSchedule; - private final EthContext ethContext; - private final MetricsSystem metricsSystem; - private List results = new ArrayList<>(); - private final ProtocolContext protocolContext; - - public BackwardSyncLookupService( - final ProtocolSchedule protocolSchedule, - final EthContext ethContext, - final MetricsSystem metricsSystem, - final ProtocolContext protocolContext) { - this.protocolSchedule = protocolSchedule; - this.ethContext = ethContext; - this.metricsSystem = metricsSystem; - this.protocolContext = protocolContext; - } - - public CompletableFuture> lookup(final Hash newBlockhash) { - synchronized (this) { - hashes.add(newBlockhash); - if (running) { - LOG.info( - "some other future is already running and will process our hash {} when time comes...", - newBlockhash.toHexString()); - return CompletableFuture.completedFuture(Collections.emptyList()); - } - running = true; - } - return findBlocksWithRetries() - .handle( - (blocks, throwable) -> { - synchronized (this) { - running = false; - } - if (throwable != null) { - throw new BackwardSyncException(throwable); - } - return blocks; - }); - } - - private CompletableFuture> findBlocksWithRetries() { - - CompletableFuture> f = tryToFindBlocks(); - for (int i = 0; i < MAX_RETRIES; i++) { - f = - f.thenApply(CompletableFuture::completedFuture) - .exceptionally( - ex -> { - synchronized (this) { - if (!results.isEmpty()) { - List copy = new ArrayList<>(results); - results = new ArrayList<>(); - return CompletableFuture.completedFuture(copy); - } - } - LOG.error( - "Failed to fetch blocks because {} Current peers: {}. Waiting for few seconds ...", - ex.getMessage(), - ethContext.getEthPeers().peerCount()); - return ethContext - .getScheduler() - .scheduleFutureTask(this::tryToFindBlocks, Duration.ofSeconds(5)); - }) - .thenCompose(Function.identity()); - } - return f.thenApply(this::rememberResults).thenCompose(this::possibleNextHash); - } - - private CompletableFuture> tryToFindBlocks() { - return CompletableFuture.supplyAsync(this::getNextHash) - .thenCompose(this::tryToFindBlock) - .thenApply(this::rememberResult) - .thenCompose(this::possibleNextHash); - } - - private CompletableFuture> possibleNextHash(final List blocks) { - synchronized (this) { - hashes.poll(); - if (hashes.isEmpty()) { - results = new ArrayList<>(); - running = false; - return CompletableFuture.completedFuture(blocks); - } - } - return tryToFindBlocks(); - } - - private List rememberResult(final Block block) { - this.results.add(block); - return results; - } - - private List rememberResults(final List blocks) { - this.results.addAll(blocks); - return results; - } - - private synchronized Hash getNextHash() { - return hashes.peek(); - } - - private CompletableFuture tryToFindBlock(final Hash targetHash) { - - final RetryingGetBlockFromPeersTask getBlockTask = - RetryingGetBlockFromPeersTask.create( - protocolContext, - protocolSchedule, - ethContext, - metricsSystem, - ethContext.getEthPeers().getMaxPeers(), - Optional.of(targetHash), - UNUSED); - return ethContext - .getScheduler() - .scheduleSyncWorkerTask(getBlockTask::run) - .thenApply(AbstractPeerTask.PeerTaskResult::getResult); - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhase.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhase.java deleted file mode 100644 index f40a4bc77c2..00000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhase.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Copyright Hyperledger Besu Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.sync.backwardsync; - -import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; -import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda; - -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.eth.manager.task.GetHeadersFromPeerByHashTask; -import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetHeadersEndingAtFromPeerByHashTask; - -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class BackwardSyncPhase extends BackwardSyncTask { - private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncPhase.class); - - public BackwardSyncPhase(final BackwardSyncContext context, final BackwardChain backwardChain) { - super(context, backwardChain); - } - - @VisibleForTesting - protected CompletableFuture waitForTTD() { - if (context.isOnTTD()) { - return CompletableFuture.completedFuture(null); - } - LOG.debug("Did not reach TTD yet, falling asleep..."); - return context - .getEthContext() - .getScheduler() - .scheduleFutureTask(this::waitForTTD, Duration.ofSeconds(5)); - } - - @Override - public CompletableFuture executeStep() { - return CompletableFuture.supplyAsync(this::waitForTTD) - .thenCompose(Function.identity()) - .thenApply(this::earliestUnprocessedHash) - .thenCompose(this::requestHeaders) - .thenApply(this::saveHeaders) - .thenApply(this::possibleMerge) - .thenCompose(this::possiblyMoreBackwardSteps); - } - - @VisibleForTesting - protected Hash earliestUnprocessedHash(final Void unused) { - BlockHeader firstHeader = - backwardChain - .getFirstAncestorHeader() - .orElseThrow( - () -> - new BackwardSyncException( - "No unprocessed hashes during backward sync. that is probably a bug.", - true)); - Hash parentHash = firstHeader.getParentHash(); - debugLambda( - LOG, - "First unprocessed hash for current pivot is {} expected on height {}", - parentHash::toHexString, - () -> firstHeader.getNumber() - 1); - return parentHash; - } - - @VisibleForTesting - protected CompletableFuture requestHeader(final Hash hash) { - debugLambda(LOG, "Requesting header for hash {}", hash::toHexString); - return GetHeadersFromPeerByHashTask.forSingleHash( - context.getProtocolSchedule(), - context.getEthContext(), - hash, - context.getProtocolContext().getBlockchain().getChainHead().getHeight(), - context.getMetricsSystem()) - .run() - .thenApply( - peerResult -> { - final List result = peerResult.getResult(); - if (result.isEmpty()) { - throw new BackwardSyncException( - "Did not receive a header for hash {}" + hash.toHexString(), true); - } - BlockHeader blockHeader = result.get(0); - debugLambda( - LOG, - "Got header {} with height {}", - () -> blockHeader.getHash().toHexString(), - blockHeader::getNumber); - return blockHeader; - }); - } - - @VisibleForTesting - protected CompletableFuture> requestHeaders(final Hash hash) { - debugLambda(LOG, "Requesting header for hash {}", hash::toHexString); - final RetryingGetHeadersEndingAtFromPeerByHashTask - retryingGetHeadersEndingAtFromPeerByHashTask = - RetryingGetHeadersEndingAtFromPeerByHashTask.endingAtHash( - context.getProtocolSchedule(), - context.getEthContext(), - hash, - context.getProtocolContext().getBlockchain().getChainHead().getHeight(), - BackwardSyncContext.BATCH_SIZE, - context.getMetricsSystem()); - return context - .getEthContext() - .getScheduler() - .scheduleSyncWorkerTask(retryingGetHeadersEndingAtFromPeerByHashTask::run) - .thenApply( - blockHeaders -> { - if (blockHeaders.isEmpty()) { - throw new BackwardSyncException( - "Did not receive a header for hash {}" + hash.toHexString(), true); - } - debugLambda( - LOG, - "Got headers {} -> {}", - blockHeaders.get(0)::getNumber, - blockHeaders.get(blockHeaders.size() - 1)::getNumber); - return blockHeaders; - }); - } - - @VisibleForTesting - protected Void saveHeader(final BlockHeader blockHeader) { - backwardChain.prependAncestorsHeader(blockHeader); - context.putCurrentChainToHeight(blockHeader.getNumber(), backwardChain); - return null; - } - - @VisibleForTesting - protected Void saveHeaders(final List blockHeaders) { - for (BlockHeader blockHeader : blockHeaders) { - saveHeader(blockHeader); - } - infoLambda( - LOG, - "Saved headers {} -> {}", - () -> blockHeaders.get(0).getNumber(), - () -> blockHeaders.get(blockHeaders.size() - 1).getNumber()); - return null; - } - - @VisibleForTesting - protected BlockHeader possibleMerge(final Void unused) { - Optional maybeHistoricalBackwardChain = - context.findCorrectChainFromPivot( - backwardChain.getFirstAncestorHeader().orElseThrow().getNumber() - 1); - maybeHistoricalBackwardChain.ifPresent(backwardChain::prependChain); - return backwardChain.getFirstAncestorHeader().orElseThrow(); - } - - // if the previous header is not present yet, we need to go deeper - @VisibleForTesting - protected CompletableFuture possiblyMoreBackwardSteps(final BlockHeader blockHeader) { - CompletableFuture completableFuture = new CompletableFuture<>(); - if (context.getProtocolContext().getBlockchain().contains(blockHeader.getHash())) { - LOG.info("Backward Phase finished."); - completableFuture.complete(null); - return completableFuture; - } - if (context.getProtocolContext().getBlockchain().getChainHead().getHeight() - > blockHeader.getNumber() - 1) { - LOG.warn( - "Backward sync is following unknown branch {} ({}) and reached bellow previous head {}({})", - blockHeader.getNumber(), - blockHeader.getHash(), - context.getProtocolContext().getBlockchain().getChainHead().getHeight(), - context.getProtocolContext().getBlockchain().getChainHead().getHash().toHexString()); - } - LOG.debug("Backward sync did not reach a know block, need to go deeper"); - completableFuture.complete(null); - return completableFuture.thenCompose(this::executeAsync); - } -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java new file mode 100644 index 00000000000..6f39c1ad696 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStep.java @@ -0,0 +1,112 @@ +/* + * Copyright Hyperledger Besu Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.sync.backwardsync; + +import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetHeadersEndingAtFromPeerByHashTask; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BackwardSyncStep { + private static final Logger LOG = LoggerFactory.getLogger(BackwardSyncStep.class); + private final BackwardSyncContext context; + private final BackwardChain backwardChain; + + public BackwardSyncStep(final BackwardSyncContext context, final BackwardChain backwardChain) { + this.context = context; + this.backwardChain = backwardChain; + } + + public CompletableFuture executeAsync(final BlockHeader firstHeader) { + return CompletableFuture.supplyAsync(() -> firstHeader) + .thenApply(this::possibleRestoreOldNodes) + .thenCompose(this::requestHeaders) + .thenApply(this::saveHeaders) + .thenCompose(context::executeNextStep); + } + + @VisibleForTesting + protected Hash possibleRestoreOldNodes(final BlockHeader firstAncestor) { + Hash lastHash = firstAncestor.getParentHash(); + Optional iterator = backwardChain.getHeader(lastHash); + while (iterator.isPresent()) { + backwardChain.prependAncestorsHeader(iterator.get()); + lastHash = iterator.get().getParentHash(); + iterator = backwardChain.getHeader(lastHash); + } + return lastHash; + } + + @VisibleForTesting + protected CompletableFuture> requestHeaders(final Hash hash) { + debugLambda(LOG, "Requesting header for hash {}", hash::toHexString); + final RetryingGetHeadersEndingAtFromPeerByHashTask + retryingGetHeadersEndingAtFromPeerByHashTask = + RetryingGetHeadersEndingAtFromPeerByHashTask.endingAtHash( + context.getProtocolSchedule(), + context.getEthContext(), + hash, + context.getProtocolContext().getBlockchain().getChainHead().getHeight(), + context.getBatchSize(), + context.getMetricsSystem()); + return context + .getEthContext() + .getScheduler() + .scheduleSyncWorkerTask(retryingGetHeadersEndingAtFromPeerByHashTask::run) + .thenApply( + blockHeaders -> { + if (blockHeaders.isEmpty()) { + throw new BackwardSyncException( + "Did not receive a header for hash {}" + hash.toHexString(), true); + } + debugLambda( + LOG, + "Got headers {} -> {}", + blockHeaders.get(0)::getNumber, + blockHeaders.get(blockHeaders.size() - 1)::getNumber); + return blockHeaders; + }); + } + + @VisibleForTesting + protected Void saveHeader(final BlockHeader blockHeader) { + backwardChain.prependAncestorsHeader(blockHeader); + return null; + } + + @VisibleForTesting + protected Void saveHeaders(final List blockHeaders) { + for (BlockHeader blockHeader : blockHeaders) { + saveHeader(blockHeader); + } + infoLambda( + LOG, + "Saved headers {} -> {} (head: {})", + () -> blockHeaders.get(0).getNumber(), + () -> blockHeaders.get(blockHeaders.size() - 1).getNumber(), + () -> context.getProtocolContext().getBlockchain().getChainHead().getHeight()); + return null; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTask.java deleted file mode 100644 index e71ad9a186e..00000000000 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTask.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright Hyperledger Besu Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.sync.backwardsync; - -import static org.slf4j.LoggerFactory.getLogger; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import org.slf4j.Logger; - -public abstract class BackwardSyncTask { - protected BackwardSyncContext context; - protected BackwardChain backwardChain; - private static final Logger LOG = getLogger(BackwardSyncTask.class); - - protected BackwardSyncTask(final BackwardSyncContext context, final BackwardChain backwardChain) { - this.context = context; - this.backwardChain = backwardChain; - } - - CompletableFuture executeAsync(final Void unused) { - Optional currentChain = context.getCurrentChain(); - if (currentChain.isPresent()) { - if (!backwardChain.equals(currentChain.get())) { - LOG.debug( - "The pivot changed, we should stop current flow, some new flow is waiting to take over..."); - return CompletableFuture.completedFuture(null); - } - if (backwardChain.getFirstAncestorHeader().isEmpty()) { - LOG.info("The Backwards sync is already finished..."); - return CompletableFuture.completedFuture(null); - } - return executeStep(); - - } else { - CompletableFuture result = new CompletableFuture<>(); - result.completeExceptionally( - new BackwardSyncException( - "No pivot... that is weird and should not have happened. This method should have been called after the pivot was set...")); - return result; - } - } - - abstract CompletableFuture executeStep(); -} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncPhase.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java similarity index 73% rename from ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncPhase.java rename to ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java index 6bf708636bc..428d8044ad1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncPhase.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStep.java @@ -16,7 +16,7 @@ import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.infoLambda; -import static org.hyperledger.besu.util.Slf4jLambdaHelper.warnLambda; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; @@ -30,31 +30,31 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ForwardSyncPhase extends BackwardSyncTask { +public class ForwardSyncStep { - private static final Logger LOG = LoggerFactory.getLogger(ForwardSyncPhase.class); - private int batchSize = BackwardSyncContext.BATCH_SIZE; + private static final Logger LOG = LoggerFactory.getLogger(ForwardSyncStep.class); + private final BackwardSyncContext context; + private final BackwardChain backwardChain; - public ForwardSyncPhase(final BackwardSyncContext context, final BackwardChain backwardChain) { - super(context, backwardChain); + public ForwardSyncStep(final BackwardSyncContext context, final BackwardChain backwardChain) { + this.context = context; + this.backwardChain = backwardChain; } - @Override - public CompletableFuture executeStep() { + public CompletableFuture executeAsync() { return CompletableFuture.supplyAsync(() -> returnFirstNUnknownHeaders(null)) .thenCompose(this::possibleRequestBodies) .thenApply(this::processKnownAncestors) - .thenCompose(this::possiblyMoreForwardSteps); + .thenCompose(context::executeNextStep); } @VisibleForTesting - protected BlockHeader processKnownAncestors(final Void unused) { + protected Void processKnownAncestors(final Void unused) { while (backwardChain.getFirstAncestorHeader().isPresent()) { BlockHeader header = backwardChain.getFirstAncestorHeader().orElseThrow(); if (context.getProtocolContext().getBlockchain().contains(header.getHash())) { @@ -63,7 +63,8 @@ protected BlockHeader processKnownAncestors(final Void unused) { "Block {} is already imported, we can ignore it for the sync process", () -> header.getHash().toHexString()); backwardChain.dropFirstHeader(); - } else if (backwardChain.isTrusted(header.getHash())) { + } else if (context.getProtocolContext().getBlockchain().contains(header.getParentHash()) + && backwardChain.isTrusted(header.getHash())) { debugLambda( LOG, "Importing trusted block {}({})", @@ -72,7 +73,7 @@ protected BlockHeader processKnownAncestors(final Void unused) { saveBlock(backwardChain.getTrustedBlock(header.getHash())); } else { debugLambda(LOG, "First unprocessed header is {}", header::getNumber); - return header; + return null; } } return null; @@ -86,7 +87,7 @@ protected List returnFirstNUnknownHeaders(final Void unused) { debugLambda( LOG, "Block {}({}) is already imported, we can ignore it for the sync process", - () -> header.getNumber(), + header::getNumber, () -> header.getHash().toHexString()); backwardChain.dropFirstHeader(); } else if (backwardChain.isTrusted(header.getHash())) { @@ -96,7 +97,7 @@ protected List returnFirstNUnknownHeaders(final Void unused) { () -> header.getHash().toHexString()); saveBlock(backwardChain.getTrustedBlock(header.getHash())); } else { - return backwardChain.getFirstNAncestorHeaders(batchSize); + return backwardChain.getFirstNAncestorHeaders(context.getBatchSize()); } } return Collections.emptyList(); @@ -167,7 +168,7 @@ protected CompletableFuture> requestBodies(final List b @VisibleForTesting protected Void saveBlock(final Block block) { - debugLambda(LOG, "Going to validate block {}", () -> block.getHeader().getHash().toHexString()); + traceLambda(LOG, "Going to validate block {}", () -> block.getHeader().getHash().toHexString()); var optResult = context .getBlockValidatorForBlock(block) @@ -179,7 +180,7 @@ protected Void saveBlock(final Block block) { optResult.blockProcessingOutputs.ifPresent( result -> { - debugLambda( + traceLambda( LOG, "Block {} was validated, going to import it", () -> block.getHeader().getHash().toHexString()); @@ -191,6 +192,11 @@ protected Void saveBlock(final Block block) { @VisibleForTesting protected Void saveBlocks(final List blocks) { + if (blocks.isEmpty()) { + LOG.info("No blocks to save..."); + context.halveBatchSize(); + return null; + } for (Block block : blocks) { final Optional parent = @@ -199,59 +205,20 @@ protected Void saveBlocks(final List blocks) { .getBlockchain() .getBlockByHash(block.getHeader().getParentHash()); if (parent.isEmpty()) { - batchSize = batchSize / 2 + 1; + context.halveBatchSize(); return null; } else { - batchSize = BackwardSyncContext.BATCH_SIZE; saveBlock(block); } } - backwardChain.commit(); infoLambda( LOG, - "Saved blocks {}->{}", + "Saved blocks {} -> {} (target: {})", () -> blocks.get(0).getHeader().getNumber(), - () -> blocks.get(blocks.size() - 1).getHeader().getNumber()); + () -> blocks.get(blocks.size() - 1).getHeader().getNumber(), + () -> + backwardChain.getPivot().orElse(blocks.get(blocks.size() - 1)).getHeader().getNumber()); + context.resetBatchSize(); return null; } - - @VisibleForTesting - protected CompletableFuture possiblyMoreForwardSteps(final BlockHeader firstNotSynced) { - CompletableFuture completableFuture = CompletableFuture.completedFuture(null); - if (firstNotSynced == null) { - final List successors = backwardChain.getSuccessors(); - LOG.info( - "Forward Sync Phase is finished. Importing {} block(s) provided by consensus layer...", - successors.size()); - successors.forEach( - block -> { - if (!context.getProtocolContext().getBlockchain().contains(block.getHash())) { - saveBlock(block); - } - }); - LOG.info("The Backward sync is done..."); - backwardChain.clear(); - return CompletableFuture.completedFuture(null); - } - if (context.getProtocolContext().getBlockchain().contains(firstNotSynced.getParentHash())) { - debugLambda( - LOG, - "Block {} is not yet imported, we need to run another step of ForwardSync", - firstNotSynced::toLogString); - return completableFuture.thenCompose(this::executeAsync); - } - - warnLambda( - LOG, - "Block {} is not yet imported but its parent {} is not imported either... " - + "This should not normally happen and indicates a wrong behaviour somewhere...", - firstNotSynced::toLogString, - () -> firstNotSynced.getParentHash().toHexString()); - return completableFuture.thenCompose(this::executeBackwardAsync); - } - - @VisibleForTesting - protected CompletionStage executeBackwardAsync(final Void unused) { - return new BackwardSyncPhase(context, backwardChain).executeAsync(unused); - } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java index 7e5de1bbd64..eae0b4e3bf3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/GenericKeyValueStorageFacade.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Map; import java.util.Optional; public class GenericKeyValueStorageFacade implements Closeable { @@ -40,12 +41,22 @@ public Optional get(final K key) { return storage.get(keyConvertor.toBytes(key)).map(valueConvertor::fromBytes); } + public Optional get(final byte[] key) { + return storage.get(key).map(valueConvertor::fromBytes); + } + public void put(final K key, final V value) { final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction(); keyValueStorageTransaction.put(keyConvertor.toBytes(key), valueConvertor.toBytes(value)); keyValueStorageTransaction.commit(); } + public void put(final byte[] key, final V value) { + final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction(); + keyValueStorageTransaction.put(key, valueConvertor.toBytes(value)); + keyValueStorageTransaction.commit(); + } + public void drop(final K key) { storage.tryDelete(keyConvertor.toBytes(key)); } @@ -58,4 +69,13 @@ public void clear() { public void close() throws IOException { storage.close(); } + + public void putAll(final Map map) { + final KeyValueStorageTransaction keyValueStorageTransaction = storage.startTransaction(); + for (Map.Entry entry : map.entrySet()) { + keyValueStorageTransaction.put( + keyConvertor.toBytes(entry.getKey()), valueConvertor.toBytes(entry.getValue())); + } + keyValueStorageTransaction.commit(); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/HashConvertor.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/HashConvertor.java new file mode 100644 index 00000000000..ac3a5585a26 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/HashConvertor.java @@ -0,0 +1,34 @@ +/* + * + * * Copyright Hyperledger Besu Contributors. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.hyperledger.besu.ethereum.eth.sync.backwardsync; + +import org.hyperledger.besu.datatypes.Hash; + +import org.apache.tuweni.bytes.Bytes32; + +public class HashConvertor implements ValueConvertor { + @Override + public Hash fromBytes(final byte[] bytes) { + return Hash.wrap(Bytes32.wrap(bytes)); + } + + @Override + public byte[] toBytes(final Hash value) { + return value.toArrayUnsafe(); + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java new file mode 100644 index 00000000000..c73d343fe18 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java @@ -0,0 +1,74 @@ +/* + * + * * Copyright Hyperledger Besu Contributors. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * * specific language governing permissions and limitations under the License. + * * + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.hyperledger.besu.ethereum.eth.sync.backwardsync; + +import static org.slf4j.LoggerFactory.getLogger; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask; +import org.hyperledger.besu.ethereum.eth.manager.task.RetryingGetBlockFromPeersTask; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import org.slf4j.Logger; + +public class SyncStepStep { + private static final Logger LOG = getLogger(SyncStepStep.class); + + public static final int UNUSED = -1; + private final BackwardSyncContext context; + private final BackwardChain backwardChain; + + public SyncStepStep(final BackwardSyncContext context, final BackwardChain backwardChain) { + this.context = context; + this.backwardChain = backwardChain; + } + + public CompletableFuture executeAsync(final Hash hash) { + return CompletableFuture.supplyAsync(() -> hash) + .thenCompose(this::requestBlock) + .thenApply(this::saveBlock) + .thenCompose(context::executeNextStep); + } + + private CompletableFuture requestBlock(final Hash targetHash) { + final RetryingGetBlockFromPeersTask getBlockTask = + RetryingGetBlockFromPeersTask.create( + context.getProtocolContext(), + context.getProtocolSchedule(), + context.getEthContext(), + context.getMetricsSystem(), + context.getEthContext().getEthPeers().getMaxPeers(), + Optional.of(targetHash), + UNUSED); + return context + .getEthContext() + .getScheduler() + .scheduleSyncWorkerTask(getBlockTask::run) + .thenApply(AbstractPeerTask.PeerTaskResult::getResult); + } + + private Void saveBlock(final Block block) { + LOG.debug( + "Appending block {}({})", block.getHeader().getNumber(), block.getHash().toHexString()); + backwardChain.appendTrustedBlock(block); + return null; + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java index 7ff7954f761..69e754a33ac 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java @@ -27,6 +27,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; +import org.hyperledger.besu.plugin.services.BesuEvents.TTDReachedListener; import org.hyperledger.besu.util.Subscribers; import java.util.Map; @@ -42,6 +43,7 @@ public class SyncState { private final AtomicLong inSyncSubscriberId = new AtomicLong(); private final Map inSyncTrackers = new ConcurrentHashMap<>(); private final Subscribers syncStatusListeners = Subscribers.create(); + private final Subscribers ttdReachedListeners = Subscribers.create(); private volatile long chainHeightListenerId; private volatile Optional syncTarget = Optional.empty(); private Optional worldStateDownloadStatus = Optional.empty(); @@ -109,10 +111,18 @@ public long subscribeSyncStatus(final SyncStatusListener listener) { return syncStatusListeners.subscribe(listener); } + public long subscribeTTDReached(final TTDReachedListener listener) { + return ttdReachedListeners.subscribe(listener); + } + public boolean unsubscribeSyncStatus(final long listenerId) { return syncStatusListeners.unsubscribe(listenerId); } + public boolean unsubscribeTTDReached(final long listenerId) { + return ttdReachedListeners.unsubscribe(listenerId); + } + public Optional syncStatus() { return syncStatus(syncTarget); } @@ -143,6 +153,7 @@ public void setReachedTerminalDifficulty(final boolean stoppedAtTerminalDifficul // TODO: this is an inexpensive way to stop sync when we reach TTD, // we should revisit when merge sync is better defined this.reachedTerminalDifficulty = Optional.of(stoppedAtTerminalDifficulty); + ttdReachedListeners.forEach(listener -> listener.onTTDReached(stoppedAtTerminalDifficulty)); } public Optional hasReachedTerminalDifficulty() { diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java index ea8b96bce42..f0b5abd8eed 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncContextTest.java @@ -21,7 +21,10 @@ import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.hyperledger.besu.config.StubGenesisConfigOptions; @@ -32,29 +35,35 @@ import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockDataGenerator; -import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; +import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.referencetests.ReferenceTestWorldState; +import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; -import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; @@ -86,13 +95,12 @@ public class BackwardSyncContextTest { @Mock private BlockValidator blockValidator; @Mock private SyncState syncState; - @Mock private BackwardSyncLookupService backwardSyncLookupService; + private BackwardChain backwardChain; @Before public void setup() { when(mockProtocolSpec.getBlockValidator()).thenReturn(blockValidator); when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec); - when(syncState.hasReachedTerminalDifficulty()).thenReturn(Optional.of(true)); Block genesisBlock = blockDataGenerator.genesisBlock(); remoteBlockchain = createInMemoryBlockchain(genesisBlock); localBlockchain = createInMemoryBlockchain(genesisBlock); @@ -126,6 +134,7 @@ public void setup() { new ReferenceTestWorldState(), blockDataGenerator.receipts(block))); }); + backwardChain = newBackwardChain(); context = spy( new BackwardSyncContext( @@ -134,15 +143,31 @@ public void setup() { metricsSystem, ethContext, syncState, - backwardSyncLookupService, - new InMemoryKeyValueStorageProvider())); + backwardChain)); + doReturn(true).when(context).isOnTTD(); + doReturn(2).when(context).getBatchSize(); + } + + private BackwardChain newBackwardChain() { + final GenericKeyValueStorageFacade headersStorage = + new GenericKeyValueStorageFacade<>( + Hash::toArrayUnsafe, + new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()), + new InMemoryKeyValueStorage()); + final GenericKeyValueStorageFacade blocksStorage = + new GenericKeyValueStorageFacade<>( + Hash::toArrayUnsafe, + new BlocksConvertor(new MainnetBlockHeaderFunctions()), + new InMemoryKeyValueStorage()); + final GenericKeyValueStorageFacade chainStorage = + new GenericKeyValueStorageFacade<>( + Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage()); + return new BackwardChain(headersStorage, blocksStorage, chainStorage); } @Test public void shouldSyncUntilHash() throws Exception { final Hash hash = getBlockByNumber(REMOTE_HEIGHT).getHash(); - when(backwardSyncLookupService.lookup(hash)) - .thenReturn(CompletableFuture.completedFuture(List.of(getBlockByNumber(REMOTE_HEIGHT)))); final CompletableFuture future = context.syncBackwardsUntil(hash); respondUntilFutureIsDone(future); @@ -151,15 +176,6 @@ public void shouldSyncUntilHash() throws Exception { assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock()); } - @Test - public void shouldNotAppendWhenAlreadySyncingHash() { - final Hash hash = getBlockByNumber(REMOTE_HEIGHT).getHash(); - when(backwardSyncLookupService.lookup(hash)) - .thenReturn(CompletableFuture.completedFuture(Collections.emptyList())); - final CompletableFuture fut2 = context.syncBackwardsUntil(hash); - assertThat(fut2).isCompleted(); - } - @Test public void shouldSyncUntilRemoteBranch() throws Exception { @@ -189,32 +205,80 @@ public void shouldAddExpectedBlock() throws Exception { assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock()); } + private void respondUntilFutureIsDone(final CompletableFuture future) { + final RespondingEthPeer.Responder responder = + RespondingEthPeer.blockchainResponder(remoteBlockchain); + + peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); + } + + @Nonnull + private Block getBlockByNumber(final int number) { + return remoteBlockchain.getBlockByNumber(number).orElseThrow(); + } + + @Captor ArgumentCaptor captor; + @Test - public void shouldReplaceFlowWhenBlockWasSkipped() throws Exception { + public void shouldWaitWhenTTDNotReached() + throws ExecutionException, InterruptedException, TimeoutException { + doReturn(false).when(context).isOnTTD(); + when(syncState.subscribeTTDReached(any())).thenReturn(88L); - final CompletableFuture future = - context.syncBackwardsUntil(getBlockByNumber(REMOTE_HEIGHT - 10)); + final CompletableFuture voidCompletableFuture = context.waitForTTD(); - final CompletableFuture secondFuture = - context.syncBackwardsUntil(getBlockByNumber(REMOTE_HEIGHT)); + verify(syncState).subscribeTTDReached(captor.capture()); + verify(syncState, never()).unsubscribeTTDReached(anyLong()); + assertThat(voidCompletableFuture).isNotCompleted(); - assertThat(future).isNotSameAs(secondFuture); + captor.getValue().onTTDReached(true); - respondUntilFutureIsDone(secondFuture); + voidCompletableFuture.get(1, TimeUnit.SECONDS); - secondFuture.get(); - assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(remoteBlockchain.getChainHeadBlock()); + verify(syncState).unsubscribeTTDReached(88L); } - private void respondUntilFutureIsDone(final CompletableFuture future) { - final RespondingEthPeer.Responder responder = - RespondingEthPeer.blockchainResponder(remoteBlockchain); + @Test + public void shouldNotWaitWhenTTDReached() + throws ExecutionException, InterruptedException, TimeoutException { + doReturn(true).when(context).isOnTTD(); + when(syncState.subscribeTTDReached(any())).thenReturn(88L); + final CompletableFuture voidCompletableFuture = context.waitForTTD(); + voidCompletableFuture.get(1, TimeUnit.SECONDS); + assertThat(voidCompletableFuture).isCompleted(); + + verify(syncState).subscribeTTDReached(captor.capture()); + verify(syncState).unsubscribeTTDReached(88L); + } - peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); + @Test + public void shouldStartForwardStepWhenOnLocalHeight() { + createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 10); + doReturn(CompletableFuture.completedFuture(null)).when(context).executeForwardAsync(any()); + + context.executeNextStep(null); + verify(context).executeForwardAsync(any()); } - @Nonnull - private Block getBlockByNumber(final int number) { - return remoteBlockchain.getBlockByNumber(number).orElseThrow(); + @Test + public void shouldFinishWhenWorkIsDonw() { + + final CompletableFuture completableFuture = context.executeNextStep(null); + assertThat(completableFuture.isDone()).isTrue(); + } + + @Test + public void shouldCreateAnotherBackwardStepWhenNotOnLocalHeight() { + createBackwardChain(LOCAL_HEIGHT + 3, LOCAL_HEIGHT + 10); + doReturn(CompletableFuture.completedFuture(null)).when(context).executeBackwardAsync(any()); + + context.executeNextStep(null); + verify(context).executeBackwardAsync(any()); + } + + private void createBackwardChain(final int from, final int until) { + for (int i = until; i > from; --i) { + backwardChain.prependAncestorsHeader(getBlockByNumber(i - 1).getHeader()); + } } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupServiceTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupServiceTest.java deleted file mode 100644 index 07b282a188a..00000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncLookupServiceTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright Hyperledger Besu Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.hyperledger.besu.ethereum.eth.sync.backwardsync; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import org.hyperledger.besu.config.StubGenesisConfigOptions; -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.ProtocolContext; -import org.hyperledger.besu.ethereum.chain.MutableBlockchain; -import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.core.BlockDataGenerator; -import org.hyperledger.besu.ethereum.core.TransactionReceipt; -import org.hyperledger.besu.ethereum.eth.manager.EthContext; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; -import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; -import org.hyperledger.besu.ethereum.mainnet.MainnetProtocolSchedule; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; -import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; -import org.hyperledger.besu.plugin.services.MetricsSystem; - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import javax.annotation.Nonnull; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Answers; -import org.mockito.Mock; -import org.mockito.Spy; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class BackwardSyncLookupServiceTest { - public static final int REMOTE_HEIGHT = 50; - private static final BlockDataGenerator blockDataGenerator = new BlockDataGenerator(); - - @Spy - private final ProtocolSchedule protocolSchedule = - MainnetProtocolSchedule.fromConfig(new StubGenesisConfigOptions()); - - @Spy private final ProtocolSpec mockProtocolSpec = protocolSchedule.getByBlockNumber(0L); - private MutableBlockchain remoteBlockchain; - private RespondingEthPeer peer; - - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private MetricsSystem metricsSystem; - - @Mock private ProtocolContext protocolContext; - - private BackwardSyncLookupService backwardSyncLookupService; - - @Before - public void setup() throws NoSuchFieldException, IllegalAccessException { - when(protocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec); - Block genesisBlock = blockDataGenerator.genesisBlock(); - remoteBlockchain = createInMemoryBlockchain(genesisBlock); - final Field max_retries = BackwardSyncLookupService.class.getDeclaredField("MAX_RETRIES"); - - Field modifiersField = Field.class.getDeclaredField("modifiers"); - modifiersField.setAccessible(true); - modifiersField.setInt(max_retries, max_retries.getModifiers() & ~Modifier.FINAL); - - max_retries.setAccessible(true); - max_retries.set(null, 1); - - for (int i = 1; i <= REMOTE_HEIGHT; i++) { - final BlockDataGenerator.BlockOptions options = - new BlockDataGenerator.BlockOptions() - .setBlockNumber(i) - .setParentHash(remoteBlockchain.getBlockHashByNumber(i - 1).orElseThrow()); - final Block block = blockDataGenerator.block(options); - final List receipts = blockDataGenerator.receipts(block); - - remoteBlockchain.appendBlock(block, receipts); - } - EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); - - peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); - EthContext ethContext = ethProtocolManager.ethContext(); - - backwardSyncLookupService = - spy( - new BackwardSyncLookupService( - protocolSchedule, ethContext, metricsSystem, protocolContext)); - } - - @Test - public void shouldFindABlockWhenResponding() throws Exception { - final Hash hash = getBlockByNumber(23).getHash(); - - final CompletableFuture> future = backwardSyncLookupService.lookup(hash); - - respondUntilFutureIsDone(future); - - final List blocks = future.get(); - assertThat(blocks.get(0)).isEqualTo(getBlockByNumber(23)); - } - - private void respondUntilFutureIsDone(final CompletableFuture future) { - final RespondingEthPeer.Responder responder = - RespondingEthPeer.blockchainResponder(remoteBlockchain); - - peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); - } - - @Nonnull - private Block getBlockByNumber(final int number) { - return remoteBlockchain.getBlockByNumber(number).orElseThrow(); - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhaseTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStepTest.java similarity index 58% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhaseTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStepTest.java index db4682be762..841e48669ad 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncPhaseTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncStepTest.java @@ -17,8 +17,6 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -40,10 +38,7 @@ import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.junit.Before; @@ -55,7 +50,7 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class BackwardSyncPhaseTest { +public class BackwardSyncStepTest { public static final int REMOTE_HEIGHT = 50; public static final int LOCAL_HEIGHT = 25; @@ -71,6 +66,7 @@ public class BackwardSyncPhaseTest { private RespondingEthPeer peer; GenericKeyValueStorageFacade headersStorage; GenericKeyValueStorageFacade blocksStorage; + GenericKeyValueStorageFacade chainStorage; @Before public void setup() { @@ -85,6 +81,10 @@ public void setup() { new BlocksConvertor(new MainnetBlockHeaderFunctions()), new InMemoryKeyValueStorage()); + chainStorage = + new GenericKeyValueStorageFacade<>( + Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage()); + Block genesisBlock = blockDataGenerator.genesisBlock(); remoteBlockchain = createInMemoryBlockchain(genesisBlock); MutableBlockchain localBlockchain = createInMemoryBlockchain(genesisBlock); @@ -104,6 +104,9 @@ public void setup() { } when(context.getProtocolContext().getBlockchain()).thenReturn(localBlockchain); when(context.getProtocolSchedule()).thenReturn(protocolSchedule); + when(context.getBatchSize()).thenReturn(5); + when(context.executeNextStep(null)).thenReturn(CompletableFuture.completedFuture(null)); + EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); @@ -111,26 +114,17 @@ public void setup() { when(context.getEthContext()).thenReturn(ethContext); } - @Test - public void shouldWaitWhenTTDNotReached() - throws ExecutionException, InterruptedException, TimeoutException { - final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3); - when(context.isOnTTD()).thenReturn(false).thenReturn(false).thenReturn(true); - BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain); - step.waitForTTD(); - verify(context, Mockito.times(2)).getEthContext(); - } - @Test public void shouldFindHeaderWhenRequested() throws Exception { final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3); - when(context.isOnTTD()).thenReturn(true); - BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain); + when(context.getBatchSize()).thenReturn(5); + BackwardSyncStep step = spy(new BackwardSyncStep(context, backwardChain)); final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(remoteBlockchain); - final CompletableFuture future = step.executeStep(); + final CompletableFuture future = + step.executeAsync(backwardChain.getFirstAncestorHeader().orElseThrow()); peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); future.get(); } @@ -138,49 +132,38 @@ public void shouldFindHeaderWhenRequested() throws Exception { @Test public void shouldFindHashToSync() { - BackwardSyncPhase step = - new BackwardSyncPhase(context, createBackwardChain(REMOTE_HEIGHT - 4, REMOTE_HEIGHT)); - - final Hash hash = step.earliestUnprocessedHash(null); - + final BackwardChain backwardChain = createBackwardChain(REMOTE_HEIGHT - 4, REMOTE_HEIGHT); + BackwardSyncStep step = new BackwardSyncStep(context, backwardChain); + final Hash hash = + step.possibleRestoreOldNodes(backwardChain.getFirstAncestorHeader().orElseThrow()); assertThat(hash).isEqualTo(getBlockByNumber(REMOTE_HEIGHT - 4).getHeader().getParentHash()); } - @Test - public void shouldFailWhenNothingToSync() { - final BackwardChain chain = createBackwardChain(REMOTE_HEIGHT); - chain.dropFirstHeader(); - BackwardSyncPhase step = new BackwardSyncPhase(context, chain); - assertThatThrownBy(() -> step.earliestUnprocessedHash(null)) - .isInstanceOf(BackwardSyncException.class) - .hasMessageContaining("No unprocessed hashes during backward sync"); - } - @Test public void shouldRequestHeaderWhenAsked() throws Exception { - BackwardSyncPhase step = new BackwardSyncPhase(context, createBackwardChain(REMOTE_HEIGHT - 1)); + BackwardSyncStep step = new BackwardSyncStep(context, createBackwardChain(REMOTE_HEIGHT - 1)); final Block lookingForBlock = getBlockByNumber(REMOTE_HEIGHT - 2); final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(remoteBlockchain); - final CompletableFuture future = - step.requestHeader(lookingForBlock.getHeader().getHash()); + final CompletableFuture> future = + step.requestHeaders(lookingForBlock.getHeader().getHash()); peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); - final BlockHeader blockHeader = future.get(); + final BlockHeader blockHeader = future.get().get(0); assertThat(blockHeader).isEqualTo(lookingForBlock.getHeader()); } @Test public void shouldThrowWhenResponseIsEmptyWhenRequestingHeader() throws Exception { - BackwardSyncPhase step = new BackwardSyncPhase(context, createBackwardChain(REMOTE_HEIGHT - 1)); + BackwardSyncStep step = new BackwardSyncStep(context, createBackwardChain(REMOTE_HEIGHT - 1)); final Block lookingForBlock = getBlockByNumber(REMOTE_HEIGHT - 2); final RespondingEthPeer.Responder responder = RespondingEthPeer.emptyResponder(); - final CompletableFuture future = - step.requestHeader(lookingForBlock.getHeader().getHash()); + final CompletableFuture> future = + step.requestHeaders(lookingForBlock.getHeader().getHash()); peer.respondWhileOtherThreadsWork(responder, () -> !future.isDone()); assertThatThrownBy(future::get) @@ -194,80 +177,11 @@ public void shouldSaveHeaderDelegatesProperly() { final BackwardChain chain = Mockito.mock(BackwardChain.class); final BlockHeader header = Mockito.mock(BlockHeader.class); - when(header.getNumber()).thenReturn(12345L); - - BackwardSyncPhase step = new BackwardSyncPhase(context, chain); + BackwardSyncStep step = new BackwardSyncStep(context, chain); step.saveHeader(header); verify(chain).prependAncestorsHeader(header); - verify(context).putCurrentChainToHeight(12345L, chain); - } - - @Test - public void shouldMergeWhenPossible() { - BackwardChain backwardChain = createBackwardChain(REMOTE_HEIGHT - 3, REMOTE_HEIGHT); - backwardChain = spy(backwardChain); - BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain); - - final BackwardChain historicalChain = - createBackwardChain(REMOTE_HEIGHT - 10, REMOTE_HEIGHT - 4); - when(context.findCorrectChainFromPivot(REMOTE_HEIGHT - 4)) - .thenReturn(Optional.of(historicalChain)); - - assertThat(backwardChain.getFirstAncestorHeader().orElseThrow()) - .isEqualTo(getBlockByNumber(REMOTE_HEIGHT - 3).getHeader()); - step.possibleMerge(null); - assertThat(backwardChain.getFirstAncestorHeader().orElseThrow()) - .isEqualTo(getBlockByNumber(REMOTE_HEIGHT - 10).getHeader()); - - verify(backwardChain).prependChain(historicalChain); - } - - @Test - public void shouldNotMergeWhenNotPossible() { - BackwardChain backwardChain = createBackwardChain(REMOTE_HEIGHT - 5, REMOTE_HEIGHT); - backwardChain = spy(backwardChain); - when(context.findCorrectChainFromPivot(any(Long.class))).thenReturn(Optional.empty()); - BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain); - - step.possibleMerge(null); - - verify(backwardChain, never()).prependChain(any()); - } - - @Test - public void shouldFinishWhenNoMoreSteps() { - BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 10); - BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain); - - final CompletableFuture completableFuture = - step.possiblyMoreBackwardSteps(getBlockByNumber(LOCAL_HEIGHT).getHeader()); - - assertThat(completableFuture.isDone()).isTrue(); - assertThat(completableFuture.isCompletedExceptionally()).isFalse(); - } - - @Test - public void shouldFinishExceptionallyWhenHeaderIsBellowBlockchainHeightButUnknown() { - BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 10); - BackwardSyncPhase step = new BackwardSyncPhase(context, backwardChain); - - final CompletableFuture completableFuture = - step.possiblyMoreBackwardSteps( - ChainForTestCreator.createEmptyBlock((long) LOCAL_HEIGHT - 1).getHeader()); - - assertThat(completableFuture.isCompletedExceptionally()).isTrue(); - } - - @Test - public void shouldCreateAnotherStepWhenThereIsWorkToBeDone() { - BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3, LOCAL_HEIGHT + 10); - BackwardSyncPhase step = spy(new BackwardSyncPhase(context, backwardChain)); - - step.possiblyMoreBackwardSteps(backwardChain.getFirstAncestorHeader().orElseThrow()); - - verify(step).executeAsync(any()); } private BackwardChain createBackwardChain(final int from, final int until) { @@ -280,8 +194,10 @@ private BackwardChain createBackwardChain(final int from, final int until) { @Nonnull private BackwardChain createBackwardChain(final int number) { - return new BackwardChain( - headersStorage, blocksStorage, remoteBlockchain.getBlockByNumber(number).orElseThrow()); + final BackwardChain backwardChain = + new BackwardChain(headersStorage, blocksStorage, chainStorage); + backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow()); + return backwardChain; } @Nonnull diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTaskTest.java deleted file mode 100644 index fb9514d7668..00000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/BackwardSyncTaskTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright Hyperledger Besu Contributors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.sync.backwardsync; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; -import static org.mockito.Mockito.when; - -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.core.Block; -import org.hyperledger.besu.ethereum.core.BlockHeader; -import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; -import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import javax.annotation.Nonnull; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class BackwardSyncTaskTest { - - public static final int HEIGHT = 20_000; - - @Mock private BackwardSyncContext context; - private List blocks; - - GenericKeyValueStorageFacade headersStorage; - GenericKeyValueStorageFacade blocksStorage; - - @Before - public void initContextAndChain() { - blocks = ChainForTestCreator.prepareChain(2, HEIGHT); - headersStorage = - new GenericKeyValueStorageFacade<>( - Hash::toArrayUnsafe, - new BlocksHeadersConvertor(new MainnetBlockHeaderFunctions()), - new InMemoryKeyValueStorage()); - blocksStorage = - new GenericKeyValueStorageFacade<>( - Hash::toArrayUnsafe, - new BlocksConvertor(new MainnetBlockHeaderFunctions()), - new InMemoryKeyValueStorage()); - } - - @Test - public void shouldFailWhenPivotNotSetInContext() { - when(context.getCurrentChain()).thenReturn(Optional.empty()); - BackwardSyncTask step = createBackwardSyncTask(); - CompletableFuture completableFuture = step.executeAsync(null); - assertThatThrownBy(completableFuture::get) - .getCause() - .isInstanceOf(BackwardSyncException.class) - .hasMessageContaining("No pivot"); - } - - @Nonnull - private BackwardSyncTask createBackwardSyncTask() { - final BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(1)); - return createBackwardSyncTask(backwardChain); - } - - @Nonnull - private BackwardSyncTask createBackwardSyncTask(final BackwardChain backwardChain) { - return new BackwardSyncTask(context, backwardChain) { - @Override - CompletableFuture executeStep() { - return CompletableFuture.completedFuture(null); - } - }; - } - - @Test - public void shouldFinishImmediatelyFailWhenPivotIsDifferent() { - final BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(0)); - when(context.getCurrentChain()).thenReturn(Optional.of(backwardChain)); - BackwardSyncTask step = createBackwardSyncTask(); - CompletableFuture completableFuture = step.executeAsync(null); - assertThat(completableFuture.isDone()).isTrue(); - } - - @Test - public void shouldExecuteWhenPivotIsCorrect() { - final BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(1)); - BackwardSyncTask step = createBackwardSyncTask(); - when(context.getCurrentChain()).thenReturn(Optional.of(backwardChain)); - CompletableFuture completableFuture = step.executeAsync(null); - assertThat(completableFuture.isDone()).isTrue(); - } -} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncPhaseTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java similarity index 70% rename from ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncPhaseTest.java rename to ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java index 7c58d57226b..958db596c02 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncPhaseTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/ForwardSyncStepTest.java @@ -20,8 +20,6 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.hyperledger.besu.config.StubGenesisConfigOptions; @@ -46,6 +44,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import org.junit.Before; @@ -56,7 +55,7 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) -public class ForwardSyncPhaseTest { +public class ForwardSyncStepTest { public static final int REMOTE_HEIGHT = 50; public static final int LOCAL_HEIGHT = 25; @@ -74,6 +73,8 @@ public class ForwardSyncPhaseTest { GenericKeyValueStorageFacade headersStorage; GenericKeyValueStorageFacade blocksStorage; + GenericKeyValueStorageFacade chainStorage; + @Before public void setup() { headersStorage = @@ -86,6 +87,9 @@ public void setup() { Hash::toArrayUnsafe, new BlocksConvertor(new MainnetBlockHeaderFunctions()), new InMemoryKeyValueStorage()); + chainStorage = + new GenericKeyValueStorageFacade<>( + Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage()); Block genesisBlock = blockDataGenerator.genesisBlock(); remoteBlockchain = createInMemoryBlockchain(genesisBlock); @@ -107,6 +111,8 @@ public void setup() { when(context.getProtocolContext().getBlockchain()).thenReturn(localBlockchain); when(context.getProtocolSchedule()).thenReturn(protocolSchedule); + when(context.getBatchSize()).thenReturn(2); + when(context.executeNextStep(null)).thenReturn(CompletableFuture.completedFuture(null)); EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); @@ -129,12 +135,12 @@ public void setup() { @Test public void shouldExecuteForwardSyncWhenPossible() throws Exception { final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT, LOCAL_HEIGHT + 3); - ForwardSyncPhase step = new ForwardSyncPhase(context, backwardChain); + ForwardSyncStep step = new ForwardSyncStep(context, backwardChain); final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(remoteBlockchain); - final CompletableFuture completableFuture = step.executeStep(); + final CompletableFuture completableFuture = step.executeAsync(); peer.respondWhile( responder, @@ -153,7 +159,7 @@ public void shouldExecuteForwardSyncWhenPossible() throws Exception { @Test public void shouldDropHeadersAsLongAsWeKnowThem() { final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 5, LOCAL_HEIGHT + 3); - ForwardSyncPhase step = new ForwardSyncPhase(context, backwardChain); + ForwardSyncStep step = new ForwardSyncStep(context, backwardChain); assertThat(backwardChain.getFirstAncestorHeader().orElseThrow()) .isEqualTo(getBlockByNumber(LOCAL_HEIGHT - 5).getHeader()); @@ -162,41 +168,9 @@ public void shouldDropHeadersAsLongAsWeKnowThem() { .isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 1).getHeader()); } - @Test - public void shouldDropBlocksThatWeTrust() { - final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 5, LOCAL_HEIGHT); - backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 1)); - final BackwardChain finalChain = createBackwardChain(LOCAL_HEIGHT + 2, LOCAL_HEIGHT + 5); - finalChain.prependChain(backwardChain); - - ForwardSyncPhase step = new ForwardSyncPhase(context, finalChain); - - assertThat(finalChain.getFirstAncestorHeader().orElseThrow()) - .isEqualTo(getBlockByNumber(LOCAL_HEIGHT - 5).getHeader()); - step.processKnownAncestors(null); - assertThat(finalChain.getFirstAncestorHeader().orElseThrow()) - .isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 2).getHeader()); - } - - @Test - public void shouldMergeEvenLongerChains() { - final BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 5, LOCAL_HEIGHT + 7); - backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 1)); - final BackwardChain finalChain = createBackwardChain(LOCAL_HEIGHT + 2, LOCAL_HEIGHT + 5); - finalChain.prependChain(backwardChain); - - ForwardSyncPhase step = new ForwardSyncPhase(context, finalChain); - - assertThat(finalChain.getFirstAncestorHeader().orElseThrow()) - .isEqualTo(getBlockByNumber(LOCAL_HEIGHT - 5).getHeader()); - step.processKnownAncestors(null); - assertThat(finalChain.getFirstAncestorHeader().orElseThrow()) - .isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 2).getHeader()); - } - @Test public void shouldNotRequestWhenNull() { - ForwardSyncPhase phase = new ForwardSyncPhase(null, null); + ForwardSyncStep phase = new ForwardSyncStep(context, null); final CompletableFuture completableFuture = phase.possibleRequestBlock(null); assertThat(completableFuture.isDone()).isTrue(); @@ -207,8 +181,8 @@ public void shouldNotRequestWhenNull() { @Test public void shouldFindBlockWhenRequested() throws Exception { - ForwardSyncPhase step = - new ForwardSyncPhase(context, createBackwardChain(LOCAL_HEIGHT + 1, LOCAL_HEIGHT + 3)); + ForwardSyncStep step = + new ForwardSyncStep(context, createBackwardChain(LOCAL_HEIGHT + 1, LOCAL_HEIGHT + 3)); final RespondingEthPeer.Responder responder = RespondingEthPeer.blockchainResponder(remoteBlockchain); @@ -221,37 +195,19 @@ public void shouldFindBlockWhenRequested() throws Exception { } @Test - public void shouldCreateAnotherStepWhenThereIsWorkToBeDone() { - BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 1, LOCAL_HEIGHT + 10); - ForwardSyncPhase step = spy(new ForwardSyncPhase(context, backwardChain)); - - step.possiblyMoreForwardSteps(backwardChain.getFirstAncestorHeader().orElseThrow()); - - verify(step).executeAsync(any()); - } - - @Test - public void shouldCreateBackwardStepWhenParentOfWorkIsNotImportedYet() { - BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT + 3, LOCAL_HEIGHT + 10); - ForwardSyncPhase step = spy(new ForwardSyncPhase(context, backwardChain)); - - step.possiblyMoreForwardSteps(backwardChain.getFirstAncestorHeader().orElseThrow()); - - verify(step).executeBackwardAsync(any()); - } - - @Test - public void shouldAddSuccessorsWhenNoUnknownBlockSet() { + public void shouldAddSuccessorsWhenNoUnknownBlockSet() throws Exception { BackwardChain backwardChain = createBackwardChain(LOCAL_HEIGHT - 3, LOCAL_HEIGHT); - backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 1)); - backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 2)); - backwardChain.appendExpectedBlock(getBlockByNumber(LOCAL_HEIGHT + 3)); + backwardChain.appendTrustedBlock(getBlockByNumber(LOCAL_HEIGHT + 1)); + backwardChain.appendTrustedBlock(getBlockByNumber(LOCAL_HEIGHT + 2)); + backwardChain.appendTrustedBlock(getBlockByNumber(LOCAL_HEIGHT + 3)); + + ForwardSyncStep step = new ForwardSyncStep(context, backwardChain); + step.processKnownAncestors(null); + assertThat(backwardChain.getFirstAncestorHeader()).isEmpty(); - ForwardSyncPhase step = new ForwardSyncPhase(context, backwardChain); - final BlockHeader header = step.processKnownAncestors(null); - assertThat(header).isNull(); + final CompletableFuture future = step.executeAsync(); - final CompletableFuture future = step.possiblyMoreForwardSteps(null); + future.get(1, TimeUnit.SECONDS); assertThat(future.isDone()).isTrue(); assertThat(localBlockchain.getChainHeadBlock()).isEqualTo(getBlockByNumber(LOCAL_HEIGHT + 3)); } @@ -266,8 +222,10 @@ private BackwardChain createBackwardChain(final int from, final int until) { @Nonnull private BackwardChain backwardChainFromBlock(final int number) { - return new BackwardChain( - headersStorage, blocksStorage, remoteBlockchain.getBlockByNumber(number).orElseThrow()); + final BackwardChain backwardChain = + new BackwardChain(headersStorage, blocksStorage, chainStorage); + backwardChain.appendTrustedBlock(remoteBlockchain.getBlockByNumber(number).orElseThrow()); + return backwardChain; } @Nonnull diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/InMemoryBackwardChainTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/InMemoryBackwardChainTest.java index da35e384d93..8dd3d9d8711 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/InMemoryBackwardChainTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/InMemoryBackwardChainTest.java @@ -22,10 +22,15 @@ import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.core.BlockHeaderFunctions; +import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; +import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; import java.util.List; +import java.util.Optional; +import javax.annotation.Nonnull; import org.junit.Before; import org.junit.Test; @@ -41,6 +46,7 @@ public class InMemoryBackwardChainTest { GenericKeyValueStorageFacade headersStorage; GenericKeyValueStorageFacade blocksStorage; + GenericKeyValueStorageFacade chainStorage; @Before public void prepareData() { @@ -54,14 +60,16 @@ public void prepareData() { Hash::toArrayUnsafe, new BlocksConvertor(new MainnetBlockHeaderFunctions()), new InMemoryKeyValueStorage()); + chainStorage = + new GenericKeyValueStorageFacade<>( + Hash::toArrayUnsafe, new HashConvertor(), new InMemoryKeyValueStorage()); blocks = prepareChain(ELEMENTS, HEIGHT); } @Test public void shouldReturnFirstHeaderCorrectly() { - BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1)); + BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1)); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 4).getHeader()); @@ -69,10 +77,17 @@ public void shouldReturnFirstHeaderCorrectly() { assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 4).getHeader()); } + @Nonnull + private BackwardChain createChainFromBlock(final Block pivot) { + final BackwardChain backwardChain = + new BackwardChain(headersStorage, blocksStorage, chainStorage); + backwardChain.appendTrustedBlock(pivot); + return backwardChain; + } + @Test public void shouldSaveHeadersWhenHeightAndHashMatches() { - BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1)); + BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1)); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 4).getHeader()); @@ -82,8 +97,7 @@ public void shouldSaveHeadersWhenHeightAndHashMatches() { @Test public void shouldNotSaveHeadersWhenWrongHeight() { - BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1)); + BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1)); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); assertThatThrownBy( @@ -96,8 +110,7 @@ public void shouldNotSaveHeadersWhenWrongHeight() { @Test public void shouldNotSaveHeadersWhenWrongHash() { - BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1)); + BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1)); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); BlockHeader wrongHashHeader = prepareWrongParentHash(blocks.get(blocks.size() - 4).getHeader()); @@ -108,55 +121,10 @@ public void shouldNotSaveHeadersWhenWrongHash() { assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader()); } - @Test - public void shouldMergeConnectedChains() { - - BackwardChain firstChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1)); - firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); - firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); - - BackwardChain secondChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 4)); - secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 5).getHeader()); - secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 6).getHeader()); - - BlockHeader firstHeader = firstChain.getFirstAncestorHeader().orElseThrow(); - assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader()); - - firstChain.prependChain(secondChain); - - firstHeader = firstChain.getFirstAncestorHeader().orElseThrow(); - assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 6).getHeader()); - } - - @Test - public void shouldNotMergeNotConnectedChains() { - - BackwardChain firstChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1)); - firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); - firstChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); - - BackwardChain secondChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 5)); - secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 6).getHeader()); - secondChain.prependAncestorsHeader(blocks.get(blocks.size() - 7).getHeader()); - - BlockHeader firstHeader = firstChain.getFirstAncestorHeader().orElseThrow(); - assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader()); - - firstChain.prependChain(secondChain); - - firstHeader = firstChain.getFirstAncestorHeader().orElseThrow(); - assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 3).getHeader()); - } - @Test public void shouldDropFromTheEnd() { - BackwardChain backwardChain = - new BackwardChain(headersStorage, blocksStorage, blocks.get(blocks.size() - 1)); + BackwardChain backwardChain = createChainFromBlock(blocks.get(blocks.size() - 1)); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 2).getHeader()); backwardChain.prependAncestorsHeader(blocks.get(blocks.size() - 3).getHeader()); @@ -173,4 +141,59 @@ public void shouldDropFromTheEnd() { firstHeader = backwardChain.getFirstAncestorHeader().orElseThrow(); assertThat(firstHeader).isEqualTo(blocks.get(blocks.size() - 1).getHeader()); } + + @Test + public void shouldCreateChainFromScheduleAndFunctions() { + final StorageProvider provider = new InMemoryKeyValueStorageProvider(); + BlockHeaderFunctions functions = new MainnetBlockHeaderFunctions(); + + final BackwardChain chain = BackwardChain.from(provider, functions); + assertThat(chain).isNotNull(); + + chain.clear(); + } + + @Test + public void shouldAddHeaderToQueue() { + BackwardChain backwardChain = createChainFromBlock(blocks.get(3)); + Optional firstHash = backwardChain.getFirstHash(); + assertThat(firstHash).isNotPresent(); + backwardChain.addNewHash(blocks.get(7).getHash()); + backwardChain.addNewHash(blocks.get(9).getHash()); + backwardChain.addNewHash(blocks.get(9).getHash()); + backwardChain.addNewHash(blocks.get(11).getHash()); + + firstHash = backwardChain.getFirstHash(); + assertThat(firstHash).isPresent(); + assertThat(firstHash.orElseThrow()).isEqualTo(blocks.get(7).getHash()); + firstHash = backwardChain.getFirstHash(); + assertThat(firstHash).isPresent(); + assertThat(firstHash.orElseThrow()).isEqualTo(blocks.get(9).getHash()); + firstHash = backwardChain.getFirstHash(); + assertThat(firstHash).isPresent(); + assertThat(firstHash.orElseThrow()).isEqualTo(blocks.get(11).getHash()); + } + + @Test + public void shouldChangeFirstAncestorIfPivotIsToFar() { + BackwardChain backwardChain = createChainFromBlock(blocks.get(3)); + backwardChain.appendTrustedBlock(blocks.get(4)); + + Optional firstAncestorHeader = backwardChain.getFirstAncestorHeader(); + assertThat(firstAncestorHeader).isPresent(); + assertThat(firstAncestorHeader.orElseThrow()).isEqualTo(blocks.get(3).getHeader()); + Optional pivot = backwardChain.getPivot(); + assertThat(pivot).isPresent(); + assertThat(pivot.orElseThrow()).isEqualTo(blocks.get(4)); + + backwardChain.appendTrustedBlock(blocks.get(7)); + + firstAncestorHeader = backwardChain.getFirstAncestorHeader(); + + assertThat(firstAncestorHeader).isPresent(); + assertThat(firstAncestorHeader.orElseThrow()).isEqualTo(blocks.get(7).getHeader()); + pivot = backwardChain.getPivot(); + assertThat(pivot).isPresent(); + assertThat(pivot.orElseThrow()).isEqualTo(blocks.get(7)); + } } diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index d4588ad028d..224a7e82677 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -64,7 +64,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = 'hRHEr0UjfMKObN5VFIHk9GgcYlcEsEXEYfj0qVl5XQk=' + knownHash = 'Q0degxzD4dwmIvROpsxR3krWexb/lQUOfs7YjH2Sr5M=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java index 304e96b7b76..9c430ba86c0 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java @@ -236,4 +236,16 @@ interface LogListener { */ void onLogEmitted(LogWithMetadata logWithMetadata); } + + interface TTDReachedListener { + + /** + * Emitted when Total Terminal Difficulty is reached on a chain and dependent merge + * functionality should trigger. + * + * @param reached is true when we reached TTD, can be potentially false in case we reorg under + * TTD + */ + void onTTDReached(boolean reached); + } }