Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove bucketlist shadows, avoid resolving snapshots on publish #2205

Merged
33 changes: 31 additions & 2 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,18 @@ calculateMergeProtocolVersion(
protocolVersion = std::max(oi.getMetadata().ledgerVersion,
ni.getMetadata().ledgerVersion);

// Starting with FIRST_PROTOCOL_SHADOWS_REMOVED,
// protocol version is determined as a max of curr, snap, and any shadow of
// version < FIRST_PROTOCOL_SHADOWS_REMOVED. This means that a bucket may
// still perform an old style merge despite the presence of the new protocol
// shadows.
for (auto const& si : shadowIterators)
{
protocolVersion =
std::max(si.getMetadata().ledgerVersion, protocolVersion);
auto version = si.getMetadata().ledgerVersion;
if (version < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
MonsieurNicolas marked this conversation as resolved.
Show resolved Hide resolved
{
protocolVersion = std::max(version, protocolVersion);
}
}

CLOG(TRACE, "Bucket") << "Bucket merge protocolVersion=" << protocolVersion
Expand Down Expand Up @@ -387,6 +395,19 @@ calculateMergeProtocolVersion(
{
++mc.mPostInitEntryProtocolMerges;
}

if (protocolVersion < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
++mc.mPreShadowRemovalProtocolMerges;
}
else
{
if (!shadowIterators.empty())
{
throw std::runtime_error("Shadows are not supported");
}
++mc.mPostShadowRemovalProtocolMerges;
}
}

// There are 4 "easy" cases for merging: exhausted iterators on either
Expand Down Expand Up @@ -615,4 +636,12 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
shadows};
return out.getBucket(bucketManager, &mk);
}

uint32_t
Bucket::getBucketVersion(std::shared_ptr<Bucket> const& bucket)
{
assert(bucket);
BucketInputIterator it(bucket);
return it.getMetadata().ledgerVersion;
}
}
3 changes: 3 additions & 0 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// we were only supporting LIVEENTRY and DEADENTRY.
static constexpr uint32_t
FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY = 11;
static constexpr uint32_t FIRST_PROTOCOL_SHADOWS_REMOVED = 12;

static void checkProtocolLegality(BucketEntry const& entry,
uint32_t protocolVersion);
Expand Down Expand Up @@ -103,5 +104,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents, bool doFsync);

static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
};
}
71 changes: 68 additions & 3 deletions src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "main/Application.h"
#include "util/Logging.h"
#include "util/XDRStream.h"
#include "util/format.h"
#include "util/types.h"
#include <cassert>

Expand Down Expand Up @@ -154,8 +155,12 @@ BucketLevel::prepare(Application& app, uint32_t currLedger,
}
}

mNextCurr = FutureBucket(app, curr, snap, shadows, currLedgerProtocol,
countMergeEvents, mLevel);
auto shadowsBasedOnProtocol =
Bucket::getBucketVersion(snap) >= Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED
? std::vector<std::shared_ptr<Bucket>>()
: shadows;
mNextCurr = FutureBucket(app, curr, snap, shadowsBasedOnProtocol,
currLedgerProtocol, countMergeEvents, mLevel);
assert(mNextCurr.isMerging());
}

Expand Down Expand Up @@ -421,6 +426,14 @@ BucketList::resolveAnyReadyFutures()
}
}

bool
BucketList::futuresAllResolved() const
{
return std::all_of(
mLevels.begin(), mLevels.end(),
[](BucketLevel const& bl) { return !bl.getNext().isMerging(); });
}

void
BucketList::addBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
Expand Down Expand Up @@ -551,7 +564,8 @@ BucketList::addBatch(Application& app, uint32_t currLedger,
}

