Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel bl db #4176

Merged
merged 1 commit into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
134 changes: 11 additions & 123 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,6 @@ Bucket::Bucket()
{
}

std::unique_ptr<XDRInputFileStream>
Bucket::openStream()
{
releaseAssertOrThrow(!mFilename.empty());
auto streamPtr = std::make_unique<XDRInputFileStream>();
streamPtr->open(mFilename.string());
return std::move(streamPtr);
}

XDRInputFileStream&
Bucket::getIndexStream()
{
if (!mIndexStream)
{
mIndexStream = openStream();
}
return *mIndexStream;
}

XDRInputFileStream&
Bucket::getEvictionStream()
{
if (!mEvictionStream)
{
mEvictionStream = openStream();
}
return *mEvictionStream;
}

Hash const&
Bucket::getHash() const
{
Expand Down Expand Up @@ -156,90 +127,6 @@ void
Bucket::freeIndex()
{
mIndex.reset(nullptr);
mIndexStream.reset(nullptr);
}

std::optional<BucketEntry>
Bucket::getEntryAtOffset(LedgerKey const& k, std::streamoff pos,
size_t pageSize)
{
ZoneScoped;
auto& stream = getIndexStream();
stream.seek(pos);

BucketEntry be;
if (pageSize == 0)
{
if (stream.readOne(be))
{
return std::make_optional(be);
}
}
else if (stream.readPage(be, k, pageSize))
{
return std::make_optional(be);
}

// Mark entry miss for metrics
getIndex().markBloomMiss();
return std::nullopt;
}

std::optional<BucketEntry>
Bucket::getBucketEntry(LedgerKey const& k)
{
ZoneScoped;
auto pos = getIndex().lookup(k);
if (pos.has_value())
{
return getEntryAtOffset(k, pos.value(), getIndex().getPageSize());
}

return std::nullopt;
}

// When searching for an entry, BucketList calls this function on every bucket.
// Since the input is sorted, we do a binary search for the first key in keys.
// If we find the entry, we remove the found key from keys so that later buckets
// do not load shadowed entries. If we don't find the entry, we do not remove it
// from keys so that it will be searched for again at a lower level.
void
Bucket::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result)
{
ZoneScoped;

auto currKeyIt = keys.begin();
auto const& index = getIndex();
auto indexIter = index.begin();
while (currKeyIt != keys.end() && indexIter != index.end())
{
auto [offOp, newIndexIter] = index.scan(indexIter, *currKeyIt);
indexIter = newIndexIter;
if (offOp)
{
auto entryOp =
getEntryAtOffset(*currKeyIt, *offOp, getIndex().getPageSize());
if (entryOp)
{
if (entryOp->type() != DEADENTRY)
{
result.push_back(entryOp->liveEntry());
}

currKeyIt = keys.erase(currKeyIt);
continue;
}
}

++currKeyIt;
}
}

std::vector<PoolID> const&
Bucket::getPoolIDsByAsset(Asset const& asset) const
{
return getIndex().getPoolIDsByAsset(asset);
}

#ifdef BUILD_TESTS
Expand Down Expand Up @@ -787,12 +674,12 @@ mergeCasesWithEqualKeys(MergeCounters& mc, BucketInputIterator& oi,
}

