Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions src/index/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain&
return chain.Next(chain.FindFork(pindex_prev));
}

bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data)
bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, CDBBatch& batch, const CBlock* block_data)
{
interfaces::BlockInfo block_info = kernel::MakeBlockInfo(pindex, block_data);

Expand All @@ -173,7 +173,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
block_info.undo_data = &block_undo;
}

if (!CustomAppend(block_info)) {
if (!CustomAppend(block_info, batch)) {
FatalErrorf("Failed to write block %s to index database",
pindex->GetBlockHash().ToString());
return false;
Expand All @@ -185,6 +185,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data
void BaseIndex::Sync()
{
const CBlockIndex* pindex = m_best_block_index.load();
CDBBatch batch{GetDB()};
if (!m_synced) {
auto last_log_time{NodeClock::now()};
auto last_locator_write_time{last_log_time};
Expand All @@ -196,7 +197,7 @@ void BaseIndex::Sync()
// No need to handle errors in Commit. If it fails, the error will be already be
// logged. The best way to recover is to continue, as index cannot be corrupted by
// a missed commit to disk for an advanced index state.
Commit();
Commit(batch);
return;
}

Expand All @@ -206,7 +207,7 @@ void BaseIndex::Sync()
if (!pindex_next) {
SetBestBlockIndex(pindex);
// No need to handle errors in Commit. See rationale above.
Commit();
Commit(batch);

// If pindex is still the chain tip after committing, exit the
// sync loop. It is important for cs_main to be locked while
Expand All @@ -227,7 +228,7 @@ void BaseIndex::Sync()
pindex = pindex_next;


if (!ProcessBlock(pindex)) return; // error logged internally
if (!ProcessBlock(pindex, batch)) return; // error logged internally

auto current_time{NodeClock::now()};
if (current_time - last_log_time >= SYNC_LOG_INTERVAL) {
Expand All @@ -239,36 +240,44 @@ void BaseIndex::Sync()
SetBestBlockIndex(pindex);
last_locator_write_time = current_time;
// No need to handle errors in Commit. See rationale above.
Commit();
Commit(batch);
}
}
}

GetDB().WriteBatch(batch);
batch.Clear();

if (pindex) {
LogInfo("%s is enabled at height %d", GetName(), pindex->nHeight);
} else {
LogInfo("%s is enabled", GetName());
}
}

bool BaseIndex::Commit()
bool BaseIndex::Commit(CDBBatch& batch)
{
// Don't commit anything if we haven't indexed any block yet
// (this could happen if init is interrupted).
bool ok = m_best_block_index != nullptr;
if (ok) {
CDBBatch batch(GetDB());
ok = CustomCommit(batch);
if (ok) {
GetDB().WriteBestBlock(batch, GetLocator(*m_chain, m_best_block_index.load()->GetBlockHash()));
ok = GetDB().WriteBatch(batch);
}
// Don't write if we haven't indexed any blocks yet
if (m_best_block_index == nullptr) {
return true;
}

// Write accumulated data
bool ok = GetDB().WriteBatch(batch);
batch.Clear();

// Add index-specific data (like DB_MUHASH for coinstatsindex)
if (CustomCommit(batch)) {
// Add best block locator if custom commit succeeded
GetDB().WriteBestBlock(batch, GetLocator(*m_chain, m_best_block_index.load()->GetBlockHash()));
ok = GetDB().WriteBatch(batch);
}
batch.Clear();

if (!ok) {
LogError("Failed to commit latest %s state", GetName());
return false;
}
return true;
return ok;
}

bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
Expand Down Expand Up @@ -354,11 +363,12 @@ void BaseIndex::BlockConnected(ChainstateRole role, const std::shared_ptr<const
}

// Dispatch block to child class; errors are logged internally and abort the node.
if (ProcessBlock(pindex, block.get())) {
if (CDBBatch batch{GetDB()}; ProcessBlock(pindex, batch, block.get())) {
// Setting the best block index is intentionally the last step of this
// function, so BlockUntilSyncedToCurrentChain callers waiting for the
// best block index to be updated can rely on the block being fully
// processed, and the index object being safe to delete.
GetDB().WriteBatch(batch);
SetBestBlockIndex(pindex);
}
}
Expand Down Expand Up @@ -405,7 +415,8 @@ void BaseIndex::ChainStateFlushed(ChainstateRole role, const CBlockLocator& loca
// No need to handle errors in Commit. If it fails, the error will be already be logged. The
// best way to recover is to continue, as index cannot be corrupted by a missed commit to disk
// for an advanced index state.
Commit();
CDBBatch batch{GetDB()};
Commit(batch);
}

bool BaseIndex::BlockUntilSyncedToCurrentChain() const
Expand Down
6 changes: 3 additions & 3 deletions src/index/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ class BaseIndex : public CValidationInterface
/// from further behind on reboot. If the new state is not a successor of the previous state (due
/// to a chain reorganization), the index must halt until Commit succeeds or else it could end up
/// getting corrupted.
bool Commit();
bool Commit(CDBBatch& batch);

/// Loop over disconnected blocks and call CustomRemove.
bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip);

bool ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data = nullptr);
bool ProcessBlock(const CBlockIndex* pindex, CDBBatch& batch, const CBlock* block_data = nullptr);

