Skip to content
This repository has been archived by the owner on May 22, 2024. It is now read-only.

Commit

Permalink
Clean-up phase in krator fixed where a channel was closed prematurely…
Browse files Browse the repository at this point in the history
… (#557)

* Clean-up phase in krator fixed where a channel was closed prematurely

* Wait for Deleted event to drop Manifest

* Deleted event is fired after the manifest was sent

Co-authored-by: Kevin Flansburg <kevin.flansburg@gmail.com>
  • Loading branch information
siegfriedweber and kflansburg committed Apr 9, 2021
1 parent 96e4562 commit f5c176d
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion krator/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl<O: Operator> OperatorRuntime<O> {
let (sender, mut receiver) = tokio::sync::mpsc::channel::<Event<O::Manifest>>(128);

let deleted = Arc::new(Notify::new());
let deleted_event = Arc::new(Notify::new());

let (manifest, object_state) = match initial_event {
Event::Applied(manifest) => {
Expand All @@ -167,6 +168,7 @@ impl<O: Operator> OperatorRuntime<O> {

let (manifest_tx, manifest_rx) = Manifest::new(manifest);
let reflector_deleted = Arc::clone(&deleted);
let reflector_deleted_event = Arc::clone(&deleted_event);

// Two tasks are spawned for each resource. The first updates shared state (manifest and
// deleted flag) while the second awaits on the actual state machine, interrupts it on
Expand Down Expand Up @@ -213,6 +215,7 @@ impl<O: Operator> OperatorRuntime<O> {
return;
}
}
reflector_deleted_event.notify_one();
break;
}
_ => warn!("Resource got unexpected event, ignoring: {:?}", &event),
Expand All @@ -226,6 +229,7 @@ impl<O: Operator> OperatorRuntime<O> {
self.operator.shared_state().await,
object_state,
deleted,
deleted_event,
Arc::clone(&self.operator),
));

Expand Down Expand Up @@ -337,6 +341,7 @@ async fn run_object_task<O: Operator>(
shared: SharedState<<O::ObjectState as ObjectState>::SharedState>,
mut object_state: O::ObjectState,
deleted: Arc<Notify>,
deleted_event: Arc<Notify>,
operator: Arc<O>,
) {
debug!("Running registration hook.");
Expand Down Expand Up @@ -377,7 +382,7 @@ async fn run_object_task<O: Operator>(
object_state.async_drop(&mut state_writer).await;
}

match operator.deregistration_hook(manifest).await {
match operator.deregistration_hook(manifest.clone()).await {
Ok(()) => (),
Err(e) => warn!(
"Operator deregistration hook for object {} in namespace {:?} failed: {:?}",
Expand Down Expand Up @@ -413,4 +418,5 @@ async fn run_object_task<O: Operator>(
}
},
}
deleted_event.notified().await;
}

0 comments on commit f5c176d

Please sign in to comment.