Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds poolshare index for BucketsDB #4224

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
74 changes: 3 additions & 71 deletions src/bucket/Bucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,78 +236,10 @@ Bucket::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
}
}

void
Bucket::loadPoolShareTrustLinessByAccount(
AccountID const& accountID, UnorderedSet<LedgerKey>& seenTrustlines,
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
LedgerKeySet& liquidityPoolKeys)
std::vector<PoolID> const&
Bucket::getPoolIDsByAsset(Asset const& asset) const
{
ZoneScoped;

// Takes a LedgerKey or LedgerEntry::_data_t, returns true if entry is a
// poolshare trusline for the given accountID
auto trustlineCheck = [&accountID](auto const& entry) {
return entry.type() == TRUSTLINE &&
entry.trustLine().asset.type() == ASSET_TYPE_POOL_SHARE &&
entry.trustLine().accountID == accountID;
};

// Get upper and lower bound for poolshare trustline range associated
// with this account
auto searchRange = getIndex().getPoolshareTrustlineRange(accountID);
if (!searchRange)
{
// No poolshare trustlines, exit
return;
}

BucketEntry be;
auto& stream = getIndexStream();
stream.seek(searchRange->first);
while (stream && stream.pos() < searchRange->second && stream.readOne(be))
{
LedgerEntry entry;
switch (be.type())
{
case LIVEENTRY:
case INITENTRY:
entry = be.liveEntry();
break;
case DEADENTRY:
{
auto key = be.deadEntry();

// If we find a valid trustline key and we have not seen the
// key yet, mark it as dead so we do not load a shadowed version
// later
if (trustlineCheck(key))
{
seenTrustlines.emplace(key);
}
continue;
}
case METAENTRY:
default:
throw std::invalid_argument("Indexed METAENTRY");
}

// If this is a pool share trustline that matches the accountID and
// is the newest version of the key, add it to results
if (trustlineCheck(entry.data) &&
seenTrustlines.find(LedgerEntryKey(entry)) == seenTrustlines.end())
{
seenTrustlines.emplace(LedgerEntryKey(entry));
auto const& poolshareID =
entry.data.trustLine().asset.liquidityPoolID();

LedgerKey key;
key.type(LIQUIDITY_POOL);
key.liquidityPool().liquidityPoolID = poolshareID;

liquidityPoolKeyToTrustline.emplace(key, entry);
liquidityPoolKeys.emplace(key);
}
}
return getIndex().getPoolIDsByAsset(asset);
}

#ifdef BUILD_TESTS
Expand Down
13 changes: 3 additions & 10 deletions src/bucket/Bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,9 @@ class Bucket : public std::enable_shared_from_this<Bucket>,
void loadKeys(std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<LedgerEntry>& result);

// Loads all poolshare trustlines for the given account. Trustlines are
// stored with their corresponding liquidity pool key in
// liquidityPoolKeyToTrustline. All liquidity pool keys corresponding to
// loaded trustlines are also reduntantly stored in liquidityPoolKeys.
// If a trustline key is in seenTrustlines, it is not loaded. Whenever a
// dead trustline is found, its key is added to seenTrustlines.
void loadPoolShareTrustLinessByAccount(
AccountID const& accountID, UnorderedSet<LedgerKey>& seenTrustlines,
UnorderedMap<LedgerKey, LedgerEntry>& liquidityPoolKeyToTrustline,
LedgerKeySet& liquidityPoolKeys);
// Return all PoolIDs that contain the given asset on either side of the
// pool
std::vector<PoolID> const& getPoolIDsByAsset(Asset const& asset) const;

// At version 11, we added support for INITENTRY and METAENTRY. Before this
// we were only supporting LIVEENTRY and DEADENTRY.
Expand Down
7 changes: 6 additions & 1 deletion src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class BucketIndex : public NonMovableOrCopyable
IndividualIndex::const_iterator>;

inline static const std::string DB_BACKEND_STATE = "bl";
inline static const uint32_t BUCKET_INDEX_VERSION = 1;
inline static const uint32_t BUCKET_INDEX_VERSION = 2;

// Returns true if LedgerEntryType not supported by BucketListDB
static bool typeNotSupported(LedgerEntryType t);
Expand Down Expand Up @@ -113,6 +113,11 @@ class BucketIndex : public NonMovableOrCopyable
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getPoolshareTrustlineRange(AccountID const& accountID) const = 0;

// Return all PoolIDs that contain the given asset on either side of the
// pool
virtual std::vector<PoolID> const&
getPoolIDsByAsset(Asset const& asset) const = 0;

