Skip to content

Commit 8f7d916

Browse files
authored
Merge 5d6c759 into 71bf60b
2 parents 71bf60b + 5d6c759 commit 8f7d916

File tree

3 files changed

+168
-166
lines changed

3 files changed

+168
-166
lines changed

src/store/fs.rs

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ use crate::{
117117
BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader,
118118
OutboardReader,
119119
},
120-
util::entity_manager::{self, ActiveEntityState},
120+
util::entity_manager,
121121
},
122122
util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned},
123123
Hash, IROH_BLOCK_SIZE,
@@ -211,21 +211,41 @@ impl TaskContext {
211211
}
212212
}
213213

214-
#[derive(Debug)]
215-
struct EmParams;
216-
217-
impl entity_manager::Params for EmParams {
214+
impl entity_manager::Params for HashContext {
218215
type EntityId = Hash;
219216

220217
type GlobalState = Arc<TaskContext>;
221218

222-
type EntityState = BaoFileHandle;
219+
fn id(&self) -> &Self::EntityId {
220+
&self.id
221+
}
222+
223+
fn global(&self) -> &Self::GlobalState {
224+
&self.global
225+
}
223226

224-
async fn on_shutdown(
225-
state: entity_manager::ActiveEntityState<Self>,
226-
_cause: entity_manager::ShutdownCause,
227-
) {
228-
state.persist().await;
227+
fn ref_count(&self) -> usize {
228+
self.state.sender_count() + self.state.receiver_count()
229+
}
230+
231+
fn new(id: &Self::EntityId, global: &Self::GlobalState) -> Self {
232+
Self {
233+
id: *id,
234+
global: global.clone(),
235+
state: BaoFileHandle::default(),
236+
}
237+
}
238+
239+
fn reset(&mut self, id: &Self::EntityId, global: &Self::GlobalState) {
240+
self.id = *id;
241+
self.global = global.clone();
242+
// this is identical to self.state = BaoFileHandle::default(),
243+
// but does not allocate a new handle.
244+
self.state.send_replace(BaoFileStorage::Initial);
245+
}
246+
247+
async fn on_shutdown(&self, _cause: entity_manager::ShutdownCause) {
248+
self.persist().await;
229249
}
230250
}
231251

@@ -240,7 +260,7 @@ struct Actor {
240260
// Tasks for import and export operations.
241261
tasks: JoinSet<()>,
242262
// Entity manager that handles concurrency for entities.
243-
handles: EntityManagerState<EmParams>,
263+
handles: EntityManagerState<HashContext>,
244264
// temp tags
245265
temp_tags: TempTags,
246266
// waiters for idle state.
@@ -249,7 +269,12 @@ struct Actor {
249269
_rt: RtWrapper,
250270
}
251271

252-
type HashContext = ActiveEntityState<EmParams>;
272+
#[derive(Debug, Clone)]
273+
struct HashContext {
274+
id: Hash,
275+
global: Arc<TaskContext>,
276+
state: BaoFileHandle,
277+
}
253278

254279
impl SyncEntityApi for HashContext {
255280
/// Load the state from the database.
@@ -677,11 +702,11 @@ trait HashSpecificCommand: HashSpecific + Send + 'static {
677702

678703
/// Opportunity to send an error if spawning fails due to the task being busy (inbox full)
679704
/// or dead (e.g. panic in one of the running tasks).
680-
fn on_error(self, arg: SpawnArg<EmParams>) -> impl Future<Output = ()> + Send + 'static;
705+
fn on_error(self, arg: SpawnArg<HashContext>) -> impl Future<Output = ()> + Send + 'static;
681706

682707
async fn spawn(
683708
self,
684-
manager: &mut entity_manager::EntityManagerState<EmParams>,
709+
manager: &mut entity_manager::EntityManagerState<HashContext>,
685710
tasks: &mut JoinSet<()>,
686711
) where
687712
Self: Sized,
@@ -715,13 +740,13 @@ impl HashSpecificCommand for ObserveMsg {
715740
async fn handle(self, ctx: HashContext) {
716741
ctx.observe(self).await
717742
}
718-
async fn on_error(self, _arg: SpawnArg<EmParams>) {}
743+
async fn on_error(self, _arg: SpawnArg<HashContext>) {}
719744
}
720745
impl HashSpecificCommand for ExportPathMsg {
721746
async fn handle(self, ctx: HashContext) {
722747
ctx.export_path(self).await
723748
}
724-
async fn on_error(self, arg: SpawnArg<EmParams>) {
749+
async fn on_error(self, arg: SpawnArg<HashContext>) {
725750
let err = match arg {
726751
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
727752
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -737,7 +762,7 @@ impl HashSpecificCommand for ExportBaoMsg {
737762
async fn handle(self, ctx: HashContext) {
738763
ctx.export_bao(self).await
739764
}
740-
async fn on_error(self, arg: SpawnArg<EmParams>) {
765+
async fn on_error(self, arg: SpawnArg<HashContext>) {
741766
let err = match arg {
742767
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
743768
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -753,7 +778,7 @@ impl HashSpecificCommand for ExportRangesMsg {
753778
async fn handle(self, ctx: HashContext) {
754779
ctx.export_ranges(self).await
755780
}
756-
async fn on_error(self, arg: SpawnArg<EmParams>) {
781+
async fn on_error(self, arg: SpawnArg<HashContext>) {
757782
let err = match arg {
758783
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
759784
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -769,7 +794,7 @@ impl HashSpecificCommand for ImportBaoMsg {
769794
async fn handle(self, ctx: HashContext) {
770795
ctx.import_bao(self).await
771796
}
772-
async fn on_error(self, arg: SpawnArg<EmParams>) {
797+
async fn on_error(self, arg: SpawnArg<HashContext>) {
773798
let err = match arg {
774799
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
775800
SpawnArg::Dead => io::Error::other("entity is dead"),
@@ -788,7 +813,7 @@ impl HashSpecificCommand for (TempTag, ImportEntryMsg) {
788813
let (tt, cmd) = self;
789814
ctx.finish_import(cmd, tt).await
790815
}
791-
async fn on_error(self, arg: SpawnArg<EmParams>) {
816+
async fn on_error(self, arg: SpawnArg<HashContext>) {
792817
let err = match arg {
793818
SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(),
794819
SpawnArg::Dead => io::Error::other("entity is dead"),

src/store/fs/bao_file.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use super::{
3030
use crate::{
3131
api::blobs::Bitfield,
3232
store::{
33-
fs::{meta::raw_outboard_size, util::entity_manager, HashContext},
33+
fs::{meta::raw_outboard_size, HashContext},
3434
util::{
3535
read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile,
3636
PartialMemStorage, DD,
@@ -523,16 +523,6 @@ impl BaoFileStorage {
523523
#[derive(Debug, Clone, Default, derive_more::Deref)]
524524
pub(crate) struct BaoFileHandle(pub(super) watch::Sender<BaoFileStorage>);
525525

526-
impl entity_manager::Reset for BaoFileHandle {
527-
fn reset(&mut self) {
528-
self.send_replace(BaoFileStorage::Initial);
529-
}
530-
531-
fn ref_count(&self) -> usize {
532-
self.0.receiver_count() + self.0.sender_count()
533-
}
534-
}
535-
536526
/// A reader for a bao file, reading just the data.
537527
#[derive(Debug)]
538528
pub struct DataReader(pub(super) BaoFileHandle);

0 commit comments

Comments
 (0)