Skip to content

Commit e0a0cca

Browse files
authored
Merge 45a02c9 into 0fdcd63
2 parents 0fdcd63 + 45a02c9 commit e0a0cca

File tree

2 files changed

+38
-5
lines changed

2 files changed

+38
-5
lines changed

src/store/fs/gc.rs

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::collections::HashSet;
1+
use std::{collections::HashSet, pin::Pin, sync::Arc};
22

33
use bao_tree::ChunkRanges;
44
use genawaiter::sync::{Co, Gen};
55
use n0_future::{Stream, StreamExt};
6-
use tracing::{debug, error, warn};
6+
use tracing::{debug, error, info, warn};
77

88
use crate::{api::Store, Hash, HashAndFormat};
99

@@ -130,14 +130,32 @@ fn gc_sweep<'a>(
130130
})
131131
}
132132

133-
#[derive(Debug, Clone)]
133+
#[derive(derive_more::Debug, Clone)]
134134
pub struct GcConfig {
135135
pub interval: std::time::Duration,
136+
#[debug("ProtectCallback")]
137+
pub add_protected: Option<ProtectCb>,
136138
}
137139

140+
#[derive(Debug)]
141+
pub enum ProtectOutcome {
142+
Continue,
143+
Skip,
144+
}
145+
146+
pub type ProtectCb = Arc<
147+
dyn for<'a> Fn(
148+
&'a mut HashSet<Hash>,
149+
)
150+
-> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
151+
+ Send
152+
+ Sync
153+
+ 'static,
154+
>;
155+
138156
pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
157+
debug!(externally_protected = live.len(), "gc: start");
139158
{
140-
live.clear();
141159
store.clear_protected().await?;
142160
let mut stream = gc_mark(store, live);
143161
while let Some(ev) = stream.next().await {
@@ -155,6 +173,7 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
155173
}
156174
}
157175
}
176+
debug!(total_protected = live.len(), "gc: sweep");
158177
{
159178
let mut stream = gc_sweep(store, live);
160179
while let Some(ev) = stream.next().await {
@@ -172,14 +191,26 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
172191
}
173192
}
174193
}
194+
debug!("gc: done");
175195

176196
Ok(())
177197
}
178198

179199
pub async fn run_gc(store: Store, config: GcConfig) {
200+
debug!("gc enabled with interval {:?}", config.interval);
180201
let mut live = HashSet::new();
181202
loop {
203+
live.clear();
182204
tokio::time::sleep(config.interval).await;
205+
if let Some(ref cb) = config.add_protected {
206+
match (cb)(&mut live).await {
207+
ProtectOutcome::Continue => {}
208+
ProtectOutcome::Skip => {
209+
info!("Skip gc run: protect callback indicated skip");
210+
continue;
211+
}
212+
}
213+
}
183214
if let Err(e) = gc_run_once(&store, &mut live).await {
184215
error!("error during gc run: {e}");
185216
break;
@@ -284,6 +315,7 @@ mod tests {
284315
assert!(!data_path.exists());
285316
assert!(!outboard_path.exists());
286317
}
318+
live.clear();
287319
// create a large partial file and check that the data and outboard file as well as
288320
// the sizes and bitfield files are deleted by gc
289321
{

src/store/fs/options.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use std::{
44
time::Duration,
55
};
66

7-
use super::{gc::GcConfig, meta::raw_outboard_size, temp_name};
7+
pub use super::gc::{GcConfig, ProtectCb, ProtectOutcome};
8+
use super::{meta::raw_outboard_size, temp_name};
89
use crate::Hash;
910

1011
/// Options for directories used by the file store.

0 commit comments

Comments
 (0)