Skip to content

Commit 803796f

Browse files
authored
Merge 1be113a into a8441bf
2 parents a8441bf + 1be113a commit 803796f

File tree

2 files changed

+72
-130
lines changed

2 files changed

+72
-130
lines changed

src/store/fs.rs

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,13 @@ impl entity_manager::Params for EmParams {
218218
type EntityState = Slot;
219219

220220
async fn on_shutdown(
221-
_state: entity_manager::ActiveEntityState<Self>,
222-
_cause: entity_manager::ShutdownCause,
221+
state: entity_manager::ActiveEntityState<Self>,
222+
cause: entity_manager::ShutdownCause,
223223
) {
224+
if let Some(mut handle) = state.state.0.lock().await.take() {
225+
trace!("shutting down hash: {}, cause: {cause:?}", state.id);
226+
handle.persist(&state);
227+
}
224228
}
225229
}
226230

@@ -291,7 +295,7 @@ impl HashContext {
291295
.get_or_create(|| async {
292296
let res = self.db().get(hash).await.map_err(io::Error::other)?;
293297
let res = match res {
294-
Some(state) => open_bao_file(&hash, state, &self.global).await,
298+
Some(state) => open_bao_file(state, self).await,
295299
None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
296300
};
297301
Ok((res?, ()))
@@ -311,11 +315,8 @@ impl HashContext {
311315
.get_or_create(|| async {
312316
let res = self.db().get(hash).await.map_err(io::Error::other)?;
313317
let res = match res {
314-
Some(state) => open_bao_file(&hash, state, &self.global).await,
315-
None => Ok(BaoFileHandle::new_partial_mem(
316-
hash,
317-
self.global.options.clone(),
318-
)),
318+
Some(state) => open_bao_file(state, self).await,
319+
None => Ok(BaoFileHandle::new_partial_mem()),
319320
};
320321
Ok((res?, ()))
321322
})
@@ -327,12 +328,9 @@ impl HashContext {
327328
}
328329
}
329330

330-
async fn open_bao_file(
331-
hash: &Hash,
332-
state: EntryState<Bytes>,
333-
ctx: &TaskContext,
334-
) -> io::Result<BaoFileHandle> {
335-
let options = &ctx.options;
331+
async fn open_bao_file(state: EntryState<Bytes>, ctx: &HashContext) -> io::Result<BaoFileHandle> {
332+
let hash = &ctx.id;
333+
let options = &ctx.global.options;
336334
Ok(match state {
337335
EntryState::Complete {
338336
data_location,
@@ -362,9 +360,9 @@ async fn open_bao_file(
362360
MemOrFile::File(file)
363361
}
364362
};
365-
BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
363+
BaoFileHandle::new_complete(data, outboard)
366364
}
367-
EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
365+
EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?,
368366
})
369367
}
370368

@@ -618,12 +616,7 @@ impl Actor {
618616
options: options.clone(),
619617
db: meta::Db::new(db_send),
620618
internal_cmd_tx: fs_commands_tx,
621-
empty: BaoFileHandle::new_complete(
622-
Hash::EMPTY,
623-
MemOrFile::empty(),
624-
MemOrFile::empty(),
625-
options,
626-
),
619+
empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()),
627620
protect,
628621
});
629622
rt.spawn(db_actor.run());
@@ -925,18 +918,14 @@ async fn import_bao_impl(
925918
handle: BaoFileHandle,
926919
ctx: HashContext,
927920
) -> api::Result<()> {
928-
trace!(
929-
"importing bao: {} {} bytes",
930-
handle.hash().fmt_short(),
931-
size
932-
);
921+
trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size);
933922
let mut batch = Vec::<BaoContentItem>::new();
934923
let mut ranges = ChunkRanges::empty();
935924
while let Some(item) = rx.recv().await? {
936925
// if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
937926
if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
938927
let bitfield = Bitfield::new_unchecked(ranges, size.into());
939-
handle.write_batch(&batch, &bitfield, &ctx.global).await?;
928+
handle.write_batch(&batch, &bitfield, &ctx).await?;
940929
batch.clear();
941930
ranges = ChunkRanges::empty();
942931
}
@@ -952,7 +941,7 @@ async fn import_bao_impl(
952941
}
953942
if !batch.is_empty() {
954943
let bitfield = Bitfield::new_unchecked(ranges, size.into());
955-
handle.write_batch(&batch, &bitfield, &ctx.global).await?;
944+
handle.write_batch(&batch, &bitfield, &ctx).await?;
956945
}
957946
Ok(())
958947
}
@@ -992,7 +981,6 @@ async fn export_ranges_impl(
992981
"export_ranges: exporting ranges: {hash} {ranges:?} size={}",
993982
handle.current_size()?
994983
);
995-
debug_assert!(handle.hash() == hash, "hash mismatch");
996984
let bitfield = handle.bitfield()?;
997985
let data = handle.data_reader();
998986
let size = bitfield.size();
@@ -1051,8 +1039,7 @@ async fn export_bao_impl(
10511039
handle: BaoFileHandle,
10521040
) -> anyhow::Result<()> {
10531041
let ExportBaoRequest { ranges, hash, .. } = cmd;
1054-
debug_assert!(handle.hash() == hash, "hash mismatch");
1055-
let outboard = handle.outboard()?;
1042+
let outboard = handle.outboard(&hash)?;
10561043
let size = outboard.tree.size();
10571044
if size == 0 && hash != Hash::EMPTY {
10581045
// we have no data whatsoever, so we stop here

0 commit comments

Comments
 (0)