Skip to content

Commit

Permalink
Merge pull request #2205 from marta-lokhova/avoid_resolving_buckets_o…
Browse files Browse the repository at this point in the history
…n_publish

Remove bucketlist shadows, avoid resolving snapshots on publish

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita committed Sep 16, 2019
2 parents 14a0f41 + 119b651 commit 1e4ebb5
Show file tree
Hide file tree
Showing 30 changed files with 811 additions and 91 deletions.
33 changes: 31 additions & 2 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,18 @@ calculateMergeProtocolVersion(
protocolVersion = std::max(oi.getMetadata().ledgerVersion,
ni.getMetadata().ledgerVersion);

// Starting with FIRST_PROTOCOL_SHADOWS_REMOVED,
// protocol version is determined as a max of curr, snap, and any shadow of
// version < FIRST_PROTOCOL_SHADOWS_REMOVED. This means that a bucket may
// still perform an old style merge despite the presence of the new protocol
// shadows.
for (auto const& si : shadowIterators)
{
protocolVersion =
std::max(si.getMetadata().ledgerVersion, protocolVersion);
auto version = si.getMetadata().ledgerVersion;
if (version < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
protocolVersion = std::max(version, protocolVersion);
}
}

CLOG(TRACE, "Bucket") << "Bucket merge protocolVersion=" << protocolVersion
Expand Down Expand Up @@ -387,6 +395,19 @@ calculateMergeProtocolVersion(
{
++mc.mPostInitEntryProtocolMerges;
}

if (protocolVersion < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
++mc.mPreShadowRemovalProtocolMerges;
}
else
{
if (!shadowIterators.empty())
{
throw std::runtime_error("Shadows are not supported");
}
++mc.mPostShadowRemovalProtocolMerges;
}
}

// There are 4 "easy" cases for merging: exhausted iterators on either
Expand Down Expand Up @@ -615,4 +636,12 @@ Bucket::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
shadows};
return out.getBucket(bucketManager, &mk);
}

uint32_t
Bucket::getBucketVersion(std::shared_ptr<Bucket> const& bucket)
{
assert(bucket);
BucketInputIterator it(bucket);
return it.getMetadata().ledgerVersion;
}
}
3 changes: 3 additions & 0 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// we were only supporting LIVEENTRY and DEADENTRY.
static constexpr uint32_t
FIRST_PROTOCOL_SUPPORTING_INITENTRY_AND_METAENTRY = 11;
static constexpr uint32_t FIRST_PROTOCOL_SHADOWS_REMOVED = 12;

static void checkProtocolLegality(BucketEntry const& entry,
uint32_t protocolVersion);
Expand Down Expand Up @@ -103,5 +104,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
std::shared_ptr<Bucket> const& newBucket,
std::vector<std::shared_ptr<Bucket>> const& shadows,
bool keepDeadEntries, bool countMergeEvents, bool doFsync);

static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
};
}
71 changes: 68 additions & 3 deletions src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "main/Application.h"
#include "util/Logging.h"
#include "util/XDRStream.h"
#include "util/format.h"
#include "util/types.h"
#include <cassert>

Expand Down Expand Up @@ -154,8 +155,12 @@ BucketLevel::prepare(Application& app, uint32_t currLedger,
}
}

mNextCurr = FutureBucket(app, curr, snap, shadows, currLedgerProtocol,
countMergeEvents, mLevel);
auto shadowsBasedOnProtocol =
Bucket::getBucketVersion(snap) >= Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED
? std::vector<std::shared_ptr<Bucket>>()
: shadows;
mNextCurr = FutureBucket(app, curr, snap, shadowsBasedOnProtocol,
currLedgerProtocol, countMergeEvents, mLevel);
assert(mNextCurr.isMerging());
}

Expand Down Expand Up @@ -421,6 +426,14 @@ BucketList::resolveAnyReadyFutures()
}
}

bool
BucketList::futuresAllResolved() const
{
return std::all_of(
mLevels.begin(), mLevels.end(),
[](BucketLevel const& bl) { return !bl.getNext().isMerging(); });
}