virtual bool AllowPrune() const = 0;

Expand All @@ -116,7 +116,7 @@ class BaseIndex : public CValidationInterface
[[nodiscard]] virtual bool CustomInit(const std::optional<interfaces::BlockRef>& block) { return true; }

/// Write update index entries for a newly connected block.
[[nodiscard]] virtual bool CustomAppend(const interfaces::BlockInfo& block) { return true; }
[[nodiscard]] virtual bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) { return true; }

/// Virtual method called internally by Commit that can be overridden to atomically
/// commit more index state.
Expand Down
11 changes: 4 additions & 7 deletions src/index/blockfilterindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,16 @@ std::optional<uint256> BlockFilterIndex::ReadFilterHeader(int height, const uint
return read_out.second.header;
}

bool BlockFilterIndex::CustomAppend(const interfaces::BlockInfo& block)
bool BlockFilterIndex::CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch)
{
BlockFilter filter(m_filter_type, *Assert(block.data), *Assert(block.undo_data));
const uint256& header = filter.ComputeHeader(m_last_header);
bool res = Write(filter, block.height, header);
bool res = Write(filter, block.height, header, batch);
if (res) m_last_header = header; // update last header
return res;
}

bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header)
bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header, CDBBatch& batch)
{
size_t bytes_written = WriteFilterToDisk(m_next_filter_pos, filter);
if (bytes_written == 0) return false;
Expand All @@ -291,10 +291,7 @@ bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, c
value.second.header = filter_header;
value.second.pos = m_next_filter_pos;

if (!m_db->Write(DBHeightKey(block_height), value)) {
return false;
}

batch.Write(DBHeightKey(block_height), value);
m_next_filter_pos.nPos += bytes_written;
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/index/blockfilterindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BlockFilterIndex final : public BaseIndex

bool AllowPrune() const override { return true; }

bool Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header);
bool Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header, CDBBatch& batch);

std::optional<uint256> ReadFilterHeader(int height, const uint256& expected_block_hash);

Expand All @@ -58,7 +58,7 @@ class BlockFilterIndex final : public BaseIndex

bool CustomCommit(CDBBatch& batch) override;

bool CustomAppend(const interfaces::BlockInfo& block) override;
bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override;

bool CustomRemove(const interfaces::BlockInfo& block) override;

Expand Down
5 changes: 3 additions & 2 deletions src/index/coinstatsindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ CoinStatsIndex::CoinStatsIndex(std::unique_ptr<interfaces::Chain> chain, size_t
m_db = std::make_unique<CoinStatsIndex::DB>(path / "db", n_cache_size, f_memory, f_wipe);
}

bool CoinStatsIndex::CustomAppend(const interfaces::BlockInfo& block)
bool CoinStatsIndex::CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch)
{
const CAmount block_subsidy{GetBlockSubsidy(block.height, Params().GetConsensus())};
m_total_subsidy += block_subsidy;
Expand Down Expand Up @@ -234,7 +234,8 @@ bool CoinStatsIndex::CustomAppend(const interfaces::BlockInfo& block)

// Intentionally do not update DB_MUHASH here so it stays in sync with
// DB_BEST_BLOCK, and the index is not corrupted if there is an unclean shutdown.
return m_db->Write(DBHeightKey(block.height), value);
batch.Write(DBHeightKey(block.height), value);
return true;
}

