Skip to content

Commit 3eae4d4

Browse files
authored
Merge 21dc789 into 91af79e
2 parents 91af79e + 21dc789 commit 3eae4d4

File tree

2 files changed

+71
-130
lines changed

2 files changed

+71
-130
lines changed

src/store/fs.rs

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,16 @@ impl entity_manager::Params for EmParams {
218218
type EntityState = Slot;
219219

220220
async fn on_shutdown(
221-
_state: entity_manager::ActiveEntityState<Self>,
222-
_cause: entity_manager::ShutdownCause,
221+
state: entity_manager::ActiveEntityState<Self>,
222+
cause: entity_manager::ShutdownCause,
223223
) {
224+
// this isn't strictly necessary. Drop will run anyway as soon as the
225+
// state is reset to it's default value. Doing it here means that we
226+
// have exact control over where it happens.
227+
if let Some(mut handle) = state.state.0.lock().await.take() {
228+
trace!("shutting down hash: {}, cause: {cause:?}", state.id);
229+
handle.persist(&state);
230+
}
224231
}
225232
}
226233

@@ -291,7 +298,7 @@ impl HashContext {
291298
.get_or_create(|| async {
292299
let res = self.db().get(hash).await.map_err(io::Error::other)?;
293300
let res = match res {
294-
Some(state) => open_bao_file(&hash, state, &self.global).await,
301+
Some(state) => open_bao_file(state, self).await,
295302
None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")),
296303
};
297304
Ok((res?, ()))
@@ -311,11 +318,8 @@ impl HashContext {
311318
.get_or_create(|| async {
312319
let res = self.db().get(hash).await.map_err(io::Error::other)?;
313320
let res = match res {
314-
Some(state) => open_bao_file(&hash, state, &self.global).await,
315-
None => Ok(BaoFileHandle::new_partial_mem(
316-
hash,
317-
self.global.options.clone(),
318-
)),
321+
Some(state) => open_bao_file(state, self).await,
322+
None => Ok(BaoFileHandle::new_partial_mem()),
319323
};
320324
Ok((res?, ()))
321325
})
@@ -327,12 +331,9 @@ impl HashContext {
327331
}
328332
}
329333

330-
async fn open_bao_file(
331-
hash: &Hash,
332-
state: EntryState<Bytes>,
333-
ctx: &TaskContext,
334-
) -> io::Result<BaoFileHandle> {
335-
let options = &ctx.options;
334+
async fn open_bao_file(state: EntryState<Bytes>, ctx: &HashContext) -> io::Result<BaoFileHandle> {
335+
let hash = &ctx.id;
336+
let options = &ctx.global.options;
336337
Ok(match state {
337338
EntryState::Complete {
338339
data_location,
@@ -362,9 +363,9 @@ async fn open_bao_file(
362363
MemOrFile::File(file)
363364
}
364365
};
365-
BaoFileHandle::new_complete(*hash, data, outboard, options.clone())
366+
BaoFileHandle::new_complete(data, outboard)
366367
}
367-
EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?,
368+
EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?,
368369
})
369370
}
370371

@@ -618,12 +619,7 @@ impl Actor {
618619
options: options.clone(),
619620
db: meta::Db::new(db_send),
620621
internal_cmd_tx: fs_commands_tx,
621-
empty: BaoFileHandle::new_complete(
622-
Hash::EMPTY,
623-
MemOrFile::empty(),
624-
MemOrFile::empty(),
625-
options,
626-
),
622+
empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()),
627623
protect,
628624
});
629625
rt.spawn(db_actor.run());
@@ -925,18 +921,14 @@ async fn import_bao_impl(
925921
handle: BaoFileHandle,
926922
ctx: HashContext,
927923
) -> api::Result<()> {
928-
trace!(
929-
"importing bao: {} {} bytes",
930-
handle.hash().fmt_short(),
931-
size
932-
);
924+
trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size);
933925
let mut batch = Vec::<BaoContentItem>::new();
934926
let mut ranges = ChunkRanges::empty();
935927
while let Some(item) = rx.recv().await? {
936928
// if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch
937929
if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() {
938930
let bitfield = Bitfield::new_unchecked(ranges, size.into());
939-
handle.write_batch(&batch, &bitfield, &ctx.global).await?;
931+
handle.write_batch(&batch, &bitfield, &ctx).await?;
940932
batch.clear();
941933
ranges = ChunkRanges::empty();
942934
}
@@ -952,7 +944,7 @@ async fn import_bao_impl(
952944
}
953945
if !batch.is_empty() {
954946
let bitfield = Bitfield::new_unchecked(ranges, size.into());
955-
handle.write_batch(&batch, &bitfield, &ctx.global).await?;
947+
handle.write_batch(&batch, &bitfield, &ctx).await?;
956948
}
957949
Ok(())
958950
}
@@ -992,7 +984,6 @@ async fn export_ranges_impl(
992984
"export_ranges: exporting ranges: {hash} {ranges:?} size={}",
993985
handle.current_size()?
994986
);
995-
debug_assert!(handle.hash() == hash, "hash mismatch");
996987
let bitfield = handle.bitfield()?;
997988
let data = handle.data_reader();
998989
let size = bitfield.size();
@@ -1051,8 +1042,7 @@ async fn export_bao_impl(
10511042
handle: BaoFileHandle,
10521043
) -> anyhow::Result<()> {
10531044
let ExportBaoRequest { ranges, hash, .. } = cmd;
1054-
debug_assert!(handle.hash() == hash, "hash mismatch");
1055-
let outboard = handle.outboard()?;
1045+
let outboard = handle.outboard(&hash)?;
10561046
let size = outboard.tree.size();
10571047
if size == 0 && hash != Hash::EMPTY {
10581048
// we have no data whatsoever, so we stop here

0 commit comments

Comments
 (0)