Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/vss-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ jobs:
run: |
cd ldk-node
export TEST_VSS_BASE_URL="http://localhost:8080/vss"
RUSTFLAGS="--cfg vss_test" cargo build --verbose --color always
RUSTFLAGS="--cfg vss_test" cargo test io::vss_store
RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss
13 changes: 10 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,7 @@ fn build_with_store_internal(
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(RwLock::new(NodeMetrics::default()))
} else {
log_error!(logger, "Failed to read node metrics from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
Expand Down Expand Up @@ -1201,7 +1202,8 @@ fn build_with_store_internal(
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(_) => {
Err(e) => {
log_error!(logger, "Failed to read payment data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};
Expand Down Expand Up @@ -1334,7 +1336,7 @@ fn build_with_store_internal(
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
log_error!(logger, "Failed to read channel monitors from store: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
Expand All @@ -1359,6 +1361,7 @@ fn build_with_store_internal(
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(Graph::new(config.network.into(), Arc::clone(&logger)))
} else {
log_error!(logger, "Failed to read network graph from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
Expand All @@ -1379,6 +1382,7 @@ fn build_with_store_internal(
Arc::clone(&logger),
)))
} else {
log_error!(logger, "Failed to read scoring data from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
Expand Down Expand Up @@ -1448,7 +1452,7 @@ fn build_with_store_internal(
);
let (_hash, channel_manager) =
<(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| {
log_error!(logger, "Failed to read channel manager from KVStore: {}", e);
log_error!(logger, "Failed to read channel manager from store: {}", e);
BuildError::ReadFailed
})?;
channel_manager
Expand Down Expand Up @@ -1677,6 +1681,7 @@ fn build_with_store_internal(
Arc::clone(&logger),
))
} else {
log_error!(logger, "Failed to read output sweeper data from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
Expand All @@ -1689,6 +1694,7 @@ fn build_with_store_internal(
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(EventQueue::new(Arc::clone(&kv_store), Arc::clone(&logger)))
} else {
log_error!(logger, "Failed to read event queue from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
Expand All @@ -1700,6 +1706,7 @@ fn build_with_store_internal(
if e.kind() == std::io::ErrorKind::NotFound {
Arc::new(PeerStore::new(Arc::clone(&kv_store), Arc::clone(&logger)))
} else {
log_error!(logger, "Failed to read peer data from store: {}", e);
return Err(BuildError::ReadFailed);
}
},
Expand Down
48 changes: 4 additions & 44 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use core::future::Future;
use core::task::{Poll, Waker};
use std::collections::VecDeque;
use std::ops::Deref;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::{Arc, Mutex};

use bitcoin::blockdata::locktime::absolute::LockTime;
use bitcoin::secp256k1::PublicKey;
Expand Down Expand Up @@ -287,7 +287,6 @@ where
{
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
notifier: Condvar,
kv_store: Arc<DynStore>,
logger: L,
}
Expand All @@ -299,8 +298,7 @@ where
pub(crate) fn new(kv_store: Arc<DynStore>, logger: L) -> Self {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let waker = Arc::new(Mutex::new(None));
let notifier = Condvar::new();
Self { queue, waker, notifier, kv_store, logger }
Self { queue, waker, kv_store, logger }
}

pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> {
Expand All @@ -310,8 +308,6 @@ where
self.persist_queue(&locked_queue)?;
}

self.notifier.notify_one();

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
Expand All @@ -327,19 +323,12 @@ where
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
}

pub(crate) fn wait_next_event(&self) -> Event {
let locked_queue =
self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap();
locked_queue.front().unwrap().clone()
}

pub(crate) fn event_handled(&self) -> Result<(), Error> {
{
let mut locked_queue = self.queue.lock().unwrap();
locked_queue.pop_front();
self.persist_queue(&locked_queue)?;
}
self.notifier.notify_one();

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
Expand Down Expand Up @@ -383,8 +372,7 @@ where
let read_queue: EventQueueDeserWrapper = Readable::read(reader)?;
let queue = Arc::new(Mutex::new(read_queue.0));
let waker = Arc::new(Mutex::new(None));
let notifier = Condvar::new();
Ok(Self { queue, waker, notifier, kv_store, logger })
Ok(Self { queue, waker, kv_store, logger })
}
}

Expand Down Expand Up @@ -1637,7 +1625,6 @@ mod tests {

// Check we get the expected event and that it is returned until we mark it handled.
for _ in 0..5 {
assert_eq!(event_queue.wait_next_event(), expected_event);
assert_eq!(event_queue.next_event_async().await, expected_event);
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
}
Expand All @@ -1652,7 +1639,7 @@ mod tests {
.unwrap();
let deser_event_queue =
EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap();
assert_eq!(deser_event_queue.wait_next_event(), expected_event);
assert_eq!(deser_event_queue.next_event_async().await, expected_event);

event_queue.event_handled().unwrap();
assert_eq!(event_queue.next_event(), None);
Expand Down Expand Up @@ -1721,32 +1708,5 @@ mod tests {
}
}
assert_eq!(event_queue.next_event(), None);

// Check we operate correctly, even when mixing and matching blocking and async API calls.
let (tx, mut rx) = tokio::sync::watch::channel(());
let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
let e = thread_queue.wait_next_event();
assert_eq!(e, thread_event);
thread_queue.event_handled().unwrap();
tx.send(()).unwrap();
});

let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
// Sleep a bit before we enqueue the events everybody is waiting for.
std::thread::sleep(Duration::from_millis(20));
thread_queue.add_event(thread_event.clone()).unwrap();
thread_queue.add_event(thread_event.clone()).unwrap();
});

let e = event_queue.next_event_async().await;
assert_eq!(e, expected_event.clone());
event_queue.event_handled().unwrap();

rx.changed().await.unwrap();
assert_eq!(event_queue.next_event(), None);
}
}
Loading
Loading