diff --git a/lightning-liquidity/src/lsps2/event.rs b/lightning-liquidity/src/lsps2/event.rs index 29cc577f293..502429b79ec 100644 --- a/lightning-liquidity/src/lsps2/event.rs +++ b/lightning-liquidity/src/lsps2/event.rs @@ -163,7 +163,7 @@ pub enum LSPS2ServiceEvent { /// /// **Note: ** As this event is persisted and might get replayed after restart, you'll need to /// ensure channel creation idempotency. I.e., please check if you already created a - /// corresponding channel based on the given `their_network_key` and `intercept_scid` and + /// corresponding channel based on the given `their_network_key` and `user_channel_id` and /// ignore this event in case you did. /// /// [`ChannelManager::create_channel`]: lightning::ln::channelmanager::ChannelManager::create_channel diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index 2d727acf16e..e9013cfe8a8 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -593,6 +593,7 @@ where peer_by_channel_id: RwLock>, total_pending_requests: AtomicUsize, config: LSPS2ServiceConfig, + persistence_in_flight: AtomicUsize, } impl LSPS2ServiceHandler @@ -640,6 +641,7 @@ where peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid), peer_by_channel_id: RwLock::new(peer_by_channel_id), total_pending_requests: AtomicUsize::new(0), + persistence_in_flight: AtomicUsize::new(0), channel_manager, kv_store, config, @@ -1603,7 +1605,7 @@ where ) -> Result<(), lightning::io::Error> { let fut = { let outer_state_lock = self.per_peer_state.read().unwrap(); - let encoded = match outer_state_lock.get(&counterparty_node_id) { + match outer_state_lock.get(&counterparty_node_id) { None => { // We dropped the peer state by now. return Ok(()); @@ -1615,18 +1617,19 @@ where return Ok(()); } else { peer_state_lock.needs_persist = false; - peer_state_lock.encode() + let key = counterparty_node_id.to_string(); + let encoded = peer_state_lock.encode(); + // Begin the write with the entry lock held. This avoids racing with + // potentially-in-flight `persist` calls writing state for the same peer. + self.kv_store.write( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + encoded, + ) } }, - }; - let key = counterparty_node_id.to_string(); - - self.kv_store.write( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - encoded, - ) + } }; fut.await.map_err(|e| { @@ -1644,38 +1647,80 @@ where // introduce some batching to upper-bound the number of requests inflight at any given // time. - let mut need_remove = Vec::new(); - let mut need_persist = Vec::new(); + if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { + // If we're not the first event processor to get here, just return early, the increment + // we just did will be treated as "go around again" at the end. + return Ok(()); + } - { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - outer_state_lock.retain(|counterparty_node_id, inner_state_lock| { - let mut peer_state_lock = inner_state_lock.lock().unwrap(); - peer_state_lock.prune_expired_request_state(); - let is_prunable = peer_state_lock.is_prunable(); - if is_prunable { - need_remove.push(*counterparty_node_id); - } else if peer_state_lock.needs_persist { - need_persist.push(*counterparty_node_id); + loop { + let mut need_remove = Vec::new(); + let mut need_persist = Vec::new(); + + { + // First build a list of peers to persist and prune with the read lock. This allows + // us to avoid the write lock unless we actually need to remove a node. + let outer_state_lock = self.per_peer_state.read().unwrap(); + for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() { + let mut peer_state_lock = inner_state_lock.lock().unwrap(); + peer_state_lock.prune_expired_request_state(); + let is_prunable = peer_state_lock.is_prunable(); + if is_prunable { + need_remove.push(*counterparty_node_id); + } else if peer_state_lock.needs_persist { + need_persist.push(*counterparty_node_id); + } } - !is_prunable - }); - } + } - for counterparty_node_id in need_persist.into_iter() { - debug_assert!(!need_remove.contains(&counterparty_node_id)); - self.persist_peer_state(counterparty_node_id).await?; - } + for counterparty_node_id in need_persist.into_iter() { + debug_assert!(!need_remove.contains(&counterparty_node_id)); + self.persist_peer_state(counterparty_node_id).await?; + } - for counterparty_node_id in need_remove { - let key = counterparty_node_id.to_string(); - self.kv_store - .remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .await?; + for counterparty_node_id in need_remove { + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) { + let state = entry.get_mut().get_mut().unwrap(); + if state.is_prunable() { + entry.remove(); + let key = counterparty_node_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )); + } else { + // If the peer got new state, force a re-persist of the current state. + state.needs_persist = true; + } + } else { + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); + } + } + if let Some(future) = future_opt { + future.await?; + } else { + self.persist_peer_state(counterparty_node_id).await?; + } + } + + if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.persistence_in_flight.store(1, Ordering::Release); + continue; + } + break; } Ok(()) diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 4439130a056..1111c682fbc 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -20,6 +20,7 @@ use crate::message_queue::MessageQueue; use crate::persist::{ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, }; +use crate::prelude::hash_map::Entry; use crate::prelude::*; use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; use crate::utils::time::TimeProvider; @@ -35,6 +36,7 @@ use lightning::util::persist::KVStore; use lightning::util::ser::Writeable; use core::ops::Deref; +use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; use alloc::string::String; @@ -139,6 +141,7 @@ where node_signer: NS, kv_store: K, last_pruning: Mutex>, + persistence_in_flight: AtomicUsize, } impl LSPS5ServiceHandler @@ -166,6 +169,7 @@ where node_signer, kv_store, last_pruning: Mutex::new(None), + persistence_in_flight: AtomicUsize::new(0), } } @@ -220,6 +224,8 @@ where let key = counterparty_node_id.to_string(); + // Begin the write with the `per_peer_state` write lock held to avoid racing with + // potentially-in-flight `persist` calls writing state for the same peer. self.kv_store.write( LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, @@ -242,38 +248,80 @@ where // TODO: We should eventually persist in parallel, however, when we do, we probably want to // introduce some batching to upper-bound the number of requests inflight at any given // time. - let mut need_remove = Vec::new(); - let mut need_persist = Vec::new(); - { - let mut outer_state_lock = self.per_peer_state.write().unwrap(); - self.check_prune_stale_webhooks(&mut outer_state_lock); - - outer_state_lock.retain(|client_id, peer_state| { - let is_prunable = peer_state.is_prunable(); - let has_open_channel = self.client_has_open_channel(client_id); - if is_prunable && !has_open_channel { - need_remove.push(*client_id); - } else if peer_state.needs_persist { - need_persist.push(*client_id); - } - !is_prunable || has_open_channel - }); - }; - for counterparty_node_id in need_persist.into_iter() { - debug_assert!(!need_remove.contains(&counterparty_node_id)); - self.persist_peer_state(counterparty_node_id).await?; + if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 { + // If we're not the first event processor to get here, just return early, the increment + // we just did will be treated as "go around again" at the end. + return Ok(()); } - for counterparty_node_id in need_remove { - let key = counterparty_node_id.to_string(); - self.kv_store - .remove( - LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .await?; + loop { + let mut need_remove = Vec::new(); + let mut need_persist = Vec::new(); + + self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap()); + { + let outer_state_lock = self.per_peer_state.read().unwrap(); + + for (client_id, peer_state) in outer_state_lock.iter() { + let is_prunable = peer_state.is_prunable(); + let has_open_channel = self.client_has_open_channel(client_id); + if is_prunable && !has_open_channel { + need_remove.push(*client_id); + } else if peer_state.needs_persist { + need_persist.push(*client_id); + } + } + } + + for client_id in need_persist.into_iter() { + debug_assert!(!need_remove.contains(&client_id)); + self.persist_peer_state(client_id).await?; + } + + for client_id in need_remove { + let mut future_opt = None; + { + // We need to take the `per_peer_state` write lock to remove an entry, but also + // have to hold it until after the `remove` call returns (but not through + // future completion) to ensure that writes for the peer's state are + // well-ordered with other `persist_peer_state` calls even across the removal + // itself. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) { + let state = entry.get_mut(); + if state.is_prunable() && !self.client_has_open_channel(&client_id) { + entry.remove(); + let key = client_id.to_string(); + future_opt = Some(self.kv_store.remove( + LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, + &key, + )); + } else { + // If the peer was re-added, force a re-persist of the current state. + state.needs_persist = true; + } + } else { + // This should never happen, we can only have one `persist` call + // in-progress at once and map entries are only removed by it. + debug_assert!(false); + } + } + if let Some(future) = future_opt { + future.await?; + } else { + self.persist_peer_state(client_id).await?; + } + } + + if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 { + // If another thread incremented the state while we were running we should go + // around again, but only once. + self.persistence_in_flight.store(1, Ordering::Release); + continue; + } + break; } Ok(()) @@ -761,7 +809,7 @@ impl PeerState { }); } - fn is_prunable(&mut self) -> bool { + fn is_prunable(&self) -> bool { self.webhooks.is_empty() } }