void
BucketList::addBatch(Application& app, uint32_t currLedger,
uint32_t currLedgerProtocol,
Expand Down Expand Up @@ -551,7 +564,8 @@ BucketList::addBatch(Application& app, uint32_t currLedger,
}

void
BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion)
BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion,
uint32_t ledger)
{
for (uint32_t i = 0; i < static_cast<uint32>(mLevels.size()); i++)
{
Expand All @@ -566,6 +580,57 @@ BucketList::restartMerges(Application& app, uint32_t maxProtocolVersion)
<< "Restarted merge on BucketList level " << i;
}
}
// The next block assumes we are re-starting a
// FIRST_PROTOCOL_SHADOWS_REMOVED or later merge, which has no shadows
// _and_ no stored inputs/outputs, and ensures that we are only
// re-starting buckets of correct version.
else if (next.isClear() && i > 0)
{
// Recover merge by iterating through bucketlist levels and
// using snaps and currs. The only time we don't use level's
// curr is when a level is about to snap; in that case, when the
// merge is needed, level's curr will snap, and merge will be
// promoted into curr. Therefore, when starting merge, we use an
// empty curr.
// Additionally, it is safe to recover a merge at any point
// before the merge is needed (meaning it should be promoted
// into level's curr after ledger close). This is due to the
// fact that future bucket inputs _do not change_ until level
// spill, and after such spills, new merges are started with new
// inputs.
auto snap = mLevels[i - 1].getSnap();
Hash const emptyHash;

// Exit early if a level has not been initialized yet;
// There are two possibilities for empty buckets: it is either truly
// untouched (meaning not enough ledgers were produced to populate
// given level) or it's a protocol 10-or-earlier bucket (since it
// does not contain a metadata entry). If we are dealing with
// 10-or-earlier bucket, it must have had an output published, and
// would be handled in the previous `if` block. Therefore, we must
// be dealing with an untouched level.
if (snap->getHash() == emptyHash)
{
return;
}

auto version = Bucket::getBucketVersion(snap);
if (version < Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
auto msg =
fmt::format("Invalid state: bucketlist level {} has clear "
"future bucket but version {} snap",
i, version);
throw std::runtime_error(msg);
}

// Round down the current ledger to when the merge was started, and
// re-start the merge via prepare, mimicking the logic in `addBatch`
auto mergeStartLedger = mask(ledger, BucketList::levelHalf(i - 1));
level.prepare(
app, mergeStartLedger, version, snap, /* shadows= */ {},
!app.getConfig().ARTIFICIALLY_REDUCE_MERGE_COUNTS_FOR_TESTING);
}
}
}

Expand Down
13 changes: 12 additions & 1 deletion src/bucket/BucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,16 @@ class BucketList
// merging buckets between levels. This needs to be called after forcing a
// BucketList to adopt a new state, either at application restart or when
// catching up from buckets loaded over the network.
void restartMerges(Application& app, uint32_t maxProtocolVersion);

// There are two ways to re-start a merge:
// 1. Use non-LIVE inputs/outputs from HAS file. This function will make
// these FutureBuckets LIVE (either FB_INPUT_LIVE or FB_OUTPUT_LIVE)
// using hashes of inputs and outputs from the HAS
// 2. Introduced with FIRST_PROTOCOL_SHADOWS_REMOVED: skip using
// input/output hashes, and restart live merges from currs and snaps of the
// bucketlist at that ledger.
void restartMerges(Application& app, uint32_t maxProtocolVersion,
uint32_t ledger);

// Run through the levels and check for FutureBuckets that are done merging;
// if so, call resolve() on them, changing state from FB_LIVE_INPUTS to
Expand All @@ -413,6 +422,8 @@ class BucketList
// HistoryArchiveStates, that can cause repeated merges when re-activated.
void resolveAnyReadyFutures();

bool futuresAllResolved() const;

// Add a batch of initial (created), live (updated) and dead entries to the
// bucketlist, representing the entries effected by closing
// `currLedger`. The bucketlist will incorporate these into the smallest
Expand Down
3 changes: 3 additions & 0 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ struct MergeCounters
uint64_t mRunningMergeReattachments{0};
uint64_t mFinishedMergeReattachments{0};

uint64_t mPreShadowRemovalProtocolMerges{0};
uint64_t mPostShadowRemovalProtocolMerges{0};

