Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lightning-liquidity/src/lsps2/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub enum LSPS2ServiceEvent {
///
/// **Note: ** As this event is persisted and might get replayed after restart, you'll need to
/// ensure channel creation idempotency. I.e., please check if you already created a
/// corresponding channel based on the given `their_network_key` and `intercept_scid` and
/// corresponding channel based on the given `their_network_key` and `user_channel_id` and
/// ignore this event in case you did.
///
/// [`ChannelManager::create_channel`]: lightning::ln::channelmanager::ChannelManager::create_channel
Expand Down
123 changes: 84 additions & 39 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ where
peer_by_channel_id: RwLock<HashMap<ChannelId, PublicKey>>,
total_pending_requests: AtomicUsize,
config: LSPS2ServiceConfig,
persistence_in_flight: AtomicUsize,
}

impl<CM: Deref, K: Deref + Clone> LSPS2ServiceHandler<CM, K>
Expand Down Expand Up @@ -640,6 +641,7 @@ where
peer_by_intercept_scid: RwLock::new(peer_by_intercept_scid),
peer_by_channel_id: RwLock::new(peer_by_channel_id),
total_pending_requests: AtomicUsize::new(0),
persistence_in_flight: AtomicUsize::new(0),
channel_manager,
kv_store,
config,
Expand Down Expand Up @@ -1603,7 +1605,7 @@ where
) -> Result<(), lightning::io::Error> {
let fut = {
let outer_state_lock = self.per_peer_state.read().unwrap();
let encoded = match outer_state_lock.get(&counterparty_node_id) {
match outer_state_lock.get(&counterparty_node_id) {
None => {
// We dropped the peer state by now.
return Ok(());
Expand All @@ -1615,18 +1617,19 @@ where
return Ok(());
} else {
peer_state_lock.needs_persist = false;
peer_state_lock.encode()
let key = counterparty_node_id.to_string();
let encoded = peer_state_lock.encode();
// Begin the write with the entry lock held. This avoids racing with
// potentially-in-flight `persist` calls writing state for the same peer.
self.kv_store.write(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
encoded,
)
}
},
};
let key = counterparty_node_id.to_string();

self.kv_store.write(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
encoded,
)
}
};

fut.await.map_err(|e| {
Expand All @@ -1644,38 +1647,80 @@ where
// introduce some batching to upper-bound the number of requests inflight at any given
// time.

let mut need_remove = Vec::new();
let mut need_persist = Vec::new();
if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return Ok(());
}

{
let mut outer_state_lock = self.per_peer_state.write().unwrap();
outer_state_lock.retain(|counterparty_node_id, inner_state_lock| {
let mut peer_state_lock = inner_state_lock.lock().unwrap();
peer_state_lock.prune_expired_request_state();
let is_prunable = peer_state_lock.is_prunable();
if is_prunable {
need_remove.push(*counterparty_node_id);
} else if peer_state_lock.needs_persist {
need_persist.push(*counterparty_node_id);
loop {
let mut need_remove = Vec::new();
let mut need_persist = Vec::new();

{
// First build a list of peers to persist and prune with the read lock. This allows
// us to avoid the write lock unless we actually need to remove a node.
let outer_state_lock = self.per_peer_state.read().unwrap();
for (counterparty_node_id, inner_state_lock) in outer_state_lock.iter() {
let mut peer_state_lock = inner_state_lock.lock().unwrap();
peer_state_lock.prune_expired_request_state();
let is_prunable = peer_state_lock.is_prunable();
if is_prunable {
need_remove.push(*counterparty_node_id);
} else if peer_state_lock.needs_persist {
need_persist.push(*counterparty_node_id);
}
}
!is_prunable
});
}
}

for counterparty_node_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&counterparty_node_id));
self.persist_peer_state(counterparty_node_id).await?;
}
for counterparty_node_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&counterparty_node_id));
self.persist_peer_state(counterparty_node_id).await?;
}

for counterparty_node_id in need_remove {
let key = counterparty_node_id.to_string();
self.kv_store
.remove(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
)
.await?;
for counterparty_node_id in need_remove {
let mut future_opt = None;
{
// We need to take the `per_peer_state` write lock to remove an entry, but also
// have to hold it until after the `remove` call returns (but not through
// future completion) to ensure that writes for the peer's state are
// well-ordered with other `persist_peer_state` calls even across the removal
// itself.
let mut per_peer_state = self.per_peer_state.write().unwrap();
if let Entry::Occupied(mut entry) = per_peer_state.entry(counterparty_node_id) {
let state = entry.get_mut().get_mut().unwrap();
if state.is_prunable() {
entry.remove();
let key = counterparty_node_id.to_string();
future_opt = Some(self.kv_store.remove(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
));
} else {
// If the peer got new state, force a re-persist of the current state.
state.needs_persist = true;
}
} else {
// This should never happen, we can only have one `persist` call
// in-progress at once and map entries are only removed by it.
debug_assert!(false);
}
}
if let Some(future) = future_opt {
future.await?;
} else {
self.persist_peer_state(counterparty_node_id).await?;
}
}

if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use fetch_update to avoid the race here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm? Not sure which race you're referring to. I copied this logic from PeerManager.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a potential race between fetch_sub and store below? Although granted I don't believe it would have much consequence, doing both in one operation just seemed cleaner.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I actually think its correct. If we do a fetch_sub and the number we get back is greater than 1, we still "own" the "mutex" - no other persist should be in the loop. We can thus write one to reset the counter such that we won't loop too many times. fetch_update isn't atomic, its actually a compare_and_swap loop which I think doens't quite work.

// If another thread incremented the state while we were running we should go
// around again, but only once.
self.persistence_in_flight.store(1, Ordering::Release);
continue;
}
break;
}

