Skip to content

Commit

Permalink
Enable parallel BucketsDB loads
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Jan 30, 2024
1 parent 6cfd9fd commit 86f4619
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 28 deletions.
4 changes: 3 additions & 1 deletion src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,11 @@ class BucketManager : NonMovableOrCopyable
virtual void maybeSetIndex(std::shared_ptr<Bucket> b,
std::unique_ptr<BucketIndex const>&& index) = 0;

virtual std::unique_ptr<SearchableBucketListSnapshot const>
virtual std::unique_ptr<SearchableBucketListSnapshot>
getSearchableBucketListSnapshot() const = 0;

virtual std::recursive_mutex& getBucketSnapshotMutex() const = 0;

// Scans BucketList for non-live entries to evict starting at the entry
// pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries
// have been evicted or maxEvictionScanSize bytes have been scanned.
Expand Down
8 changes: 7 additions & 1 deletion src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ BucketManagerImpl::scanForEvictionLegacySQL(AbstractLedgerTxn& ltx,
}
}

std::unique_ptr<SearchableBucketListSnapshot const>
std::unique_ptr<SearchableBucketListSnapshot>
BucketManagerImpl::getSearchableBucketListSnapshot() const
{
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB() && mBucketList);
Expand All @@ -919,6 +919,12 @@ BucketManagerImpl::getSearchableBucketListSnapshot() const
new SearchableBucketListSnapshot(mApp, *mBucketList));
}

std::recursive_mutex&
BucketManagerImpl::getBucketSnapshotMutex() const
{
return mBucketSnapshotMutex;
}

medida::Meter&
BucketManagerImpl::getBloomMissMeter() const
{
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ class BucketManagerImpl : public BucketManager
void scanForEvictionLegacySQL(AbstractLedgerTxn& ltx,
uint32_t ledgerSeq) override;

std::unique_ptr<SearchableBucketListSnapshot const>
std::unique_ptr<SearchableBucketListSnapshot>
getSearchableBucketListSnapshot() const override;
std::recursive_mutex& getBucketSnapshotMutex() const override;

medida::Meter& getBloomMissMeter() const override;
medida::Meter& getBloomLookupMeter() const override;
Expand Down
44 changes: 28 additions & 16 deletions src/bucket/SearchableBucketListSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ SearchableBucketListSnapshot::isWithinAllowedLedgerDrift(
return mLCL >= minimumLCL;
}

void
SearchableBucketListSnapshot::maybeUpdateSnapshot()
{
auto currLCL = mApp.getLedgerManager().getLastClosedLedgerNum();
if (currLCL != mLCL)
{
mLCL = currLCL;
mLevels.clear();

std::lock_guard<std::recursive_mutex> lock(
mApp.getBucketManager().getBucketSnapshotMutex());
auto& bl = mApp.getBucketManager().getBucketList();
for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
{
auto const& level = bl.getLevel(i);
mLevels.emplace_back(SearchableBucketLevelSnapshot(level));
}
}
}

void
SearchableBucketListSnapshot::loopAllBuckets(
std::function<bool(SearchableBucketSnapshot const&)> f) const
Expand All @@ -85,13 +105,11 @@ SearchableBucketListSnapshot::loopAllBuckets(
}

std::shared_ptr<LedgerEntry>
SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k) const
SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k)
{
ZoneScoped;
auto timer = getPointLoadTimer(k.type()).TimeScope();

// Snapshots not currently supported, all access must be up to date
releaseAssert(isWithinAllowedLedgerDrift(0));
maybeUpdateSnapshot();

std::shared_ptr<LedgerEntry> result{};

Expand All @@ -117,13 +135,11 @@ SearchableBucketListSnapshot::getLedgerEntry(LedgerKey const& k) const

std::vector<LedgerEntry>
SearchableBucketListSnapshot::loadKeys(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys)
{
ZoneScoped;
auto timer = recordBulkLoadMetrics("prefetch", inKeys.size()).TimeScope();

// Snapshots not currently supported, all access must be up to date
releaseAssert(isWithinAllowedLedgerDrift(0));
maybeUpdateSnapshot();

std::vector<LedgerEntry> entries;

Expand All @@ -140,13 +156,11 @@ SearchableBucketListSnapshot::loadKeys(

std::vector<LedgerEntry>
SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
AccountID const& accountID, Asset const& asset) const
AccountID const& accountID, Asset const& asset)
{
ZoneScoped;
auto timer = recordBulkLoadMetrics("poolshareTrustlines", 0).TimeScope();

// Snapshots not currently supported, all access must be up to date
releaseAssert(isWithinAllowedLedgerDrift(0));
maybeUpdateSnapshot();

UnorderedMap<LedgerKey, LedgerEntry> liquidityPoolToTrustline;
UnorderedSet<LedgerKey> deadTrustlines;
Expand Down Expand Up @@ -189,13 +203,11 @@ SearchableBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(

std::vector<InflationWinner>
SearchableBucketListSnapshot::loadInflationWinners(size_t maxWinners,
int64_t minBalance) const
int64_t minBalance)
{
ZoneScoped;
auto timer = recordBulkLoadMetrics("inflationWinners", 0).TimeScope();

// Snapshots not currently supported, all access must be up to date
releaseAssert(isWithinAllowedLedgerDrift(0));
maybeUpdateSnapshot();

UnorderedMap<AccountID, int64_t> voteCount;
UnorderedSet<AccountID> seen;
Expand Down
15 changes: 9 additions & 6 deletions src/bucket/SearchableBucketListSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct SearchableBucketLevelSnapshot

// A lightweight wrapper around the BucketList for thread safe BucketListDB
// lookups
class SearchableBucketListSnapshot : public NonMovable
class SearchableBucketListSnapshot : public NonMovableOrCopyable
{
Application& mApp;
std::vector<SearchableBucketLevelSnapshot> mLevels;
Expand Down Expand Up @@ -53,22 +53,25 @@ class SearchableBucketListSnapshot : public NonMovable
// allowedLedgerDrift, lcl]
bool isWithinAllowedLedgerDrift(uint32_t allowedLedgerDrift) const;

// Checks if snapshot is behind the current LCL and updates as necessary
void maybeUpdateSnapshot();

SearchableBucketListSnapshot(Application& app, BucketList const& bl);

public:
std::vector<LedgerEntry>
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const;
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys);

std::vector<LedgerEntry>
loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID,
Asset const& asset) const;
Asset const& asset);

std::vector<InflationWinner> loadInflationWinners(size_t maxWinners,
int64_t minBalance) const;
int64_t minBalance);