uint64_t mNewMetaEntries{0};
uint64_t mNewInitEntries{0};
uint64_t mNewLiveEntries{0};
Expand Down
5 changes: 4 additions & 1 deletion src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ MergeCounters::operator+=(MergeCounters const& delta)
mRunningMergeReattachments += delta.mRunningMergeReattachments;
mFinishedMergeReattachments += delta.mFinishedMergeReattachments;

mPreShadowRemovalProtocolMerges += delta.mPreShadowRemovalProtocolMerges;
mPostShadowRemovalProtocolMerges += delta.mPostShadowRemovalProtocolMerges;

mNewMetaEntries += delta.mNewMetaEntries;
mNewInitEntries += delta.mNewInitEntries;
mNewLiveEntries += delta.mNewLiveEntries;
Expand Down Expand Up @@ -732,7 +735,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
mBucketList.getLevel(i).setNext(has.currentBuckets.at(i).next);
}

mBucketList.restartMerges(mApp, maxProtocolVersion);
mBucketList.restartMerges(mApp, maxProtocolVersion, has.currentLedger);
cleanupStaleFiles();
}

Expand Down
15 changes: 15 additions & 0 deletions src/bucket/FutureBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ FutureBucket::FutureBucket(Application& app,
assert(snap);
mInputCurrBucketHash = binToHex(curr->getHash());
mInputSnapBucketHash = binToHex(snap->getHash());
if (Bucket::getBucketVersion(snap) >=
Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED)
{
if (!mInputShadowBuckets.empty())
{
throw std::runtime_error(
"Invalid FutureBucket: ledger version doesn't support shadows");
}
}
for (auto const& b : mInputShadowBuckets)
{
mInputShadowBucketHashes.push_back(binToHex(b->getHash()));
Expand Down Expand Up @@ -204,6 +213,12 @@ FutureBucket::hasHashes() const
return (mState == FB_HASH_INPUTS || mState == FB_HASH_OUTPUT);
}

bool
FutureBucket::isClear() const
{
return mState == FB_CLEAR;
}

bool
FutureBucket::mergeComplete() const
{
Expand Down
3 changes: 3 additions & 0 deletions src/bucket/FutureBucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class FutureBucket
// running).
bool isMerging() const;

// Returns whether this object is in a FB_CLEAR state.
bool isClear() const;

// Returns whether this object is in a FB_HASH_FOO state.
bool hasHashes() const;

Expand Down
24 changes: 20 additions & 4 deletions src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ TEST_CASE("bucket list", "[bucket][bucketlist]")
}
}

TEST_CASE("bucket list shadowing", "[bucket][bucketlist]")
TEST_CASE("bucket list shadowing pre/post proto 12", "[bucket][bucketlist]")
{
VirtualClock clock;
Config const& cfg = getTestConfig();
Expand All @@ -187,8 +187,10 @@ TEST_CASE("bucket list shadowing", "[bucket][bucketlist]")
autocheck::generator<std::vector<LedgerKey>> deadGen;
CLOG(DEBUG, "Bucket") << "Adding batches to bucket list";

auto totalNumEntries = 1200;
for (uint32_t i = 1;
!app->getClock().getIOContext().stopped() && i < 1200; ++i)
!app->getClock().getIOContext().stopped() && i <= totalNumEntries;
++i)
{
app->getClock().crank(false);
auto liveBatch = LedgerTestUtils::generateValidLedgerEntries(5);
Expand Down Expand Up @@ -242,11 +244,25 @@ TEST_CASE("bucket list shadowing", "[bucket][bucketlist]")
bool hasBob =
(curr->containsBucketIdentity(BucketEntryBob) ||
snap->containsBucketIdentity(BucketEntryBob));
CHECK(!hasAlice);
CHECK(!hasBob);
if (app->getConfig().LEDGER_PROTOCOL_VERSION <
Bucket::FIRST_PROTOCOL_SHADOWS_REMOVED ||
j > 5)
{
CHECK(!hasAlice);
CHECK(!hasBob);
}
// On the last iteration, when bucket list population is
// complete, ensure that post-FIRST_PROTOCOL_SHADOWS_REMOVED
// Alice and Bob appear on lower levels unshadowed.
else if (i == totalNumEntries)
{
CHECK(hasAlice);
CHECK(hasBob);
}
}
}
}

});
}

Expand Down

0 comments on commit 1e4ebb5

Please sign in to comment.