void
BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion)
BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion,
MonsieurNicolas marked this conversation as resolved.
Show resolved Hide resolved
uint32_t ledger)
{
for (uint32_t i = 0; i < static_cast<uint32>(mLevels.size()); i++)
{
Expand All @@ -566,6 +580,57 @@ BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion)
<< "Restarted merge on BucketList level " << i;
}
}
// The next block assumes we are re-starting a
// FIRST_PROTOCOL_SHADOWS_REMOVED or later merge, which has no shadows
// _and_ no stored inputs/outputs, and ensures that we are only
// re-starting buckets of correct version.
else if (next.isClear() && i > 0)
graydon marked this conversation as resolved.
Show resolved Hide resolved
{
// Recover merge by iterating through bucketlist levels and
// using snaps and currs. The only time we don't use level's
// curr is when a level is about to snap; in that case, when the
// merge is needed, level's curr will snap, and merge will be
// promoted into curr. Therefore, when starting merge, we use an
// empty curr.
// Additionally, it is safe to recover a merge at any point
// before the merge is needed (meaning it should be promoted
// into level's curr after ledger close). This is due to the
// fact that future bucket inputs _do not change_ until level
// spill, and after such spills, new merges are started with new
// inputs.
auto snap = mLevels[i - 1].getSnap();
Hash const emptyHash;

// Exit early if a level has not been initialized yet;
graydon marked this conversation as resolved.
Show resolved Hide resolved
// There are two possibilities for empty buckets: it is either truly
// untouched (meaning not enough ledgers were produced to populate
// given level) or it's a protocol 10-or-earlier bucket (since it
// does not contain a metadata entry). If we are dealing with
// 10-or-earlier bucket, it must have had an output published, and
// would be handled in the previous `if` block. Therefore, we must
// be dealing with an untouched level.
if (snap->getHash() == emptyHash)
{
return;
}

auto version = Bucket::getBucketVersion(snap);
if (version < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
auto msg =
fmt::format("Invalid state: bucketlist level {} has clear "
"future bucket but version {} snap",
i, version);
throw std::runtime_error(msg);
}

// Round down the current ledger to when the merge was started, and
// re-start the merge via prepare, mimicking the logic in `addBatch`
auto mergeStartLedger = mask(ledger, BucketList::levelHalf(i - 1));
level.prepare(
app, mergeStartLedger, version, snap, /* shadows= */ {},
!app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING);
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion src/bucket/BucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,16 @@ class BucketList
// merging buckets between levels. This needs to be called after forcing a
// BucketList to adopt a new state, either at application restart or when
// catching up from buckets loaded over the network.
void restartMerges(Application& app, uint32_t maxProtocolVersion);

// There are two ways to re-start a merge:
// 1. Use non-LIVE inputs/outputs from HAS file. This function will make
// these FutureBuckets LIVE (either FB_INPUT_LIVE or FB_OUTPUT_LIVE)
// using hashes of inputs and outputs from the HAS
// 2. Introduced with FIRST_PROTOCOL_SHADOWS_REMOVED: skip using
// input/output hashes, and restart live merges from currs and snaps of the
// bucketlist at that ledger.
void restartMerges(Application& app, uint32_t maxProtocolVersion,
uint32_t ledger);

// Run through the levels and check for FutureBuckets that are done merging;
// if so, call resolve() on them, changing state from FB_LIVE_INPUTS to
Expand All @@ -413,6 +422,8 @@ class BucketList
// HistoryArchiveStates, that can cause repeated merges when re-activated.
void resolveAnyReadyFutures();

bool futuresAllResolved() const;

// Add a batch of initial (created), live (updated) and dead entries to the
// bucketlist, representing the entries effected by closing
// `currLedger`. The bucketlist will incorporate these into the smallest
Expand Down
3 changes: 3 additions & 0 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ struct MergeCounters
uint64_t mRunningMergeReattachments{0};
uint64_t mFinishedMergeReattachments{0};

uint64_t mPreShadowRemovalProtocolMerges{0};
uint64_t mPostShadowRemovalProtocolMerges{0};

uint64_t mNewMetaEntries{0};
uint64_t mNewInitEntries{0};
uint64_t mNewLiveEntries{0};
Expand Down
5 changes: 4 additions & 1 deletion src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ MergeCounters::operator+=(MergeCounters const& delta)
mRunningMergeReattachments += delta.mRunningMergeReattachments;
mFinishedMergeReattachments += delta.mFinishedMergeReattachments;

mPreShadowRemovalProtocolMerges += delta.mPreShadowRemovalProtocolMerges;
mPostShadowRemovalProtocolMerges += delta.mPostShadowRemovalProtocolMerges;

mNewMetaEntries += delta.mNewMetaEntries;
mNewInitEntries += delta.mNewInitEntries;
mNewLiveEntries += delta.mNewLiveEntries;
Expand Down Expand Up @@ -732,7 +735,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
mBucketList.getLevel(i).setNext(has.currentBuckets.at(i).next);
}