// Returns page size for index. InidividualIndex returns 0 for page size
virtual std::streamoff getPageSize() const = 0;

Expand Down
42 changes: 42 additions & 0 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,33 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
{
++count;
LedgerKey key = getBucketLedgerKey(be);

// We need an asset to poolID mapping for
// loadPoolshareTrustlineByAccountAndAsset queries. For this
// query, we only need to index INIT entries because:
// 1. PoolID is the hash of the Assets it refers to, so this
// index cannot be invalidated by newer LIVEENTRY updates
// 2. We do a join over all bucket indexes so we avoid storing
// multiple redundant index entries (i.e. LIVEENTRY updates)
// 3. We only use this index to collect the possible set of
// Trustline keys, then we load those keys. This means that
// we don't need to keep track of DEADENTRY. Even if a given
// INITENTRY has been deleted by a newer DEADENTRY, the
// trustline load will not return deleted trustlines, so the
// load result is still correct even if the index has a few
// deleted mappings.
if (be.type() == INITENTRY && key.type() == LIQUIDITY_POOL)
{
auto const& poolParams = be.liveEntry()
.data.liquidityPool()
.body.constantProduct()
.params;
mData.assetToPoolID[poolParams.assetA].emplace_back(
key.liquidityPool().liquidityPoolID);
mData.assetToPoolID[poolParams.assetB].emplace_back(
key.liquidityPool().liquidityPoolID);
}

if constexpr (std::is_same<IndexT, RangeIndex>::value)
{
if (pos >= pageUpperBound)
Expand Down Expand Up @@ -450,6 +477,21 @@ BucketIndexImpl<IndexT>::getPoolshareTrustlineRange(
return std::make_pair(startOff, endOff);
}

template <class IndexT>
std::vector<PoolID> const&
BucketIndexImpl<IndexT>::getPoolIDsByAsset(Asset const& asset) const
{
static const std::vector<PoolID> emptyVec = {};

auto iter = mData.assetToPoolID.find(asset);
if (iter == mData.assetToPoolID.end())
{
return emptyVec;
}

return iter->second;
}

#ifdef BUILD_TESTS
template <class IndexT>
bool
Expand Down
11 changes: 9 additions & 2 deletions src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "bucket/BucketIndex.h"
#include "medida/meter.h"

#include <cereal/types/map.hpp>
#include <map>

class bloom_filter;

namespace stellar
Expand All @@ -28,13 +31,14 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
IndexT keysToOffset{};
std::streamoff pageSize{};
std::unique_ptr<bloom_filter> filter{};
std::map<Asset, std::vector<PoolID>> assetToPoolID{};

template <class Archive>
void
save(Archive& ar) const
{
auto version = BUCKET_INDEX_VERSION;
ar(version, pageSize, keysToOffset, filter);
ar(version, pageSize, assetToPoolID, keysToOffset, filter);
}

// Note: version and pageSize must be loaded before this function is
Expand All @@ -45,7 +49,7 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
void
load(Archive& ar)
{
ar(keysToOffset, filter);
ar(assetToPoolID, keysToOffset, filter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a risk of loading the old version with the new code (or vice versa)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in BucketIndex::load, we load the version and pageSize first. These two values are always serialized first no matter the index version. Then we check the index version, and if the on-disk version number doesn't match the expected version number, we drop the on-disk index and reindex from scratch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we delete the outdated index right away instead of keeping both until the bucket gets cleaned up? (so that we don't accidentally use it)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually have both indexes persisted, but we generate a new index, then rename the new index the the same name as the old index. On linux, this deletes the old file. However, I looked up in the standard and this behavior is implementation defined, so I agree we should delete explicitly here.

}
} mData;

Expand Down Expand Up @@ -74,6 +78,9 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
virtual std::optional<std::pair<std::streamoff, std::streamoff>>
getPoolshareTrustlineRange(AccountID const& accountID) const override;

virtual std::vector<PoolID> const&
getPoolIDsByAsset(Asset const& asset) const override;

virtual std::streamoff
getPageSize() const override
{
Expand Down
49 changes: 17 additions & 32 deletions src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,48 +441,33 @@ BucketList::loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const
return entries;
}

// This query has two steps:
// 1. For each bucket, determine what PoolIDs contain the target asset via the
// assetToPoolID index
// 2. Perform a bulk lookup for all possible trustline keys, that is, all
// trustlines with the given accountID and poolID from step 1
std::vector<LedgerEntry>
BucketList::loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID,
Asset const& asset) const
{
ZoneScoped;
UnorderedMap<LedgerKey, LedgerEntry> liquidityPoolToTrustline;
UnorderedSet<LedgerKey> seenTrustlines;
LedgerKeySet liquidityPoolKeysToSearch;
LedgerKeySet trustlinesToLoad;

// First get all the poolshare trustlines for the given account
auto trustLineLoop = [&](std::shared_ptr<Bucket> b) {
b->loadPoolShareTrustLinessByAccount(accountID, seenTrustlines,
liquidityPoolToTrustline,
liquidityPoolKeysToSearch);
return false; // continue
};
loopAllBuckets(trustLineLoop);

// Load all the LiquidityPool entries that the account has a trustline for.
auto liquidityPoolEntries = loadKeys(liquidityPoolKeysToSearch);
// pools always exist when there are trustlines
releaseAssertOrThrow(liquidityPoolEntries.size() ==
liquidityPoolKeysToSearch.size());
// Filter out liquidity pools that don't match the asset we're looking for
std::vector<LedgerEntry> result;
result.reserve(liquidityPoolEntries.size());
for (const auto& e : liquidityPoolEntries)
{
releaseAssert(e.data.type() == LIQUIDITY_POOL);
auto const& params =
e.data.liquidityPool().body.constantProduct().params;
if (compareAsset(params.assetA, asset) ||
compareAsset(params.assetB, asset))
for (auto const& poolID : b->getPoolIDsByAsset(asset))
{
auto trustlineIter =
liquidityPoolToTrustline.find(LedgerEntryKey(e));
releaseAssert(trustlineIter != liquidityPoolToTrustline.end());
result.emplace_back(trustlineIter->second);
LedgerKey trustlineKey(TRUSTLINE);
trustlineKey.trustLine().accountID = accountID;
trustlineKey.trustLine().asset.type(ASSET_TYPE_POOL_SHARE);
trustlineKey.trustLine().asset.liquidityPoolID() = poolID;
trustlinesToLoad.emplace(trustlineKey);
}
}

return result;
return false; // continue
};

