From 5dfb9160fae95a67e2db5ad9f51db5d7fcc20251 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 1/8] Bump protocol version --- src/main/Config.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/Config.cpp b/src/main/Config.cpp index bbf6f832ed..6cc7abcaf3 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -25,7 +25,7 @@ namespace stellar { -const uint32 Config::CURRENT_LEDGER_PROTOCOL_VERSION = 11; +const uint32 Config::CURRENT_LEDGER_PROTOCOL_VERSION = 12; // Options that must only be used for testing static const std::unordered_set TESTING_ONLY_OPTIONS = { From 52d53ebb31969ce124a0d28dd8319531d1e15cf7 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 2/8] Stop enforcing futures resolvedness in HAS and add utility functions --- src/bucket/FutureBucket.cpp | 6 ++++++ src/bucket/FutureBucket.h | 3 +++ src/history/HistoryArchive.cpp | 14 ++++++++------ src/history/HistoryArchive.h | 3 +++ src/historywork/ResolveSnapshotWork.cpp | 4 ++++ 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/src/bucket/FutureBucket.cpp b/src/bucket/FutureBucket.cpp index cc8689a8a4..ecfd8b4a54 100644 --- a/src/bucket/FutureBucket.cpp +++ b/src/bucket/FutureBucket.cpp @@ -204,6 +204,12 @@ FutureBucket::hasHashes() const return (mState == FB_HASH_INPUTS || mState == FB_HASH_OUTPUT); } +bool +FutureBucket::isClear() const +{ + return mState == FB_CLEAR; +} + bool FutureBucket::mergeComplete() const { diff --git a/src/bucket/FutureBucket.h b/src/bucket/FutureBucket.h index 226e1730af..84b3d03e81 100644 --- a/src/bucket/FutureBucket.h +++ b/src/bucket/FutureBucket.h @@ -101,6 +101,9 @@ class FutureBucket // running). bool isMerging() const; + // Returns whether this object is in a FB_CLEAR state. + bool isClear() const; + // Returns whether this object is in a FB_HASH_FOO state. bool hasHashes() const; diff --git a/src/history/HistoryArchive.cpp b/src/history/HistoryArchive.cpp index 98b63b5403..1d885e9dca 100644 --- a/src/history/HistoryArchive.cpp +++ b/src/history/HistoryArchive.cpp @@ -67,6 +67,14 @@ HistoryArchiveState::futuresAllResolved() const return true; } +bool +HistoryArchiveState::futuresAllClear() const +{ + return std::all_of( + currentBuckets.begin(), currentBuckets.end(), + [](HistoryStateBucket const& bl) { return bl.next.isClear(); }); +} + void HistoryArchiveState::resolveAllFutures() { @@ -94,10 +102,6 @@ HistoryArchiveState::resolveAnyReadyFutures() void HistoryArchiveState::save(std::string const& outFile) const { - // We only ever write fully-resolved HASs to files, when making - // checkpoints. This may change in the future if we start publishing - // input-only HASs. - assert(futuresAllResolved()); std::ofstream out(outFile); cereal::JSONOutputArchive ar(out); serialize(ar); @@ -129,7 +133,6 @@ HistoryArchiveState::load(std::string const& inFile) << "Unexpected history archive state version: " << version; throw std::runtime_error("unexpected history archive state version"); } - assert(futuresAllResolved()); } void @@ -138,7 +141,6 @@ HistoryArchiveState::fromString(std::string const& str) std::istringstream in(str); cereal::JSONInputArchive ar(in); serialize(ar); - assert(futuresAllResolved()); } std::string diff --git a/src/history/HistoryArchive.h b/src/history/HistoryArchive.h index b2d8173259..ba5d86330a 100644 --- a/src/history/HistoryArchive.h +++ b/src/history/HistoryArchive.h @@ -108,6 +108,9 @@ struct HistoryArchiveState CEREAL_NVP(currentBuckets)); } + // Return true if all futures are in FB_CLEAR state + bool futuresAllClear() const; + // Return true if all futures have already been resolved, otherwise false. bool futuresAllResolved() const; diff --git a/src/historywork/ResolveSnapshotWork.cpp b/src/historywork/ResolveSnapshotWork.cpp index f14058ef36..9e8583360c 100644 --- a/src/historywork/ResolveSnapshotWork.cpp +++ b/src/historywork/ResolveSnapshotWork.cpp @@ -16,6 +16,10 @@ ResolveSnapshotWork::ResolveSnapshotWork( , mSnapshot(snapshot) , mTimer(std::make_unique(app.getClock())) { + if (!mSnapshot) + { + throw std::runtime_error("ResolveSnapshotWork: invalid snapshot"); + } } BasicWork::State From 6f0f424d530de18473853456a901cd3d833813ae Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 3/8] Do not use shadows in new-style merges --- src/bucket/Bucket.cpp | 33 ++++++++++++++- src/bucket/Bucket.h | 3 ++ src/bucket/BucketList.cpp | 71 ++++++++++++++++++++++++++++++-- src/bucket/BucketList.h | 13 +++++- src/bucket/BucketManager.h | 3 ++ src/bucket/BucketManagerImpl.cpp | 5 ++- src/bucket/FutureBucket.cpp | 9 ++++ 7 files changed, 130 insertions(+), 7 deletions(-) diff --git a/src/bucket/Bucket.cpp b/src/bucket/Bucket.cpp index a4ffedef9b..08f423a83f 100644 --- a/src/bucket/Bucket.cpp +++ b/src/bucket/Bucket.cpp @@ -356,10 +356,18 @@ calculateMergeProtocolVersion( protocolVersion = std::max(oi.getMetadata().ledgerVersion, ni.getMetadata().ledgerVersion); + // Starting with FIRST_PROTOCOL_SHADOWS_REMOVED, + // protocol version is determined as a max of curr, snap, and any shadow of + // version < FIRST_PROTOCOL_SHADOWS_REMOVED. This means that a bucket may + // still perform an old style merge despite the presence of the new protocol + // shadows. for (auto const& si : shadowIterators) { - protocolVersion = - std::max(si.getMetadata().ledgerVersion, protocolVersion); + auto version = si.getMetadata().ledgerVersion; + if (version < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + protocolVersion = std::max(version, protocolVersion); + } } CLOG(TRACE, "Bucket") << "Bucket merge protocolVersion=" << protocolVersion @@ -387,6 +395,19 @@ calculateMergeProtocolVersion( { ++mc.mPostInitEntryProtocolMerges; } + + if (protocolVersion < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + ++mc.mPreShadowRemovalProtocolMerges; + } + else + { + if (!shadowIterators.empty()) + { + throw std::runtime_error("Shadows are not supported"); + } + ++mc.mPostShadowRemovalProtocolMerges; + } } // There are 4 "easy" cases for merging: exhausted iterators on either @@ -615,4 +636,12 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion, shadows}; return out.getBucket(bucketManager, &mk); } + +uint32_t +Bucket::getBucketVersion(std::shared_ptr const& bucket) +{ + assert(bucket); + BucketInputIterator it(bucket); + return it.getMetadata().ledgerVersion; +} } diff --git a/src/bucket/Bucket.h b/src/bucket/Bucket.h index 9de815a68f..fae9273f06 100644 --- a/src/bucket/Bucket.h +++ b/src/bucket/Bucket.h @@ -60,6 +60,7 @@ class Bucket : public std::enable_shared_from_this, // we were only supporting LIVEENTRY and DEADENTRY. static constexpr uint32_t FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY = 11; + static constexpr uint32_t FIRST_PROTOCOL_SHADOWS_REMOVED = 12; static void checkProtocolLegality(BucketEntry const& entry, uint32_t protocolVersion); @@ -103,5 +104,7 @@ class Bucket : public std::enable_shared_from_this, std::shared_ptr const& newBucket, std::vector> const& shadows, bool keepDeadEntries, bool countMergeEvents, bool doFsync); + + static uint32_t getBucketVersion(std::shared_ptr const& bucket); }; } diff --git a/src/bucket/BucketList.cpp b/src/bucket/BucketList.cpp index baa6b7e497..d2de048e07 100644 --- a/src/bucket/BucketList.cpp +++ b/src/bucket/BucketList.cpp @@ -12,6 +12,7 @@ #include "main/Application.h" #include "util/Logging.h" #include "util/XDRStream.h" +#include "util/format.h" #include "util/types.h" #include @@ -154,8 +155,12 @@ BucketLevel::prepare(Application& app, uint32_t currLedger, } } - mNextCurr = FutureBucket(app, curr, snap, shadows, currLedgerProtocol, - countMergeEvents, mLevel); + auto shadowsBasedOnProtocol = + Bucket::getBucketVersion(snap) >= Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED + ? std::vector>() + : shadows; + mNextCurr = FutureBucket(app, curr, snap, shadowsBasedOnProtocol, + currLedgerProtocol, countMergeEvents, mLevel); assert(mNextCurr.isMerging()); } @@ -421,6 +426,14 @@ BucketList::resolveAnyReadyFutures() } } +bool +BucketList::futuresAllResolved() const +{ + return std::all_of( + mLevels.begin(), mLevels.end(), + [](BucketLevel const& bl) { return !bl.getNext().isMerging(); }); +} + void BucketList::addBatch(Application& app, uint32_t currLedger, uint32_t currLedgerProtocol, @@ -551,7 +564,8 @@ BucketList::addBatch(Application& app, uint32_t currLedger, } void -BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion) +BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion, + uint32_t ledger) { for (uint32_t i = 0; i < static_cast(mLevels.size()); i++) { @@ -566,6 +580,57 @@ BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion) << "Restarted merge on BucketList level " << i; } } + // The next block assumes we are re-starting a + // FIRST_PROTOCOL_SHADOWS_REMOVED or later merge, which has no shadows + // _and_ no stored inputs/outputs, and ensures that we are only + // re-starting buckets of correct version. + else if (next.isClear() && i > 0) + { + // Recover merge by iterating through bucketlist levels and + // using snaps and currs. The only time we don't use level's + // curr is when a level is about to snap; in that case, when the + // merge is needed, level's curr will snap, and merge will be + // promoted into curr. Therefore, when starting merge, we use an + // empty curr. + // Additionally, it is safe to recover a merge at any point + // before the merge is needed (meaning it should be promoted + // into level's curr after ledger close). This is due to the + // fact that future bucket inputs _do not change_ until level + // spill, and after such spills, new merges are started with new + // inputs. + auto snap = mLevels[i - 1].getSnap(); + Hash const emptyHash; + + // Exit early if a level has not been initialized yet; + // There are two possibilities for empty buckets: it is either truly + // untouched (meaning not enough ledgers were produced to populate + // given level) or it's a protocol 10-or-earlier bucket (since it + // does not contain a metadata entry). If we are dealing with + // 10-or-earlier bucket, it must have had an output published, and + // would be handled in the previous `if` block. Therefore, we must + // be dealing with an untouched level. + if (snap->getHash() == emptyHash) + { + return; + } + + auto version = Bucket::getBucketVersion(snap); + if (version < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + auto msg = + fmt::format("Invalid state: bucketlist level {} has clear " + "future bucket but version {} snap", + i, version); + throw std::runtime_error(msg); + } + + // Round down the current ledger to when the merge was started, and + // re-start the merge via prepare, mimicking the logic in `addBatch` + auto mergeStartLedger = mask(ledger, BucketList::levelHalf(i - 1)); + level.prepare( + app, mergeStartLedger, version, snap, /* shadows= */ {}, + !app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING); + } } } diff --git a/src/bucket/BucketList.h b/src/bucket/BucketList.h index 90ef40a8c9..70ee2ad276 100644 --- a/src/bucket/BucketList.h +++ b/src/bucket/BucketList.h @@ -404,7 +404,16 @@ class BucketList // merging buckets between levels. This needs to be called after forcing a // BucketList to adopt a new state, either at application restart or when // catching up from buckets loaded over the network. - void restartMerges(Application& app, uint32_t maxProtocolVersion); + + // There are two ways to re-start a merge: + // 1. Use non-LIVE inputs/outputs from HAS file. This function will make + // these FutureBuckets LIVE (either FB_INPUT_LIVE or FB_OUTPUT_LIVE) + // using hashes of inputs and outputs from the HAS + // 2. Introduced with FIRST_PROTOCOL_SHADOWS_REMOVED: skip using + // input/output hashes, and restart live merges from currs and snaps of the + // bucketlist at that ledger. + void restartMerges(Application& app, uint32_t maxProtocolVersion, + uint32_t ledger); // Run through the levels and check for FutureBuckets that are done merging; // if so, call resolve() on them, changing state from FB_LIVE_INPUTS to @@ -413,6 +422,8 @@ class BucketList // HistoryArchiveStates, that can cause repeated merges when re-activated. void resolveAnyReadyFutures(); + bool futuresAllResolved() const; + // Add a batch of initial (created), live (updated) and dead entries to the // bucketlist, representing the entries effected by closing // `currLedger`. The bucketlist will incorporate these into the smallest diff --git a/src/bucket/BucketManager.h b/src/bucket/BucketManager.h index 7929a9527b..c03dd22875 100644 --- a/src/bucket/BucketManager.h +++ b/src/bucket/BucketManager.h @@ -34,6 +34,9 @@ struct MergeCounters uint64_t mRunningMergeReattachments{0}; uint64_t mFinishedMergeReattachments{0}; + uint64_t mPreShadowRemovalProtocolMerges{0}; + uint64_t mPostShadowRemovalProtocolMerges{0}; + uint64_t mNewMetaEntries{0}; uint64_t mNewInitEntries{0}; uint64_t mNewLiveEntries{0}; diff --git a/src/bucket/BucketManagerImpl.cpp b/src/bucket/BucketManagerImpl.cpp index 9426cc67bb..3ae2983283 100644 --- a/src/bucket/BucketManagerImpl.cpp +++ b/src/bucket/BucketManagerImpl.cpp @@ -212,6 +212,9 @@ MergeCounters::operator+=(MergeCounters const& delta) mRunningMergeReattachments += delta.mRunningMergeReattachments; mFinishedMergeReattachments += delta.mFinishedMergeReattachments; + mPreShadowRemovalProtocolMerges += delta.mPreShadowRemovalProtocolMerges; + mPostShadowRemovalProtocolMerges += delta.mPostShadowRemovalProtocolMerges; + mNewMetaEntries += delta.mNewMetaEntries; mNewInitEntries += delta.mNewInitEntries; mNewLiveEntries += delta.mNewLiveEntries; @@ -732,7 +735,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has, mBucketList.getLevel(i).setNext(has.currentBuckets.at(i).next); } - mBucketList.restartMerges(mApp, maxProtocolVersion); + mBucketList.restartMerges(mApp, maxProtocolVersion, has.currentLedger); cleanupStaleFiles(); } diff --git a/src/bucket/FutureBucket.cpp b/src/bucket/FutureBucket.cpp index ecfd8b4a54..1e77429425 100644 --- a/src/bucket/FutureBucket.cpp +++ b/src/bucket/FutureBucket.cpp @@ -44,6 +44,15 @@ FutureBucket::FutureBucket(Application& app, assert(snap); mInputCurrBucketHash = binToHex(curr->getHash()); mInputSnapBucketHash = binToHex(snap->getHash()); + if (Bucket::getBucketVersion(snap) >= + Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + if (!mInputShadowBuckets.empty()) + { + throw std::runtime_error( + "Invalid FutureBucket: ledger version doesn't support shadows"); + } + } for (auto const& b : mInputShadowBuckets) { mInputShadowBucketHashes.push_back(binToHex(b->getHash())); From 72181f1f9059ac174bd5f991396b0feb560dab91 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 4/8] Clear new-style future buckets for publish and validate buckets during catchup --- src/history/HistoryArchive.cpp | 79 +++++++++++++++++++ src/history/HistoryArchive.h | 3 + src/history/HistoryManager.h | 3 - src/history/HistoryManagerImpl.cpp | 13 +-- src/history/HistoryManagerImpl.h | 2 - src/history/StateSnapshot.cpp | 28 +------ src/history/StateSnapshot.h | 1 - .../PutHistoryArchiveStateWork.cpp | 4 + src/historywork/ResolveSnapshotWork.cpp | 2 +- 9 files changed, 92 insertions(+), 43 deletions(-) diff --git a/src/history/HistoryArchive.cpp b/src/history/HistoryArchive.cpp index 1d885e9dca..23faa50044 100644 --- a/src/history/HistoryArchive.cpp +++ b/src/history/HistoryArchive.cpp @@ -9,6 +9,7 @@ #include "history/HistoryArchive.h" #include "bucket/Bucket.h" #include "bucket/BucketList.h" +#include "bucket/BucketManager.h" #include "crypto/Hex.h" #include "crypto/SHA.h" #include "history/HistoryManager.h" @@ -261,6 +262,84 @@ HistoryArchiveState::allBuckets() const return std::vector(buckets.begin(), buckets.end()); } +bool +HistoryArchiveState::containsValidBuckets(Application& app) const +{ + // This function assumes presence of required buckets to verify state + // Level 0 future buckets are always clear + assert(currentBuckets[0].next.isClear()); + + for (uint32_t i = 1; i < BucketList::kNumLevels; i++) + { + auto& level = currentBuckets[i]; + auto& prev = currentBuckets[i - 1]; + Hash const emptyHash; + + auto snap = + app.getBucketManager().getBucketByHash(hexToBin256(prev.snap)); + assert(snap); + if (snap->getHash() == emptyHash) + { + continue; + } + else if (Bucket::getBucketVersion(snap) >= + Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + if (!level.next.isClear()) + { + CLOG(ERROR, "History") + << "Invalid HAS: future must be cleared "; + return false; + } + } + else if (!level.next.hasOutputHash()) + { + CLOG(ERROR, "History") + << "Invalid HAS: future must have resolved output"; + return false; + } + } + return true; +} + +void +HistoryArchiveState::prepareForPublish(Application& app) +{ + // Level 0 future buckets are always clear + assert(currentBuckets[0].next.isClear()); + + for (uint32_t i = 1; i < BucketList::kNumLevels; i++) + { + auto& level = currentBuckets[i]; + auto& prev = currentBuckets[i - 1]; + + auto snap = + app.getBucketManager().getBucketByHash(hexToBin256(prev.snap)); + if (Bucket::getBucketVersion(snap) >= + Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + level.next.clear(); + } + else if (level.next.hasHashes() && !level.next.isLive()) + { + // Note: this `maxProtocolVersion` is over-approximate. The actual + // max for the ledger being published might be lower, but if the + // "true" (lower) max-value were actually in conflict with the state + // we're about to publish it should have caused an error earlier + // anyways, back when the bucket list and HAS for this state was + // initially formed. Since we're just reconstituting a HAS here, we + // assume it was legit when formed. Given that getting the true + // value here therefore doesn't seem to add much checking, and given + // that it'd be somewhat convoluted _to_ materialize the true value + // here, we're going to live with the approximate value for now. + uint32_t maxProtocolVersion = + Config::CURRENT_LEDGER_PROTOCOL_VERSION; + level.next.makeLive(app, maxProtocolVersion, i); + } + } + assert(containsValidBuckets(app)); +} + HistoryArchiveState::HistoryArchiveState() : server(STELLAR_CORE_VERSION) { uint256 u; diff --git a/src/history/HistoryArchive.h b/src/history/HistoryArchive.h index ba5d86330a..d8e38a062d 100644 --- a/src/history/HistoryArchive.h +++ b/src/history/HistoryArchive.h @@ -129,6 +129,9 @@ struct HistoryArchiveState std::string toString() const; void fromString(std::string const& str); + + void prepareForPublish(Application& app); + bool containsValidBuckets(Application& app) const; }; class HistoryArchive : public std::enable_shared_from_this diff --git a/src/history/HistoryManager.h b/src/history/HistoryManager.h index bd37031063..bf02c74359 100644 --- a/src/history/HistoryManager.h +++ b/src/history/HistoryManager.h @@ -285,9 +285,6 @@ class HistoryManager std::vector const& originalBuckets, bool success) = 0; - // Return the HistoryArchiveState of the LedgerManager's LCL - virtual HistoryArchiveState getLastClosedHistoryArchiveState() const = 0; - // Infer a quorum set by reading SCP messages in history archives. virtual InferredQuorum inferQuorum(uint32_t ledgerNum) = 0; diff --git a/src/history/HistoryManagerImpl.cpp b/src/history/HistoryManagerImpl.cpp index 512f69d056..f8648c7aed 100644 --- a/src/history/HistoryManagerImpl.cpp +++ b/src/history/HistoryManagerImpl.cpp @@ -67,7 +67,6 @@ HistoryManagerImpl::HistoryManagerImpl(Application& app) : mApp(app) , mWorkDir(nullptr) , mPublishWork(nullptr) - , mPublishSuccess( app.getMetrics().NewMeter({"history", "publish", "success"}, "event")) , mPublishFailure( @@ -175,14 +174,6 @@ HistoryManagerImpl::localFilename(std::string const& basename) return this->getTmpDir() + "/" + basename; } -HistoryArchiveState -HistoryManagerImpl::getLastClosedHistoryArchiveState() const -{ - auto seq = mApp.getLedgerManager().getLastClosedLedgerNum(); - auto& bl = mApp.getBucketManager().getBucketList(); - return HistoryArchiveState(seq, bl); -} - InferredQuorum HistoryManagerImpl::inferQuorum(uint32_t ledgerNum) { @@ -251,9 +242,9 @@ HistoryManagerImpl::maybeQueueHistoryCheckpoint() void HistoryManagerImpl::queueCurrentHistory() { - auto has = getLastClosedHistoryArchiveState(); + auto ledger = mApp.getLedgerManager().getLastClosedLedgerNum(); + HistoryArchiveState has(ledger, mApp.getBucketManager().getBucketList()); - auto ledger = has.currentLedger; CLOG(DEBUG, "History") << "Queueing publish state for ledger " << ledger; mEnqueueTimes.emplace(ledger, std::chrono::steady_clock::now()); diff --git a/src/history/HistoryManagerImpl.h b/src/history/HistoryManagerImpl.h index 8a4b4d7f89..bdc64a140b 100644 --- a/src/history/HistoryManagerImpl.h +++ b/src/history/HistoryManagerImpl.h @@ -79,8 +79,6 @@ class HistoryManagerImpl : public HistoryManager std::vector const& originalBuckets, bool success) override; - HistoryArchiveState getLastClosedHistoryArchiveState() const override; - InferredQuorum inferQuorum(uint32_t ledgerNum) override; std::string const& getTmpDir() override; diff --git a/src/history/StateSnapshot.cpp b/src/history/StateSnapshot.cpp index d60860c035..a07bdcc466 100644 --- a/src/history/StateSnapshot.cpp +++ b/src/history/StateSnapshot.cpp @@ -39,33 +39,11 @@ StateSnapshot::StateSnapshot(Application& app, HistoryArchiveState const& state) mSnapDir, HISTORY_FILE_TYPE_SCP, mLocalState.currentLedger)) { - makeLive(); -} - -void -StateSnapshot::makeLive() -{ - for (uint32_t i = 0; - i < static_cast(mLocalState.currentBuckets.size()); i++) + if (mLocalState.currentBuckets.size() != BucketList::kNumLevels) { - auto& hb = mLocalState.currentBuckets[i]; - if (hb.next.hasHashes() && !hb.next.isLive()) - { - // Note: this `maxProtocolVersion` is over-approximate. The actual - // max for the ledger being published might be lower, but if the - // "true" (lower) max-value were actually in conflict with the state - // we're about to publish it should have caused an error earlier - // anyways, back when the bucket list and HAS for this state was - // initially formed. Since we're just reconstituting a HAS here, we - // assume it was legit when formed. Given that getting the true - // value here therefore doesn't seem to add much checking, and given - // that it'd be somewhat convoluted _to_ materialize the true value - // here, we're going to live with the approximate value for now. - uint32_t maxProtocolVersion = - Config::CURRENT_LEDGER_PROTOCOL_VERSION; - hb.next.makeLive(mApp, maxProtocolVersion, i); - } + throw std::runtime_error("Invalid HAS: malformed bucketlist"); } + mLocalState.prepareForPublish(mApp); } bool diff --git a/src/history/StateSnapshot.h b/src/history/StateSnapshot.h index 4cbb044a6c..08987db80e 100644 --- a/src/history/StateSnapshot.h +++ b/src/history/StateSnapshot.h @@ -28,7 +28,6 @@ struct StateSnapshot : public std::enable_shared_from_this std::shared_ptr mSCPHistorySnapFile; StateSnapshot(Application& app, HistoryArchiveState const& state); - void makeLive(); bool writeHistoryBlocks() const; std::vector> differingHASFiles(HistoryArchiveState const& other); diff --git a/src/historywork/PutHistoryArchiveStateWork.cpp b/src/historywork/PutHistoryArchiveStateWork.cpp index c555b663f9..d5810146d2 100644 --- a/src/historywork/PutHistoryArchiveStateWork.cpp +++ b/src/historywork/PutHistoryArchiveStateWork.cpp @@ -22,6 +22,10 @@ PutHistoryArchiveStateWork::PutHistoryArchiveStateWork( , mArchive(archive) , mLocalFilename(HistoryArchiveState::localName(app, archive->getName())) { + if (!mState.containsValidBuckets(mApp)) + { + throw std::runtime_error("Malformed HAS, unable to publish"); + } } void diff --git a/src/historywork/ResolveSnapshotWork.cpp b/src/historywork/ResolveSnapshotWork.cpp index 9e8583360c..d7d67e2460 100644 --- a/src/historywork/ResolveSnapshotWork.cpp +++ b/src/historywork/ResolveSnapshotWork.cpp @@ -31,7 +31,7 @@ ResolveSnapshotWork::onRun() } mSnapshot->mLocalState.resolveAnyReadyFutures(); - mSnapshot->makeLive(); + mSnapshot->mLocalState.prepareForPublish(mApp); if ((mApp.getLedgerManager().getLastClosedLedgerNum() > mSnapshot->mLocalState.currentLedger) && mSnapshot->mLocalState.futuresAllResolved()) From 015369205494fc76c25923699d3d04f41f561c52 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 5/8] Wait for merges to resolve after bucket application --- src/catchup/ApplyBucketsWork.cpp | 64 +++++++++++++++++++++++++++++++- src/catchup/ApplyBucketsWork.h | 23 +++++++++++- src/catchup/CatchupWork.cpp | 16 ++++++-- src/main/ApplicationUtils.cpp | 3 +- 4 files changed, 99 insertions(+), 7 deletions(-) diff --git a/src/catchup/ApplyBucketsWork.cpp b/src/catchup/ApplyBucketsWork.cpp index b512d1228f..4b59d3e190 100644 --- a/src/catchup/ApplyBucketsWork.cpp +++ b/src/catchup/ApplyBucketsWork.cpp @@ -25,7 +25,8 @@ namespace stellar ApplyBucketsWork::ApplyBucketsWork( Application& app, std::map> const& buckets, - HistoryArchiveState const& applyState, uint32_t maxProtocolVersion) + HistoryArchiveState const& applyState, uint32_t maxProtocolVersion, + bool resolveMerges) : BasicWork(app, "apply-buckets", BasicWork::RETRY_NEVER) , mBuckets(buckets) , mApplyState(applyState) @@ -40,6 +41,7 @@ ApplyBucketsWork::ApplyBucketsWork( , mBucketApplyFailure(app.getMetrics().NewMeter( {"history", "bucket-apply", "failure"}, "event")) , mCounters(app.getClock().now()) + , mResolveMerges(resolveMerges) { } @@ -92,6 +94,8 @@ ApplyBucketsWork::onReset() mLevel = BucketList::kNumLevels - 1; mApplying = false; + + mDelayTimer.reset(); mSnapBucket.reset(); mCurrBucket.reset(); mSnapApplicator.reset(); @@ -145,6 +149,42 @@ ApplyBucketsWork::startLevel() BasicWork::State ApplyBucketsWork::onRun() { + if (mLevel == BucketList::kNumLevels - 1 && + !mApplyState.containsValidBuckets(mApp)) + { + CLOG(ERROR, "History") << "Malformed HAS: unable to apply buckets"; + return State::WORK_FAILURE; + } + + if (mResolveMerges && mDelayTimer) + { + CLOG(INFO, "History") << "ApplyBucketsWork: application completed; " + "waiting for merge resolution"; + auto& bl = mApp.getBucketManager().getBucketList(); + bl.resolveAnyReadyFutures(); + if (bl.futuresAllResolved()) + { + return State::WORK_SUCCESS; + } + else + { + std::weak_ptr weak( + std::static_pointer_cast(shared_from_this())); + auto handler = [weak](asio::error_code const& ec) { + auto self = weak.lock(); + if (self) + { + self->wakeUp(); + } + }; + + // Check back later + mDelayTimer->expires_from_now(std::chrono::milliseconds(500)); + mDelayTimer->async_wait(handler); + return State::WORK_WAITING; + } + } + // Check if we're at the beginning of the new level if (isLevelComplete()) { @@ -192,8 +232,15 @@ ApplyBucketsWork::onRun() return State::WORK_RUNNING; } - CLOG(DEBUG, "History") << "ApplyBuckets : done, restarting merges"; + CLOG(INFO, "History") << "ApplyBuckets : done, restarting merges"; mApp.getBucketManager().assumeState(mApplyState, mMaxProtocolVersion); + + if (mResolveMerges) + { + mDelayTimer = std::make_unique(mApp.getClock()); + return State::WORK_RUNNING; + } + return State::WORK_SUCCESS; } @@ -257,4 +304,17 @@ ApplyBucketsWork::onFailureRetry() { mBucketApplyFailure.Mark(); } + +void +ApplyBucketsWork::onSuccess() +{ + if (mResolveMerges) + { + if (!mApp.getBucketManager().getBucketList().futuresAllResolved()) + { + throw std::runtime_error( + "Not all futures were resolved after bucket application!"); + } + } +} } diff --git a/src/catchup/ApplyBucketsWork.h b/src/catchup/ApplyBucketsWork.h index cd5cbc0d0a..7b1c8da0c3 100644 --- a/src/catchup/ApplyBucketsWork.h +++ b/src/catchup/ApplyBucketsWork.h @@ -46,6 +46,25 @@ class ApplyBucketsWork : public BasicWork medida::Meter& mBucketApplyFailure; BucketApplicator::Counters mCounters; + // With FIRST_PROTOCOL_SHADOWS_REMOVED or higher, when buckets are applied, + // we do not have resolved outputs before applying transactions and joining + // the network. If online catchup does not wait for merges to be resolved, + // marks a node as "in sync", and begins closing ledgers, a large (but not + // yet finished) merge might be needed at an applied ledger. At that point, + // `closeLedger` will block waiting for the merge to resolve. If the delay + // is long enough, node might go out of sync. Specifically, this applies to + // the following scenarios: + // - Large merge is needed during ApplyLedgerChainWork + // - Large merge is needed during syncing ledgers replay + // - Large merge is needed after successful catchup, during normal in-sync + // ledger close + // To prevent this, wait for restarted merges to resolve before proceeding. + // This approach conservatively waits for all merges regardless of HAS + // ledger number, and number of ledgers to replay. + + bool const mResolveMerges; + std::unique_ptr mDelayTimer; + void advance(std::string const& name, BucketApplicator& applicator); std::shared_ptr getBucket(std::string const& bucketHash); BucketLevel& getBucketLevel(uint32_t level); @@ -56,7 +75,8 @@ class ApplyBucketsWork : public BasicWork ApplyBucketsWork( Application& app, std::map> const& buckets, - HistoryArchiveState const& applyState, uint32_t maxProtocolVersion); + HistoryArchiveState const& applyState, uint32_t maxProtocolVersion, + bool resolveMerges); ~ApplyBucketsWork() = default; protected: @@ -69,5 +89,6 @@ class ApplyBucketsWork : public BasicWork }; void onFailureRaise() override; void onFailureRetry() override; + void onSuccess() override; }; } diff --git a/src/catchup/CatchupWork.cpp b/src/catchup/CatchupWork.cpp index 2a845a7fb5..dcb8d14202 100644 --- a/src/catchup/CatchupWork.cpp +++ b/src/catchup/CatchupWork.cpp @@ -3,6 +3,7 @@ // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 #include "catchup/CatchupWork.h" +#include "bucket/BucketList.h" #include "catchup/ApplyBucketsWork.h" #include "catchup/ApplyCheckpointWork.h" #include "catchup/CatchupConfiguration.h" @@ -27,7 +28,7 @@ CatchupWork::CatchupWork(Application& app, CatchupConfiguration catchupConfiguration, ProgressHandler progressHandler) : Work(app, "catchup", BasicWork::RETRY_NEVER) - , mLocalState{app.getHistoryManager().getLastClosedHistoryArchiveState()} + , mLocalState{app.getLedgerManager().getLastClosedLedgerHAS()} , mDownloadDir{std::make_unique( mApp.getTmpDirManager().tmpDir(getName()))} , mCatchupConfiguration{catchupConfiguration} @@ -110,8 +111,17 @@ CatchupWork::downloadApplyBuckets() std::vector hashes = has.differingBuckets(mLocalState); auto getBuckets = std::make_shared( mApp, mBuckets, hashes, *mDownloadDir); + + // A consequence of FIRST_PROTOCOL_SHADOWS_REMOVED upgrade, inputs or + // outputs aren't published to the archives anymore: new-style merges are + // re-started from scratch. To avoid going out of sync during online + // catchup, allow bucket application work to wait for merges to be complete + // before marking itself as "successful". + bool waitForMerges = + mCatchupConfiguration.mode() == CatchupConfiguration::Mode::ONLINE; auto applyBuckets = std::make_shared( - mApp, mBuckets, has, mVerifiedLedgerRangeStart.header.ledgerVersion); + mApp, mBuckets, has, mVerifiedLedgerRangeStart.header.ledgerVersion, + waitForMerges); std::vector> seq{getBuckets, applyBuckets}; return std::make_shared(mApp, "download-verify-apply-buckets", @@ -391,7 +401,7 @@ computeCatchupledgers(uint32_t lastClosedLedger, historyManager.checkpointContainingLedger(smallestLedgerToApply); // if first ledger that should be applied is on checkpoint boundary then - // we do an bucket-apply, and apply ledgers from netx one + // we do an bucket-apply, and apply ledgers from next one if (smallestCheckpointToApply == smallestLedgerToApply) { return {smallestLedgerToApply + 1, diff --git a/src/main/ApplicationUtils.cpp b/src/main/ApplicationUtils.cpp index fb21b8718c..6d938733ab 100644 --- a/src/main/ApplicationUtils.cpp +++ b/src/main/ApplicationUtils.cpp @@ -235,7 +235,8 @@ rebuildLedgerFromBuckets(Config cfg) has.fromString(hasStr); auto applyBucketsWork = ws.executeWork( - localBuckets, has, Config::CURRENT_LEDGER_PROTOCOL_VERSION); + localBuckets, has, Config::CURRENT_LEDGER_PROTOCOL_VERSION, + /* resolveMerges */ false); auto ok = applyBucketsWork->getState() == BasicWork::State::WORK_SUCCESS; if (ok) { From 901534e2b9fe0c480983f035a7f61c077523eaf9 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 6/8] Test protocol 12 upgrade --- src/herder/test/UpgradesTests.cpp | 104 ++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/src/herder/test/UpgradesTests.cpp b/src/herder/test/UpgradesTests.cpp index 1bd6dccfcd..91fdf1825c 100644 --- a/src/herder/test/UpgradesTests.cpp +++ b/src/herder/test/UpgradesTests.cpp @@ -1524,6 +1524,110 @@ TEST_CASE("upgrade to version 11", "[upgrades]") } } +TEST_CASE("upgrade to version 12", "[upgrades]") +{ + VirtualClock clock; + auto cfg = getTestConfig(); + cfg.USE_CONFIG_FOR_GENESIS = false; + + auto app = createTestApplication(clock, cfg); + app->start(); + executeUpgrade(*app, makeProtocolVersionUpgrade(11)); + + auto& lm = app->getLedgerManager(); + uint32_t oldProto = 11; + uint32_t newProto = 12; + auto root = TestAccount{*app, txtest::getRoot(app->getNetworkID())}; + + for (size_t i = 0; i < 10; ++i) + { + auto stranger = + TestAccount{*app, txtest::getAccount(fmt::format("stranger{}", i))}; + TxSetFramePtr txSet = + std::make_shared(lm.getLastClosedLedgerHeader().hash); + uint32_t ledgerSeq = lm.getLastClosedLedgerNum() + 1; + uint64_t minBalance = lm.getLastMinBalance(5); + uint64_t big = minBalance + ledgerSeq; + uint64_t closeTime = 60 * 5 * ledgerSeq; + txSet->add(root.tx({txtest::createAccount(stranger, big)})); + // Provoke sortForHash and hash-caching: + txSet->getContentsHash(); + + // On 4th iteration of advance (a.k.a. ledgerSeq 5), perform a + // ledger-protocol version upgrade to the new protocol, to + // start new-style merges (no shadows) + auto upgrades = xdr::xvector{}; + if (ledgerSeq == 5) + { + auto ledgerUpgrade = LedgerUpgrade{LEDGER_UPGRADE_VERSION}; + ledgerUpgrade.newLedgerVersion() = newProto; + auto v = xdr::xdr_to_opaque(ledgerUpgrade); + upgrades.push_back(UpgradeType{v.begin(), v.end()}); + CLOG(INFO, "Ledger") + << "Ledger " << ledgerSeq << " upgrading to v" << newProto; + } + StellarValue sv(txSet->getContentsHash(), closeTime, upgrades, + STELLAR_VALUE_BASIC); + lm.closeLedger(LedgerCloseData(ledgerSeq, txSet, sv)); + auto& bm = app->getBucketManager(); + auto mc = bm.readMergeCounters(); + auto& bl = bm.getBucketList(); + while (!bl.futuresAllResolved()) + { + bl.resolveAnyReadyFutures(); + } + + if (ledgerSeq < 5) + { + REQUIRE(mc.mPreShadowRemovalProtocolMerges != 0); + } + else + { + auto& lev0 = bm.getBucketList().getLevel(0); + auto& lev1 = bm.getBucketList().getLevel(1); + auto lev0Curr = lev0.getCurr(); + auto lev0Snap = lev0.getSnap(); + auto lev1Curr = lev1.getCurr(); + auto lev1Snap = lev1.getSnap(); + auto getVers = [](std::shared_ptr b) -> uint32_t { + return BucketInputIterator(b).getMetadata().ledgerVersion; + }; + switch (ledgerSeq) + { + case 8: + REQUIRE(getVers(lev1Curr) == newProto); + REQUIRE(getVers(lev1Snap) == oldProto); + REQUIRE(mc.mPostShadowRemovalProtocolMerges == 6); + // One more old-style merge despite the upgrade + // At ledger 8, level 2 spills, and starts an old-style merge, + // as level 1 snap is still of old version + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 7); + break; + case 7: + REQUIRE(getVers(lev0Snap) == newProto); + REQUIRE(getVers(lev1Curr) == oldProto); + REQUIRE(mc.mPostShadowRemovalProtocolMerges == 4); + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6); + break; + case 6: + REQUIRE(getVers(lev0Snap) == newProto); + REQUIRE(getVers(lev1Curr) == oldProto); + REQUIRE(mc.mPostShadowRemovalProtocolMerges == 3); + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6); + break; + case 5: + REQUIRE(getVers(lev0Curr) == newProto); + REQUIRE(getVers(lev0Snap) == oldProto); + REQUIRE(mc.mPostShadowRemovalProtocolMerges == 1); + REQUIRE(mc.mPreShadowRemovalProtocolMerges == 6); + break; + default: + break; + } + } + } +} + TEST_CASE("upgrade base reserve", "[upgrades]") { VirtualClock clock; From 73a4e2ed4659615dc9a3a9e17f011dbd5643a6d8 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 7/8] Test publish and catchup after protocol upgrade --- src/history/test/HistoryTests.cpp | 159 +++++++++++++++++++++++++ src/history/test/HistoryTestsUtils.cpp | 18 ++- src/history/test/HistoryTestsUtils.h | 4 + 3 files changed, 180 insertions(+), 1 deletion(-) diff --git a/src/history/test/HistoryTests.cpp b/src/history/test/HistoryTests.cpp index 566a2f68e7..83065b29ee 100644 --- a/src/history/test/HistoryTests.cpp +++ b/src/history/test/HistoryTests.cpp @@ -320,6 +320,91 @@ TEST_CASE("History publish to multiple archives", "[history]") REQUIRE(catchupSimulation.catchupOffline(catchupApp, checkpointLedger)); } +TEST_CASE("Publish works correctly post shadow removal", "[history]") +{ + // Given a HAS, verify that appropriate levels have "next" cleared, while + // the remaining initialized levels have output hashes. + auto checkFuture = [](uint32_t maxLevelCleared, + uint32_t maxLevelInitialized, + HistoryArchiveState const& has) { + REQUIRE(maxLevelCleared <= maxLevelInitialized); + for (uint32_t i = 0; i <= maxLevelInitialized; ++i) + { + auto next = has.currentBuckets[i].next; + if (i <= maxLevelCleared) + { + REQUIRE(next.isClear()); + } + else + { + REQUIRE(next.hasOutputHash()); + } + } + }; + + auto verifyFutureBucketsInHAS = [&](CatchupSimulation& sim, + uint32_t upgradeLedger, + uint32_t expectedLevelsCleared) { + // Perform publish: 2 checkpoints (or 127 ledgers) correspond to 3 + // levels being initialized and partially filled in the bucketlist + sim.setProto12UpgradeLedger(upgradeLedger); + auto checkpointLedger = sim.getLastCheckpointLedger(2); + auto maxLevelTouched = 3; + sim.ensureOfflineCatchupPossible(checkpointLedger); + + auto& app = sim.getApp(); + auto w = + app.getWorkScheduler().executeWork(); + auto has = w->getHistoryArchiveState(); + REQUIRE(w->getState() == BasicWork::State::WORK_SUCCESS); + checkFuture(expectedLevelsCleared, maxLevelTouched, has); + }; + + auto configurator = + std::make_shared(); + CatchupSimulation catchupSimulation{VirtualClock::VIRTUAL_TIME, + configurator}; + + uint32_t oldProto = Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED - 1; + catchupSimulation.generateRandomLedger(oldProto); + + // The next sections reflect how future buckets in HAS change, depending on + // the protocol version of the merge. Intuitively, if an upgrade is done + // closer to publish, less levels had time to start a new-style merge, + // meaning that some levels will still have an output in them. + SECTION("upgrade way before publish") + { + // Upgrade happened early enough to allow new-style buckets to propagate + // down to level 2 snap, so, that all levels up to 3 are performing new + // style merges. + uint32_t upgradeLedger = 64; + verifyFutureBucketsInHAS(catchupSimulation, upgradeLedger, 3); + } + SECTION("upgrade slightly later") + { + // Between ledger 80 and 127, there is not enough ledgers to propagate + // new-style bucket to level 2 snap, so level 3 still performs an + // old-style merge, while all levels above perform new style merges. + uint32_t upgradeLedger = 80; + verifyFutureBucketsInHAS(catchupSimulation, upgradeLedger, 2); + } + SECTION("upgrade close to publish") + { + // At upgrade ledger 125, level0Curr is new-style. Then, at ledger 126, + // a new-style merge for level 1 is started (lev0Snap is new-style, so + // level 0 and 1 should be clear + uint32_t upgradeLedger = 125; + verifyFutureBucketsInHAS(catchupSimulation, upgradeLedger, 1); + } + SECTION("upgrade right before publish") + { + // At ledger 127, only level0Curr is of new version, so all levels below + // are left as-is. + uint32_t upgradeLedger = 127; + verifyFutureBucketsInHAS(catchupSimulation, upgradeLedger, 0); + } +} + static std::string resumeModeName(uint32_t count) { @@ -512,6 +597,80 @@ TEST_CASE("History prefix catchup", "[history][catchup]") REQUIRE(b->getLedgerManager().getLastClosedLedgerNum() == 2 * freq + 7); } +TEST_CASE("Catchup post-shadow-removal works", "[history]") +{ + uint32_t newProto = Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED; + uint32_t oldProto = newProto - 1; + + auto configurator = + std::make_shared(); + CatchupSimulation catchupSimulation{VirtualClock::VIRTUAL_TIME, + configurator}; + + catchupSimulation.generateRandomLedger(oldProto); + + // Different counts: with proto 12, catchup should adapt and switch merge + // logic + std::vector counts = {0, std::numeric_limits::max(), + 60}; + + SECTION("Upgrade at checkpoint start") + { + uint32_t upgradeLedger = 64; + catchupSimulation.setProto12UpgradeLedger(upgradeLedger); + auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(3); + catchupSimulation.ensureOnlineCatchupPossible(checkpointLedger); + + for (auto count : counts) + { + auto a = catchupSimulation.createCatchupApplication( + count, Config::TESTDB_IN_MEMORY_SQLITE, + std::string("full, ") + resumeModeName(count) + ", " + + dbModeName(Config::TESTDB_IN_MEMORY_SQLITE)); + + REQUIRE(catchupSimulation.catchupOnline(a, checkpointLedger)); + } + } + SECTION("Upgrade mid-checkpoint") + { + // Notice the effect of shifting the upgrade by one ledger: + // At ledger 64, spills of levels 1,2,3 occur, starting merges with + // _old-style_ logic. + // Then at ledger 65, an upgrade happens, but old merges are still valid + uint32_t upgradeLedger = 65; + catchupSimulation.setProto12UpgradeLedger(upgradeLedger); + auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(3); + catchupSimulation.ensureOnlineCatchupPossible(checkpointLedger); + + for (auto count : counts) + { + auto a = catchupSimulation.createCatchupApplication( + count, Config::TESTDB_IN_MEMORY_SQLITE, + std::string("full, ") + resumeModeName(count) + ", " + + dbModeName(Config::TESTDB_IN_MEMORY_SQLITE)); + + REQUIRE(catchupSimulation.catchupOnline(a, checkpointLedger)); + } + } + SECTION("Apply-buckets old-style merges, upgrade during tx replay") + { + // Ensure that ApplyBucketsWork correctly restarts old-style merges + // during catchup. Upgrade happens at ledger 70, so catchup applies + // buckets for the first checkpoint, then replays ledgers 64...127. + uint32_t upgradeLedger = 70; + catchupSimulation.setProto12UpgradeLedger(upgradeLedger); + auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(2); + catchupSimulation.ensureOnlineCatchupPossible(checkpointLedger); + + auto a = catchupSimulation.createCatchupApplication( + 32, Config::TESTDB_IN_MEMORY_SQLITE, + std::string("full, ") + resumeModeName(32) + ", " + + dbModeName(Config::TESTDB_IN_MEMORY_SQLITE)); + + REQUIRE(catchupSimulation.catchupOnline(a, checkpointLedger)); + } +} + TEST_CASE("Catchup non-initentry buckets to initentry-supporting works", "[history][bucket][acceptance]") { diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp index 536909dcb7..c0862e5386 100644 --- a/src/history/test/HistoryTestsUtils.cpp +++ b/src/history/test/HistoryTestsUtils.cpp @@ -540,6 +540,13 @@ CatchupSimulation::generateRandomLedger(uint32_t version) carolSeqs.push_back(carol.loadSequenceNumber()); } +void +CatchupSimulation::setProto12UpgradeLedger(uint32_t ledger) +{ + REQUIRE(mApp.getLedgerManager().getLastClosedLedgerNum() < ledger); + mTestProtocolShadowsRemovedLedgerSeq = ledger; +} + void CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) { @@ -547,7 +554,16 @@ CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) auto& hm = mApp.getHistoryManager(); while (lm.getLastClosedLedgerNum() < targetLedger) { - generateRandomLedger(); + if (lm.getLastClosedLedgerNum() + 1 == + mTestProtocolShadowsRemovedLedgerSeq) + { + // Force proto 12 upgrade + generateRandomLedger(Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED); + } + else + { + generateRandomLedger(); + } auto seq = mApp.getLedgerManager().getLastClosedLedgerNum() + 1; if (seq == hm.nextCheckpointLedger(seq)) diff --git a/src/history/test/HistoryTestsUtils.h b/src/history/test/HistoryTestsUtils.h index d4af5133fa..8bd0b2a66c 100644 --- a/src/history/test/HistoryTestsUtils.h +++ b/src/history/test/HistoryTestsUtils.h @@ -227,6 +227,8 @@ class CatchupSimulation std::vector bobSeqs; std::vector carolSeqs; + uint32_t mTestProtocolShadowsRemovedLedgerSeq{0}; + CatchupMetrics getCatchupMetrics(Application::pointer app); CatchupPerformedWork computeCatchupPerformedWork( uint32_t lastClosedLedger, @@ -286,6 +288,8 @@ class CatchupSimulation void crankUntil(Application::pointer app, std::function const& predicate, VirtualClock::duration duration); + + void setProto12UpgradeLedger(uint32_t ledger); }; } } From 119b6514714e962f2054f54db473f96917c5f608 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 16 Sep 2019 12:23:37 -0700 Subject: [PATCH 8/8] Update BucketList tests to reflect shadow removal logic --- src/bucket/test/BucketListTests.cpp | 24 +++- src/bucket/test/BucketManagerTests.cpp | 43 ++++++- src/bucket/test/BucketTests.cpp | 118 ++++++++++++++++-- ...ucketListIsConsistentWithDatabaseTests.cpp | 22 ++-- 4 files changed, 181 insertions(+), 26 deletions(-) diff --git a/src/bucket/test/BucketListTests.cpp b/src/bucket/test/BucketListTests.cpp index 3275a8c79e..33b4c2ed9b 100644 --- a/src/bucket/test/BucketListTests.cpp +++ b/src/bucket/test/BucketListTests.cpp @@ -172,7 +172,7 @@ TEST_CASE("bucket list", "[bucket][bucketlist]") } } -TEST_CASE("bucket list shadowing", "[bucket][bucketlist]") +TEST_CASE("bucket list shadowing pre/post proto 12", "[bucket][bucketlist]") { VirtualClock clock; Config const& cfg = getTestConfig(); @@ -187,8 +187,10 @@ TEST_CASE("bucket list shadowing", "[bucket][bucketlist]") autocheck::generator> deadGen; CLOG(DEBUG, "Bucket") << "Adding batches to bucket list"; + auto totalNumEntries = 1200; for (uint32_t i = 1; - !app->getClock().getIOContext().stopped() && i < 1200; ++i) + !app->getClock().getIOContext().stopped() && i <= totalNumEntries; + ++i) { app->getClock().crank(false); auto liveBatch = LedgerTestUtils::generateValidLedgerEntries(5); @@ -242,11 +244,25 @@ TEST_CASE("bucket list shadowing", "[bucket][bucketlist]") bool hasBob = (curr->containsBucketIdentity(BucketEntryBob) || snap->containsBucketIdentity(BucketEntryBob)); - CHECK(!hasAlice); - CHECK(!hasBob); + if (app->getConfig().LEDGER_PROTOCOL_VERSION < + Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED || + j > 5) + { + CHECK(!hasAlice); + CHECK(!hasBob); + } + // On the last iteration, when bucket list population is + // complete, ensure that post-FIRST_PROTOCOL_SHADOWS_REMOVED + // Alice and Bob appear on lower levels unshadowed. + else if (i == totalNumEntries) + { + CHECK(hasAlice); + CHECK(hasBob); + } } } } + }); } diff --git a/src/bucket/test/BucketManagerTests.cpp b/src/bucket/test/BucketManagerTests.cpp index 98bb980eda..034b6841f3 100644 --- a/src/bucket/test/BucketManagerTests.cpp +++ b/src/bucket/test/BucketManagerTests.cpp @@ -653,6 +653,12 @@ class StopAndRestartBucketMergesTest << mMergeCounters.mPreInitEntryProtocolMerges; CLOG(INFO, "Bucket") << "PostInitEntryProtocolMerges: " << mMergeCounters.mPostInitEntryProtocolMerges; + CLOG(INFO, "Bucket") + << "mPreShadowRemovalProtocolMerges: " + << mMergeCounters.mPreShadowRemovalProtocolMerges; + CLOG(INFO, "Bucket") + << "mPostShadowRemovalProtocolMerges: " + << mMergeCounters.mPostShadowRemovalProtocolMerges; CLOG(INFO, "Bucket") << "RunningMergeReattachments: " << mMergeCounters.mRunningMergeReattachments; CLOG(INFO, "Bucket") << "FinishedMergeReattachments: " @@ -709,9 +715,17 @@ class StopAndRestartBucketMergesTest } void - checkSensiblePostInitEntryMergeCounters() const + checkSensiblePostInitEntryMergeCounters(uint32_t protocol) const { CHECK(mMergeCounters.mPostInitEntryProtocolMerges != 0); + if (protocol < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + CHECK(mMergeCounters.mPostShadowRemovalProtocolMerges == 0); + } + else + { + CHECK(mMergeCounters.mPostShadowRemovalProtocolMerges != 0); + } CHECK(mMergeCounters.mNewMetaEntries == 0); CHECK(mMergeCounters.mNewInitEntries != 0); @@ -730,9 +744,18 @@ class StopAndRestartBucketMergesTest CHECK(mMergeCounters.mOldInitEntriesMergedWithNewDead != 0); CHECK(mMergeCounters.mNewEntriesMergedWithOldNeitherInit != 0); - CHECK(mMergeCounters.mShadowScanSteps != 0); + if (protocol < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED) + { + CHECK(mMergeCounters.mShadowScanSteps != 0); + CHECK(mMergeCounters.mLiveEntryShadowElisions != 0); + } + else + { + CHECK(mMergeCounters.mShadowScanSteps == 0); + CHECK(mMergeCounters.mLiveEntryShadowElisions == 0); + } + CHECK(mMergeCounters.mMetaEntryShadowElisions == 0); - CHECK(mMergeCounters.mLiveEntryShadowElisions != 0); CHECK(mMergeCounters.mInitEntryShadowElisions == 0); CHECK(mMergeCounters.mDeadEntryShadowElisions == 0); @@ -746,6 +769,7 @@ class StopAndRestartBucketMergesTest checkSensiblePreInitEntryMergeCounters() const { CHECK(mMergeCounters.mPreInitEntryProtocolMerges != 0); + CHECK(mMergeCounters.mPreShadowRemovalProtocolMerges != 0); CHECK(mMergeCounters.mNewMetaEntries == 0); CHECK(mMergeCounters.mNewInitEntries == 0); @@ -784,6 +808,11 @@ class StopAndRestartBucketMergesTest CHECK(mMergeCounters.mPostInitEntryProtocolMerges == other.mMergeCounters.mPostInitEntryProtocolMerges); + CHECK(mMergeCounters.mPreShadowRemovalProtocolMerges == + other.mMergeCounters.mPreShadowRemovalProtocolMerges); + CHECK(mMergeCounters.mPostShadowRemovalProtocolMerges == + other.mMergeCounters.mPostShadowRemovalProtocolMerges); + CHECK(mMergeCounters.mRunningMergeReattachments == other.mMergeCounters.mRunningMergeReattachments); CHECK(mMergeCounters.mFinishedMergeReattachments == @@ -1229,7 +1258,7 @@ class StopAndRestartBucketMergesTest mControlSurveys.rbegin()->second.dumpMergeCounters( "control, Post-INITENTRY", mDesignatedLevel); mControlSurveys.rbegin() - ->second.checkSensiblePostInitEntryMergeCounters(); + ->second.checkSensiblePostInitEntryMergeCounters(mProtocol); } else { @@ -1248,7 +1277,8 @@ TEST_CASE("bucket persistence over app restart with initentry", { for (uint32_t protocol : {Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY - 1, - Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY}) + Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY, + Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED}) { for (uint32_t level : {2, 3}) { @@ -1264,7 +1294,8 @@ TEST_CASE("bucket persistence over app restart with initentry - extended", { for (uint32_t protocol : {Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY - 1, - Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY}) + Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY, + Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED}) { for (uint32_t level : {2, 3, 4, 5}) { diff --git a/src/bucket/test/BucketTests.cpp b/src/bucket/test/BucketTests.cpp index 2d63b15b3a..470df43fdc 100644 --- a/src/bucket/test/BucketTests.cpp +++ b/src/bucket/test/BucketTests.cpp @@ -55,6 +55,15 @@ getAppLedgerVersion(Application::pointer app) void for_versions_with_differing_bucket_logic( Config const& cfg, std::function const& f) +{ + for_versions({Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY - 1, + Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED}, + cfg, f); +} + +void +for_versions_with_differing_initentry_logic( + Config const& cfg, std::function const& f) { for_versions({Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY - 1, Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY}, @@ -319,6 +328,73 @@ generateDifferentAccount(std::vector const& others) } } +TEST_CASE("merges proceed old-style despite newer shadows", + "[bucket][bucketmaxprotocol]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + Application::pointer app = createTestApplication(clock, cfg); + auto& bm = app->getBucketManager(); + auto v12 = Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED; + auto v11 = v12 - 1; + auto v10 = v11 - 1; + + LedgerEntry liveEntry = generateAccount(); + LedgerEntry otherLiveA = generateDifferentAccount({liveEntry}); + + auto b10first = Bucket::fresh(bm, v10, {}, {liveEntry}, {}, + /*countMergeEvents=*/true, + /*doFsync=*/true); + auto b10second = Bucket::fresh(bm, v10, {}, {otherLiveA}, {}, + /*countMergeEvents=*/true, + /*doFsync=*/true); + + auto b11first = Bucket::fresh(bm, v11, {}, {liveEntry}, {}, + /*countMergeEvents=*/true, + /*doFsync=*/true); + auto b11second = Bucket::fresh(bm, v11, {}, {otherLiveA}, {}, + /*countMergeEvents=*/true, + /*doFsync=*/true); + + auto b12first = + Bucket::fresh(bm, v12, {}, {liveEntry}, {}, /*countMergeEvents=*/true, + /*doFsync=*/true); + auto b12second = Bucket::fresh(bm, v12, {}, {otherLiveA}, {}, + /*countMergeEvents=*/true, + /*doFsync=*/true); + + SECTION("shadow version 12") + { + // With proto 12, new bucket version solely depends on the snap version + auto bucket = Bucket::merge(bm, v12, b11first, b11second, + /*shadows=*/{b12first}, + /*keepDeadEntries=*/true, + /*countMergeEvents=*/true, + /*doFsync=*/true); + REQUIRE(Bucket::getBucketVersion(bucket) == v11); + } + SECTION("shadow versions mixed, pick lower") + { + // Merging older version (10) buckets, with mixed versions of shadows + // (11, 12) Pick initentry (11) style merge + auto bucket = Bucket::merge(bm, v12, b10first, b10second, + /*shadows=*/{b12first, b11second}, + /*keepDeadEntries=*/true, + /*countMergeEvents=*/true, + /*doFsync=*/true); + REQUIRE(Bucket::getBucketVersion(bucket) == v11); + } + SECTION("refuse to merge new version with shadow") + { + REQUIRE_THROWS_AS(Bucket::merge(bm, v12, b12first, b12second, + /*shadows=*/{b12first}, + /*keepDeadEntries=*/true, + /*countMergeEvents=*/true, + /*doFsync=*/true), + std::runtime_error); + } +} + TEST_CASE("merges refuse to exceed max protocol version", "[bucket][bucketmaxprotocol]") { @@ -347,12 +423,6 @@ TEST_CASE("merges refuse to exceed max protocol version", /*countMergeEvents=*/true, /*doFsync=*/true), std::runtime_error); - REQUIRE_THROWS_AS(Bucket::merge(bm, vers - 1, bold1, bold2, - /*shadows=*/{bnew1}, - /*keepDeadEntries=*/true, - /*countMergeEvents=*/true, - /*doFsync=*/true), - std::runtime_error); } TEST_CASE("bucket output iterator rejects wrong-version entries", @@ -554,6 +624,41 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") CHECK(emerge2.nInit == 0); CHECK(emerge2.nLive == 3); } + }); +} + +TEST_CASE("merging bucket entries with initentry with shadows", + "[bucket][initentry]") +{ + VirtualClock clock; + Config const& cfg = getTestConfig(); + for_versions_with_differing_initentry_logic(cfg, [&](Config const& cfg) { + CLOG(INFO, "Bucket") << "=== starting test app == "; + Application::pointer app = createTestApplication(clock, cfg); + auto& bm = app->getBucketManager(); + auto vers = getAppLedgerVersion(app); + + // Whether we're in the era of supporting or not-supporting INITENTRY. + bool initEra = + (vers >= Bucket::FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY); + + CLOG(INFO, "Bucket") << "=== finished buckets for initial account == "; + + LedgerEntry liveEntry = generateAccount(); + LedgerEntry liveEntry2 = generateSameAccountDifferentState({liveEntry}); + LedgerEntry liveEntry3 = + generateSameAccountDifferentState({liveEntry, liveEntry2}); + LedgerEntry otherLiveA = generateDifferentAccount({liveEntry}); + LedgerEntry otherLiveB = + generateDifferentAccount({liveEntry, otherLiveA}); + LedgerEntry otherLiveC = + generateDifferentAccount({liveEntry, otherLiveA, otherLiveB}); + LedgerEntry initEntry = generateSameAccountDifferentState( + {liveEntry, liveEntry2, liveEntry3}); + LedgerEntry initEntry2 = generateSameAccountDifferentState( + {initEntry, liveEntry, liveEntry2, liveEntry3}); + LedgerEntry otherInitA = generateDifferentAccount({initEntry}); + LedgerKey deadEntry = LedgerEntryKey(liveEntry); SECTION("shadows influence lifecycle entries appropriately") { @@ -590,7 +695,6 @@ TEST_CASE("merging bucket entries with initentry", "[bucket][initentry]") CHECK(e.nDead == 0); } } - SECTION("shadowing does not revive dead entries") { // This is the first contrived example of what might go wrong if we diff --git a/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp b/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp index 4624f98147..3fa01c1432 100644 --- a/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp +++ b/src/invariant/test/BucketListIsConsistentWithDatabaseTests.cpp @@ -59,10 +59,11 @@ struct BucketListGenerator { std::map> buckets; auto has = getHistoryArchiveState(); + has.prepareForPublish(*mAppApply); auto& wm = mAppApply->getWorkScheduler(); - wm.executeWork(buckets, has, - mAppApply->getConfig().LEDGER_PROTOCOL_VERSION, - std::forward(args)...); + wm.executeWork( + buckets, has, mAppApply->getConfig().LEDGER_PROTOCOL_VERSION, + /* resolveMerges= */ false, std::forward(args)...); } void @@ -277,8 +278,9 @@ class ApplyBucketsWorkAddEntry : public ApplyBucketsWork Application& app, std::map> const& buckets, HistoryArchiveState const& applyState, uint32_t maxProtocolVersion, - LedgerEntry const& entry) - : ApplyBucketsWork(app, buckets, applyState, maxProtocolVersion) + bool resolve, LedgerEntry const& entry) + : ApplyBucketsWork(app, buckets, applyState, maxProtocolVersion, + resolve) , mEntry(entry) , mAdded{false} { @@ -328,8 +330,9 @@ class ApplyBucketsWorkDeleteEntry : public ApplyBucketsWork Application& app, std::map> const& buckets, HistoryArchiveState const& applyState, uint32_t maxProtocolVersion, - LedgerEntry const& target) - : ApplyBucketsWork(app, buckets, applyState, maxProtocolVersion) + bool resolve, LedgerEntry const& target) + : ApplyBucketsWork(app, buckets, applyState, maxProtocolVersion, + resolve) , mKey(LedgerEntryKey(target)) , mEntry(target) , mDeleted{false} @@ -414,8 +417,9 @@ class ApplyBucketsWorkModifyEntry : public ApplyBucketsWork Application& app, std::map> const& buckets, HistoryArchiveState const& applyState, uint32_t maxProtocolVersion, - LedgerEntry const& target) - : ApplyBucketsWork(app, buckets, applyState, maxProtocolVersion) + bool resolve, LedgerEntry const& target) + : ApplyBucketsWork(app, buckets, applyState, maxProtocolVersion, + resolve) , mKey(LedgerEntryKey(target)) , mEntry(target) , mModified{false}