From 33fe1e328281f583456d7654b9a1577a9e64997d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C5=91rinc?= Date: Wed, 10 Sep 2025 19:19:13 -0700 Subject: [PATCH] indexes: batch index writes --- src/index/base.cpp | 53 +++++++++++++++++----------- src/index/base.h | 6 ++-- src/index/blockfilterindex.cpp | 11 +++--- src/index/blockfilterindex.h | 4 +-- src/index/coinstatsindex.cpp | 5 +-- src/index/coinstatsindex.h | 2 +- src/index/txindex.cpp | 27 ++++++-------- src/index/txindex.h | 2 +- src/test/blockfilter_index_tests.cpp | 2 +- 9 files changed, 57 insertions(+), 55 deletions(-) diff --git a/src/index/base.cpp b/src/index/base.cpp index 82259ac046a6e..8e336e2731ba1 100644 --- a/src/index/base.cpp +++ b/src/index/base.cpp @@ -149,7 +149,7 @@ static const CBlockIndex* NextSyncBlock(const CBlockIndex* pindex_prev, CChain& return chain.Next(chain.FindFork(pindex_prev)); } -bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data) +bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, CDBBatch& batch, const CBlock* block_data) { interfaces::BlockInfo block_info = kernel::MakeBlockInfo(pindex, block_data); @@ -173,7 +173,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data block_info.undo_data = &block_undo; } - if (!CustomAppend(block_info)) { + if (!CustomAppend(block_info, batch)) { FatalErrorf("Failed to write block %s to index database", pindex->GetBlockHash().ToString()); return false; @@ -185,6 +185,7 @@ bool BaseIndex::ProcessBlock(const CBlockIndex* pindex, const CBlock* block_data void BaseIndex::Sync() { const CBlockIndex* pindex = m_best_block_index.load(); + CDBBatch batch{GetDB()}; if (!m_synced) { auto last_log_time{NodeClock::now()}; auto last_locator_write_time{last_log_time}; @@ -196,7 +197,7 @@ void BaseIndex::Sync() // No need to handle errors in Commit. If it fails, the error will be already be // logged. The best way to recover is to continue, as index cannot be corrupted by // a missed commit to disk for an advanced index state. - Commit(); + Commit(batch); return; } @@ -206,7 +207,7 @@ void BaseIndex::Sync() if (!pindex_next) { SetBestBlockIndex(pindex); // No need to handle errors in Commit. See rationale above. - Commit(); + Commit(batch); // If pindex is still the chain tip after committing, exit the // sync loop. It is important for cs_main to be locked while @@ -227,7 +228,7 @@ void BaseIndex::Sync() pindex = pindex_next; - if (!ProcessBlock(pindex)) return; // error logged internally + if (!ProcessBlock(pindex, batch)) return; // error logged internally auto current_time{NodeClock::now()}; if (current_time - last_log_time >= SYNC_LOG_INTERVAL) { @@ -239,11 +240,14 @@ void BaseIndex::Sync() SetBestBlockIndex(pindex); last_locator_write_time = current_time; // No need to handle errors in Commit. See rationale above. - Commit(); + Commit(batch); } } } + GetDB().WriteBatch(batch); + batch.Clear(); + if (pindex) { LogInfo("%s is enabled at height %d", GetName(), pindex->nHeight); } else { @@ -251,24 +255,29 @@ void BaseIndex::Sync() } } -bool BaseIndex::Commit() +bool BaseIndex::Commit(CDBBatch& batch) { - // Don't commit anything if we haven't indexed any block yet - // (this could happen if init is interrupted). - bool ok = m_best_block_index != nullptr; - if (ok) { - CDBBatch batch(GetDB()); - ok = CustomCommit(batch); - if (ok) { - GetDB().WriteBestBlock(batch, GetLocator(*m_chain, m_best_block_index.load()->GetBlockHash())); - ok = GetDB().WriteBatch(batch); - } + // Don't write if we haven't indexed any blocks yet + if (m_best_block_index == nullptr) { + return true; + } + + // Write accumulated data + bool ok = GetDB().WriteBatch(batch); + batch.Clear(); + + // Add index-specific data (like DB_MUHASH for coinstatsindex) + if (CustomCommit(batch)) { + // Add best block locator if custom commit succeeded + GetDB().WriteBestBlock(batch, GetLocator(*m_chain, m_best_block_index.load()->GetBlockHash())); + ok = GetDB().WriteBatch(batch); } + batch.Clear(); + if (!ok) { LogError("Failed to commit latest %s state", GetName()); - return false; } - return true; + return ok; } bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip) @@ -354,11 +363,12 @@ void BaseIndex::BlockConnected(ChainstateRole role, const std::shared_ptr& block) { return true; } /// Write update index entries for a newly connected block. - [[nodiscard]] virtual bool CustomAppend(const interfaces::BlockInfo& block) { return true; } + [[nodiscard]] virtual bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) { return true; } /// Virtual method called internally by Commit that can be overridden to atomically /// commit more index state. diff --git a/src/index/blockfilterindex.cpp b/src/index/blockfilterindex.cpp index 2ccae3a221b22..a17cb9e8d6c08 100644 --- a/src/index/blockfilterindex.cpp +++ b/src/index/blockfilterindex.cpp @@ -271,16 +271,16 @@ std::optional BlockFilterIndex::ReadFilterHeader(int height, const uint return read_out.second.header; } -bool BlockFilterIndex::CustomAppend(const interfaces::BlockInfo& block) +bool BlockFilterIndex::CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) { BlockFilter filter(m_filter_type, *Assert(block.data), *Assert(block.undo_data)); const uint256& header = filter.ComputeHeader(m_last_header); - bool res = Write(filter, block.height, header); + bool res = Write(filter, block.height, header, batch); if (res) m_last_header = header; // update last header return res; } -bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header) +bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header, CDBBatch& batch) { size_t bytes_written = WriteFilterToDisk(m_next_filter_pos, filter); if (bytes_written == 0) return false; @@ -291,10 +291,7 @@ bool BlockFilterIndex::Write(const BlockFilter& filter, uint32_t block_height, c value.second.header = filter_header; value.second.pos = m_next_filter_pos; - if (!m_db->Write(DBHeightKey(block_height), value)) { - return false; - } - + batch.Write(DBHeightKey(block_height), value); m_next_filter_pos.nPos += bytes_written; return true; } diff --git a/src/index/blockfilterindex.h b/src/index/blockfilterindex.h index 983cf32206a42..964d0172e15dd 100644 --- a/src/index/blockfilterindex.h +++ b/src/index/blockfilterindex.h @@ -47,7 +47,7 @@ class BlockFilterIndex final : public BaseIndex bool AllowPrune() const override { return true; } - bool Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header); + bool Write(const BlockFilter& filter, uint32_t block_height, const uint256& filter_header, CDBBatch& batch); std::optional ReadFilterHeader(int height, const uint256& expected_block_hash); @@ -58,7 +58,7 @@ class BlockFilterIndex final : public BaseIndex bool CustomCommit(CDBBatch& batch) override; - bool CustomAppend(const interfaces::BlockInfo& block) override; + bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override; bool CustomRemove(const interfaces::BlockInfo& block) override; diff --git a/src/index/coinstatsindex.cpp b/src/index/coinstatsindex.cpp index af798e2913906..4dd7cec178c27 100644 --- a/src/index/coinstatsindex.cpp +++ b/src/index/coinstatsindex.cpp @@ -130,7 +130,7 @@ CoinStatsIndex::CoinStatsIndex(std::unique_ptr chain, size_t m_db = std::make_unique(path / "db", n_cache_size, f_memory, f_wipe); } -bool CoinStatsIndex::CustomAppend(const interfaces::BlockInfo& block) +bool CoinStatsIndex::CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) { const CAmount block_subsidy{GetBlockSubsidy(block.height, Params().GetConsensus())}; m_total_subsidy += block_subsidy; @@ -234,7 +234,8 @@ bool CoinStatsIndex::CustomAppend(const interfaces::BlockInfo& block) // Intentionally do not update DB_MUHASH here so it stays in sync with // DB_BEST_BLOCK, and the index is not corrupted if there is an unclean shutdown. - return m_db->Write(DBHeightKey(block.height), value); + batch.Write(DBHeightKey(block.height), value); + return true; } [[nodiscard]] static bool CopyHeightIndexToHashIndex(CDBIterator& db_it, CDBBatch& batch, diff --git a/src/index/coinstatsindex.h b/src/index/coinstatsindex.h index 7e48f4c4eef23..18bcb762946cb 100644 --- a/src/index/coinstatsindex.h +++ b/src/index/coinstatsindex.h @@ -51,7 +51,7 @@ class CoinStatsIndex final : public BaseIndex bool CustomCommit(CDBBatch& batch) override; - bool CustomAppend(const interfaces::BlockInfo& block) override; + bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override; bool CustomRemove(const interfaces::BlockInfo& block) override; diff --git a/src/index/txindex.cpp b/src/index/txindex.cpp index 11dd856e1b67b..468b7725533c3 100644 --- a/src/index/txindex.cpp +++ b/src/index/txindex.cpp @@ -26,9 +26,6 @@ class TxIndex::DB : public BaseIndex::DB /// Read the disk location of the transaction data with the given hash. Returns false if the /// transaction hash is not indexed. bool ReadTxPos(const Txid& txid, CDiskTxPos& pos) const; - - /// Write a batch of transaction positions to the DB. - [[nodiscard]] bool WriteTxs(const std::vector>& v_pos); }; TxIndex::DB::DB(size_t n_cache_size, bool f_memory, bool f_wipe) : @@ -40,35 +37,31 @@ bool TxIndex::DB::ReadTxPos(const Txid& txid, CDiskTxPos& pos) const return Read(std::make_pair(DB_TXINDEX, txid.ToUint256()), pos); } -bool TxIndex::DB::WriteTxs(const std::vector>& v_pos) -{ - CDBBatch batch(*this); - for (const auto& [txid, pos] : v_pos) { - batch.Write(std::make_pair(DB_TXINDEX, txid.ToUint256()), pos); - } - return WriteBatch(batch); -} - TxIndex::TxIndex(std::unique_ptr chain, size_t n_cache_size, bool f_memory, bool f_wipe) : BaseIndex(std::move(chain), "txindex"), m_db(std::make_unique(n_cache_size, f_memory, f_wipe)) {} TxIndex::~TxIndex() = default; -bool TxIndex::CustomAppend(const interfaces::BlockInfo& block) +bool TxIndex::CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) { // Exclude genesis block transaction because outputs are not spendable. if (block.height == 0) return true; assert(block.data); - CDiskTxPos pos({block.file_number, block.data_pos}, GetSizeOfCompactSize(block.data->vtx.size())); + CDiskTxPos tx_pos({block.file_number, block.data_pos}, GetSizeOfCompactSize(block.data->vtx.size())); + std::vector> vPos; vPos.reserve(block.data->vtx.size()); for (const auto& tx : block.data->vtx) { - vPos.emplace_back(tx->GetHash(), pos); - pos.nTxOffset += ::GetSerializeSize(TX_WITH_WITNESS(*tx)); + vPos.emplace_back(tx->GetHash(), tx_pos); + tx_pos.nTxOffset += ::GetSerializeSize(TX_WITH_WITNESS(*tx)); } - return m_db->WriteTxs(vPos); + + for (const auto& [txid, pos] : vPos) { + batch.Write(std::make_pair(DB_TXINDEX, txid.ToUint256()), pos); + } + return true; } BaseIndex::DB& TxIndex::GetDB() const { return *m_db; } diff --git a/src/index/txindex.h b/src/index/txindex.h index f8236c9284434..59e1ae4ddfa8e 100644 --- a/src/index/txindex.h +++ b/src/index/txindex.h @@ -25,7 +25,7 @@ class TxIndex final : public BaseIndex bool AllowPrune() const override { return false; } protected: - bool CustomAppend(const interfaces::BlockInfo& block) override; + bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override; BaseIndex::DB& GetDB() const override; diff --git a/src/test/blockfilter_index_tests.cpp b/src/test/blockfilter_index_tests.cpp index 224acb8b07936..0490e498f5d1e 100644 --- a/src/test/blockfilter_index_tests.cpp +++ b/src/test/blockfilter_index_tests.cpp @@ -326,7 +326,7 @@ class IndexReorgCrash : public BaseIndex bool AllowPrune() const override { return false; } BaseIndex::DB& GetDB() const override { return *m_db; } - bool CustomAppend(const interfaces::BlockInfo& block) override + bool CustomAppend(const interfaces::BlockInfo& block, CDBBatch& batch) override { // Simulate a delay so new blocks can get connected during the initial sync if (block.height == m_blocking_height) m_blocker.wait();