bool
Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionMetrics>& metrics)
Bucket::scanForEvictionLegacySQL(
AbstractLedgerTxn& ltx, EvictionIterator& iter, uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionStatistics>& stats) const
{
ZoneScoped;
if (isEmpty() ||
Expand All @@ -809,7 +696,8 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
return true;
}

auto& stream = getEvictionStream();
XDRInputFileStream stream{};
stream.open(mFilename);
stream.seek(iter.bucketFileOffset);

BucketEntry be;
Expand Down Expand Up @@ -844,10 +732,10 @@ Bucket::scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
if (shouldEvict())
{
ZoneNamedN(evict, "evict entry", true);
if (metrics.has_value())
if (stats.has_value())
{
++metrics->numEntriesEvicted;
metrics->evictedEntriesAgeSum +=
++stats->numEntriesEvicted;
stats->evictedEntriesAgeSum +=
ledgerSeq - liveUntilLedger;
}

Expand Down
65 changes: 15 additions & 50 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace stellar
class AbstractLedgerTxn;
class Application;
class BucketManager;
struct EvictionMetrics;
struct EvictionStatistics;

class Bucket : public std::enable_shared_from_this<Bucket>,
public NonMovableOrCopyable
Expand All @@ -49,33 +49,9 @@ class Bucket : public std::enable_shared_from_this<Bucket>,

std::unique_ptr<BucketIndex const> mIndex{};

// Lazily-constructed and retained for read path, one for BucketListDB reads
// and one for eviction scans
std::unique_ptr<XDRInputFileStream> mIndexStream;
std::unique_ptr<XDRInputFileStream> mEvictionStream;

// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;

// Returns (lazily-constructed) file stream for bucketDB search. Note
// this might be in some random position left over from a previous read --
// must be seek()'ed before use.
XDRInputFileStream& getIndexStream();

// Returns (lazily-constructed) file stream for eviction scans. Unlike the
// indexStream, this should retain its position in-between calls. However, a
// node performing catchup or joining the network may need to begin evicting
// mid-bucket, so this stream should still be seeked to the proper position
// before reading.
XDRInputFileStream& getEvictionStream();

// Loads the bucket entry for LedgerKey k. Starts at file offset pos and
// reads until key is found or the end of the page.
std::optional<BucketEntry>
getEntryAtOffset(LedgerKey const& k, std::streamoff pos, size_t pageSize);

std::unique_ptr<XDRInputFileStream> openStream();

static std::string randomFileName(std::string const& tmpDir,
std::string ext);

Expand Down Expand Up @@ -109,18 +85,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);

// Loads bucket entry for LedgerKey k.
std::optional<BucketEntry> getBucketEntry(LedgerKey const& k);

// Loads LedgerEntry's for given keys. When a key is found, the
// entry is added to result and the key is removed from keys.
void loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result);

// Return all PoolIDs that contain the given asset on either side of the
// pool
std::vector<PoolID> const& getPoolIDsByAsset(Asset const& asset) const;

// At version 11, we added support for INITENTRY and METAENTRY. Before this
// we were only supporting LIVEENTRY and DEADENTRY.
static constexpr ProtocolVersion
Expand All @@ -141,19 +105,6 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
static std::string randomBucketName(std::string const& tmpDir);
static std::string randomBucketIndexName(std::string const& tmpDir);

// Returns false if eof reached or if Bucket protocol version < 20, true
// otherwise. Modifies iter as the bucket is scanned. Also modifies
// bytesToScan and remainingEntriesToEvict such that after this function
// returns:
// bytesToScan -= amount_bytes_scanned
// remainingEntriesToEvict -= entries_evicted
bool scanForEviction(AbstractLedgerTxn& ltx, EvictionIterator& iter,
uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionMetrics>& metrics);

#ifdef BUILD_TESTS
// "Applies" the bucket to the database. For each entry in the bucket,
// if the entry is init or live, creates or updates the corresponding
Expand All @@ -169,6 +120,18 @@ class Bucket : public std::enable_shared_from_this<Bucket>,

#endif // BUILD_TESTS

// Returns false if eof reached, true otherwise. Modifies iter as the bucket
// is scanned. Also modifies bytesToScan and maxEntriesToEvict such that
// after this function returns:
// bytesToScan -= amount_bytes_scanned
// maxEntriesToEvict -= entries_evicted
bool scanForEvictionLegacySQL(
AbstractLedgerTxn& ltx, EvictionIterator& iter, uint32_t& bytesToScan,
uint32_t& remainingEntriesToEvict, uint32_t ledgerSeq,
medida::Counter& entriesEvictedCounter,
medida::Counter& bytesScannedForEvictionCounter,
std::optional<EvictionStatistics>& stats) const;

// Create a fresh bucket from given vectors of init (created) and live
// (updated) LedgerEntries, and dead LedgerEntryKeys. The bucket will
// be sorted, hashed, and adopted in the provided BucketManager.
Expand Down Expand Up @@ -201,5 +164,7 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
static uint32_t getBucketVersion(std::shared_ptr<Bucket> const& bucket);
static uint32_t
getBucketVersion(std::shared_ptr<Bucket const> const& bucket);

friend class BucketSnapshot;
};
}