Ok(())
Expand Down
108 changes: 78 additions & 30 deletions lightning-liquidity/src/lsps5/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::message_queue::MessageQueue;
use crate::persist::{
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
};
use crate::prelude::hash_map::Entry;
use crate::prelude::*;
use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
use crate::utils::time::TimeProvider;
Expand All @@ -35,6 +36,7 @@ use lightning::util::persist::KVStore;
use lightning::util::ser::Writeable;

use core::ops::Deref;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::time::Duration;

use alloc::string::String;
Expand Down Expand Up @@ -139,6 +141,7 @@ where
node_signer: NS,
kv_store: K,
last_pruning: Mutex<Option<LSPSDateTime>>,
persistence_in_flight: AtomicUsize,
}

impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPS5ServiceHandler<CM, NS, K, TP>
Expand Down Expand Up @@ -166,6 +169,7 @@ where
node_signer,
kv_store,
last_pruning: Mutex::new(None),
persistence_in_flight: AtomicUsize::new(0),
}
}

Expand Down Expand Up @@ -220,6 +224,8 @@ where

let key = counterparty_node_id.to_string();

// Begin the write with the `per_peer_state` write lock held to avoid racing with
// potentially-in-flight `persist` calls writing state for the same peer.
self.kv_store.write(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
Expand All @@ -242,38 +248,80 @@ where
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
// introduce some batching to upper-bound the number of requests inflight at any given
// time.
let mut need_remove = Vec::new();
let mut need_persist = Vec::new();
{
let mut outer_state_lock = self.per_peer_state.write().unwrap();
self.check_prune_stale_webhooks(&mut outer_state_lock);

outer_state_lock.retain(|client_id, peer_state| {
let is_prunable = peer_state.is_prunable();
let has_open_channel = self.client_has_open_channel(client_id);
if is_prunable && !has_open_channel {
need_remove.push(*client_id);
} else if peer_state.needs_persist {
need_persist.push(*client_id);
}
!is_prunable || has_open_channel
});
};

for counterparty_node_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&counterparty_node_id));
self.persist_peer_state(counterparty_node_id).await?;
if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return Ok(());
}

for counterparty_node_id in need_remove {
let key = counterparty_node_id.to_string();
self.kv_store
.remove(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
)
.await?;
loop {
let mut need_remove = Vec::new();
let mut need_persist = Vec::new();

self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
{
let outer_state_lock = self.per_peer_state.read().unwrap();

for (client_id, peer_state) in outer_state_lock.iter() {
let is_prunable = peer_state.is_prunable();
let has_open_channel = self.client_has_open_channel(client_id);
if is_prunable && !has_open_channel {
need_remove.push(*client_id);
} else if peer_state.needs_persist {
need_persist.push(*client_id);
}
}
}

for client_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&client_id));
self.persist_peer_state(client_id).await?;
}

for client_id in need_remove {
let mut future_opt = None;
{
// We need to take the `per_peer_state` write lock to remove an entry, but also
// have to hold it until after the `remove` call returns (but not through
// future completion) to ensure that writes for the peer's state are
// well-ordered with other `persist_peer_state` calls even across the removal
// itself.
let mut per_peer_state = self.per_peer_state.write().unwrap();
if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
let state = entry.get_mut();
if state.is_prunable() && !self.client_has_open_channel(&client_id) {
entry.remove();
let key = client_id.to_string();
future_opt = Some(self.kv_store.remove(
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
));
} else {
// If the peer was re-added, force a re-persist of the current state.
state.needs_persist = true;
}
} else {
// This should never happen, we can only have one `persist` call
// in-progress at once and map entries are only removed by it.
debug_assert!(false);
}
}
if let Some(future) = future_opt {
future.await?;
} else {
self.persist_peer_state(client_id).await?;
}
}

if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
// If another thread incremented the state while we were running we should go
// around again, but only once.
self.persistence_in_flight.store(1, Ordering::Release);
continue;
}
break;
}

Ok(())
Expand Down Expand Up @@ -761,7 +809,7 @@ impl PeerState {
});
}

fn is_prunable(&mut self) -> bool {
fn is_prunable(&self) -> bool {
self.webhooks.is_empty()
}
}
Expand Down
Loading