[[nodiscard]] static bool CopyHeightIndexToHashIndex(CDBIterator& db_it, CDBBatch& batch,
Expand Down
2 changes: 1 addition & 1 deletion src/index/coinstatsindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CoinStatsIndex final : public BaseIndex

bool CustomCommit(CDBBatch& batch) override;

bool CustomAppend(const interfaces::BlockInfo& block) override;
bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override;

bool CustomRemove(const interfaces::BlockInfo& block) override;

Expand Down
27 changes: 10 additions & 17 deletions src/index/txindex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ class TxIndex::DB : public BaseIndex::DB
/// Read the disk location of the transaction data with the given hash. Returns false if the
/// transaction hash is not indexed.
bool ReadTxPos(const Txid& txid, CDiskTxPos& pos) const;

/// Write a batch of transaction positions to the DB.
[[nodiscard]] bool WriteTxs(const std::vector<std::pair<Txid, CDiskTxPos>>& v_pos);
};

TxIndex::DB::DB(size_t n_cache_size, bool f_memory, bool f_wipe) :
Expand All @@ -40,35 +37,31 @@ bool TxIndex::DB::ReadTxPos(const Txid& txid, CDiskTxPos& pos) const
return Read(std::make_pair(DB_TXINDEX, txid.ToUint256()), pos);
}

bool TxIndex::DB::WriteTxs(const std::vector<std::pair<Txid, CDiskTxPos>>& v_pos)
{
CDBBatch batch(*this);
for (const auto& [txid, pos] : v_pos) {
batch.Write(std::make_pair(DB_TXINDEX, txid.ToUint256()), pos);
}
return WriteBatch(batch);
}

TxIndex::TxIndex(std::unique_ptr<interfaces::Chain> chain, size_t n_cache_size, bool f_memory, bool f_wipe)
: BaseIndex(std::move(chain), "txindex"), m_db(std::make_unique<TxIndex::DB>(n_cache_size, f_memory, f_wipe))
{}

TxIndex::~TxIndex() = default;

bool TxIndex::CustomAppend(const interfaces::BlockInfo& block)
bool TxIndex::CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch)
{
// Exclude genesis block transaction because outputs are not spendable.
if (block.height == 0) return true;

assert(block.data);
CDiskTxPos pos({block.file_number, block.data_pos}, GetSizeOfCompactSize(block.data->vtx.size()));
CDiskTxPos tx_pos({block.file_number, block.data_pos}, GetSizeOfCompactSize(block.data->vtx.size()));

std::vector<std::pair<Txid, CDiskTxPos>> vPos;
vPos.reserve(block.data->vtx.size());
for (const auto& tx : block.data->vtx) {
vPos.emplace_back(tx->GetHash(), pos);
pos.nTxOffset += ::GetSerializeSize(TX_WITH_WITNESS(*tx));
vPos.emplace_back(tx->GetHash(), tx_pos);
tx_pos.nTxOffset += ::GetSerializeSize(TX_WITH_WITNESS(*tx));
}
return m_db->WriteTxs(vPos);

for (const auto& [txid, pos] : vPos) {
batch.Write(std::make_pair(DB_TXINDEX, txid.ToUint256()), pos);
}
return true;
}

BaseIndex::DB& TxIndex::GetDB() const { return *m_db; }
Expand Down
2 changes: 1 addition & 1 deletion src/index/txindex.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TxIndex final : public BaseIndex
bool AllowPrune() const override { return false; }

protected:
bool CustomAppend(const interfaces::BlockInfo& block) override;
bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override;

BaseIndex::DB& GetDB() const override;

Expand Down
2 changes: 1 addition & 1 deletion src/test/blockfilter_index_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class IndexReorgCrash : public BaseIndex
bool AllowPrune() const override { return false; }
BaseIndex::DB& GetDB() const override { return *m_db; }

bool CustomAppend(const interfaces::BlockInfo& block) override
bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override
{
// Simulate a delay so new blocks can get connected during the initial sync
if (block.height == m_blocking_height) m_blocker.wait();
Expand Down
Loading