mBucketList.restartMerges(mApp, maxProtocolVersion);
mBucketList.restartMerges(mApp, maxProtocolVersion, has.currentLedger);
cleanupStaleFiles();
}

Expand Down
15 changes: 15 additions & 0 deletions src/bucket/FutureBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ FutureBucket::FutureBucket(Application& app,
assert(snap);
mInputCurrBucketHash = binToHex(curr->getHash());
mInputSnapBucketHash = binToHex(snap->getHash());
if (Bucket::getBucketVersion(snap) >=
Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
if (!mInputShadowBuckets.empty())
{
throw std::runtime_error(
"Invalid FutureBucket: ledger version doesn't support shadows");
}
}
for (auto const& b : mInputShadowBuckets)
{
mInputShadowBucketHashes.push_back(binToHex(b->getHash()));
Expand Down Expand Up @@ -204,6 +213,12 @@ FutureBucket::hasHashes() const
return (mState == FB_HASH_INPUTS || mState == FB_HASH_OUTPUT);
}

bool
FutureBucket::isClear() const
{
return mState == FB_CLEAR;
}

bool
FutureBucket::mergeComplete() const
{
Expand Down
3 changes: 3 additions & 0 deletions src/bucket/FutureBucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class FutureBucket
// running).
bool isMerging() const;

// Returns whether this object is in a FB_CLEAR state.
bool isClear() const;

// Returns whether this object is in a FB_HASH_FOO state.
bool hasHashes() const;

Expand Down
24 changes: 20 additions & 4 deletions src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ TEST_CASE("bucket list", "[bucket][bucketlist]")
}
}

TEST_CASE("bucket list shadowing", "[bucket][bucketlist]")
TEST_CASE("bucket list shadowing pre/post proto 12", "[bucket][bucketlist]")
{
VirtualClock clock;
Config const& cfg = getTestConfig();
Expand All @@ -187,8 +187,10 @@ TEST_CASE("bucket list shadowing", "[bucket][bucketlist]")
autocheck::generator<std::vector<LedgerKey>> deadGen;
CLOG(DEBUG, "Bucket") << "Adding batches to bucket list";

auto totalNumEntries = 1200;
for (uint32_t i = 1;
!app->getClock().getIOContext().stopped() && i < 1200; ++i)
!app->getClock().getIOContext().stopped() && i <= totalNumEntries;
++i)
{
app->getClock().crank(false);
auto liveBatch = LedgerTestUtils::generateValidLedgerEntries(5);
Expand Down Expand Up @@ -242,11 +244,25 @@ TEST_CASE("bucket list shadowing", "[bucket][bucketlist]")
bool hasBob =
(curr->containsBucketIdentity(BucketEntryBob) ||
snap->containsBucketIdentity(BucketEntryBob));
CHECK(!hasAlice);
CHECK(!hasBob);
if (app->getConfig().LEDGER_PROTOCOL_VERSION <
Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED ||
j > 5)
{
CHECK(!hasAlice);
CHECK(!hasBob);
}
// On the last iteration, when bucket list population is
// complete, ensure that post-FIRST_PROTOCOL_SHADOWS_REMOVED
// Alice and Bob appear on lower levels unshadowed.
else if (i == totalNumEntries)
{
CHECK(hasAlice);
CHECK(hasBob);
}
}
}
}

});
}

Expand Down
Loading