From c202d35ff36ac37559a832bc0111b0ec4d37d6d0 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 24 Sep 2025 17:35:46 +0000 Subject: [PATCH 1/3] Correct idempotency key docs on `LSPS2ServiceEvent::OpenChannel` While it would be correct to use `(their_network_key, intercept_scid)` as the idempotency key when handling `LSPS2ServiceEvent::OpenChannel`, we don't actually expect anyone to do so as it would require separate storage to track the `intercept_scid` -> opened channel mappings. Thus, we update the documentation to note that the correct idempotency key is `(their_network_key, user_channel_id)`. --- lightning-liquidity/src/lsps2/event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lightning-liquidity/src/lsps2/event.rs b/lightning-liquidity/src/lsps2/event.rs index 29cc577f293..502429b79ec 100644 --- a/lightning-liquidity/src/lsps2/event.rs +++ b/lightning-liquidity/src/lsps2/event.rs @@ -163,7 +163,7 @@ pub enum LSPS2ServiceEvent { /// /// **Note: ** As this event is persisted and might get replayed after restart, you'll need to /// ensure channel creation idempotency. I.e., please check if you already created a - /// corresponding channel based on the given `their_network_key` and `intercept_scid` and + /// corresponding channel based on the given `their_network_key` and `user_channel_id` and /// ignore this event in case you did. /// /// [`ChannelManager::create_channel`]: lightning::ln::channelmanager::ChannelManager::create_channel From 0f237961e228b6b77844bef247ce0c470dd4cee3 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 24 Sep 2025 17:41:29 +0000 Subject: [PATCH 2/3] Fix races when removing per-peer state from KVStore in LSPS2/5 If we note that a peer should be removed in LSPS2/5 handling, we need to make sure that the peer wasn't re-added between dropping its state in memory and going to remove its state from disk. If it is, we need to overwrite the current on-disk state instead. --- lightning-liquidity/src/lsps2/service.rs | 73 ++++++++++++++++-------- lightning-liquidity/src/lsps5/service.rs | 66 +++++++++++++++------ 2 files changed, 97 insertions(+), 42 deletions(-) diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 2d727acf16e..2ded3215a91 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -1603,7 +1603,7 @@ where ) -> Result<(), lightning::io::Error> { let fut = { let outer_state_lock = self.per_peer_state.read().unwrap(); - let encoded = match outer_state_lock.get(&counterparty_node_id) { + match outer_state_lock.get(&counterparty_node_id) { None => { // We dropped the peer state by now. return Ok(()); @@ -1615,18 +1615,19 @@ where return Ok(()); } else { peer_state_lock.needs_persist = false; - peer_state_lock.encode() + let key = counterparty_node_id.to_string(); + let encoded = peer_state_lock.encode(); + // Begin the write with the entry lock held. This avoids racing with + // potentially-in-flight `persist` calls writing state for the same peer. + self.kv_store.write( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + encoded, + ) } }, - }; - let key = counterparty_node_id.to_string(); - - self.kv_store.write( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - encoded, - ) + } }; fut.await.map_err(|e| { @@ -1648,8 +1649,10 @@ where let mut need_persist = Vec::new(); { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - outer_state_lock.retain(|counterparty_node_id, inner_state_lock| { + // First build a list of peers to persist and prune with the read lock. This allows + // us to avoid the write lock unless we actually need to remove a node. + let outer_state_lock = self.per_peer_state.read().unwrap(); + for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() { let mut peer_state_lock = inner_state_lock.lock().unwrap(); peer_state_lock.prune_expired_request_state(); let is_prunable = peer_state_lock.is_prunable(); @@ -1658,8 +1661,7 @@ where } else if peer_state_lock.needs_persist { need_persist.push(*counterparty_node_id); } - !is_prunable - }); + } } for counterparty_node_id in need_persist.into_iter() { @@ -1668,14 +1670,39 @@ where } for counterparty_node_id in need_remove { - let key = counterparty_node_id.to_string(); - self.kv_store - .remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .await?; + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) { + let state = entry.get_mut().get_mut().unwrap(); + if state.is_prunable() { + entry.remove(); + let key = counterparty_node_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )); + } else { + // If the peer got new state, force a re-persist of the current state. + state.needs_persist = true; + } + } else { + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); + } + } + if let Some(future) = future_opt { + future.await?; + } else { + self.persist_peer_state(counterparty_node_id).await?; + } } Ok(()) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 4439130a056..da2a90c52a6 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -20,6 +20,7 @@ use crate::message_queue::MessageQueue; use crate::persist::{ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, }; +use crate::prelude::hash_map::Entry; use crate::prelude::*; use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; use crate::utils::time::TimeProvider; @@ -220,6 +221,8 @@ where let key = counterparty_node_id.to_string(); + // Begin the write with the `per_peer_state` write lock held to avoid racing with + // potentially-in-flight `persist` calls writing state for the same peer. self.kv_store.write( LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, @@ -244,11 +247,12 @@ where // time. let mut need_remove = Vec::new(); let mut need_persist = Vec::new(); + + self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - self.check_prune_stale_webhooks(&mut outer_state_lock); + let outer_state_lock = self.per_peer_state.read().unwrap(); - outer_state_lock.retain(|client_id, peer_state| { + for (client_id, peer_state) in outer_state_lock.iter() { let is_prunable = peer_state.is_prunable(); let has_open_channel = self.client_has_open_channel(client_id); if is_prunable && !has_open_channel { @@ -256,24 +260,48 @@ where } else if peer_state.needs_persist { need_persist.push(*client_id); } - !is_prunable || has_open_channel - }); - }; + } + } - for counterparty_node_id in need_persist.into_iter() { - debug_assert!(!need_remove.contains(&counterparty_node_id)); - self.persist_peer_state(counterparty_node_id).await?; + for client_id in need_persist.into_iter() { + debug_assert!(!need_remove.contains(&client_id)); + self.persist_peer_state(client_id).await?; } - for counterparty_node_id in need_remove { - let key = counterparty_node_id.to_string(); - self.kv_store - .remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .await?; + for client_id in need_remove { + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { + let state = entry.get_mut(); + if state.is_prunable() && !self.client_has_open_channel(&client_id) { + entry.remove(); + let key = client_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )); + } else { + // If the peer was re-added, force a re-persist of the current state. + state.needs_persist = true; + } + } else { + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); + } + } + if let Some(future) = future_opt { + future.await?; + } else { + self.persist_peer_state(client_id).await?; + } } Ok(()) @@ -761,7 +789,7 @@ impl PeerState { }); } - fn is_prunable(&mut self) -> bool { + fn is_prunable(&self) -> bool { self.webhooks.is_empty() } } From a9ddf3f1c917220217e71b403702fb5b3bc797a3 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 25 Sep 2025 14:01:31 +0000 Subject: [PATCH 3/3] Ensure mutual exclusion in LSPS2/5 persistence There are various race conditions between `persist` calls on the LSPS2 and LSPS5 services. Thus, we simply ensure that no two `persist` calls can be in-flight at the same time, ensuring that any new state updates between them will ultimately be persisted by re-persisting from the top in the originally-running `persist` call if another is started before it finishes. --- lightning-liquidity/src/lsps2/service.rs | 112 ++++++++++++---------- lightning-liquidity/src/lsps5/service.rs | 114 +++++++++++++---------- 2 files changed, 132 insertions(+), 94 deletions(-) diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 2ded3215a91..e9013cfe8a8 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -593,6 +593,7 @@ where peer_by_channel_id: RwLock>, total_pending_requests: AtomicUsize, config: LSPS2ServiceConfig, + persistence_in_flight: AtomicUsize, } impl LSPS2ServiceHandler @@ -640,6 +641,7 @@ where peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid), peer_by_channel_id: RwLock::new(peer_by_channel_id), total_pending_requests: AtomicUsize::new(0), + persistence_in_flight: AtomicUsize::new(0), channel_manager, kv_store, config, @@ -1645,64 +1647,80 @@ where // introduce some batching to upper-bound the number of requests inflight at any given // time. - let mut need_remove = Vec::new(); - let mut need_persist = Vec::new(); + if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { + // If we're not the first event processor to get here, just return early, the increment + // we just did will be treated as "go around again" at the end. + return Ok(()); + } - { - // First build a list of peers to persist and prune with the read lock. This allows - // us to avoid the write lock unless we actually need to remove a node. - let outer_state_lock = self.per_peer_state.read().unwrap(); - for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - peer_state_lock.prune_expired_request_state(); - let is_prunable = peer_state_lock.is_prunable(); - if is_prunable { - need_remove.push(*counterparty_node_id); - } else if peer_state_lock.needs_persist { - need_persist.push(*counterparty_node_id); + loop { + let mut need_remove = Vec::new(); + let mut need_persist = Vec::new(); + + { + // First build a list of peers to persist and prune with the read lock. This allows + // us to avoid the write lock unless we actually need to remove a node. + let outer_state_lock = self.per_peer_state.read().unwrap(); + for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + peer_state_lock.prune_expired_request_state(); + let is_prunable = peer_state_lock.is_prunable(); + if is_prunable { + need_remove.push(*counterparty_node_id); + } else if peer_state_lock.needs_persist { + need_persist.push(*counterparty_node_id); + } } } - } - for counterparty_node_id in need_persist.into_iter() { - debug_assert!(!need_remove.contains(&counterparty_node_id)); - self.persist_peer_state(counterparty_node_id).await?; - } + for counterparty_node_id in need_persist.into_iter() { + debug_assert!(!need_remove.contains(&counterparty_node_id)); + self.persist_peer_state(counterparty_node_id).await?; + } - for counterparty_node_id in need_remove { - let mut future_opt = None; - { - // We need to take the `per_peer_state` write lock to remove an entry, but also - // have to hold it until after the `remove` call returns (but not through - // future completion) to ensure that writes for the peer's state are - // well-ordered with other `persist_peer_state` calls even across the removal - // itself. - let mut per_peer_state = self.per_peer_state.write().unwrap(); - if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) { - let state = entry.get_mut().get_mut().unwrap(); - if state.is_prunable() { - entry.remove(); - let key = counterparty_node_id.to_string(); - future_opt = Some(self.kv_store.remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - )); + for counterparty_node_id in need_remove { + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) { + let state = entry.get_mut().get_mut().unwrap(); + if state.is_prunable() { + entry.remove(); + let key = counterparty_node_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )); + } else { + // If the peer got new state, force a re-persist of the current state. + state.needs_persist = true; + } } else { - // If the peer got new state, force a re-persist of the current state. - state.needs_persist = true; + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); } + } + if let Some(future) = future_opt { + future.await?; } else { - // This should never happen, we can only have one `persist` call - // in-progress at once and map entries are only removed by it. - debug_assert!(false); + self.persist_peer_state(counterparty_node_id).await?; } } - if let Some(future) = future_opt { - future.await?; - } else { - self.persist_peer_state(counterparty_node_id).await?; + + if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.persistence_in_flight.store(1, Ordering::Release); + continue; } + break; } Ok(()) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index da2a90c52a6..1111c682fbc 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -36,6 +36,7 @@ use lightning::util::persist::KVStore; use lightning::util::ser::Writeable; use core::ops::Deref; +use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; use alloc::string::String; @@ -140,6 +141,7 @@ where node_signer: NS, kv_store: K, last_pruning: Mutex>, + persistence_in_flight: AtomicUsize, } impl LSPS5ServiceHandler @@ -167,6 +169,7 @@ where node_signer, kv_store, last_pruning: Mutex::new(None), + persistence_in_flight: AtomicUsize::new(0), } } @@ -245,63 +248,80 @@ where // TODO: We should eventually persist in parallel, however, when we do, we probably want to // introduce some batching to upper-bound the number of requests inflight at any given // time. - let mut need_remove = Vec::new(); - let mut need_persist = Vec::new(); - - self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); - { - let outer_state_lock = self.per_peer_state.read().unwrap(); - - for (client_id, peer_state) in outer_state_lock.iter() { - let is_prunable = peer_state.is_prunable(); - let has_open_channel = self.client_has_open_channel(client_id); - if is_prunable && !has_open_channel { - need_remove.push(*client_id); - } else if peer_state.needs_persist { - need_persist.push(*client_id); - } - } - } - for client_id in need_persist.into_iter() { - debug_assert!(!need_remove.contains(&client_id)); - self.persist_peer_state(client_id).await?; + if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { + // If we're not the first event processor to get here, just return early, the increment + // we just did will be treated as "go around again" at the end. + return Ok(()); } - for client_id in need_remove { - let mut future_opt = None; + loop { + let mut need_remove = Vec::new(); + let mut need_persist = Vec::new(); + + self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); { - // We need to take the `per_peer_state` write lock to remove an entry, but also - // have to hold it until after the `remove` call returns (but not through - // future completion) to ensure that writes for the peer's state are - // well-ordered with other `persist_peer_state` calls even across the removal - // itself. - let mut per_peer_state = self.per_peer_state.write().unwrap(); - if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { - let state = entry.get_mut(); - if state.is_prunable() && !self.client_has_open_channel(&client_id) { - entry.remove(); - let key = client_id.to_string(); - future_opt = Some(self.kv_store.remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - )); + let outer_state_lock = self.per_peer_state.read().unwrap(); + + for (client_id, peer_state) in outer_state_lock.iter() { + let is_prunable = peer_state.is_prunable(); + let has_open_channel = self.client_has_open_channel(client_id); + if is_prunable && !has_open_channel { + need_remove.push(*client_id); + } else if peer_state.needs_persist { + need_persist.push(*client_id); + } + } + } + + for client_id in need_persist.into_iter() { + debug_assert!(!need_remove.contains(&client_id)); + self.persist_peer_state(client_id).await?; + } + + for client_id in need_remove { + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { + let state = entry.get_mut(); + if state.is_prunable() && !self.client_has_open_channel(&client_id) { + entry.remove(); + let key = client_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )); + } else { + // If the peer was re-added, force a re-persist of the current state. + state.needs_persist = true; + } } else { - // If the peer was re-added, force a re-persist of the current state. - state.needs_persist = true; + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); } + } + if let Some(future) = future_opt { + future.await?; } else { - // This should never happen, we can only have one `persist` call - // in-progress at once and map entries are only removed by it. - debug_assert!(false); + self.persist_peer_state(client_id).await?; } } - if let Some(future) = future_opt { - future.await?; - } else { - self.persist_peer_state(client_id).await?; + + if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.persistence_in_flight.store(1, Ordering::Release); + continue; } + break; } Ok(())