From 5f1a872efa8b831a3b35bfdd2b6bd749fc7113a7 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 18 Nov 2025 11:13:19 +0100 Subject: [PATCH 1/2] Prefactor: Move `ChainSource` creation before `Wallet` creation In the following commits we will use the chain source to poll a best tip before intializing the listener objects. As a prefactor, we here move the creation of our onchain wallet after creation of the chain source, which in turn means we'll need to use the same pattern as for the other listeners, i.e., not giving the wallet reference to `ChainSource` on creation but rather handing it in when it's being used at runtime. --- src/builder.rs | 120 ++++++++++++++++++++---------------------- src/chain/bitcoind.rs | 54 +++++++++---------- src/chain/electrum.rs | 20 +++---- src/chain/esplora.rs | 21 ++++---- src/chain/mod.rs | 57 +++++++++++--------- src/lib.rs | 19 +++++-- 6 files changed, 150 insertions(+), 141 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index b45f03f6d..98650aa1a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1178,54 +1178,6 @@ fn build_with_store_internal( } }, }; - - // Initialize the on-chain wallet and chain access - let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| { - log_error!(logger, "Failed to derive master secret: {}", e); - BuildError::InvalidSeedBytes - })?; - - let descriptor = Bip84(xprv, KeychainKind::External); - let change_descriptor = Bip84(xprv, KeychainKind::Internal); - let mut wallet_persister = - KVStoreWalletPersister::new(Arc::clone(&kv_store), Arc::clone(&logger)); - let wallet_opt = BdkWallet::load() - .descriptor(KeychainKind::External, Some(descriptor.clone())) - .descriptor(KeychainKind::Internal, Some(change_descriptor.clone())) - .extract_keys() - .check_network(config.network) - .load_wallet(&mut wallet_persister) - .map_err(|e| match e { - bdk_wallet::LoadWithPersistError::InvalidChangeSet( - bdk_wallet::LoadError::Mismatch(bdk_wallet::LoadMismatch::Network { - loaded, - expected, - }), - ) => { - log_error!( - logger, - "Failed to setup wallet: Networks do not match. Expected {} but got {}", - expected, - loaded - ); - BuildError::NetworkMismatch - }, - _ => { - log_error!(logger, "Failed to set up wallet: {}", e); - BuildError::WalletSetupFailed - }, - })?; - let bdk_wallet = match wallet_opt { - Some(wallet) => wallet, - None => BdkWallet::create(descriptor, change_descriptor) - .network(config.network) - .create_wallet(&mut wallet_persister) - .map_err(|e| { - log_error!(logger, "Failed to set up wallet: {}", e); - BuildError::WalletSetupFailed - })?, - }; - let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); let fee_estimator = Arc::new(OnchainFeeEstimator::new()); @@ -1243,16 +1195,6 @@ fn build_with_store_internal( }, }; - let wallet = Arc::new(Wallet::new( - bdk_wallet, - wallet_persister, - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&payment_store), - Arc::clone(&config), - Arc::clone(&logger), - )); - let chain_source = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); @@ -1260,7 +1202,6 @@ fn build_with_store_internal( server_url.clone(), headers.clone(), sync_config, - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), @@ -1274,7 +1215,6 @@ fn build_with_store_internal( Arc::new(ChainSource::new_electrum( server_url.clone(), sync_config, - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), @@ -1295,7 +1235,6 @@ fn build_with_store_internal( *rpc_port, rpc_user.clone(), rpc_password.clone(), - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), @@ -1309,7 +1248,6 @@ fn build_with_store_internal( *rpc_port, rpc_user.clone(), rpc_password.clone(), - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), @@ -1327,7 +1265,6 @@ fn build_with_store_internal( server_url.clone(), HashMap::new(), sync_config, - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), @@ -1338,6 +1275,63 @@ fn build_with_store_internal( }, }; + // Initialize the on-chain wallet and chain access + let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| { + log_error!(logger, "Failed to derive master secret: {}", e); + BuildError::InvalidSeedBytes + })?; + + let descriptor = Bip84(xprv, KeychainKind::External); + let change_descriptor = Bip84(xprv, KeychainKind::Internal); + let mut wallet_persister = + KVStoreWalletPersister::new(Arc::clone(&kv_store), Arc::clone(&logger)); + let wallet_opt = BdkWallet::load() + .descriptor(KeychainKind::External, Some(descriptor.clone())) + .descriptor(KeychainKind::Internal, Some(change_descriptor.clone())) + .extract_keys() + .check_network(config.network) + .load_wallet(&mut wallet_persister) + .map_err(|e| match e { + bdk_wallet::LoadWithPersistError::InvalidChangeSet( + bdk_wallet::LoadError::Mismatch(bdk_wallet::LoadMismatch::Network { + loaded, + expected, + }), + ) => { + log_error!( + logger, + "Failed to setup wallet: Networks do not match. Expected {} but got {}", + expected, + loaded + ); + BuildError::NetworkMismatch + }, + _ => { + log_error!(logger, "Failed to set up wallet: {}", e); + BuildError::WalletSetupFailed + }, + })?; + let bdk_wallet = match wallet_opt { + Some(wallet) => wallet, + None => BdkWallet::create(descriptor, change_descriptor) + .network(config.network) + .create_wallet(&mut wallet_persister) + .map_err(|e| { + log_error!(logger, "Failed to set up wallet: {}", e); + BuildError::WalletSetupFailed + })?, + }; + + let wallet = Arc::new(Wallet::new( + bdk_wallet, + wallet_persister, + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&payment_store), + Arc::clone(&config), + Arc::clone(&logger), + )); + // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { log_error!(logger, "Failed to get current time: {}", e); diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 4b7cd588f..4d7a4a0fe 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -47,7 +47,6 @@ pub(super) struct BitcoindChainSource { api_client: Arc, header_cache: tokio::sync::Mutex, latest_chain_tip: RwLock>, - onchain_wallet: Arc, wallet_polling_status: Mutex, fee_estimator: Arc, kv_store: Arc, @@ -59,9 +58,8 @@ pub(super) struct BitcoindChainSource { impl BitcoindChainSource { pub(crate) fn new_rpc( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - kv_store: Arc, config: Arc, logger: Arc, - node_metrics: Arc>, + fee_estimator: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, ) -> Self { let api_client = Arc::new(BitcoindClient::new_rpc( rpc_host.clone(), @@ -77,7 +75,6 @@ impl BitcoindChainSource { api_client, header_cache, latest_chain_tip, - onchain_wallet, wallet_polling_status, fee_estimator, kv_store, @@ -89,9 +86,9 @@ impl BitcoindChainSource { pub(crate) fn new_rest( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - kv_store: Arc, config: Arc, rest_client_config: BitcoindRestClientConfig, - logger: Arc, node_metrics: Arc>, + fee_estimator: Arc, kv_store: Arc, config: Arc, + rest_client_config: BitcoindRestClientConfig, logger: Arc, + node_metrics: Arc>, ) -> Self { let api_client = Arc::new(BitcoindClient::new_rest( rest_client_config.rest_host, @@ -111,7 +108,6 @@ impl BitcoindChainSource { header_cache, latest_chain_tip, wallet_polling_status, - onchain_wallet, fee_estimator, kv_store, config, @@ -126,8 +122,8 @@ impl BitcoindChainSource { pub(super) async fn continuously_sync_wallets( &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, - channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) { // First register for the wallet polling status to make sure `Node::sync_wallets` calls // wait on the result before proceeding. @@ -155,14 +151,10 @@ impl BitcoindChainSource { let channel_manager_best_block_hash = channel_manager.current_best_block().block_hash; let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash; - let onchain_wallet_best_block_hash = - self.onchain_wallet.current_best_block().block_hash; + let onchain_wallet_best_block_hash = onchain_wallet.current_best_block().block_hash; let mut chain_listeners = vec![ - ( - onchain_wallet_best_block_hash, - &*self.onchain_wallet as &(dyn Listen + Send + Sync), - ), + (onchain_wallet_best_block_hash, &*onchain_wallet as &(dyn Listen + Send + Sync)), (channel_manager_best_block_hash, &*channel_manager as &(dyn Listen + Send + Sync)), (sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)), ]; @@ -307,6 +299,7 @@ impl BitcoindChainSource { return; } _ = self.poll_and_update_listeners( + Arc::clone(&onchain_wallet), Arc::clone(&channel_manager), Arc::clone(&chain_monitor), Arc::clone(&output_sweeper) @@ -337,8 +330,8 @@ impl BitcoindChainSource { } pub(super) async fn poll_and_update_listeners( - &self, channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + &self, onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { let mut status_lock = self.wallet_polling_status.lock().unwrap(); @@ -355,7 +348,12 @@ impl BitcoindChainSource { } let res = self - .poll_and_update_listeners_inner(channel_manager, chain_monitor, output_sweeper) + .poll_and_update_listeners_inner( + onchain_wallet, + channel_manager, + chain_monitor, + output_sweeper, + ) .await; self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); @@ -364,8 +362,8 @@ impl BitcoindChainSource { } async fn poll_and_update_listeners_inner( - &self, channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + &self, onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone(); let chain_tip = if let Some(tip) = latest_chain_tip_opt { @@ -386,7 +384,7 @@ impl BitcoindChainSource { let mut locked_header_cache = self.header_cache.lock().await; let chain_poller = ChainPoller::new(Arc::clone(&self.api_client), self.config.network); let chain_listener = ChainListener { - onchain_wallet: Arc::clone(&self.onchain_wallet), + onchain_wallet: Arc::clone(&onchain_wallet), channel_manager: Arc::clone(&channel_manager), chain_monitor: Arc::clone(&chain_monitor), output_sweeper, @@ -422,7 +420,7 @@ impl BitcoindChainSource { let cur_height = channel_manager.current_best_block().height; let now = SystemTime::now(); - let bdk_unconfirmed_txids = self.onchain_wallet.get_unconfirmed_txids(); + let bdk_unconfirmed_txids = onchain_wallet.get_unconfirmed_txids(); match self .api_client .get_updated_mempool_transactions(cur_height, bdk_unconfirmed_txids) @@ -436,11 +434,11 @@ impl BitcoindChainSource { evicted_txids.len(), now.elapsed().unwrap().as_millis() ); - self.onchain_wallet - .apply_mempool_txs(unconfirmed_txs, evicted_txids) - .unwrap_or_else(|e| { + onchain_wallet.apply_mempool_txs(unconfirmed_txs, evicted_txids).unwrap_or_else( + |e| { log_error!(self.logger, "Failed to apply mempool transactions: {:?}", e); - }); + }, + ); }, Err(e) => { log_error!(self.logger, "Failed to poll for mempool transactions: {:?}", e); diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index dbd0d9f7f..9e05dfaee 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -47,7 +47,6 @@ pub(super) struct ElectrumChainSource { server_url: String, pub(super) sync_config: ElectrumSyncConfig, electrum_runtime_status: RwLock, - onchain_wallet: Arc, onchain_wallet_sync_status: Mutex, lightning_wallet_sync_status: Mutex, fee_estimator: Arc, @@ -59,7 +58,7 @@ pub(super) struct ElectrumChainSource { impl ElectrumChainSource { pub(super) fn new( - server_url: String, sync_config: ElectrumSyncConfig, onchain_wallet: Arc, + server_url: String, sync_config: ElectrumSyncConfig, fee_estimator: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { @@ -70,7 +69,6 @@ impl ElectrumChainSource { server_url, sync_config, electrum_runtime_status, - onchain_wallet, onchain_wallet_sync_status, lightning_wallet_sync_status, fee_estimator, @@ -94,7 +92,9 @@ impl ElectrumChainSource { self.electrum_runtime_status.write().unwrap().stop(); } - pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + pub(crate) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { let receiver_res = { let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); status_lock.register_or_subscribe_pending_sync() @@ -108,14 +108,14 @@ impl ElectrumChainSource { })?; } - let res = self.sync_onchain_wallet_inner().await; + let res = self.sync_onchain_wallet_inner(onchain_wallet).await; self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); res } - async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> { + async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc) -> Result<(), Error> { let electrum_client: Arc = if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { Arc::clone(client) @@ -133,7 +133,7 @@ impl ElectrumChainSource { let apply_wallet_update = |update_res: Result, now: Instant| match update_res { - Ok(update) => match self.onchain_wallet.apply_update(update) { + Ok(update) => match onchain_wallet.apply_update(update) { Ok(()) => { log_info!( self.logger, @@ -160,10 +160,10 @@ impl ElectrumChainSource { Err(e) => Err(e), }; - let cached_txs = self.onchain_wallet.get_cached_txs(); + let cached_txs = onchain_wallet.get_cached_txs(); let res = if incremental_sync { - let incremental_sync_request = self.onchain_wallet.get_incremental_sync_request(); + let incremental_sync_request = onchain_wallet.get_incremental_sync_request(); let incremental_sync_fut = electrum_client .get_incremental_sync_wallet_update(incremental_sync_request, cached_txs); @@ -171,7 +171,7 @@ impl ElectrumChainSource { let update_res = incremental_sync_fut.await.map(|u| u.into()); apply_wallet_update(update_res, now) } else { - let full_scan_request = self.onchain_wallet.get_full_scan_request(); + let full_scan_request = onchain_wallet.get_full_scan_request(); let full_scan_fut = electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs); let now = Instant::now(); diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index be6f2fb86..f6f313955 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -34,7 +34,6 @@ use crate::{Error, NodeMetrics}; pub(super) struct EsploraChainSource { pub(super) sync_config: EsploraSyncConfig, esplora_client: EsploraAsyncClient, - onchain_wallet: Arc, onchain_wallet_sync_status: Mutex, tx_sync: Arc>>, lightning_wallet_sync_status: Mutex, @@ -48,9 +47,8 @@ pub(super) struct EsploraChainSource { impl EsploraChainSource { pub(crate) fn new( server_url: String, headers: HashMap, sync_config: EsploraSyncConfig, - onchain_wallet: Arc, fee_estimator: Arc, - kv_store: Arc, config: Arc, logger: Arc, - node_metrics: Arc>, + fee_estimator: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, ) -> Self { let mut client_builder = esplora_client::Builder::new(&server_url); client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); @@ -68,7 +66,6 @@ impl EsploraChainSource { Self { sync_config, esplora_client, - onchain_wallet, onchain_wallet_sync_status, tx_sync, lightning_wallet_sync_status, @@ -80,7 +77,9 @@ impl EsploraChainSource { } } - pub(super) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + pub(super) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { let receiver_res = { let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); status_lock.register_or_subscribe_pending_sync() @@ -94,14 +93,14 @@ impl EsploraChainSource { })?; } - let res = self.sync_onchain_wallet_inner().await; + let res = self.sync_onchain_wallet_inner(onchain_wallet).await; self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); res } - async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> { + async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc) -> Result<(), Error> { // If this is our first sync, do a full scan with the configured gap limit. // Otherwise just do an incremental sync. let incremental_sync = @@ -112,7 +111,7 @@ impl EsploraChainSource { let now = Instant::now(); match $sync_future.await { Ok(res) => match res { - Ok(update) => match self.onchain_wallet.apply_update(update) { + Ok(update) => match onchain_wallet.apply_update(update) { Ok(()) => { log_info!( self.logger, @@ -182,14 +181,14 @@ impl EsploraChainSource { } if incremental_sync { - let sync_request = self.onchain_wallet.get_incremental_sync_request(); + let sync_request = onchain_wallet.get_incremental_sync_request(); let wallet_sync_timeout_fut = tokio::time::timeout( Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), self.esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), ); get_and_apply_wallet_update!(wallet_sync_timeout_fut) } else { - let full_scan_request = self.onchain_wallet.get_full_scan_request(); + let full_scan_request = onchain_wallet.get_full_scan_request(); let wallet_sync_timeout_fut = tokio::time::timeout( Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), self.esplora_client.full_scan( diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 309d60eab..9c7ddd817 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -99,15 +99,14 @@ enum ChainSourceKind { impl ChainSource { pub(crate) fn new_esplora( server_url: String, headers: HashMap, sync_config: EsploraSyncConfig, - onchain_wallet: Arc, fee_estimator: Arc, - tx_broadcaster: Arc, kv_store: Arc, config: Arc, - logger: Arc, node_metrics: Arc>, + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, ) -> Self { let esplora_chain_source = EsploraChainSource::new( server_url, headers, sync_config, - onchain_wallet, fee_estimator, kv_store, config, @@ -119,7 +118,7 @@ impl ChainSource { } pub(crate) fn new_electrum( - server_url: String, sync_config: ElectrumSyncConfig, onchain_wallet: Arc, + server_url: String, sync_config: ElectrumSyncConfig, fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, @@ -127,7 +126,6 @@ impl ChainSource { let electrum_chain_source = ElectrumChainSource::new( server_url, sync_config, - onchain_wallet, fee_estimator, kv_store, config, @@ -140,16 +138,15 @@ impl ChainSource { pub(crate) fn new_bitcoind_rpc( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - tx_broadcaster: Arc, kv_store: Arc, config: Arc, - logger: Arc, node_metrics: Arc>, + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, ) -> Self { let bitcoind_chain_source = BitcoindChainSource::new_rpc( rpc_host, rpc_port, rpc_user, rpc_password, - onchain_wallet, fee_estimator, kv_store, config, @@ -162,17 +159,15 @@ impl ChainSource { pub(crate) fn new_bitcoind_rest( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - tx_broadcaster: Arc, kv_store: Arc, config: Arc, - rest_client_config: BitcoindRestClientConfig, logger: Arc, - node_metrics: Arc>, + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, rest_client_config: BitcoindRestClientConfig, + logger: Arc, node_metrics: Arc>, ) -> Self { let bitcoind_chain_source = BitcoindChainSource::new_rest( rpc_host, rpc_port, rpc_user, rpc_password, - onchain_wallet, fee_estimator, kv_store, config, @@ -223,7 +218,7 @@ impl ChainSource { } pub(crate) async fn continuously_sync_wallets( - &self, stop_sync_receiver: tokio::sync::watch::Receiver<()>, + &self, stop_sync_receiver: tokio::sync::watch::Receiver<()>, onchain_wallet: Arc, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) { @@ -234,6 +229,7 @@ impl ChainSource { { self.start_tx_based_sync_loop( stop_sync_receiver, + onchain_wallet, channel_manager, chain_monitor, output_sweeper, @@ -256,6 +252,7 @@ impl ChainSource { { self.start_tx_based_sync_loop( stop_sync_receiver, + onchain_wallet, channel_manager, chain_monitor, output_sweeper, @@ -276,6 +273,7 @@ impl ChainSource { bitcoind_chain_source .continuously_sync_wallets( stop_sync_receiver, + onchain_wallet, channel_manager, chain_monitor, output_sweeper, @@ -287,9 +285,9 @@ impl ChainSource { async fn start_tx_based_sync_loop( &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, - channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, background_sync_config: &BackgroundSyncConfig, - logger: Arc, + onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, + background_sync_config: &BackgroundSyncConfig, logger: Arc, ) { // Setup syncing intervals let onchain_wallet_sync_interval_secs = background_sync_config @@ -328,7 +326,7 @@ impl ChainSource { return; } _ = onchain_wallet_sync_interval.tick() => { - let _ = self.sync_onchain_wallet().await; + let _ = self.sync_onchain_wallet(Arc::clone(&onchain_wallet)).await; } _ = fee_rate_update_interval.tick() => { let _ = self.update_fee_rate_estimates().await; @@ -346,13 +344,15 @@ impl ChainSource { // Synchronize the onchain wallet via transaction-based protocols (i.e., Esplora, Electrum, // etc.) - pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + pub(crate) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.sync_onchain_wallet().await + esplora_chain_source.sync_onchain_wallet(onchain_wallet).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.sync_onchain_wallet().await + electrum_chain_source.sync_onchain_wallet(onchain_wallet).await }, ChainSourceKind::Bitcoind { .. } => { // In BitcoindRpc mode we sync lightning and onchain wallet in one go via @@ -388,8 +388,8 @@ impl ChainSource { } pub(crate) async fn poll_and_update_listeners( - &self, channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + &self, onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { match &self.kind { ChainSourceKind::Esplora { .. } => { @@ -404,7 +404,12 @@ impl ChainSource { }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source - .poll_and_update_listeners(channel_manager, chain_monitor, output_sweeper) + .poll_and_update_listeners( + onchain_wallet, + channel_manager, + chain_monitor, + output_sweeper, + ) .await }, } diff --git a/src/lib.rs b/src/lib.rs index 9c2a733b0..4d84c3c99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -238,12 +238,19 @@ impl Node { // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); + let sync_wallet = Arc::clone(&self.wallet); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); self.runtime.spawn_background_task(async move { chain_source - .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) + .continuously_sync_wallets( + stop_sync_receiver, + sync_wallet, + sync_cman, + sync_cmon, + sync_sweeper, + ) .await; }); @@ -1235,6 +1242,7 @@ impl Node { } let chain_source = Arc::clone(&self.chain_source); + let sync_wallet = Arc::clone(&self.wallet); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); @@ -1244,11 +1252,16 @@ impl Node { chain_source .sync_lightning_wallet(sync_cman, sync_cmon, Arc::clone(&sync_sweeper)) .await?; - chain_source.sync_onchain_wallet().await?; + chain_source.sync_onchain_wallet(sync_wallet).await?; } else { chain_source.update_fee_rate_estimates().await?; chain_source - .poll_and_update_listeners(sync_cman, sync_cmon, Arc::clone(&sync_sweeper)) + .poll_and_update_listeners( + sync_wallet, + sync_cman, + sync_cmon, + Arc::clone(&sync_sweeper), + ) .await?; } let _ = sync_sweeper.regenerate_and_broadcast_spend_if_necessary().await; From c0880d9d974f5cd6740151401bd6bfa4193172dd Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 18 Nov 2025 09:58:15 +0100 Subject: [PATCH 2/2] Try to poll chain tip on initialization Previously, we couldn't poll the chain tip in `Builder::build` as we wouldn't have a runtime available. Since we now do, we can at least attempt to poll for the chain tip before initializing objects, avoiding that fresh nodes need to re-validate everything from genesis. --- src/builder.rs | 112 +++++++++++++++++++++++++----------------- src/chain/bitcoind.rs | 46 +++++++++++------ src/chain/mod.rs | 24 ++++----- 3 files changed, 111 insertions(+), 71 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 98650aa1a..183c7513b 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1195,10 +1195,10 @@ fn build_with_store_internal( }, }; - let chain_source = match chain_data_source_config { + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); - Arc::new(ChainSource::new_esplora( + ChainSource::new_esplora( server_url.clone(), headers.clone(), sync_config, @@ -1208,11 +1208,11 @@ fn build_with_store_internal( Arc::clone(&config), Arc::clone(&logger), Arc::clone(&node_metrics), - )) + ) }, Some(ChainDataSourceConfig::Electrum { server_url, sync_config }) => { let sync_config = sync_config.unwrap_or(ElectrumSyncConfig::default()); - Arc::new(ChainSource::new_electrum( + ChainSource::new_electrum( server_url.clone(), sync_config, Arc::clone(&fee_estimator), @@ -1221,7 +1221,7 @@ fn build_with_store_internal( Arc::clone(&config), Arc::clone(&logger), Arc::clone(&node_metrics), - )) + ) }, Some(ChainDataSourceConfig::Bitcoind { rpc_host, @@ -1230,38 +1230,44 @@ fn build_with_store_internal( rpc_password, rest_client_config, }) => match rest_client_config { - Some(rest_client_config) => Arc::new(ChainSource::new_bitcoind_rest( - rpc_host.clone(), - *rpc_port, - rpc_user.clone(), - rpc_password.clone(), - Arc::clone(&fee_estimator), - Arc::clone(&tx_broadcaster), - Arc::clone(&kv_store), - Arc::clone(&config), - rest_client_config.clone(), - Arc::clone(&logger), - Arc::clone(&node_metrics), - )), - None => Arc::new(ChainSource::new_bitcoind_rpc( - rpc_host.clone(), - *rpc_port, - rpc_user.clone(), - rpc_password.clone(), - Arc::clone(&fee_estimator), - Arc::clone(&tx_broadcaster), - Arc::clone(&kv_store), - Arc::clone(&config), - Arc::clone(&logger), - Arc::clone(&node_metrics), - )), + Some(rest_client_config) => runtime.block_on(async { + ChainSource::new_bitcoind_rest( + rpc_host.clone(), + *rpc_port, + rpc_user.clone(), + rpc_password.clone(), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + rest_client_config.clone(), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + .await + }), + None => runtime.block_on(async { + ChainSource::new_bitcoind_rpc( + rpc_host.clone(), + *rpc_port, + rpc_user.clone(), + rpc_password.clone(), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + .await + }), }, None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); let sync_config = EsploraSyncConfig::default(); - Arc::new(ChainSource::new_esplora( + ChainSource::new_esplora( server_url.clone(), HashMap::new(), sync_config, @@ -1271,9 +1277,10 @@ fn build_with_store_internal( Arc::clone(&config), Arc::clone(&logger), Arc::clone(&node_metrics), - )) + ) }, }; + let chain_source = Arc::new(chain_source); // Initialize the on-chain wallet and chain access let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| { @@ -1313,13 +1320,31 @@ fn build_with_store_internal( })?; let bdk_wallet = match wallet_opt { Some(wallet) => wallet, - None => BdkWallet::create(descriptor, change_descriptor) - .network(config.network) - .create_wallet(&mut wallet_persister) - .map_err(|e| { - log_error!(logger, "Failed to set up wallet: {}", e); - BuildError::WalletSetupFailed - })?, + None => { + let mut wallet = BdkWallet::create(descriptor, change_descriptor) + .network(config.network) + .create_wallet(&mut wallet_persister) + .map_err(|e| { + log_error!(logger, "Failed to set up wallet: {}", e); + BuildError::WalletSetupFailed + })?; + + if let Some(best_block) = chain_tip_opt { + // Insert the first checkpoint if we have it, to avoid resyncing from genesis. + // TODO: Use a proper wallet birthday once BDK supports it. + let mut latest_checkpoint = wallet.latest_checkpoint(); + let block_id = + bdk_chain::BlockId { height: best_block.height, hash: best_block.block_hash }; + latest_checkpoint = latest_checkpoint.insert(block_id); + let update = + bdk_wallet::Update { chain: Some(latest_checkpoint), ..Default::default() }; + wallet.apply_update(update).map_err(|e| { + log_error!(logger, "Failed to apply checkpoint during wallet setup: {}", e); + BuildError::WalletSetupFailed + })?; + } + wallet + }, }; let wallet = Arc::new(Wallet::new( @@ -1499,13 +1524,10 @@ fn build_with_store_internal( channel_manager } else { // We're starting a fresh node. - let genesis_block_hash = - bitcoin::blockdata::constants::genesis_block(config.network).block_hash(); + let best_block = + chain_tip_opt.unwrap_or_else(|| BestBlock::from_network(config.network)); - let chain_params = ChainParameters { - network: config.network.into(), - best_block: BestBlock::new(genesis_block_hash, 0), - }; + let chain_params = ChainParameters { network: config.network.into(), best_block }; channelmanager::ChannelManager::new( Arc::clone(&fee_estimator), Arc::clone(&chain_monitor), diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 4d7a4a0fe..b3d7880d6 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -14,7 +14,7 @@ use base64::prelude::BASE64_STANDARD; use base64::Engine; use bitcoin::{BlockHash, FeeRate, Network, Transaction, Txid}; use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; -use lightning::chain::Listen; +use lightning::chain::{BestBlock, Listen}; use lightning::util::ser::Writeable; use lightning_block_sync::gossip::UtxoSource; use lightning_block_sync::http::{HttpEndpoint, JsonResponse}; @@ -42,6 +42,7 @@ use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; +const CHAIN_POLLING_TIMEOUT_SECS: u64 = 10; pub(super) struct BitcoindChainSource { api_client: Arc, @@ -329,6 +330,33 @@ impl BitcoindChainSource { } } + pub(super) async fn poll_best_block(&self) -> Result { + self.poll_chain_tip().await.map(|tip| tip.to_best_block()) + } + + async fn poll_chain_tip(&self) -> Result { + let validate_res = tokio::time::timeout( + Duration::from_secs(CHAIN_POLLING_TIMEOUT_SECS), + validate_best_block_header(self.api_client.as_ref()), + ) + .await + .map_err(|e| { + log_error!(self.logger, "Failed to poll for chain data: {:?}", e); + Error::TxSyncTimeout + })?; + + match validate_res { + Ok(tip) => { + *self.latest_chain_tip.write().unwrap() = Some(tip); + Ok(tip) + }, + Err(e) => { + log_error!(self.logger, "Failed to poll for chain data: {:?}", e); + return Err(Error::TxSyncFailed); + }, + } + } + pub(super) async fn poll_and_update_listeners( &self, onchain_wallet: Arc, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, @@ -366,20 +394,8 @@ impl BitcoindChainSource { chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone(); - let chain_tip = if let Some(tip) = latest_chain_tip_opt { - tip - } else { - match validate_best_block_header(self.api_client.as_ref()).await { - Ok(tip) => { - *self.latest_chain_tip.write().unwrap() = Some(tip); - tip - }, - Err(e) => { - log_error!(self.logger, "Failed to poll for chain data: {:?}", e); - return Err(Error::TxSyncFailed); - }, - } - }; + let chain_tip = + if let Some(tip) = latest_chain_tip_opt { tip } else { self.poll_chain_tip().await? }; let mut locked_header_cache = self.header_cache.lock().await; let chain_poller = ChainPoller::new(Arc::clone(&self.api_client), self.config.network); diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 9c7ddd817..2cd98e20d 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use bitcoin::{Script, Txid}; -use lightning::chain::Filter; +use lightning::chain::{BestBlock, Filter}; use lightning_block_sync::gossip::UtxoSource; use crate::chain::bitcoind::BitcoindChainSource; @@ -102,7 +102,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> Self { + ) -> (Self, Option) { let esplora_chain_source = EsploraChainSource::new( server_url, headers, @@ -114,7 +114,7 @@ impl ChainSource { node_metrics, ); let kind = ChainSourceKind::Esplora(esplora_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, None) } pub(crate) fn new_electrum( @@ -122,7 +122,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> Self { + ) -> (Self, Option) { let electrum_chain_source = ElectrumChainSource::new( server_url, sync_config, @@ -133,15 +133,15 @@ impl ChainSource { node_metrics, ); let kind = ChainSourceKind::Electrum(electrum_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, None) } - pub(crate) fn new_bitcoind_rpc( + pub(crate) async fn new_bitcoind_rpc( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> Self { + ) -> (Self, Option) { let bitcoind_chain_source = BitcoindChainSource::new_rpc( rpc_host, rpc_port, @@ -153,16 +153,17 @@ impl ChainSource { Arc::clone(&logger), node_metrics, ); + let best_block = bitcoind_chain_source.poll_best_block().await.ok(); let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, best_block) } - pub(crate) fn new_bitcoind_rest( + pub(crate) async fn new_bitcoind_rest( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, rest_client_config: BitcoindRestClientConfig, logger: Arc, node_metrics: Arc>, - ) -> Self { + ) -> (Self, Option) { let bitcoind_chain_source = BitcoindChainSource::new_rest( rpc_host, rpc_port, @@ -175,8 +176,9 @@ impl ChainSource { Arc::clone(&logger), node_metrics, ); + let best_block = bitcoind_chain_source.poll_best_block().await.ok(); let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, best_block) } pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> {