Skip to content

Commit

Permalink
Wallet passive startup
Browse files Browse the repository at this point in the history
Move wallet startup code closer to a simple model where the wallet attaches to
the chain with a single chain.handleNotifications() call, and just starts
passively receiving blocks and mempool notifications from the last update point,
instead having to actively rescan blocks and request a mempool snapshot, and
deal with the tip changing, and deal with early or stale notifications.

Also, stop locking the cs_wallet mutex and registering for validationinterface
notifications before the rescan. This was new behavior since
6a72f26
bitcoin#16426 and is not ideal because it stops
other wallets and rpcs and the gui from receiving new notifications until after the
scan completes.

This change is a half-step towards implementing multiwallet parallel scans
(bitcoin#11756), since it provides needed
locator and birthday timestamp information to the Chain interface, and it
rationalizes locking and event ordering in the startup code. The second half of
implementing parallel rescans requires moving the ScanForWalletTransactions
implementation (which this PR does not touch) from the wallet to the node.
  • Loading branch information
ryanofsky committed Feb 14, 2020
1 parent d205c0c commit ba3cdcc
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 226 deletions.
121 changes: 80 additions & 41 deletions src/interfaces/chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,22 @@
#include <validation.h>
#include <validationinterface.h>

#include <future>
#include <memory>
#include <utility>

namespace interfaces {
namespace {

//! Return whether block data is missing in block range
bool MissingBlockData(const CBlockIndex* start, const CBlockIndex* end)
{
for (const CBlockIndex* block = end; block != start; block = block->pprev) {
if ((block->nStatus & BLOCK_HAVE_DATA) == 0 || block->nTx == 0) return true;
}
return false;
}

bool FillBlock(const CBlockIndex* index, const FoundBlock& block, UniqueLock<RecursiveMutex>& lock)
{
if (!index) return false;
Expand Down Expand Up @@ -164,29 +174,6 @@ class ChainImpl : public Chain
}
return nullopt;
}
uint256 getBlockHash(int height) override
{
LOCK(::cs_main);
CBlockIndex* block = ::ChainActive()[height];
assert(block);
return block->GetBlockHash();
}
bool haveBlockOnDisk(int height) override
{
LOCK(cs_main);
CBlockIndex* block = ::ChainActive()[height];
return block && ((block->nStatus & BLOCK_HAVE_DATA) != 0) && block->nTx > 0;
}
Optional<int> findFirstBlockWithTimeAndHeight(int64_t time, int height, uint256* hash) override
{
LOCK(cs_main);
CBlockIndex* block = ::ChainActive().FindEarliestAtLeast(time, height);
if (block) {
if (hash) *hash = block->GetBlockHash();
return block->nHeight;
}
return nullopt;
}
CBlockLocator getTipLocator() override
{
LOCK(cs_main);
Expand All @@ -197,14 +184,6 @@ class ChainImpl : public Chain
LOCK(cs_main);
return CheckFinalTx(tx);
}
Optional<int> findLocatorFork(const CBlockLocator& locator) override
{
LOCK(cs_main);
if (CBlockIndex* fork = FindForkInGlobalIndex(::ChainActive(), locator)) {
return fork->nHeight;
}
return nullopt;
}
bool findBlock(const uint256& hash, const FoundBlock& block) override
{
WAIT_LOCK(cs_main, lock);
Expand Down Expand Up @@ -351,9 +330,76 @@ class ChainImpl : public Chain
{
::uiInterface.ShowProgress(title, progress, resume_possible);
}
std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications) override
{
return MakeUnique<NotificationsHandlerImpl>(std::move(notifications));
std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications,
ScanFn scan_fn,
MempoolFn mempool_fn,
const CBlockLocator* scan_locator,
int64_t scan_time,
const FoundBlock& tip,
bool& missing_block_data) override LOCKS_EXCLUDED(::cs_main, ::mempool.cs)
{
// Declare an asynchronous task to send a mempool snapshot immediately
// before enabling notifications.
std::vector<CTransactionRef> mempool_snapshot;
std::packaged_task<std::unique_ptr<Handler>()> register_task{[&] {
if (mempool_fn) mempool_fn(std::move(mempool_snapshot));
return MakeUnique<NotificationsHandlerImpl>(std::move(notifications));
}};
std::future<std::unique_ptr<Handler>> register_future{register_task.get_future()};

// Lock cs_main to find forks and trigger rescans, then lock mempool.cs
// to build a mempool snapshot, then release both locks and
// asynchronously send the mempool snapshot to the caller, and enable
// notifications starting from the point when the snapshot was created.
{
AssertLockNotHeld(::cs_main);
WAIT_LOCK(::cs_main, main_lock);

// Call scan_fn until it has scanned all blocks after specified
// location and time. Looping is necessary because new blocks may
// be connected during rescans.
missing_block_data = false;
if (scan_fn) {
CBlockIndex* scan_start = scan_locator ? FindForkInGlobalIndex(ChainActive(), *scan_locator) : nullptr;
scan_start = ChainActive().FindEarliestAtLeast(scan_time, scan_start ? scan_start->nHeight : 0);
while (scan_start) {
if (MissingBlockData(scan_start, ChainActive().Tip())) {
missing_block_data = true;
return nullptr;
}
uint256 scan_tip_hash = ChainActive().Tip()->GetBlockHash();
int scan_tip_height = ChainActive().Height();
Optional<uint256> scanned_hash;
{
REVERSE_LOCK(main_lock);
scanned_hash = scan_fn(scan_start->GetBlockHash(), scan_start->nHeight, scan_tip_hash, scan_tip_height);
}
if (!scanned_hash) return nullptr;
scan_start = ChainActive().Next(ChainActive().FindFork(LookupBlockIndex(*scanned_hash)));
}
}
FillBlock(ChainActive().Tip(), tip, main_lock);

// Take a snapshot of mempool transactions if needed
AssertLockNotHeld(::mempool.cs);
LOCK(::mempool.cs);
if (mempool_fn) {
for (const CTxMemPoolEntry& entry : ::mempool.mapTx) {
mempool_snapshot.push_back(entry.GetSharedTx());
}
}

// Register for notifications. Avoid receiving stale notifications
// that may be backed up in the queue by delaying registration with
// CallFunctionInValidationInterfaceQueue. Avoid missing any new
// notifications that happen after scanning blocks and taking the
// mempool snapshot above by holding on to cs_main and mempool.cs
// while calling CallFunctionInValidationInterfaceQueue, so the new
// notifications get enqueued after register_task, and won't be
// handled until after it returns
CallFunctionInValidationInterfaceQueue([&] { register_task(); });
}
return register_future.get();
}
void waitForNotificationsIfTipChanged(const uint256& old_tip) override
{
Expand All @@ -373,13 +419,6 @@ class ChainImpl : public Chain
RPCRunLater(name, std::move(fn), seconds);
}
int rpcSerializationFlags() override { return RPCSerializationFlags(); }
void requestMempoolTransactions(Notifications& notifications) override
{
LOCK2(::cs_main, ::mempool.cs);
for (const CTxMemPoolEntry& entry : ::mempool.mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx());
}
}
NodeContext& m_node;
};
} // namespace
Expand Down
77 changes: 43 additions & 34 deletions src/interfaces/chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ class FoundBlock
//! wallet cache it, fee estimation being driven by node mempool, wallet
//! should be the consumer.
//!
//! * The `guessVerificationProgress`, `getBlockHeight`, `getBlockHash`, etc
//! methods can go away if rescan logic is moved on the node side, and wallet
//! only register rescan request.
//! * The `guessVerificationProgress`, `getBlockHeight`, and similar methods can
//! go away if rescan logic is moved on the node side, and the wallet just
//! requests scans from the node
//! (https://github.com/bitcoin/bitcoin/issues/11756#issuecomment-637038714).
class Chain
{
public:
Expand All @@ -96,28 +97,9 @@ class Chain
//! included in the current chain.
virtual Optional<int> getBlockHeight(const uint256& hash) = 0;

//! Get block hash. Height must be valid or this function will abort.
virtual uint256 getBlockHash(int height) = 0;

//! Check that the block is available on disk (i.e. has not been
//! pruned), and contains transactions.
virtual bool haveBlockOnDisk(int height) = 0;

//! Return height of the first block in the chain with timestamp equal
//! or greater than the given time and height equal or greater than the
//! given height, or nullopt if there is no block with a high enough
//! timestamp and height. Also return the block hash as an optional output parameter
//! (to avoid the cost of a second lookup in case this information is needed.)
virtual Optional<int> findFirstBlockWithTimeAndHeight(int64_t time, int height, uint256* hash) = 0;

//! Get locator for the current chain tip.
virtual CBlockLocator getTipLocator() = 0;

//! Return height of the highest block on chain in common with the locator,
//! which will either be the original block used to create the locator,
//! or one of its ancestors.
virtual Optional<int> findLocatorFork(const CBlockLocator& locator) = 0;

//! Check if transaction will be final given chain height current time.
virtual bool checkFinalTx(const CTransaction& tx) = 0;

Expand Down Expand Up @@ -251,8 +233,45 @@ class Chain
virtual void chainStateFlushed(const CBlockLocator& locator) {}
};