std::shared_ptr<LedgerEntry> getLedgerEntry(LedgerKey const& k) const;
std::shared_ptr<LedgerEntry> getLedgerEntry(LedgerKey const& k);

friend std::unique_ptr<SearchableBucketListSnapshot const>
friend std::unique_ptr<SearchableBucketListSnapshot>
BucketManagerImpl::getSearchableBucketListSnapshot() const;
};
}
42 changes: 42 additions & 0 deletions src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "bucket/BucketList.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketOutputIterator.h"
#include "bucket/SearchableBucketListSnapshot.h"
#include "bucket/test/BucketTestUtils.h"
#include "ledger/test/LedgerTestUtils.h"
#include "lib/catch.hpp"
Expand Down Expand Up @@ -1113,6 +1114,47 @@ TEST_CASE_VERSIONS("eviction scan", "[bucketlist]")
});
}

TEST_CASE_VERSIONS("Searchable BucketListDB snapshots", "[bucketlist]")
{
VirtualClock clock;
Config cfg(getTestConfig(0, Config::TESTDB_IN_MEMORY_SQLITE));
cfg.EXPERIMENTAL_BUCKETLIST_DB = true;

auto app = createTestApplication<BucketTestApplication>(clock, cfg);
for_versions_from(20, *app, [&] {
LedgerManagerForBucketTests& lm = app->getLedgerManager();
auto& bm = app->getBucketManager();

auto entry =
LedgerTestUtils::generateValidLedgerEntryOfType(CLAIMABLE_BALANCE);
entry.data.claimableBalance().amount = 0;

auto searchableBL = bm.getSearchableBucketListSnapshot();

// Update entry every 5 ledgers so we can see bucket merge events
for (auto ledgerSeq = 1; ledgerSeq < 101; ++ledgerSeq)
{
if ((ledgerSeq - 1) % 5 == 0)
{
++entry.data.claimableBalance().amount;
entry.lastModifiedLedgerSeq = ledgerSeq;
lm.setNextLedgerEntryBatchForBucketTesting({}, {entry}, {});
}
else
{
lm.setNextLedgerEntryBatchForBucketTesting({}, {}, {});
}

closeLedger(*app);

// Snapshot should automatically update with latest version
auto loadedEntry =
searchableBL->getLedgerEntry(LedgerEntryKey(entry));
REQUIRE((loadedEntry && *loadedEntry == entry));
}
});
}

static std::string
formatX32(uint32_t v)
{
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/LedgerTxn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3371,7 +3371,7 @@ LedgerTxnRoot::Impl::areEntriesMissingInCacheForOffer(OfferEntry const& oe)
return false;
}

SearchableBucketListSnapshot const&
SearchableBucketListSnapshot&
LedgerTxnRoot::Impl::getSearchableBucketListSnapshot() const
{
releaseAssert(mApp.getConfig().isUsingBucketListDB());
Expand Down
4 changes: 2 additions & 2 deletions src/ledger/LedgerTxnImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ class LedgerTxnRoot::Impl
mutable BestOffers mBestOffers;
mutable uint64_t mPrefetchHits{0};
mutable uint64_t mPrefetchMisses{0};
mutable std::unique_ptr<SearchableBucketListSnapshot const>
mutable std::unique_ptr<SearchableBucketListSnapshot>
mSearchableBucketListSnapshot{};

size_t mBulkLoadBatchSize;
Expand Down Expand Up @@ -874,7 +874,7 @@ class LedgerTxnRoot::Impl

bool areEntriesMissingInCacheForOffer(OfferEntry const& oe);

SearchableBucketListSnapshot const& getSearchableBucketListSnapshot() const;
SearchableBucketListSnapshot& getSearchableBucketListSnapshot() const;

public:
// Constructor has the strong exception safety guarantee
Expand Down

0 comments on commit 86f4619

Please sign in to comment.