Skip to content

Commit 054b6c5

Browse files
authored
Merge 6dfdd6a into def9abc
2 parents def9abc + 6dfdd6a commit 054b6c5

File tree

5 files changed

+102
-7
lines changed

5 files changed

+102
-7
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ iroh-base = "0.90"
4444
reflink-copy = "0.1.24"
4545
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
47+
atomic_refcell = "0.1.13"
4748

4849
[dev-dependencies]
4950
clap = { version = "4.5.31", features = ["derive"] }
@@ -58,7 +59,6 @@ testresult = "0.4.1"
5859
tracing-subscriber = { version = "0.3.19", features = ["fmt"] }
5960
tracing-test = "0.2.5"
6061
walkdir = "2.5.0"
61-
atomic_refcell = "0.1.13"
6262

6363
[features]
6464
hide-proto-docs = []

src/store/fs.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ use crate::{
106106
ApiClient,
107107
},
108108
store::{
109-
fs::util::entity_manager::{self, ActiveEntityState},
109+
fs::util::entity_manager::{self, ActiveEntityState, ShutdownCause},
110110
util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
111111
Hash,
112112
},
@@ -217,10 +217,17 @@ impl entity_manager::Params for EmParams {
217217

218218
type EntityState = Slot;
219219

220-
async fn on_shutdown(
221-
_state: entity_manager::ActiveEntityState<Self>,
222-
_cause: entity_manager::ShutdownCause,
223-
) {
220+
async fn on_shutdown(state: HashContext, cause: ShutdownCause) {
221+
// this isn't strictly necessary. Drop will run anyway as soon as the
222+
// state is reset to it's default value. Doing it here means that we
223+
// have exact control over where it happens.
224+
if let Some(handle) = state.state.0.lock().await.take() {
225+
trace!(
226+
"shutting down entity manager for hash: {}, cause: {cause:?}",
227+
state.id
228+
);
229+
drop(handle);
230+
}
224231
}
225232
}
226233

src/store/fs/bao_file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ use bao_tree::{
2020
use bytes::{Bytes, BytesMut};
2121
use derive_more::Debug;
2222
use irpc::channel::mpsc;
23-
use tokio::sync::watch;
2423
use tracing::{debug, error, info, trace};
2524

2625
use super::{
2726
entry_state::{DataLocation, EntryState, OutboardLocation},
2827
options::{Options, PathOptions},
28+
util::watch,
2929
BaoFilePart,
3030
};
3131
use crate::{

src/store/fs/util.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::future::Future;
22

33
use tokio::{select, sync::mpsc};
44
pub(crate) mod entity_manager;
5+
pub(crate) mod watch;
56

67
/// A wrapper for a tokio mpsc receiver that allows peeking at the next message.
78
#[derive(Debug)]

src/store/fs/util/watch.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use std::{ops::Deref, sync::Arc};
2+
3+
use atomic_refcell::{AtomicRef, AtomicRefCell};
4+
5+
struct State<T> {
6+
value: T,
7+
dropped: bool,
8+
}
9+
10+
struct Shared<T> {
11+
value: AtomicRefCell<State<T>>,
12+
notify: tokio::sync::Notify,
13+
}
14+
15+
pub struct Sender<T>(Arc<Shared<T>>);
16+
17+
pub struct Receiver<T>(Arc<Shared<T>>);
18+
19+
impl<T> Sender<T> {
20+
pub fn new(value: T) -> Self {
21+
Self(Arc::new(Shared {
22+
value: AtomicRefCell::new(State {
23+
value,
24+
dropped: false,
25+
}),
26+
notify: tokio::sync::Notify::new(),
27+
}))
28+
}
29+
30+
pub fn send_if_modified<F>(&self, modify: F) -> bool
31+
where
32+
F: FnOnce(&mut T) -> bool,
33+
{
34+
let mut state = self.0.value.borrow_mut();
35+
let modified = modify(&mut state.value);
36+
if modified {
37+
self.0.notify.notify_waiters();
38+
}
39+
modified
40+
}
41+
42+
pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
43+
AtomicRef::map(self.0.value.borrow(), |state| &state.value)
44+
}
45+
46+
pub fn subscribe(&self) -> Receiver<T> {
47+
Receiver(self.0.clone())
48+
}
49+
}
50+
51+
impl<T> Drop for Sender<T> {
52+
fn drop(&mut self) {
53+
self.0.value.borrow_mut().dropped = true;
54+
self.0.notify.notify_waiters();
55+
}
56+
}
57+
58+
impl<T> Receiver<T> {
59+
pub async fn changed(&self) -> Result<(), error::RecvError> {
60+
self.0.notify.notified().await;
61+
if self.0.value.borrow().dropped {
62+
Err(error::RecvError(()))
63+
} else {
64+
Ok(())
65+
}
66+
}
67+
68+
pub fn borrow(&self) -> impl Deref<Target = T> + '_ {
69+
AtomicRef::map(self.0.value.borrow(), |state| &state.value)
70+
}
71+
}
72+
73+
pub mod error {
74+
use std::{error::Error, fmt};
75+
76+
/// Error produced when receiving a change notification.
77+
#[derive(Debug, Clone)]
78+
pub struct RecvError(pub(super) ());
79+
80+
impl fmt::Display for RecvError {
81+
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
82+
write!(fmt, "channel closed")
83+
}
84+
}
85+
86+
impl Error for RecvError {}
87+
}

0 commit comments

Comments
 (0)