//! Register handler for notifications.
virtual std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications) = 0;
using ScanFn = std::function<Optional<uint256>(const uint256& start_hash, int start_height, const uint256& tip_hash, int tip_height)>;
using MempoolFn = std::function<void(std::vector<CTransactionRef>)>;

//! Register handler for notifications. Call @ref scan_fn to send existing
//! blocks and @ref mempool_fn to send existing transactions before sending
//! the first notifications about new blocks and transactions back to the
//! caller.
//!
//! @param[in] notifications callback object receiving notifications
//! @param[in] scan_fn callback invoked before notifications are sent
//! to scan blocks after a specified location and
//! time. This should return the hash of the last
//! block scanned, and may be called more than once
//! if new blocks were connected during the scan.
//! @param[in] mempool_fn callback invoked before notifications are sent
//! with snapshot of mempool transactions
//! @param[in] scan_locator location of last block previously scanned.
//! scan_fn will be only be called for blocks after
//! this point. Can be null to scan from genesis.
//! @param[in] scan_time minimum block timestamp for beginning the scan
//! scan_fn will only be called for blocks starting
//! from this timestamp
//! @param[out] tip information about chain tip at the point where
//! notifications will begin
//!
//! @todo The handleNotifications interface should be simplified so callback
//! arguments @ref scan_fn and @ref mempool_fn are dropped and all arguments
//! except @ref notifications are optional. The @ref scan_fn argument will
//! be dropped when implementing
//! https://github.com/bitcoin/bitcoin/issues/11756, and the @ref mempool_fn
//! argument can be dropped by extending transactionAddedToMempool to accept
//! a span of transactions instead of just a single transaction.
virtual std::unique_ptr<Handler> handleNotifications(std::shared_ptr<Notifications> notifications,
ScanFn scan_fn,
MempoolFn mempool_fn,
const CBlockLocator* scan_locator,
int64_t scan_time,
const FoundBlock& tip,
bool& missing_block_data) = 0;

//! Wait for pending notifications to be processed unless block hash points to the current
//! chain tip.
Expand All @@ -270,16 +289,6 @@ class Chain

//! Current RPC serialization flags.
virtual int rpcSerializationFlags() = 0;

//! Synchronously send transactionAddedToMempool notifications about all
//! current mempool transactions to the specified handler and return after
//! the last one is sent. These notifications aren't coordinated with async
//! notifications sent by handleNotifications, so out of date async
//! notifications from handleNotifications can arrive during and after
//! synchronous notifications from requestMempoolTransactions. Clients need
//! to be prepared to handle this by ignoring notifications about unknown
//! removed transactions and already added new transactions.
virtual void requestMempoolTransactions(Notifications& notifications) = 0;
};

//! Interface to let node manage chain clients (wallets, or maybe tools for
Expand Down
Loading

0 comments on commit ba3cdcc

Please sign in to comment.