diff --git a/src/builder.rs b/src/builder.rs index b45f03f6d..183c7513b 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1178,54 +1178,6 @@ fn build_with_store_internal( } }, }; - - // Initialize the on-chain wallet and chain access - let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| { - log_error!(logger, "Failed to derive master secret: {}", e); - BuildError::InvalidSeedBytes - })?; - - let descriptor = Bip84(xprv, KeychainKind::External); - let change_descriptor = Bip84(xprv, KeychainKind::Internal); - let mut wallet_persister = - KVStoreWalletPersister::new(Arc::clone(&kv_store), Arc::clone(&logger)); - let wallet_opt = BdkWallet::load() - .descriptor(KeychainKind::External, Some(descriptor.clone())) - .descriptor(KeychainKind::Internal, Some(change_descriptor.clone())) - .extract_keys() - .check_network(config.network) - .load_wallet(&mut wallet_persister) - .map_err(|e| match e { - bdk_wallet::LoadWithPersistError::InvalidChangeSet( - bdk_wallet::LoadError::Mismatch(bdk_wallet::LoadMismatch::Network { - loaded, - expected, - }), - ) => { - log_error!( - logger, - "Failed to setup wallet: Networks do not match. Expected {} but got {}", - expected, - loaded - ); - BuildError::NetworkMismatch - }, - _ => { - log_error!(logger, "Failed to set up wallet: {}", e); - BuildError::WalletSetupFailed - }, - })?; - let bdk_wallet = match wallet_opt { - Some(wallet) => wallet, - None => BdkWallet::create(descriptor, change_descriptor) - .network(config.network) - .create_wallet(&mut wallet_persister) - .map_err(|e| { - log_error!(logger, "Failed to set up wallet: {}", e); - BuildError::WalletSetupFailed - })?, - }; - let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); let fee_estimator = Arc::new(OnchainFeeEstimator::new()); @@ -1243,45 +1195,33 @@ fn build_with_store_internal( }, }; - let wallet = Arc::new(Wallet::new( - bdk_wallet, - wallet_persister, - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&payment_store), - Arc::clone(&config), - Arc::clone(&logger), - )); - - let chain_source = match chain_data_source_config { + let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default()); - Arc::new(ChainSource::new_esplora( + ChainSource::new_esplora( server_url.clone(), headers.clone(), sync_config, - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), Arc::clone(&config), Arc::clone(&logger), Arc::clone(&node_metrics), - )) + ) }, Some(ChainDataSourceConfig::Electrum { server_url, sync_config }) => { let sync_config = sync_config.unwrap_or(ElectrumSyncConfig::default()); - Arc::new(ChainSource::new_electrum( + ChainSource::new_electrum( server_url.clone(), sync_config, - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), Arc::clone(&config), Arc::clone(&logger), Arc::clone(&node_metrics), - )) + ) }, Some(ChainDataSourceConfig::Bitcoind { rpc_host, @@ -1290,53 +1230,132 @@ fn build_with_store_internal( rpc_password, rest_client_config, }) => match rest_client_config { - Some(rest_client_config) => Arc::new(ChainSource::new_bitcoind_rest( - rpc_host.clone(), - *rpc_port, - rpc_user.clone(), - rpc_password.clone(), - Arc::clone(&wallet), - Arc::clone(&fee_estimator), - Arc::clone(&tx_broadcaster), - Arc::clone(&kv_store), - Arc::clone(&config), - rest_client_config.clone(), - Arc::clone(&logger), - Arc::clone(&node_metrics), - )), - None => Arc::new(ChainSource::new_bitcoind_rpc( - rpc_host.clone(), - *rpc_port, - rpc_user.clone(), - rpc_password.clone(), - Arc::clone(&wallet), - Arc::clone(&fee_estimator), - Arc::clone(&tx_broadcaster), - Arc::clone(&kv_store), - Arc::clone(&config), - Arc::clone(&logger), - Arc::clone(&node_metrics), - )), + Some(rest_client_config) => runtime.block_on(async { + ChainSource::new_bitcoind_rest( + rpc_host.clone(), + *rpc_port, + rpc_user.clone(), + rpc_password.clone(), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + rest_client_config.clone(), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + .await + }), + None => runtime.block_on(async { + ChainSource::new_bitcoind_rpc( + rpc_host.clone(), + *rpc_port, + rpc_user.clone(), + rpc_password.clone(), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + ) + .await + }), }, None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); let sync_config = EsploraSyncConfig::default(); - Arc::new(ChainSource::new_esplora( + ChainSource::new_esplora( server_url.clone(), HashMap::new(), sync_config, - Arc::clone(&wallet), Arc::clone(&fee_estimator), Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), Arc::clone(&config), Arc::clone(&logger), Arc::clone(&node_metrics), - )) + ) }, }; + let chain_source = Arc::new(chain_source); + + // Initialize the on-chain wallet and chain access + let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| { + log_error!(logger, "Failed to derive master secret: {}", e); + BuildError::InvalidSeedBytes + })?; + + let descriptor = Bip84(xprv, KeychainKind::External); + let change_descriptor = Bip84(xprv, KeychainKind::Internal); + let mut wallet_persister = + KVStoreWalletPersister::new(Arc::clone(&kv_store), Arc::clone(&logger)); + let wallet_opt = BdkWallet::load() + .descriptor(KeychainKind::External, Some(descriptor.clone())) + .descriptor(KeychainKind::Internal, Some(change_descriptor.clone())) + .extract_keys() + .check_network(config.network) + .load_wallet(&mut wallet_persister) + .map_err(|e| match e { + bdk_wallet::LoadWithPersistError::InvalidChangeSet( + bdk_wallet::LoadError::Mismatch(bdk_wallet::LoadMismatch::Network { + loaded, + expected, + }), + ) => { + log_error!( + logger, + "Failed to setup wallet: Networks do not match. Expected {} but got {}", + expected, + loaded + ); + BuildError::NetworkMismatch + }, + _ => { + log_error!(logger, "Failed to set up wallet: {}", e); + BuildError::WalletSetupFailed + }, + })?; + let bdk_wallet = match wallet_opt { + Some(wallet) => wallet, + None => { + let mut wallet = BdkWallet::create(descriptor, change_descriptor) + .network(config.network) + .create_wallet(&mut wallet_persister) + .map_err(|e| { + log_error!(logger, "Failed to set up wallet: {}", e); + BuildError::WalletSetupFailed + })?; + + if let Some(best_block) = chain_tip_opt { + // Insert the first checkpoint if we have it, to avoid resyncing from genesis. + // TODO: Use a proper wallet birthday once BDK supports it. + let mut latest_checkpoint = wallet.latest_checkpoint(); + let block_id = + bdk_chain::BlockId { height: best_block.height, hash: best_block.block_hash }; + latest_checkpoint = latest_checkpoint.insert(block_id); + let update = + bdk_wallet::Update { chain: Some(latest_checkpoint), ..Default::default() }; + wallet.apply_update(update).map_err(|e| { + log_error!(logger, "Failed to apply checkpoint during wallet setup: {}", e); + BuildError::WalletSetupFailed + })?; + } + wallet + }, + }; + + let wallet = Arc::new(Wallet::new( + bdk_wallet, + wallet_persister, + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&payment_store), + Arc::clone(&config), + Arc::clone(&logger), + )); // Initialize the KeysManager let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| { @@ -1505,13 +1524,10 @@ fn build_with_store_internal( channel_manager } else { // We're starting a fresh node. - let genesis_block_hash = - bitcoin::blockdata::constants::genesis_block(config.network).block_hash(); + let best_block = + chain_tip_opt.unwrap_or_else(|| BestBlock::from_network(config.network)); - let chain_params = ChainParameters { - network: config.network.into(), - best_block: BestBlock::new(genesis_block_hash, 0), - }; + let chain_params = ChainParameters { network: config.network.into(), best_block }; channelmanager::ChannelManager::new( Arc::clone(&fee_estimator), Arc::clone(&chain_monitor), diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 4b7cd588f..b3d7880d6 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -14,7 +14,7 @@ use base64::prelude::BASE64_STANDARD; use base64::Engine; use bitcoin::{BlockHash, FeeRate, Network, Transaction, Txid}; use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; -use lightning::chain::Listen; +use lightning::chain::{BestBlock, Listen}; use lightning::util::ser::Writeable; use lightning_block_sync::gossip::UtxoSource; use lightning_block_sync::http::{HttpEndpoint, JsonResponse}; @@ -42,12 +42,12 @@ use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; +const CHAIN_POLLING_TIMEOUT_SECS: u64 = 10; pub(super) struct BitcoindChainSource { api_client: Arc, header_cache: tokio::sync::Mutex, latest_chain_tip: RwLock>, - onchain_wallet: Arc, wallet_polling_status: Mutex, fee_estimator: Arc, kv_store: Arc, @@ -59,9 +59,8 @@ pub(super) struct BitcoindChainSource { impl BitcoindChainSource { pub(crate) fn new_rpc( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - kv_store: Arc, config: Arc, logger: Arc, - node_metrics: Arc>, + fee_estimator: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, ) -> Self { let api_client = Arc::new(BitcoindClient::new_rpc( rpc_host.clone(), @@ -77,7 +76,6 @@ impl BitcoindChainSource { api_client, header_cache, latest_chain_tip, - onchain_wallet, wallet_polling_status, fee_estimator, kv_store, @@ -89,9 +87,9 @@ impl BitcoindChainSource { pub(crate) fn new_rest( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - kv_store: Arc, config: Arc, rest_client_config: BitcoindRestClientConfig, - logger: Arc, node_metrics: Arc>, + fee_estimator: Arc, kv_store: Arc, config: Arc, + rest_client_config: BitcoindRestClientConfig, logger: Arc, + node_metrics: Arc>, ) -> Self { let api_client = Arc::new(BitcoindClient::new_rest( rest_client_config.rest_host, @@ -111,7 +109,6 @@ impl BitcoindChainSource { header_cache, latest_chain_tip, wallet_polling_status, - onchain_wallet, fee_estimator, kv_store, config, @@ -126,8 +123,8 @@ impl BitcoindChainSource { pub(super) async fn continuously_sync_wallets( &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, - channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) { // First register for the wallet polling status to make sure `Node::sync_wallets` calls // wait on the result before proceeding. @@ -155,14 +152,10 @@ impl BitcoindChainSource { let channel_manager_best_block_hash = channel_manager.current_best_block().block_hash; let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash; - let onchain_wallet_best_block_hash = - self.onchain_wallet.current_best_block().block_hash; + let onchain_wallet_best_block_hash = onchain_wallet.current_best_block().block_hash; let mut chain_listeners = vec![ - ( - onchain_wallet_best_block_hash, - &*self.onchain_wallet as &(dyn Listen + Send + Sync), - ), + (onchain_wallet_best_block_hash, &*onchain_wallet as &(dyn Listen + Send + Sync)), (channel_manager_best_block_hash, &*channel_manager as &(dyn Listen + Send + Sync)), (sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)), ]; @@ -307,6 +300,7 @@ impl BitcoindChainSource { return; } _ = self.poll_and_update_listeners( + Arc::clone(&onchain_wallet), Arc::clone(&channel_manager), Arc::clone(&chain_monitor), Arc::clone(&output_sweeper) @@ -336,9 +330,36 @@ impl BitcoindChainSource { } } + pub(super) async fn poll_best_block(&self) -> Result { + self.poll_chain_tip().await.map(|tip| tip.to_best_block()) + } + + async fn poll_chain_tip(&self) -> Result { + let validate_res = tokio::time::timeout( + Duration::from_secs(CHAIN_POLLING_TIMEOUT_SECS), + validate_best_block_header(self.api_client.as_ref()), + ) + .await + .map_err(|e| { + log_error!(self.logger, "Failed to poll for chain data: {:?}", e); + Error::TxSyncTimeout + })?; + + match validate_res { + Ok(tip) => { + *self.latest_chain_tip.write().unwrap() = Some(tip); + Ok(tip) + }, + Err(e) => { + log_error!(self.logger, "Failed to poll for chain data: {:?}", e); + return Err(Error::TxSyncFailed); + }, + } + } + pub(super) async fn poll_and_update_listeners( - &self, channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + &self, onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { let mut status_lock = self.wallet_polling_status.lock().unwrap(); @@ -355,7 +376,12 @@ impl BitcoindChainSource { } let res = self - .poll_and_update_listeners_inner(channel_manager, chain_monitor, output_sweeper) + .poll_and_update_listeners_inner( + onchain_wallet, + channel_manager, + chain_monitor, + output_sweeper, + ) .await; self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); @@ -364,29 +390,17 @@ impl BitcoindChainSource { } async fn poll_and_update_listeners_inner( - &self, channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + &self, onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone(); - let chain_tip = if let Some(tip) = latest_chain_tip_opt { - tip - } else { - match validate_best_block_header(self.api_client.as_ref()).await { - Ok(tip) => { - *self.latest_chain_tip.write().unwrap() = Some(tip); - tip - }, - Err(e) => { - log_error!(self.logger, "Failed to poll for chain data: {:?}", e); - return Err(Error::TxSyncFailed); - }, - } - }; + let chain_tip = + if let Some(tip) = latest_chain_tip_opt { tip } else { self.poll_chain_tip().await? }; let mut locked_header_cache = self.header_cache.lock().await; let chain_poller = ChainPoller::new(Arc::clone(&self.api_client), self.config.network); let chain_listener = ChainListener { - onchain_wallet: Arc::clone(&self.onchain_wallet), + onchain_wallet: Arc::clone(&onchain_wallet), channel_manager: Arc::clone(&channel_manager), chain_monitor: Arc::clone(&chain_monitor), output_sweeper, @@ -422,7 +436,7 @@ impl BitcoindChainSource { let cur_height = channel_manager.current_best_block().height; let now = SystemTime::now(); - let bdk_unconfirmed_txids = self.onchain_wallet.get_unconfirmed_txids(); + let bdk_unconfirmed_txids = onchain_wallet.get_unconfirmed_txids(); match self .api_client .get_updated_mempool_transactions(cur_height, bdk_unconfirmed_txids) @@ -436,11 +450,11 @@ impl BitcoindChainSource { evicted_txids.len(), now.elapsed().unwrap().as_millis() ); - self.onchain_wallet - .apply_mempool_txs(unconfirmed_txs, evicted_txids) - .unwrap_or_else(|e| { + onchain_wallet.apply_mempool_txs(unconfirmed_txs, evicted_txids).unwrap_or_else( + |e| { log_error!(self.logger, "Failed to apply mempool transactions: {:?}", e); - }); + }, + ); }, Err(e) => { log_error!(self.logger, "Failed to poll for mempool transactions: {:?}", e); diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index dbd0d9f7f..9e05dfaee 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -47,7 +47,6 @@ pub(super) struct ElectrumChainSource { server_url: String, pub(super) sync_config: ElectrumSyncConfig, electrum_runtime_status: RwLock, - onchain_wallet: Arc, onchain_wallet_sync_status: Mutex, lightning_wallet_sync_status: Mutex, fee_estimator: Arc, @@ -59,7 +58,7 @@ pub(super) struct ElectrumChainSource { impl ElectrumChainSource { pub(super) fn new( - server_url: String, sync_config: ElectrumSyncConfig, onchain_wallet: Arc, + server_url: String, sync_config: ElectrumSyncConfig, fee_estimator: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { @@ -70,7 +69,6 @@ impl ElectrumChainSource { server_url, sync_config, electrum_runtime_status, - onchain_wallet, onchain_wallet_sync_status, lightning_wallet_sync_status, fee_estimator, @@ -94,7 +92,9 @@ impl ElectrumChainSource { self.electrum_runtime_status.write().unwrap().stop(); } - pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + pub(crate) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { let receiver_res = { let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); status_lock.register_or_subscribe_pending_sync() @@ -108,14 +108,14 @@ impl ElectrumChainSource { })?; } - let res = self.sync_onchain_wallet_inner().await; + let res = self.sync_onchain_wallet_inner(onchain_wallet).await; self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); res } - async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> { + async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc) -> Result<(), Error> { let electrum_client: Arc = if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { Arc::clone(client) @@ -133,7 +133,7 @@ impl ElectrumChainSource { let apply_wallet_update = |update_res: Result, now: Instant| match update_res { - Ok(update) => match self.onchain_wallet.apply_update(update) { + Ok(update) => match onchain_wallet.apply_update(update) { Ok(()) => { log_info!( self.logger, @@ -160,10 +160,10 @@ impl ElectrumChainSource { Err(e) => Err(e), }; - let cached_txs = self.onchain_wallet.get_cached_txs(); + let cached_txs = onchain_wallet.get_cached_txs(); let res = if incremental_sync { - let incremental_sync_request = self.onchain_wallet.get_incremental_sync_request(); + let incremental_sync_request = onchain_wallet.get_incremental_sync_request(); let incremental_sync_fut = electrum_client .get_incremental_sync_wallet_update(incremental_sync_request, cached_txs); @@ -171,7 +171,7 @@ impl ElectrumChainSource { let update_res = incremental_sync_fut.await.map(|u| u.into()); apply_wallet_update(update_res, now) } else { - let full_scan_request = self.onchain_wallet.get_full_scan_request(); + let full_scan_request = onchain_wallet.get_full_scan_request(); let full_scan_fut = electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs); let now = Instant::now(); diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index be6f2fb86..f6f313955 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -34,7 +34,6 @@ use crate::{Error, NodeMetrics}; pub(super) struct EsploraChainSource { pub(super) sync_config: EsploraSyncConfig, esplora_client: EsploraAsyncClient, - onchain_wallet: Arc, onchain_wallet_sync_status: Mutex, tx_sync: Arc>>, lightning_wallet_sync_status: Mutex, @@ -48,9 +47,8 @@ pub(super) struct EsploraChainSource { impl EsploraChainSource { pub(crate) fn new( server_url: String, headers: HashMap, sync_config: EsploraSyncConfig, - onchain_wallet: Arc, fee_estimator: Arc, - kv_store: Arc, config: Arc, logger: Arc, - node_metrics: Arc>, + fee_estimator: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, ) -> Self { let mut client_builder = esplora_client::Builder::new(&server_url); client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS); @@ -68,7 +66,6 @@ impl EsploraChainSource { Self { sync_config, esplora_client, - onchain_wallet, onchain_wallet_sync_status, tx_sync, lightning_wallet_sync_status, @@ -80,7 +77,9 @@ impl EsploraChainSource { } } - pub(super) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + pub(super) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { let receiver_res = { let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); status_lock.register_or_subscribe_pending_sync() @@ -94,14 +93,14 @@ impl EsploraChainSource { })?; } - let res = self.sync_onchain_wallet_inner().await; + let res = self.sync_onchain_wallet_inner(onchain_wallet).await; self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); res } - async fn sync_onchain_wallet_inner(&self) -> Result<(), Error> { + async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc) -> Result<(), Error> { // If this is our first sync, do a full scan with the configured gap limit. // Otherwise just do an incremental sync. let incremental_sync = @@ -112,7 +111,7 @@ impl EsploraChainSource { let now = Instant::now(); match $sync_future.await { Ok(res) => match res { - Ok(update) => match self.onchain_wallet.apply_update(update) { + Ok(update) => match onchain_wallet.apply_update(update) { Ok(()) => { log_info!( self.logger, @@ -182,14 +181,14 @@ impl EsploraChainSource { } if incremental_sync { - let sync_request = self.onchain_wallet.get_incremental_sync_request(); + let sync_request = onchain_wallet.get_incremental_sync_request(); let wallet_sync_timeout_fut = tokio::time::timeout( Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), self.esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), ); get_and_apply_wallet_update!(wallet_sync_timeout_fut) } else { - let full_scan_request = self.onchain_wallet.get_full_scan_request(); + let full_scan_request = onchain_wallet.get_full_scan_request(); let wallet_sync_timeout_fut = tokio::time::timeout( Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), self.esplora_client.full_scan( diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 309d60eab..2cd98e20d 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -14,7 +14,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use bitcoin::{Script, Txid}; -use lightning::chain::Filter; +use lightning::chain::{BestBlock, Filter}; use lightning_block_sync::gossip::UtxoSource; use crate::chain::bitcoind::BitcoindChainSource; @@ -99,15 +99,14 @@ enum ChainSourceKind { impl ChainSource { pub(crate) fn new_esplora( server_url: String, headers: HashMap, sync_config: EsploraSyncConfig, - onchain_wallet: Arc, fee_estimator: Arc, - tx_broadcaster: Arc, kv_store: Arc, config: Arc, - logger: Arc, node_metrics: Arc>, - ) -> Self { + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, + ) -> (Self, Option) { let esplora_chain_source = EsploraChainSource::new( server_url, headers, sync_config, - onchain_wallet, fee_estimator, kv_store, config, @@ -115,19 +114,18 @@ impl ChainSource { node_metrics, ); let kind = ChainSourceKind::Esplora(esplora_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, None) } pub(crate) fn new_electrum( - server_url: String, sync_config: ElectrumSyncConfig, onchain_wallet: Arc, + server_url: String, sync_config: ElectrumSyncConfig, fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> Self { + ) -> (Self, Option) { let electrum_chain_source = ElectrumChainSource::new( server_url, sync_config, - onchain_wallet, fee_estimator, kv_store, config, @@ -135,44 +133,42 @@ impl ChainSource { node_metrics, ); let kind = ChainSourceKind::Electrum(electrum_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, None) } - pub(crate) fn new_bitcoind_rpc( + pub(crate) async fn new_bitcoind_rpc( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - tx_broadcaster: Arc, kv_store: Arc, config: Arc, - logger: Arc, node_metrics: Arc>, - ) -> Self { + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, logger: Arc, + node_metrics: Arc>, + ) -> (Self, Option) { let bitcoind_chain_source = BitcoindChainSource::new_rpc( rpc_host, rpc_port, rpc_user, rpc_password, - onchain_wallet, fee_estimator, kv_store, config, Arc::clone(&logger), node_metrics, ); + let best_block = bitcoind_chain_source.poll_best_block().await.ok(); let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, best_block) } - pub(crate) fn new_bitcoind_rest( + pub(crate) async fn new_bitcoind_rest( rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, - onchain_wallet: Arc, fee_estimator: Arc, - tx_broadcaster: Arc, kv_store: Arc, config: Arc, - rest_client_config: BitcoindRestClientConfig, logger: Arc, - node_metrics: Arc>, - ) -> Self { + fee_estimator: Arc, tx_broadcaster: Arc, + kv_store: Arc, config: Arc, rest_client_config: BitcoindRestClientConfig, + logger: Arc, node_metrics: Arc>, + ) -> (Self, Option) { let bitcoind_chain_source = BitcoindChainSource::new_rest( rpc_host, rpc_port, rpc_user, rpc_password, - onchain_wallet, fee_estimator, kv_store, config, @@ -180,8 +176,9 @@ impl ChainSource { Arc::clone(&logger), node_metrics, ); + let best_block = bitcoind_chain_source.poll_best_block().await.ok(); let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source); - Self { kind, tx_broadcaster, logger } + (Self { kind, tx_broadcaster, logger }, best_block) } pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { @@ -223,7 +220,7 @@ impl ChainSource { } pub(crate) async fn continuously_sync_wallets( - &self, stop_sync_receiver: tokio::sync::watch::Receiver<()>, + &self, stop_sync_receiver: tokio::sync::watch::Receiver<()>, onchain_wallet: Arc, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) { @@ -234,6 +231,7 @@ impl ChainSource { { self.start_tx_based_sync_loop( stop_sync_receiver, + onchain_wallet, channel_manager, chain_monitor, output_sweeper, @@ -256,6 +254,7 @@ impl ChainSource { { self.start_tx_based_sync_loop( stop_sync_receiver, + onchain_wallet, channel_manager, chain_monitor, output_sweeper, @@ -276,6 +275,7 @@ impl ChainSource { bitcoind_chain_source .continuously_sync_wallets( stop_sync_receiver, + onchain_wallet, channel_manager, chain_monitor, output_sweeper, @@ -287,9 +287,9 @@ impl ChainSource { async fn start_tx_based_sync_loop( &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, - channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, background_sync_config: &BackgroundSyncConfig, - logger: Arc, + onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, + background_sync_config: &BackgroundSyncConfig, logger: Arc, ) { // Setup syncing intervals let onchain_wallet_sync_interval_secs = background_sync_config @@ -328,7 +328,7 @@ impl ChainSource { return; } _ = onchain_wallet_sync_interval.tick() => { - let _ = self.sync_onchain_wallet().await; + let _ = self.sync_onchain_wallet(Arc::clone(&onchain_wallet)).await; } _ = fee_rate_update_interval.tick() => { let _ = self.update_fee_rate_estimates().await; @@ -346,13 +346,15 @@ impl ChainSource { // Synchronize the onchain wallet via transaction-based protocols (i.e., Esplora, Electrum, // etc.) - pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { + pub(crate) async fn sync_onchain_wallet( + &self, onchain_wallet: Arc, + ) -> Result<(), Error> { match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { - esplora_chain_source.sync_onchain_wallet().await + esplora_chain_source.sync_onchain_wallet(onchain_wallet).await }, ChainSourceKind::Electrum(electrum_chain_source) => { - electrum_chain_source.sync_onchain_wallet().await + electrum_chain_source.sync_onchain_wallet(onchain_wallet).await }, ChainSourceKind::Bitcoind { .. } => { // In BitcoindRpc mode we sync lightning and onchain wallet in one go via @@ -388,8 +390,8 @@ impl ChainSource { } pub(crate) async fn poll_and_update_listeners( - &self, channel_manager: Arc, chain_monitor: Arc, - output_sweeper: Arc, + &self, onchain_wallet: Arc, channel_manager: Arc, + chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { match &self.kind { ChainSourceKind::Esplora { .. } => { @@ -404,7 +406,12 @@ impl ChainSource { }, ChainSourceKind::Bitcoind(bitcoind_chain_source) => { bitcoind_chain_source - .poll_and_update_listeners(channel_manager, chain_monitor, output_sweeper) + .poll_and_update_listeners( + onchain_wallet, + channel_manager, + chain_monitor, + output_sweeper, + ) .await }, } diff --git a/src/lib.rs b/src/lib.rs index 9c2a733b0..4d84c3c99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -238,12 +238,19 @@ impl Node { // Spawn background task continuously syncing onchain, lightning, and fee rate cache. let stop_sync_receiver = self.stop_sender.subscribe(); let chain_source = Arc::clone(&self.chain_source); + let sync_wallet = Arc::clone(&self.wallet); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); self.runtime.spawn_background_task(async move { chain_source - .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) + .continuously_sync_wallets( + stop_sync_receiver, + sync_wallet, + sync_cman, + sync_cmon, + sync_sweeper, + ) .await; }); @@ -1235,6 +1242,7 @@ impl Node { } let chain_source = Arc::clone(&self.chain_source); + let sync_wallet = Arc::clone(&self.wallet); let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); @@ -1244,11 +1252,16 @@ impl Node { chain_source .sync_lightning_wallet(sync_cman, sync_cmon, Arc::clone(&sync_sweeper)) .await?; - chain_source.sync_onchain_wallet().await?; + chain_source.sync_onchain_wallet(sync_wallet).await?; } else { chain_source.update_fee_rate_estimates().await?; chain_source - .poll_and_update_listeners(sync_cman, sync_cmon, Arc::clone(&sync_sweeper)) + .poll_and_update_listeners( + sync_wallet, + sync_cman, + sync_cmon, + Arc::clone(&sync_sweeper), + ) .await?; } let _ = sync_sweeper.regenerate_and_broadcast_spend_if_necessary().await;