loopAllBuckets(trustLineLoop);
return loadKeys(trustlinesToLoad);
}

std::vector<InflationWinner>
Expand Down
25 changes: 22 additions & 3 deletions src/bucket/test/BucketIndexTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,27 @@ class BucketIndexPoolShareTest : public BucketIndexTest
buildTest(bool shouldMultiVersion)
{
auto f = [&](std::vector<LedgerEntry>& entries) {
std::vector<LedgerEntry> poolEntries;
std::vector<LedgerKey> toWriteNewVersion;
if (mDist(gRandomEngine) < 30)
{
auto pool = LedgerTestUtils::generateValidLiquidityPoolEntry();
// Make sure we generate a unique poolID for each entry
LiquidityPoolEntry pool;
for (;;)
{
pool = LedgerTestUtils::generateValidLiquidityPoolEntry();
for (auto e : poolEntries)
{
if (e.data.liquidityPool().liquidityPoolID ==
pool.liquidityPoolID)
{
continue;
}
}

break;
}

auto& params = pool.body.constantProduct().params;

auto trustlineToSearch =
Expand Down Expand Up @@ -348,7 +365,7 @@ class BucketIndexPoolShareTest : public BucketIndexTest
LedgerEntry poolEntry;
poolEntry.data.type(LIQUIDITY_POOL);
poolEntry.data.liquidityPool() = pool;
entries.emplace_back(poolEntry);
poolEntries.emplace_back(poolEntry);
entries.emplace_back(trustlineToSearch);
entries.emplace_back(trustline2);
}
Expand All @@ -371,8 +388,10 @@ class BucketIndexPoolShareTest : public BucketIndexTest
entries.emplace_back(iter->second);
}

// We only index liquidity pool INITENTRY, so they must be inserted
// as INITENTRY
mApp->getLedgerManager().setNextLedgerEntryBatchForBucketTesting(
{}, entries, toWriteNewVersion);
poolEntries, entries, toWriteNewVersion);
};

BucketIndexTest::buildBucketList(f);
Expand Down
3 changes: 2 additions & 1 deletion src/catchup/IndexBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ IndexBucketsWork::IndexWork::postWork()
self->mBucket->getSize());

// If we could not load the index from the file, file is out of
// date and will be overwritten when we create a new index
// date. Delete and create a new index.
if (!self->mIndex)
{
CLOG_WARNING(Bucket, "Outdated index file: {}",
indexFilename);
std::remove(indexFilename.c_str());
}
else
{
Expand Down