From 30eab4bfa01b56c191231394ce0745e3428fbaf7 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 13:15:04 -0700 Subject: [PATCH 01/15] Fix ugly token attachment --- src/lib.rs | 13 ++++--------- src/workflow/mod.rs | 12 +++++++++++- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 30aead89b..991ca2414 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,16 +178,14 @@ where if let Some(pa) = self.pending_activations.pop() { Span::current().record("pending_activation", &format!("{}", &pa).as_str()); - if let Some(mut next_activation) = + if let Some(next_activation) = self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())? { - // TODO: What do do about task token let task_token = pa.task_token.clone(); if next_activation.more_activations_needed { self.pending_activations.push(pa); } - next_activation.activation.task_token = task_token; - return Ok(next_activation.activation); + return Ok(next_activation.finalize(task_token)); } } @@ -209,17 +207,14 @@ where let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; - if let Some(mut na) = next_activation { + if let Some(na) = next_activation { if na.more_activations_needed { self.pending_activations.push(PendingActivation { run_id, task_token: task_token.clone(), }); } - - // TODO: This sucks - na.activation.task_token = task_token; - return Ok(na.activation); + return Ok(na.finalize(task_token)); } } // Drain pending activations in case of shutdown. diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 0bb82109f..344ba8d9e 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -78,10 +78,20 @@ impl WorkflowManager { #[derive(Debug)] pub(crate) struct NextWfActivation { - pub activation: WfActivation, + /// Keep this private, so we can ensure task tokens are attached via [Self::finalize] + activation: WfActivation, pub more_activations_needed: bool, } +impl NextWfActivation { + /// Attach a task token to the activation so it can be sent out to the lang sdk + pub(crate) fn finalize(self, task_token: Vec) -> WfActivation { + let mut a = self.activation; + a.task_token = task_token; + a + } +} + impl WorkflowManager { /// Given history that was just obtained from the server, pipe it into this workflow's machines. /// From bdabdb8640cefc9f4e090ea71490aed841d46107 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 16:12:50 -0700 Subject: [PATCH 02/15] Change errors to be per-api for better granularity and clarity --- src/errors.rs | 121 +++++++++++++++++++++++++ src/lib.rs | 130 +++++++++++---------------- tests/integ_tests/simple_wf_tests.rs | 9 +- 3 files changed, 179 insertions(+), 81 deletions(-) create mode 100644 src/errors.rs diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 000000000..6a44fdcdb --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,121 @@ +use crate::{ + protos::coresdk::activity_result::ActivityResult, + protos::coresdk::workflow_completion::WfActivationCompletion, + protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse, + workflow::WorkflowError, +}; +use tonic::codegen::http::uri::InvalidUri; + +pub(crate) struct ShutdownErr; +pub(crate) struct WorkflowUpdateError { + /// Underlying workflow error + pub source: WorkflowError, + /// The run id of the erring workflow + pub run_id: String, +} + +/// Errors thrown during initialization of [Core] +#[derive(thiserror::Error, Debug, displaydoc::Display)] +pub enum CoreInitError { + /// Invalid URI: {0:?} + InvalidUri(#[from] InvalidUri), + /// Server connection error: {0:?} + TonicTransportError(#[from] tonic::transport::Error), +} + +/// Errors thrown by [Core::poll_workflow_task] +#[derive(thiserror::Error, Debug, displaydoc::Display)] +pub enum PollWfError { + /// There was an error specific to a workflow instance with id ({run_id}): {source:?} + WorkflowUpdateError { + /// Underlying workflow error + source: WorkflowError, + /// The run id of the erring workflow + run_id: String, + }, + /// Poll workflow response from server was malformed: {0:?} + BadPollResponseFromServer(PollWorkflowTaskQueueResponse), + /** [Core::shutdown] was called, and there are no more replay tasks to be handled. You must + call [Core::complete_workflow_task] for any remaining tasks, and then may exit.*/ + ShuttingDown, + /// Unhandled error when calling the temporal server: {0:?} + TonicError(#[from] tonic::Status), +} + +impl From for PollWfError { + fn from(e: WorkflowUpdateError) -> Self { + Self::WorkflowUpdateError { + source: e.source, + run_id: e.run_id, + } + } +} + +impl From for PollWfError { + fn from(_: ShutdownErr) -> Self { + Self::ShuttingDown + } +} + +/// Errors thrown by [Core::poll_activity_task] +#[derive(thiserror::Error, Debug, displaydoc::Display)] +pub enum PollActivityError { + /// [Core::shutdown] was called, we will no longer fetch new activity tasks + ShuttingDown, + /// Unhandled error when calling the temporal server: {0:?} + TonicError(#[from] tonic::Status), +} + +impl From for PollActivityError { + fn from(_: ShutdownErr) -> Self { + Self::ShuttingDown + } +} + +/// Errors thrown by [Core::complete_workflow_task] +#[derive(thiserror::Error, Debug, displaydoc::Display)] +pub enum CompleteWfError { + /// Lang SDK sent us a malformed workflow completion ({reason}): {completion:?} + MalformedWorkflowCompletion { + /// Reason the completion was malformed + reason: String, + /// The completion, which may not be included to avoid unnecessary copies. + completion: Option, + }, + /// There was an error specific to a workflow instance with id ({run_id}): {source:?} + WorkflowUpdateError { + /// Underlying workflow error + source: WorkflowError, + /// The run id of the erring workflow + run_id: String, + }, + /** There exists a pending command in this workflow's history which has not yet been handled. + When thrown from [Core::complete_task], it means you should poll for a new task, receive a + new task token, and complete that new task. */ + UnhandledCommandWhenCompleting, + /// Unhandled error when calling the temporal server: {0:?} + TonicError(#[from] tonic::Status), +} + +impl From for CompleteWfError { + fn from(e: WorkflowUpdateError) -> Self { + Self::WorkflowUpdateError { + source: e.source, + run_id: e.run_id, + } + } +} + +/// Errors thrown by [Core::complete_activity_task] +#[derive(thiserror::Error, Debug, displaydoc::Display)] +pub enum CompleteActivityError { + /// Lang SDK sent us a malformed activity completion ({reason}): {completion:?} + MalformedActivityCompletion { + /// Reason the completion was malformed + reason: String, + /// The completion, which may not be included to avoid unnecessary copies. + completion: Option, + }, + /// Unhandled error when calling the temporal server: {0:?} + TonicError(#[from] tonic::Status), +} diff --git a/src/lib.rs b/src/lib.rs index 991ca2414..a11c70a2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ extern crate tracing; pub mod protos; pub(crate) mod core_tracing; +mod errors; mod machines; mod pending_activations; mod pollers; @@ -22,11 +23,16 @@ mod workflow; #[cfg(test)] mod test_help; +pub use crate::errors::{ + CompleteActivityError, CompleteWfError, CoreInitError, PollActivityError, PollWfError, +}; pub use core_tracing::tracing_init; pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; +use crate::errors::WorkflowUpdateError; use crate::{ + errors::ShutdownErr, machines::{EmptyWorkflowCommandErr, ProtoCommand, WFCommand}, pending_activations::{PendingActivation, PendingActivations}, protos::{ @@ -54,12 +60,8 @@ use std::{ }, }; use tokio::sync::Notify; -use tonic::codegen::http::uri::InvalidUri; use tracing::Span; -/// A result alias having [CoreError] as the error type -pub type Result = std::result::Result; - /// This trait is the primary way by which language specific SDKs interact with the core SDK. It is /// expected that only one instance of an implementation will exist for the lifetime of the /// worker(s) using it. @@ -70,27 +72,31 @@ pub trait Core: Send + Sync { /// indefinitely until such work is available or [shutdown] is called. /// /// TODO: Examples - async fn poll_workflow_task(&self, task_queue: &str) -> Result; + async fn poll_workflow_task(&self, task_queue: &str) -> Result; /// Ask the core for some work, returning an [ActivityTask]. It is then the language SDK's /// responsibility to call the appropriate activity code with the provided inputs. Blocks /// indefinitely until such work is available or [shutdown] is called. /// /// TODO: Examples - async fn poll_activity_task(&self, task_queue: &str) -> Result; + async fn poll_activity_task(&self, task_queue: &str) + -> Result; /// Tell the core that a workflow activation has completed - async fn complete_workflow_task(&self, completion: WfActivationCompletion) -> Result<()>; + async fn complete_workflow_task( + &self, + completion: WfActivationCompletion, + ) -> Result<(), CompleteWfError>; /// Tell the core that an activity has finished executing async fn complete_activity_task( &self, task_token: Vec, result: ActivityResult, - ) -> Result<()>; + ) -> Result<(), CompleteActivityError>; /// Indicate that a long running activity is still making progress - async fn send_activity_heartbeat(&self, task_token: ActivityHeartbeat) -> Result<()>; + async fn send_activity_heartbeat(&self, task_token: ActivityHeartbeat) -> Result<(), ()>; /// Returns core's instance of the [ServerGatewayApis] implementor it is using. fn server_gateway(&self) -> Arc; @@ -156,7 +162,7 @@ macro_rules! abort_on_shutdown { let poll_result_future = $self.server_gateway.$gateway_fn($poll_arg); tokio::select! { _ = shutdownfut => { - Err(CoreError::ShuttingDown) + Err(ShutdownErr.into()) } r = poll_result_future => r.map_err(Into::into) } @@ -169,7 +175,7 @@ where WP: ServerGatewayApis + Send + Sync + 'static, { #[instrument(skip(self), fields(pending_activation))] - async fn poll_workflow_task(&self, task_queue: &str) -> Result { + async fn poll_workflow_task(&self, task_queue: &str) -> Result { // The poll needs to be in a loop because we can't guarantee tail call optimization in Rust // (simply) and we really, really need that for long-poll retries. loop { @@ -190,7 +196,7 @@ where } if self.shutdown_requested.load(Ordering::SeqCst) { - return Err(CoreError::ShuttingDown); + return Err(PollWfError::ShuttingDown); } match abort_on_shutdown!(self, poll_workflow_task, task_queue.to_owned()) { @@ -218,14 +224,21 @@ where } } // Drain pending activations in case of shutdown. - Err(CoreError::ShuttingDown) => continue, + Err(PollWfError::ShuttingDown) => continue, Err(e) => return Err(e), } } } #[instrument(skip(self))] - async fn poll_activity_task(&self, task_queue: &str) -> Result { + async fn poll_activity_task( + &self, + task_queue: &str, + ) -> Result { + if self.shutdown_requested.load(Ordering::SeqCst) { + return Err(PollActivityError::ShuttingDown); + } + match abort_on_shutdown!(self, poll_activity_task, task_queue.to_owned()) { Ok(work) => { let task_token = work.task_token.clone(); @@ -236,14 +249,17 @@ where } #[instrument(skip(self))] - async fn complete_workflow_task(&self, completion: WfActivationCompletion) -> Result<()> { + async fn complete_workflow_task( + &self, + completion: WfActivationCompletion, + ) -> Result<(), CompleteWfError> { let task_token = completion.task_token; let wfstatus = completion.status; let run_id = self .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or_else(|| CoreError::MalformedWorkflowCompletion { + .ok_or_else(|| CompleteWfError::MalformedWorkflowCompletion { reason: format!( "Task token {} had no workflow run associated with it", fmt_task_token(&task_token) @@ -258,7 +274,7 @@ where .into_iter() .map(|c| c.try_into()) .collect::, EmptyWorkflowCommandErr>>() - .map_err(|_| CoreError::MalformedWorkflowCompletion { + .map_err(|_| CompleteWfError::MalformedWorkflowCompletion { reason: "At least one workflow command in the completion \ contained an empty variant" .to_owned(), @@ -276,7 +292,7 @@ where if ts.code() == tonic::Code::InvalidArgument && ts.message() == "UnhandledCommand" { - CoreError::UnhandledCommandWhenCompleting + CompleteWfError::UnhandledCommandWhenCompleting } else { ts.into() } @@ -296,7 +312,7 @@ where .await?; } None => { - return Err(CoreError::MalformedWorkflowCompletion { + return Err(CompleteWfError::MalformedWorkflowCompletion { reason: "Workflow completion had empty status field".to_owned(), completion: None, }) @@ -310,11 +326,11 @@ where &self, task_token: Vec, result: ActivityResult, - ) -> Result<()> { + ) -> Result<(), CompleteActivityError> { let status = if let Some(s) = result.status { s } else { - return Err(CoreError::MalformedActivityCompletion { + return Err(CompleteActivityError::MalformedActivityCompletion { reason: "Activity result had empty status field".to_owned(), completion: Some(result), }); @@ -339,7 +355,7 @@ where Ok(()) } - async fn send_activity_heartbeat(&self, _task_token: ActivityHeartbeat) -> Result<()> { + async fn send_activity_heartbeat(&self, _task_token: ActivityHeartbeat) -> Result<(), ()> { unimplemented!() } @@ -376,7 +392,7 @@ impl CoreSDK { fn instantiate_or_update_workflow( &self, poll_wf_resp: PollWorkflowTaskQueueResponse, - ) -> Result<(Option, String)> { + ) -> Result<(Option, String), PollWfError> { match poll_wf_resp { PollWorkflowTaskQueueResponse { task_token, @@ -393,16 +409,20 @@ impl CoreSDK { .create_or_update(&run_id, history, workflow_execution) { Ok(activation) => Ok((activation, run_id)), - Err(source) => Err(CoreError::WorkflowError { source, run_id }), + Err(source) => Err(PollWfError::WorkflowUpdateError { source, run_id }), } } - p => Err(CoreError::BadPollResponseFromServer(p)), + p => Err(PollWfError::BadPollResponseFromServer(p).into()), } } /// Feed commands from the lang sdk into appropriate workflow manager which will iterate /// the state machines and return commands ready to be sent to the server - fn push_lang_commands(&self, run_id: &str, cmds: Vec) -> Result> { + fn push_lang_commands( + &self, + run_id: &str, + cmds: Vec, + ) -> Result, WorkflowUpdateError> { self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } @@ -413,7 +433,11 @@ impl CoreSDK { /// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing /// span to the wf machines thread. - fn access_wf_machine(&self, run_id: &str, mutator: F) -> Result + fn access_wf_machine( + &self, + run_id: &str, + mutator: F, + ) -> Result where F: FnOnce(&mut WorkflowManager) -> Result + Send + 'static, Fout: Send + Debug + 'static, @@ -425,61 +449,13 @@ impl CoreSDK { }; self.workflow_machines .access(run_id, mutator) - .map_err(|source| CoreError::WorkflowError { + .map_err(|source| WorkflowUpdateError { source, run_id: run_id.to_owned(), }) } } -/// The error type returned by interactions with [Core] -#[derive(thiserror::Error, Debug, displaydoc::Display)] -#[allow(clippy::large_enum_variant)] -// NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc -pub enum CoreError { - /** [Core::shutdown] was called, and there are no more replay tasks to be handled. You must - call [Core::complete_task] for any remaining tasks, and then may exit.*/ - ShuttingDown, - /// Poll workflow response from server was malformed: {0:?} - BadPollResponseFromServer(PollWorkflowTaskQueueResponse), - /// Lang SDK sent us a malformed workflow completion ({reason}): {completion:?} - MalformedWorkflowCompletion { - /// Reason the completion was malformed - reason: String, - /// The completion, which may not be included to avoid unnecessary copies. - completion: Option, - }, - /// Lang SDK sent us a malformed activity completion ({reason}): {completion:?} - MalformedActivityCompletion { - /// Reason the completion was malformed - reason: String, - /// The completion, which may not be included to avoid unnecessary copies. - completion: Option, - }, - /// There was an error specific to a workflow instance with id ({run_id}): {source:?} - WorkflowError { - /// Underlying workflow error - source: WorkflowError, - /// The run id of the erring workflow - run_id: String, - }, - /** There exists a pending command in this workflow's history which has not yet been handled. - When thrown from [Core::complete_task], it means you should poll for a new task, receive a - new task token, and complete that new task. */ - UnhandledCommandWhenCompleting, - /// Unhandled error when calling the temporal server: {0:?} - TonicError(#[from] tonic::Status), -} - -/// Errors thrown during initialization of [Core] -#[derive(thiserror::Error, Debug, displaydoc::Display)] -pub enum CoreInitError { - /// Invalid URI: {0:?} - InvalidUri(#[from] InvalidUri), - /// Server connection error: {0:?} - TonicTransportError(#[from] tonic::transport::Error), -} - #[cfg(test)] mod test { use super::*; @@ -759,7 +735,7 @@ mod test { .poll_workflow_task(TASK_Q) .await .unwrap_err(), - CoreError::ShuttingDown + PollWfError::ShuttingDown ); } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 61af33631..77c5e2f13 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -18,7 +18,8 @@ use temporal_sdk_core::{ }, workflow_completion::WfActivationCompletion, }, - Core, CoreError, CoreInitOptions, ServerGatewayApis, ServerGatewayOptions, Url, + CompleteWfError, Core, CoreInitOptions, PollWfError, ServerGatewayApis, ServerGatewayOptions, + Url, }; // TODO: These tests can get broken permanently if they break one time and the server is not @@ -556,14 +557,14 @@ async fn shutdown_aborts_actively_blocked_poll() { }); assert_matches!( core.poll_workflow_task(task_q).await.unwrap_err(), - CoreError::ShuttingDown + PollWfError::ShuttingDown ); handle.join().unwrap(); // Ensure double-shutdown doesn't explode core.shutdown(); assert_matches!( core.poll_workflow_task(task_q).await.unwrap_err(), - CoreError::ShuttingDown + PollWfError::ShuttingDown ); } @@ -772,7 +773,7 @@ async fn signal_workflow_signal_not_handled_on_workflow_completion() { )) .await .unwrap_err(); - assert_matches!(unhandled, CoreError::UnhandledCommandWhenCompleting); + assert_matches!(unhandled, CompleteWfError::UnhandledCommandWhenCompleting); // We should get a new task with the signal let res = core.poll_workflow_task(task_q).await.unwrap(); From 7050d850fe1f91424b7933221b99d242855aac29 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 16:22:02 -0700 Subject: [PATCH 03/15] Lint fix --- src/errors.rs | 1 + src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/errors.rs b/src/errors.rs index 6a44fdcdb..cf7988cef 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -74,6 +74,7 @@ impl From for PollActivityError { /// Errors thrown by [Core::complete_workflow_task] #[derive(thiserror::Error, Debug, displaydoc::Display)] +#[allow(clippy::large_enum_variant)] pub enum CompleteWfError { /// Lang SDK sent us a malformed workflow completion ({reason}): {completion:?} MalformedWorkflowCompletion { diff --git a/src/lib.rs b/src/lib.rs index a11c70a2f..665d1a3e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -412,7 +412,7 @@ impl CoreSDK { Err(source) => Err(PollWfError::WorkflowUpdateError { source, run_id }), } } - p => Err(PollWfError::BadPollResponseFromServer(p).into()), + p => Err(PollWfError::BadPollResponseFromServer(p)), } } From c25e076c42aac33086a93b12f3561afbbff706f4 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 16:29:40 -0700 Subject: [PATCH 04/15] Turn on documentation lints in CI --- .buildkite/pipeline.yml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index e84b65cad..129357459 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -9,6 +9,18 @@ steps: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml + - label: "doc" + agents: + queue: "default" + docker: "*" + env: + RUSTDOCFLAGS: -Dwarnings + command: "cargo doc --workspace --all-features --no-deps" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: unit-test + config: .buildkite/docker/docker-compose.yaml - label: "lint" agents: queue: "default" From 2e66f5043d7dbcfd9bca1f68be45ae5491815f03 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 16:35:15 -0700 Subject: [PATCH 05/15] Fix all doc problems --- protos/local/workflow_completion.proto | 2 +- src/errors.rs | 21 +++++++++++---------- src/lib.rs | 15 ++++++++------- src/pollers/mod.rs | 6 +++--- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/protos/local/workflow_completion.proto b/protos/local/workflow_completion.proto index 9c1e636f3..4fbc9beec 100644 --- a/protos/local/workflow_completion.proto +++ b/protos/local/workflow_completion.proto @@ -7,7 +7,7 @@ import "workflow_commands.proto"; /// Result of a single workflow activation, reported from lang to core message WFActivationCompletion { - // The token from the [WfActivation] you are completing + // The token from the workflow activation you are completing bytes task_token = 1; oneof status { Success successful = 2; diff --git a/src/errors.rs b/src/errors.rs index cf7988cef..dd53e5dd4 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -14,7 +14,7 @@ pub(crate) struct WorkflowUpdateError { pub run_id: String, } -/// Errors thrown during initialization of [Core] +/// Errors thrown during initialization of [crate::Core] #[derive(thiserror::Error, Debug, displaydoc::Display)] pub enum CoreInitError { /// Invalid URI: {0:?} @@ -23,7 +23,7 @@ pub enum CoreInitError { TonicTransportError(#[from] tonic::transport::Error), } -/// Errors thrown by [Core::poll_workflow_task] +/// Errors thrown by [crate::Core::poll_workflow_task] #[derive(thiserror::Error, Debug, displaydoc::Display)] pub enum PollWfError { /// There was an error specific to a workflow instance with id ({run_id}): {source:?} @@ -35,8 +35,9 @@ pub enum PollWfError { }, /// Poll workflow response from server was malformed: {0:?} BadPollResponseFromServer(PollWorkflowTaskQueueResponse), - /** [Core::shutdown] was called, and there are no more replay tasks to be handled. You must - call [Core::complete_workflow_task] for any remaining tasks, and then may exit.*/ + /** [crate::Core::shutdown] was called, and there are no more replay tasks to be handled. You + * must call [crate::Core::complete_workflow_task] for any remaining tasks, and then may + * exit.*/ ShuttingDown, /// Unhandled error when calling the temporal server: {0:?} TonicError(#[from] tonic::Status), @@ -57,10 +58,10 @@ impl From for PollWfError { } } -/// Errors thrown by [Core::poll_activity_task] +/// Errors thrown by [crate::Core::poll_activity_task] #[derive(thiserror::Error, Debug, displaydoc::Display)] pub enum PollActivityError { - /// [Core::shutdown] was called, we will no longer fetch new activity tasks + /// [crate::Core::shutdown] was called, we will no longer fetch new activity tasks ShuttingDown, /// Unhandled error when calling the temporal server: {0:?} TonicError(#[from] tonic::Status), @@ -72,7 +73,7 @@ impl From for PollActivityError { } } -/// Errors thrown by [Core::complete_workflow_task] +/// Errors thrown by [crate::Core::complete_workflow_task] #[derive(thiserror::Error, Debug, displaydoc::Display)] #[allow(clippy::large_enum_variant)] pub enum CompleteWfError { @@ -91,8 +92,8 @@ pub enum CompleteWfError { run_id: String, }, /** There exists a pending command in this workflow's history which has not yet been handled. - When thrown from [Core::complete_task], it means you should poll for a new task, receive a - new task token, and complete that new task. */ + * When thrown from [crate::Core::complete_workflow_task], it means you should poll for a new + * task, receive a new task token, and complete that new task. */ UnhandledCommandWhenCompleting, /// Unhandled error when calling the temporal server: {0:?} TonicError(#[from] tonic::Status), @@ -107,7 +108,7 @@ impl From for CompleteWfError { } } -/// Errors thrown by [Core::complete_activity_task] +/// Errors thrown by [crate::Core::complete_activity_task] #[derive(thiserror::Error, Debug, displaydoc::Display)] pub enum CompleteActivityError { /// Lang SDK sent us a malformed activity completion ({reason}): {completion:?} diff --git a/src/lib.rs b/src/lib.rs index 665d1a3e5..e253b4260 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,14 +69,14 @@ use tracing::Span; pub trait Core: Send + Sync { /// Ask the core for some work, returning a [WfActivation]. It is then the language SDK's /// responsibility to call the appropriate workflow code with the provided inputs. Blocks - /// indefinitely until such work is available or [shutdown] is called. + /// indefinitely until such work is available or [Core::shutdown] is called. /// /// TODO: Examples async fn poll_workflow_task(&self, task_queue: &str) -> Result; /// Ask the core for some work, returning an [ActivityTask]. It is then the language SDK's /// responsibility to call the appropriate activity code with the provided inputs. Blocks - /// indefinitely until such work is available or [shutdown] is called. + /// indefinitely until such work is available or [Core::shutdown] is called. /// /// TODO: Examples async fn poll_activity_task(&self, task_queue: &str) @@ -101,11 +101,12 @@ pub trait Core: Send + Sync { /// Returns core's instance of the [ServerGatewayApis] implementor it is using. fn server_gateway(&self) -> Arc; - /// Eventually ceases all polling of the server. [Core::poll_task] should be called until it - /// returns [CoreError::ShuttingDown] to ensure that any workflows which are still undergoing - /// replay have an opportunity to finish. This means that the lang sdk will need to call - /// [Core::complete_task] for those workflows until they are done. At that point, the lang - /// SDK can end the process, or drop the [Core] instance, which will close the connection. + /// Eventually ceases all polling of the server. [Core::poll_workflow_task] should be called + /// until it returns [PollWfError::ShuttingDown] to ensure that any workflows which are still + /// undergoing replay have an opportunity to finish. This means that the lang sdk will need to + /// call [Core::complete_workflow_task] for those workflows until they are done. At that point, + /// the lang SDK can end the process, or drop the [Core] instance, which will close the + /// connection. fn shutdown(&self); } diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 09cb0e116..6ba1e32b2 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -115,8 +115,8 @@ pub trait ServerGatewayApis { -> Result; /// Complete a workflow activation. `task_token` is the task token that would've been received - /// from [crate::Core::poll_task] API. `commands` is a list of new commands to send to the - /// server, such as starting a timer. + /// from [crate::Core::poll_workflow_task] API. `commands` is a list of new commands to send to + /// the server, such as starting a timer. async fn complete_workflow_task( &self, task_token: Vec, @@ -151,7 +151,7 @@ pub trait ServerGatewayApis { ) -> Result; /// Fail task by sending the failure to the server. `task_token` is the task token that would've - /// been received from [crate::Core::poll_task]. + /// been received from [crate::Core::poll_workflow_task]. async fn fail_workflow_task( &self, task_token: Vec, From 35b1409abcd61d65115318c543cbb7f75fe211d5 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 16:39:01 -0700 Subject: [PATCH 06/15] Set rust doc flags properly --- .buildkite/pipeline.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 129357459..7a20b9a56 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -13,14 +13,14 @@ steps: agents: queue: "default" docker: "*" - env: - RUSTDOCFLAGS: -Dwarnings command: "cargo doc --workspace --all-features --no-deps" timeout_in_minutes: 15 plugins: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml + env: + - RUSTDOCFLAGS=-Dwarnings - label: "lint" agents: queue: "default" From 65ca396f4d225724fcf60ecfad19674c113d7fdb Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 17:44:06 -0700 Subject: [PATCH 07/15] Break down functions in lib a bit for more readability --- src/lib.rs | 192 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 120 insertions(+), 72 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e253b4260..c0b6d74ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatew pub use url::Url; use crate::errors::WorkflowUpdateError; +use crate::protos::coresdk::workflow_completion; use crate::{ errors::ShutdownErr, machines::{EmptyWorkflowCommandErr, ProtoCommand, WFCommand}, @@ -175,25 +176,19 @@ impl Core for CoreSDK where WP: ServerGatewayApis + Send + Sync + 'static, { - #[instrument(skip(self), fields(pending_activation))] + #[instrument(skip(self))] async fn poll_workflow_task(&self, task_queue: &str) -> Result { // The poll needs to be in a loop because we can't guarantee tail call optimization in Rust // (simply) and we really, really need that for long-poll retries. loop { // We must first check if there are pending workflow tasks for workflows that are // currently replaying, and issue those tasks before bothering the server. - if let Some(pa) = self.pending_activations.pop() { - Span::current().record("pending_activation", &format!("{}", &pa).as_str()); - - if let Some(next_activation) = - self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())? - { - let task_token = pa.task_token.clone(); - if next_activation.more_activations_needed { - self.pending_activations.push(pa); - } - return Ok(next_activation.finalize(task_token)); - } + if let Some(pa) = self + .pending_activations + .pop() + .and_then(|p| self.prepare_pending_activation(p).transpose()) + { + return Ok(pa?); } if self.shutdown_requested.load(Ordering::SeqCst) { @@ -202,26 +197,8 @@ where match abort_on_shutdown!(self, poll_workflow_task, task_queue.to_owned()) { Ok(work) => { - if work == PollWorkflowTaskQueueResponse::default() { - // We get the default proto in the event that the long poll times out. - continue; - } - let task_token = work.task_token.clone(); - debug!( - task_token = %fmt_task_token(&task_token), - "Received workflow task from server" - ); - - let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; - - if let Some(na) = next_activation { - if na.more_activations_needed { - self.pending_activations.push(PendingActivation { - run_id, - task_token: task_token.clone(), - }); - } - return Ok(na.finalize(task_token)); + if let Some(activation) = self.prepare_new_activation(work)? { + return Ok(activation); } } // Drain pending activations in case of shutdown. @@ -269,47 +246,11 @@ where })?; match wfstatus { Some(wf_activation_completion::Status::Successful(success)) => { - // Convert to wf commands - let cmds = success - .commands - .into_iter() - .map(|c| c.try_into()) - .collect::, EmptyWorkflowCommandErr>>() - .map_err(|_| CompleteWfError::MalformedWorkflowCompletion { - reason: "At least one workflow command in the completion \ - contained an empty variant" - .to_owned(), - completion: None, - })?; - let commands = self.push_lang_commands(&run_id, cmds)?; - // We only actually want to send commands back to the server if there are - // no more pending activations -- in other words the lang SDK has caught - // up on replay. - if !self.pending_activations.has_pending(&run_id) { - self.server_gateway - .complete_workflow_task(task_token, commands) - .await - .map_err(|ts| { - if ts.code() == tonic::Code::InvalidArgument - && ts.message() == "UnhandledCommand" - { - CompleteWfError::UnhandledCommandWhenCompleting - } else { - ts.into() - } - })?; - } + self.wf_completion_success(task_token, &run_id, success) + .await?; } Some(wf_activation_completion::Status::Failed(failure)) => { - // Blow up any cached data associated with the workflow - self.evict_run(&run_id); - - self.server_gateway - .fail_workflow_task( - task_token, - WorkflowTaskFailedCause::Unspecified, - failure.failure.map(Into::into), - ) + self.wf_completion_failed(task_token, &run_id, failure) .await?; } None => { @@ -383,6 +324,113 @@ impl CoreSDK { } } + /// Given a pending activation, prepare it to be sent to lang + #[instrument(skip(self))] + fn prepare_pending_activation( + &self, + pa: PendingActivation, + ) -> Result, PollWfError> { + if let Some(next_activation) = + self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())? + { + let task_token = pa.task_token.clone(); + if next_activation.more_activations_needed { + self.pending_activations.push(pa); + } + return Ok(Some(next_activation.finalize(task_token))); + } + Ok(None) + } + + /// Given a wf task from the server, prepare an activation (if there is one) to be sent to lang + fn prepare_new_activation( + &self, + work: PollWorkflowTaskQueueResponse, + ) -> Result, PollWfError> { + if work == PollWorkflowTaskQueueResponse::default() { + // We get the default proto in the event that the long poll times out. + return Ok(None); + } + let task_token = work.task_token.clone(); + debug!( + task_token = %fmt_task_token(&task_token), + "Received workflow task from server" + ); + + let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; + + if let Some(na) = next_activation { + if na.more_activations_needed { + self.pending_activations.push(PendingActivation { + run_id, + task_token: task_token.clone(), + }); + } + return Ok(Some(na.finalize(task_token))); + } + Ok(None) + } + + /// Handle a successful workflow completion + async fn wf_completion_success( + &self, + task_token: Vec, + run_id: &str, + success: workflow_completion::Success, + ) -> Result<(), CompleteWfError> { + // Convert to wf commands + let cmds = success + .commands + .into_iter() + .map(|c| c.try_into()) + .collect::, EmptyWorkflowCommandErr>>() + .map_err(|_| CompleteWfError::MalformedWorkflowCompletion { + reason: "At least one workflow command in the completion \ + contained an empty variant" + .to_owned(), + completion: None, + })?; + let commands = self.push_lang_commands(&run_id, cmds)?; + // We only actually want to send commands back to the server if there are + // no more pending activations -- in other words the lang SDK has caught + // up on replay. + if !self.pending_activations.has_pending(&run_id) { + self.server_gateway + .complete_workflow_task(task_token, commands) + .await + .map_err(|ts| { + if ts.code() == tonic::Code::InvalidArgument + && ts.message() == "UnhandledCommand" + { + CompleteWfError::UnhandledCommandWhenCompleting + } else { + ts.into() + } + })?; + } + Ok(()) + } + + /// Handle a failed workflow completion + async fn wf_completion_failed( + &self, + task_token: Vec, + run_id: &str, + failure: workflow_completion::Failure, + ) -> Result<(), CompleteWfError> { + // Blow up any cached data associated with the workflow + self.evict_run(&run_id); + + self.server_gateway + .fail_workflow_task( + task_token, + WorkflowTaskFailedCause::Unspecified, + failure.failure.map(Into::into), + ) + .await?; + Ok(()) + } + /// Will create a new workflow manager if needed for the workflow task, if not, it will /// feed the existing manager the updated history we received from the server. /// From f37e6a4ebd4e278e6887d0f594dab16c12e7d082 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 26 Mar 2021 17:44:44 -0700 Subject: [PATCH 08/15] Lint fix --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index c0b6d74ad..a400db9f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -188,7 +188,7 @@ where .pop() .and_then(|p| self.prepare_pending_activation(p).transpose()) { - return Ok(pa?); + return pa; } if self.shutdown_requested.load(Ordering::SeqCst) { From b76986f6921763131ff84f509a67cff168dbc3c7 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 29 Mar 2021 14:05:13 -0700 Subject: [PATCH 09/15] A little simplification prep --- src/lib.rs | 100 +++++++++++----------- src/machines/test_help/history_builder.rs | 14 +++ src/machines/test_help/mod.rs | 2 +- src/pending_activations.rs | 1 + src/workflow/mod.rs | 4 + 5 files changed, 72 insertions(+), 49 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a400db9f3..4591e7354 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,10 +30,8 @@ pub use core_tracing::tracing_init; pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; -use crate::errors::WorkflowUpdateError; -use crate::protos::coresdk::workflow_completion; use crate::{ - errors::ShutdownErr, + errors::{ShutdownErr, WorkflowUpdateError}, machines::{EmptyWorkflowCommandErr, ProtoCommand, WFCommand}, pending_activations::{PendingActivation, PendingActivations}, protos::{ @@ -41,6 +39,7 @@ use crate::{ activity_result::{self as ar, activity_result, ActivityResult}, activity_task::ActivityTask, workflow_activation::WfActivation, + workflow_completion, workflow_completion::{wf_activation_completion, WfActivationCompletion}, ActivityHeartbeat, }, @@ -324,6 +323,14 @@ impl CoreSDK { } } + /// Evict a workflow from the cache by it's run id + /// + /// TODO: Very likely needs to be in Core public api + pub(crate) fn evict_run(&self, run_id: &str) { + self.workflow_machines.evict(run_id); + // Blow up any pending activations for the run + } + /// Given a pending activation, prepare it to be sent to lang #[instrument(skip(self))] fn prepare_pending_activation( @@ -333,15 +340,27 @@ impl CoreSDK { if let Some(next_activation) = self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())? { - let task_token = pa.task_token.clone(); - if next_activation.more_activations_needed { - self.pending_activations.push(pa); - } - return Ok(Some(next_activation.finalize(task_token))); + return Ok(Some( + self.finalize_next_activation(next_activation, pa.task_token), + )); } Ok(None) } + fn finalize_next_activation( + &self, + next_a: NextWfActivation, + task_token: Vec, + ) -> WfActivation { + if next_a.more_activations_needed { + self.pending_activations.push(PendingActivation { + run_id: next_a.get_run_id().to_owned(), + task_token: task_token.clone(), + }) + } + next_a.finalize(task_token) + } + /// Given a wf task from the server, prepare an activation (if there is one) to be sent to lang fn prepare_new_activation( &self, @@ -357,16 +376,10 @@ impl CoreSDK { "Received workflow task from server" ); - let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; + let next_activation = self.instantiate_or_update_workflow(work)?; if let Some(na) = next_activation { - if na.more_activations_needed { - self.pending_activations.push(PendingActivation { - run_id, - task_token: task_token.clone(), - }); - } - return Ok(Some(na.finalize(task_token))); + return Ok(Some(self.finalize_next_activation(na, task_token))); } Ok(None) } @@ -441,7 +454,7 @@ impl CoreSDK { fn instantiate_or_update_workflow( &self, poll_wf_resp: PollWorkflowTaskQueueResponse, - ) -> Result<(Option, String), PollWfError> { + ) -> Result, PollWfError> { match poll_wf_resp { PollWorkflowTaskQueueResponse { task_token, @@ -457,7 +470,7 @@ impl CoreSDK { .workflow_machines .create_or_update(&run_id, history, workflow_execution) { - Ok(activation) => Ok((activation, run_id)), + Ok(activation) => Ok(activation), Err(source) => Err(PollWfError::WorkflowUpdateError { source, run_id }), } } @@ -475,11 +488,6 @@ impl CoreSDK { self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } - /// Remove a workflow run from the cache entirely - fn evict_run(&self, run_id: &str) { - self.workflow_machines.evict(run_id); - } - /// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing /// span to the wf machines thread. fn access_wf_machine( @@ -530,14 +538,13 @@ mod test { use rstest::{fixture, rstest}; const TASK_Q: &str = "test-task-queue"; - const RUN_ID: &str = "fake_run_id"; #[fixture(hist_batches = &[])] fn single_timer_setup(hist_batches: &[usize]) -> FakeCore { let wfid = "fake_wf_id"; let mut t = canned_histories::single_timer("fake_timer"); - build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches) } #[fixture(hist_batches = &[])] @@ -545,7 +552,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_activity("fake_activity"); - build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches) } #[fixture(hist_batches = &[])] @@ -553,7 +560,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_failed_activity("fake_activity"); - build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches) } #[rstest(core, @@ -562,6 +569,7 @@ mod test { )] #[tokio::test] async fn single_timer_test_across_wf_bridge(core: FakeCore) { + tracing_init(); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); assert_matches!( res.jobs.as_slice(), @@ -569,7 +577,7 @@ mod test { variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); - assert!(core.workflow_machines.exists(RUN_ID)); + assert!(core.workflow_machines.exists(&res.run_id)); let task_tok = res.task_token; core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( @@ -583,6 +591,9 @@ mod test { .await .unwrap(); + // warn!(run_id = %&res.run_id, "Evicting"); + // core.workflow_machines.evict(&res.run_id); + let res = core.poll_workflow_task(TASK_Q).await.unwrap(); assert_matches!( res.jobs.as_slice(), @@ -614,7 +625,6 @@ mod test { variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); - assert!(core.workflow_machines.exists(RUN_ID)); let task_tok = res.task_token; core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( @@ -648,12 +658,11 @@ mod test { #[tokio::test] async fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let timer_1_id = "timer1"; let timer_2_id = "timer2"; let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); assert_matches!( @@ -662,7 +671,7 @@ mod test { variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); - assert!(core.workflow_machines.exists(run_id)); + assert!(core.workflow_machines.exists(t.get_orig_run_id())); let task_tok = res.task_token; core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( @@ -715,12 +724,11 @@ mod test { #[tokio::test] async fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let timer_id = "wait_timer"; let cancel_timer_id = "cancel_timer"; let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); assert_matches!( @@ -729,7 +737,7 @@ mod test { variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); - assert!(core.workflow_machines.exists(run_id)); + assert!(core.workflow_machines.exists(t.get_orig_run_id())); let task_tok = res.task_token; core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( @@ -791,13 +799,11 @@ mod test { #[tokio::test] async fn workflow_update_random_seed_on_workflow_reset() { let wfid = "fake_wf_id"; - let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; - let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; + let new_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; let timer_1_id = "timer1"; - let mut t = - canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, original_run_id); - let core = build_fake_core(wfid, run_id, &mut t, &[2]); + let mut t = canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, new_run_id); + let core = build_fake_core(wfid, &mut t, &[2]); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); let randomness_seed_from_start: u64; @@ -811,7 +817,7 @@ mod test { randomness_seed_from_start = *randomness_seed; } ); - assert!(core.workflow_machines.exists(run_id)); + assert!(core.workflow_machines.exists(t.get_orig_run_id())); let task_tok = res.task_token; core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( @@ -852,7 +858,6 @@ mod test { #[tokio::test] async fn cancel_timer_before_sent_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let cancel_timer_id = "cancel_timer"; let mut t = TestHistoryBuilder::default(); @@ -860,7 +865,7 @@ mod test { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); assert_matches!( @@ -890,13 +895,14 @@ mod test { .unwrap(); } + // TODO: This should use batches as well, probably #[tokio::test] async fn complete_activation_with_failure() { let wfid = "fake_wf_id"; let timer_id = "timer"; let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id); - let mut core = build_fake_core(wfid, RUN_ID, &mut t, &[2, 3]); + let mut core = build_fake_core(wfid, &mut t, &[2, 3]); // Need to create an expectation that we will call a failure completion Arc::get_mut(&mut core.server_gateway) .unwrap() @@ -965,11 +971,10 @@ mod test { #[tokio::test] async fn simple_timer_fail_wf_execution(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let timer_id = "timer1"; let mut t = canned_histories::single_timer(timer_id); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( @@ -1002,10 +1007,9 @@ mod test { #[tokio::test] async fn two_signals(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let mut t = canned_histories::two_signals("sig1", "sig2"); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); // Task is completed with no commands diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 2a1c8820e..9c0874259 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -28,6 +28,7 @@ pub struct TestHistoryBuilder { workflow_task_scheduled_event_id: i64, previous_started_event_id: i64, previous_task_completed_id: i64, + original_run_id: String, } impl TestHistoryBuilder { @@ -159,6 +160,10 @@ impl TestHistoryBuilder { Ok(wf_machines.get_commands()) } + pub fn get_orig_run_id(&self) -> &str { + &self.original_run_id + } + fn handle_workflow_task( &self, wf_machines: &mut WorkflowMachines, @@ -187,6 +192,15 @@ impl TestHistoryBuilder { attributes: Some(attribs), ..Default::default() }; + if let Some(Attributes::WorkflowExecutionStartedEventAttributes( + WorkflowExecutionStartedEventAttributes { + original_execution_run_id, + .. + }, + )) = &evt.attributes + { + self.original_run_id = original_execution_run_id.to_owned(); + }; self.events.push(evt); } } diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 42965053f..04c9772d5 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -28,10 +28,10 @@ pub(crate) type FakeCore = CoreSDK; /// up to the workflow task with that number, as in [TestHistoryBuilder::get_history_info]. pub(crate) fn build_fake_core( wf_id: &str, - run_id: &str, t: &mut TestHistoryBuilder, response_batches: &[usize], ) -> FakeCore { + let run_id = t.get_orig_run_id(); let wf = Some(WorkflowExecution { workflow_id: wf_id.to_string(), run_id: run_id.to_string(), diff --git a/src/pending_activations.rs b/src/pending_activations.rs index da4dfa3a0..a63c57c0e 100644 --- a/src/pending_activations.rs +++ b/src/pending_activations.rs @@ -8,6 +8,7 @@ use std::fmt::{Display, Formatter}; #[derive(Default)] pub struct PendingActivations { queue: SegQueue, + // Keys are run ids count_by_id: DashMap, } diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 344ba8d9e..13e0ffd68 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -90,6 +90,10 @@ impl NextWfActivation { a.task_token = task_token; a } + + pub(crate) fn get_run_id(&self) -> &str { + &self.activation.run_id + } } impl WorkflowManager { From 6354bd52e87c0a2b22be73a0565d7e80cb5beb89 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 29 Mar 2021 14:47:03 -0700 Subject: [PATCH 10/15] PendingActivation struct upgrades to prep for needed changes --- src/lib.rs | 2 + src/pending_activations.rs | 121 ++++++++++++++++++++++++++++++++----- 2 files changed, 107 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4591e7354..2485df0b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -347,6 +347,8 @@ impl CoreSDK { Ok(None) } + /// Prepare an activation we've just pulled out of a workflow machines instance to be shipped + /// to the lang sdk fn finalize_next_activation( &self, next_a: NextWfActivation, diff --git a/src/pending_activations.rs b/src/pending_activations.rs index a63c57c0e..de2d4d773 100644 --- a/src/pending_activations.rs +++ b/src/pending_activations.rs @@ -1,40 +1,62 @@ use crate::protosext::fmt_task_token; -use crossbeam::queue::SegQueue; -use dashmap::DashMap; -use std::fmt::{Display, Formatter}; +use parking_lot::RwLock; +use std::{ + collections::{HashMap, VecDeque}, + fmt::{Display, Formatter}, +}; -/// Tracks pending activations using an internal queue, while also allowing fast lookup of any -/// pending activations by run ID +/// Tracks pending activations using an internal queue, while also allowing lookup and removal of +/// any pending activations by run ID. #[derive(Default)] pub struct PendingActivations { - queue: SegQueue, + inner: RwLock, +} + +#[derive(Default)] +struct PaInner { + queue: VecDeque, // Keys are run ids - count_by_id: DashMap, + count_by_id: HashMap, } impl PendingActivations { pub fn push(&self, v: PendingActivation) { - *self + let mut inner = self.inner.write(); + *inner .count_by_id .entry(v.run_id.clone()) - .or_insert_with(|| 0) - .value_mut() += 1; - self.queue.push(v); + .or_insert_with(|| 0) += 1; + inner.queue.push_back(v); } pub fn pop(&self) -> Option { - let rme = self.queue.pop(); + let mut inner = self.inner.write(); + let rme = inner.queue.pop_front(); if let Some(pa) = &rme { - if let Some(mut c) = self.count_by_id.get_mut(&pa.run_id) { - *c.value_mut() -= 1 + let do_remove = if let Some(c) = inner.count_by_id.get_mut(&pa.run_id) { + *c -= 1; + *c == 0 + } else { + false + }; + if do_remove { + inner.count_by_id.remove(&pa.run_id); } - self.count_by_id.remove_if(&pa.run_id, |_, v| v <= &0); } rme } pub fn has_pending(&self, run_id: &str) -> bool { - self.count_by_id.contains_key(run_id) + self.inner.read().count_by_id.contains_key(run_id) + } + + pub fn remove_all_with_run_id(&self, run_id: &str) { + let mut inner = self.inner.write(); + + // The perf here can obviously be improved if it ever becomes an issue, but is left for + // later since it would require some careful fiddling + inner.queue.retain(|pa| pa.run_id != run_id); + inner.count_by_id.remove(run_id); } } @@ -54,3 +76,70 @@ impl Display for PendingActivation { ) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn counts_run_ids() { + let pas = PendingActivations::default(); + let rid1 = "1".to_string(); + let rid2 = "2".to_string(); + pas.push(PendingActivation { + run_id: rid1.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: rid2.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: rid2.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: rid2.clone(), + task_token: vec![], + }); + assert!(pas.has_pending(&rid1)); + assert!(pas.has_pending(&rid2)); + let last = pas.pop().unwrap(); + assert_eq!(&last.run_id, &rid1); + assert!(!pas.has_pending(&rid1)); + assert!(pas.has_pending(&rid2)); + for _ in 0..3 { + let last = pas.pop().unwrap(); + assert_eq!(&last.run_id, &rid2); + } + assert!(!pas.has_pending(&rid2)); + assert!(pas.pop().is_none()); + } + + #[test] + fn can_remove_all_with_id() { + let pas = PendingActivations::default(); + let remove_me = "2".to_string(); + pas.push(PendingActivation { + run_id: "1".to_owned(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: remove_me.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: remove_me.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: "3".to_owned(), + task_token: vec![], + }); + pas.remove_all_with_run_id(&remove_me); + assert!(!pas.has_pending(&remove_me)); + assert_eq!(&pas.pop().unwrap().run_id, "1"); + assert_eq!(&pas.pop().unwrap().run_id, "3"); + assert!(pas.pop().is_none()); + } +} From 529529c3bbc820c6f3354bd9a097f423641e08b8 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 30 Mar 2021 09:14:11 -0700 Subject: [PATCH 11/15] First pass at cleaner testing API --- build.rs | 4 ++ src/lib.rs | 71 +++++++++++++++++++++++++++++++++-- src/machines/mod.rs | 1 + src/machines/test_help/mod.rs | 61 +++++++++++++++++++++++++++--- src/protos/mod.rs | 24 ++++++++++-- 5 files changed, 148 insertions(+), 13 deletions(-) diff --git a/build.rs b/build.rs index 040b1f1e0..1e0a5d839 100644 --- a/build.rs +++ b/build.rs @@ -26,6 +26,10 @@ fn main() -> Result<(), Box> { "coresdk.workflow_activation.WFActivationJob.variant", "#[derive(::derive_more::From)]", ) + .type_attribute( + "coresdk.workflow_completion.WFActivationCompletion.status", + "#[derive(::derive_more::From)]", + ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") .compile( &[ diff --git a/src/lib.rs b/src/lib.rs index 2485df0b5..f824e9597 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -328,7 +328,7 @@ impl CoreSDK { /// TODO: Very likely needs to be in Core public api pub(crate) fn evict_run(&self, run_id: &str) { self.workflow_machines.evict(run_id); - // Blow up any pending activations for the run + self.pending_activations.remove_all_with_run_id(run_id); } /// Given a pending activation, prepare it to be sent to lang @@ -518,6 +518,7 @@ impl CoreSDK { #[cfg(test)] mod test { use super::*; + use crate::machines::test_help::{gen_assert_and_reply, poll_and_reply}; use crate::{ machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, protos::{ @@ -572,6 +573,49 @@ mod test { #[tokio::test] async fn single_timer_test_across_wf_bridge(core: FakeCore) { tracing_init(); + + poll_and_reply( + &core, + TASK_Q, + vec![ + gen_assert_and_reply( + |res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }] + ); + }, + vec![StartTimer { + timer_id: "fake_timer".to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + |res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer(_)), + }] + ); + }, + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; + } + + #[rstest(core, + case::incremental(single_timer_setup(&[1, 2])), + case::replay(single_timer_setup(&[2, 2])) + )] + #[tokio::test] + async fn single_timer_eviction_test_across_wf_bridge(core: FakeCore) { + tracing_init(); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); assert_matches!( res.jobs.as_slice(), @@ -579,7 +623,6 @@ mod test { variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); - assert!(core.workflow_machines.exists(&res.run_id)); let task_tok = res.task_token; core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( @@ -593,8 +636,28 @@ mod test { .await .unwrap(); - // warn!(run_id = %&res.run_id, "Evicting"); - // core.workflow_machines.evict(&res.run_id); + warn!(run_id = %&res.run_id, "Evicting"); + core.evict_run(&res.run_id); + + let res = core.poll_workflow_task(TASK_Q).await.unwrap(); + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }] + ); + + let task_tok = res.task_token; + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![StartTimer { + timer_id: "fake_timer".to_string(), + ..Default::default() + } + .into()], + task_tok, + )) + .await + .unwrap(); let res = core.poll_workflow_task(TASK_Q).await.unwrap(); assert_matches!( diff --git a/src/machines/mod.rs b/src/machines/mod.rs index ade531996..bdcdf173d 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -29,6 +29,7 @@ mod version_state_machine; mod workflow_task_state_machine; #[cfg(test)] +#[macro_use] pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 04c9772d5..befccc05f 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -8,12 +8,19 @@ pub(crate) use history_builder::TestHistoryBuilder; use crate::{ pollers::MockServerGatewayApis, - protos::temporal::api::common::v1::WorkflowExecution, - protos::temporal::api::history::v1::History, - protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + protos::{ + coresdk::{ + workflow_activation::WfActivation, + workflow_commands::workflow_command, + workflow_completion::{self, wf_activation_completion, WfActivationCompletion}, + }, + temporal::api::common::v1::WorkflowExecution, + temporal::api::history::v1::History, + temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + }, }, - CoreSDK, + Core, CoreSDK, }; use rand::{thread_rng, Rng}; use std::collections::VecDeque; @@ -64,3 +71,47 @@ pub(crate) fn build_fake_core( CoreSDK::new(mock_gateway) } + +// TODO: In reality this whole thing might be better done by just using the async wf driver -- +// then we don't need to bother with the commands being issued and assertions, but keeping this +// probably has some value still to test without it getting in the way, too. + +type AsserterFn = Box ()>; +type AsserterWithReply = (AsserterFn, wf_activation_completion::Status); + +pub(crate) enum CoreInteraction { + EvictRunId(String), + AssertAndReply(AsserterWithReply), +} + +pub(crate) async fn poll_and_reply<'a>( + core: &'a FakeCore, + task_queue: &'a str, + expect_and_reply: Vec, +) { + for interaction in expect_and_reply { + match interaction { + CoreInteraction::AssertAndReply((asserter, reply)) => { + let res = core.poll_workflow_task(task_queue).await.unwrap(); + asserter(&res); + let task_tok = res.task_token; + core.complete_workflow_task(WfActivationCompletion::from_status(task_tok, reply)) + .await + .unwrap(); + } + CoreInteraction::EvictRunId(run_id) => { + // TODO: Start from beginning, but remove the eviction somehow + } + } + } +} + +pub(crate) fn gen_assert_and_reply( + asserter: impl Fn(&WfActivation) -> () + 'static, + reply_commands: Vec, +) -> CoreInteraction { + CoreInteraction::AssertAndReply(( + Box::new(asserter), + workflow_completion::Success::from_cmds(reply_commands).into(), + )) +} diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 3d92a1406..90a96d777 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -30,12 +30,14 @@ pub mod coresdk { include!("coresdk.workflow_commands.rs"); } - use crate::protos::coresdk::activity_result::ActivityResult; use crate::protos::{ coresdk::{ + activity_result::ActivityResult, activity_task::ActivityTask, common::{Payload, UserCodeFailure}, workflow_activation::SignalWorkflow, + workflow_commands::workflow_command::Variant, + workflow_completion::Success, }, temporal::api::{ common::v1::{Payloads, WorkflowExecution}, @@ -64,18 +66,25 @@ pub mod coresdk { } } - impl WfActivationCompletion { - pub fn ok_from_cmds(cmds: Vec, task_token: Vec) -> Self { + impl Success { + pub fn from_cmds(cmds: Vec) -> Self { let cmds: Vec<_> = cmds .into_iter() .map(|c| WorkflowCommand { variant: Some(c) }) .collect(); - let success: workflow_completion::Success = cmds.into(); + cmds.into() + } + } + + impl WfActivationCompletion { + pub fn ok_from_cmds(cmds: Vec, task_token: Vec) -> Self { + let success = Success::from_cmds(cmds); Self { task_token, status: Some(wf_activation_completion::Status::Successful(success)), } } + pub fn fail(task_token: Vec, failure: UserCodeFailure) -> Self { Self { task_token, @@ -86,6 +95,13 @@ pub mod coresdk { )), } } + + pub fn from_status(task_token: Vec, status: wf_activation_completion::Status) -> Self { + Self { + task_token, + status: Some(status), + } + } } impl ActivityResult { From 1229fd9515b97e885499ffb64b5bc75819de0136 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 30 Mar 2021 18:04:33 -0700 Subject: [PATCH 12/15] Eviction testing working pretty well --- src/lib.rs | 192 +++++++++++----------------------- src/machines/test_help/mod.rs | 62 +++++++++-- 2 files changed, 112 insertions(+), 142 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0c764fa15..257178fd5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -516,9 +516,11 @@ impl CoreSDK { #[cfg(test)] mod test { use super::*; - use crate::machines::test_help::{gen_assert_and_reply, poll_and_reply}; + use crate::machines::test_help::gen_assert_and_fail; use crate::{ - machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, + machines::test_help::{ + build_fake_core, gen_assert_and_reply, poll_and_reply, FakeCore, TestHistoryBuilder, + }, protos::{ coresdk::{ common::UserCodeFailure, @@ -564,17 +566,19 @@ mod test { build_fake_core(wfid, &mut t, hist_batches) } - #[rstest(core, - case::incremental(single_timer_setup(&[1, 2])), - case::replay(single_timer_setup(&[2])) - )] + #[rstest] + #[case::incremental(single_timer_setup(&[1, 2]), false)] + #[case::replay(single_timer_setup(&[2]), false)] + #[case::incremental_evict(single_timer_setup(&[1, 2]), true)] + #[case::replay_evict(single_timer_setup(&[2, 2]), true)] #[tokio::test] - async fn single_timer_test_across_wf_bridge(core: FakeCore) { + async fn single_timer_test_across_wf_bridge(#[case] core: FakeCore, #[case] evict: bool) { tracing_init(); poll_and_reply( &core, TASK_Q, + evict, vec![ gen_assert_and_reply( |res| { @@ -607,72 +611,6 @@ mod test { .await; } - #[rstest(core, - case::incremental(single_timer_setup(&[1, 2])), - case::replay(single_timer_setup(&[2, 2])) - )] - #[tokio::test] - async fn single_timer_eviction_test_across_wf_bridge(core: FakeCore) { - tracing_init(); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: "fake_timer".to_string(), - ..Default::default() - } - .into()], - task_tok, - )) - .await - .unwrap(); - - warn!(run_id = %&res.run_id, "Evicting"); - core.evict_run(&res.run_id); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: "fake_timer".to_string(), - ..Default::default() - } - .into()], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); - } - #[rstest(core, case::incremental(single_activity_setup(&[1, 2])), case::incremental_activity_failure(single_activity_failure_setup(&[1, 2])), @@ -958,14 +896,17 @@ mod test { .unwrap(); } - // TODO: This should use batches as well, probably + #[rstest] + #[case::no_evict_inc(&[1, 2, 3], false)] + #[case::no_evict(&[2, 3], false)] + #[case::evict(&[2, 3, 3, 3, 3], true)] #[tokio::test] - async fn complete_activation_with_failure() { + async fn complete_activation_with_failure(#[case] batches: &[usize], #[case] evict: bool) { let wfid = "fake_wf_id"; let timer_id = "timer"; let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id); - let mut core = build_fake_core(wfid, &mut t, &[2, 3]); + let mut core = build_fake_core(wfid, &mut t, batches); // Need to create an expectation that we will call a failure completion Arc::get_mut(&mut core.server_gateway) .unwrap() @@ -973,61 +914,50 @@ mod test { .times(1) .returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {})); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into()], - res.task_token, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::fail( - res.task_token, - UserCodeFailure { - message: "oh noooooooo".to_string(), - ..Default::default() - }, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - // Need to re-issue the start timer command (we are replaying) - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into()], - res.task_token, - )) - .await - .unwrap(); - // Now we may complete the workflow - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - res.task_token, - )) - .await - .unwrap(); + poll_and_reply( + &core, + TASK_Q, + evict, + vec![ + gen_assert_and_reply( + |_| {}, + vec![StartTimer { + timer_id: timer_id.to_owned(), + ..Default::default() + } + .into()], + ), + gen_assert_and_fail(|_| {}), + gen_assert_and_reply( + |res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }] + ); + }, + // Need to re-issue the start timer command (we are replaying) + vec![StartTimer { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + |res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer(_)), + }] + ); + }, + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index befccc05f..562f83966 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -6,6 +6,7 @@ mod history_builder; pub(super) use async_workflow_driver::{CommandSender, TestWorkflowDriver}; pub(crate) use history_builder::TestHistoryBuilder; +use crate::protos::coresdk::common::UserCodeFailure; use crate::{ pollers::MockServerGatewayApis, protos::{ @@ -80,29 +81,51 @@ type AsserterFn = Box ()>; type AsserterWithReply = (AsserterFn, wf_activation_completion::Status); pub(crate) enum CoreInteraction { - EvictRunId(String), AssertAndReply(AsserterWithReply), } pub(crate) async fn poll_and_reply<'a>( core: &'a FakeCore, task_queue: &'a str, + evict_after_each_reply: bool, expect_and_reply: Vec, ) { - for interaction in expect_and_reply { - match interaction { - CoreInteraction::AssertAndReply((asserter, reply)) => { - let res = core.poll_workflow_task(task_queue).await.unwrap(); - asserter(&res); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::from_status(task_tok, reply)) + let mut run_id = "".to_string(); + let mut evictions = 0; + let expected_evictions = expect_and_reply.len() - 1; + + 'outer: loop { + let expect_iter = expect_and_reply.iter(); + + for interaction in expect_iter { + match interaction { + CoreInteraction::AssertAndReply((asserter, reply)) => { + let res = core.poll_workflow_task(task_queue).await.unwrap(); + + asserter(&res); + + let task_tok = res.task_token; + if run_id.is_empty() { + run_id = res.run_id; + } + + core.complete_workflow_task(WfActivationCompletion::from_status( + task_tok, + reply.clone(), + )) .await .unwrap(); - } - CoreInteraction::EvictRunId(run_id) => { - // TODO: Start from beginning, but remove the eviction somehow + + if evict_after_each_reply && evictions < expected_evictions { + core.evict_run(&run_id); + evictions += 1; + continue 'outer; + } + } } } + + break; } } @@ -115,3 +138,20 @@ pub(crate) fn gen_assert_and_reply( workflow_completion::Success::from_cmds(reply_commands).into(), )) } + +pub(crate) fn gen_assert_and_fail( + asserter: impl Fn(&WfActivation) -> () + 'static, +) -> CoreInteraction { + CoreInteraction::AssertAndReply(( + Box::new(asserter), + workflow_completion::Failure { + failure: Some(UserCodeFailure { + message: "Intentional test failure".to_string(), + ..Default::default() + }), + } + .into(), + )) +} + +// TODO make simple match macro for typical assert_matches case From 1631bc6947a09e85f86ecbec229d09e3fe1b1040 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 31 Mar 2021 09:51:49 -0700 Subject: [PATCH 13/15] Convert all tests to the new more concise syntax Also makes it easier to capture environment from closures by using proper lifetimes. --- src/lib.rs | 509 +++++++++++++++------------------- src/machines/test_help/mod.rs | 50 ++-- 2 files changed, 250 insertions(+), 309 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 257178fd5..11e976bfe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -516,10 +516,10 @@ impl CoreSDK { #[cfg(test)] mod test { use super::*; - use crate::machines::test_help::gen_assert_and_fail; use crate::{ machines::test_help::{ - build_fake_core, gen_assert_and_reply, poll_and_reply, FakeCore, TestHistoryBuilder, + build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply, FakeCore, + TestHistoryBuilder, }, protos::{ coresdk::{ @@ -539,6 +539,7 @@ mod test { test_help::canned_histories, }; use rstest::{fixture, rstest}; + use std::sync::atomic::AtomicU64; const TASK_Q: &str = "test-task-queue"; @@ -573,22 +574,13 @@ mod test { #[case::replay_evict(single_timer_setup(&[2, 2]), true)] #[tokio::test] async fn single_timer_test_across_wf_bridge(#[case] core: FakeCore, #[case] evict: bool) { - tracing_init(); - poll_and_reply( &core, TASK_Q, evict, - vec![ + &[ gen_assert_and_reply( - |res| { - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - }, + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), vec![StartTimer { timer_id: "fake_timer".to_string(), ..Default::default() @@ -596,14 +588,7 @@ mod test { .into()], ), gen_assert_and_reply( - |res| { - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - }, + &job_assert!(wf_activation_job::Variant::FireTimer(_)), vec![CompleteWorkflowExecution { result: None }.into()], ), ], @@ -619,40 +604,26 @@ mod test { )] #[tokio::test] async fn single_activity_completion(core: FakeCore) { - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ScheduleActivity { - activity_id: "fake_activity".to_string(), - ..Default::default() - } - .into()], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::ResolveActivity(_)), - }] - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: "fake_activity".to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] @@ -665,60 +636,52 @@ mod test { let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id); let core = build_fake_core(wfid, &mut t, hist_batches); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - assert!(core.workflow_machines.exists(t.get_orig_run_id())); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - StartTimer { - timer_id: timer_1_id.to_string(), - ..Default::default() - } - .into(), - StartTimer { - timer_id: timer_2_id.to_string(), - ..Default::default() - } - .into(), + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ + StartTimer { + timer_id: timer_1_id.to_string(), + ..Default::default() + } + .into(), + StartTimer { + timer_id: timer_2_id.to_string(), + ..Default::default() + } + .into(), + ], + ), + gen_assert_and_reply( + &|res| { + assert_matches!( + res.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t1_id } + )), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t2_id } + )), + } + ] => { + assert_eq!(t1_id, &timer_1_id); + assert_eq!(t2_id, &timer_2_id); + } + ); + }, + vec![CompleteWorkflowExecution { result: None }.into()], + ), ], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [ - WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer( - FireTimer { timer_id: t1_id } - )), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer( - FireTimer { timer_id: t2_id } - )), - } - ] => { - assert_eq!(t1_id, &timer_1_id); - assert_eq!(t2_id, &timer_2_id); - } - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] @@ -731,54 +694,39 @@ mod test { let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id); let core = build_fake_core(wfid, &mut t, hist_batches); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - assert!(core.workflow_machines.exists(t.get_orig_run_id())); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - StartTimer { - timer_id: cancel_timer_id.to_string(), - ..Default::default() - } - .into(), - StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into(), - ], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - CancelTimer { - timer_id: cancel_timer_id.to_string(), - } - .into(), - CompleteWorkflowExecution { result: None }.into(), + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ + StartTimer { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + StartTimer { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + ], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::FireTimer(_)), + vec![ + CancelTimer { + timer_id: cancel_timer_id.to_string(), + } + .into(), + CompleteWorkflowExecution { result: None }.into(), + ], + ), ], - task_tok, - )) - .await - .unwrap(); + ) + .await; } #[rstest(single_timer_setup(&[1]))] @@ -802,55 +750,56 @@ mod test { let wfid = "fake_wf_id"; let new_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; let timer_1_id = "timer1"; + let randomness_seed_from_start = AtomicU64::new(0); let mut t = canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, new_run_id); let core = build_fake_core(wfid, &mut t, &[2]); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - let randomness_seed_from_start: u64; - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow( - StartWorkflow{randomness_seed, ..} - )), - }] => { - randomness_seed_from_start = *randomness_seed; - } - ); - assert!(core.workflow_machines.exists(t.get_orig_run_id())); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_1_id.to_string(), - ..Default::default() - } - .into()], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_),), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed{randomness_seed})), - }] => { - assert_ne!(randomness_seed_from_start, *randomness_seed) - } - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &|res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow( + StartWorkflow{randomness_seed, ..} + )), + }] => { + randomness_seed_from_start.store(*randomness_seed, Ordering::SeqCst); + } + ); + }, + vec![StartTimer { + timer_id: timer_1_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &|res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer(_),), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::UpdateRandomSeed( + UpdateRandomSeed{randomness_seed})), + }] => { + assert_ne!(randomness_seed_from_start.load(Ordering::SeqCst), + *randomness_seed) + } + ) + }, + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; } // The incremental version only does one batch here, because the workflow completes right away @@ -868,32 +817,27 @@ mod test { let core = build_fake_core(wfid, &mut t, hist_batches); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - StartTimer { - timer_id: cancel_timer_id.to_string(), - ..Default::default() - } - .into(), - CancelTimer { - timer_id: cancel_timer_id.to_string(), - } - .into(), - CompleteWorkflowExecution { result: None }.into(), - ], - task_tok, - )) - .await - .unwrap(); + poll_and_reply( + &core, + TASK_Q, + false, + &[gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ + StartTimer { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + CancelTimer { + timer_id: cancel_timer_id.to_string(), + } + .into(), + CompleteWorkflowExecution { result: None }.into(), + ], + )], + ) + .await; } #[rstest] @@ -918,25 +862,18 @@ mod test { &core, TASK_Q, evict, - vec![ + &[ gen_assert_and_reply( - |_| {}, + &|_| {}, vec![StartTimer { timer_id: timer_id.to_owned(), ..Default::default() } .into()], ), - gen_assert_and_fail(|_| {}), + gen_assert_and_fail(&|_| {}), gen_assert_and_reply( - |res| { - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - }, + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), // Need to re-issue the start timer command (we are replaying) vec![StartTimer { timer_id: timer_id.to_string(), @@ -945,14 +882,7 @@ mod test { .into()], ), gen_assert_and_reply( - |res| { - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - }, + &job_assert!(wf_activation_job::Variant::FireTimer(_)), vec![CompleteWorkflowExecution { result: None }.into()], ), ], @@ -969,31 +899,32 @@ mod test { let mut t = canned_histories::single_timer(timer_id); let core = build_fake_core(wfid, &mut t, hist_batches); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into()], - res.task_token, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![FailWorkflowExecution { - failure: Some(UserCodeFailure { - message: "I'm ded".to_string(), - ..Default::default() - }), - } - .into()], - res.task_token, - )) - .await - .unwrap(); + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![StartTimer { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::FireTimer(_)), + vec![FailWorkflowExecution { + failure: Some(UserCodeFailure { + message: "I'm ded".to_string(), + ..Default::default() + }), + } + .into()], + ), + ], + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] @@ -1004,23 +935,25 @@ mod test { let mut t = canned_histories::two_signals("sig1", "sig2"); let core = build_fake_core(wfid, &mut t, hist_batches); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - // Task is completed with no commands - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds(vec![], res.task_token)) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [ - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - } - ] - ); + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + // Task is completed with no commands + vec![], + ), + gen_assert_and_reply( + &job_assert!( + wf_activation_job::Variant::SignalWorkflow(_), + wf_activation_job::Variant::SignalWorkflow(_) + ), + vec![], + ), + ], + ) + .await; } } diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 562f83966..2f313d938 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -77,18 +77,13 @@ pub(crate) fn build_fake_core( // then we don't need to bother with the commands being issued and assertions, but keeping this // probably has some value still to test without it getting in the way, too. -type AsserterFn = Box ()>; -type AsserterWithReply = (AsserterFn, wf_activation_completion::Status); - -pub(crate) enum CoreInteraction { - AssertAndReply(AsserterWithReply), -} +type AsserterWithReply<'a> = (&'a dyn Fn(&WfActivation), wf_activation_completion::Status); pub(crate) async fn poll_and_reply<'a>( core: &'a FakeCore, task_queue: &'a str, evict_after_each_reply: bool, - expect_and_reply: Vec, + expect_and_reply: &'a [AsserterWithReply<'a>], ) { let mut run_id = "".to_string(); let mut evictions = 0; @@ -99,7 +94,7 @@ pub(crate) async fn poll_and_reply<'a>( for interaction in expect_iter { match interaction { - CoreInteraction::AssertAndReply((asserter, reply)) => { + (asserter, reply) => { let res = core.poll_workflow_task(task_queue).await.unwrap(); asserter(&res); @@ -129,21 +124,21 @@ pub(crate) async fn poll_and_reply<'a>( } } -pub(crate) fn gen_assert_and_reply( - asserter: impl Fn(&WfActivation) -> () + 'static, +pub(crate) fn gen_assert_and_reply<'a>( + asserter: &'a dyn Fn(&WfActivation), reply_commands: Vec, -) -> CoreInteraction { - CoreInteraction::AssertAndReply(( - Box::new(asserter), +) -> AsserterWithReply<'a> { + ( + asserter, workflow_completion::Success::from_cmds(reply_commands).into(), - )) + ) } -pub(crate) fn gen_assert_and_fail( - asserter: impl Fn(&WfActivation) -> () + 'static, -) -> CoreInteraction { - CoreInteraction::AssertAndReply(( - Box::new(asserter), +pub(crate) fn gen_assert_and_fail<'a>( + asserter: &'a dyn Fn(&WfActivation), +) -> AsserterWithReply<'a> { + ( + asserter, workflow_completion::Failure { failure: Some(UserCodeFailure { message: "Intentional test failure".to_string(), @@ -151,7 +146,20 @@ pub(crate) fn gen_assert_and_fail( }), } .into(), - )) + ) } -// TODO make simple match macro for typical assert_matches case +/// Generate asserts for [poll_and_reply] by passing patterns to match against the job list +#[macro_export] +macro_rules! job_assert { + ($($pat:pat),+) => { + |res| { + assert_matches!( + res.jobs.as_slice(), + [$(WfActivationJob { + variant: Some($pat), + }),+] + ); + } + }; +} From fdae4095001820fedcdd31af7e0c4b7041e241d9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 31 Mar 2021 10:03:18 -0700 Subject: [PATCH 14/15] Clippy fixes --- src/machines/test_help/mod.rs | 55 ++++++++++++++++------------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 2f313d938..f9290fdc8 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -93,30 +93,27 @@ pub(crate) async fn poll_and_reply<'a>( let expect_iter = expect_and_reply.iter(); for interaction in expect_iter { - match interaction { - (asserter, reply) => { - let res = core.poll_workflow_task(task_queue).await.unwrap(); - - asserter(&res); - - let task_tok = res.task_token; - if run_id.is_empty() { - run_id = res.run_id; - } - - core.complete_workflow_task(WfActivationCompletion::from_status( - task_tok, - reply.clone(), - )) - .await - .unwrap(); - - if evict_after_each_reply && evictions < expected_evictions { - core.evict_run(&run_id); - evictions += 1; - continue 'outer; - } - } + let (asserter, reply) = interaction; + let res = core.poll_workflow_task(task_queue).await.unwrap(); + + asserter(&res); + + let task_tok = res.task_token; + if run_id.is_empty() { + run_id = res.run_id; + } + + core.complete_workflow_task(WfActivationCompletion::from_status( + task_tok, + reply.clone(), + )) + .await + .unwrap(); + + if evict_after_each_reply && evictions < expected_evictions { + core.evict_run(&run_id); + evictions += 1; + continue 'outer; } } @@ -124,19 +121,17 @@ pub(crate) async fn poll_and_reply<'a>( } } -pub(crate) fn gen_assert_and_reply<'a>( - asserter: &'a dyn Fn(&WfActivation), +pub(crate) fn gen_assert_and_reply( + asserter: &dyn Fn(&WfActivation), reply_commands: Vec, -) -> AsserterWithReply<'a> { +) -> AsserterWithReply<'_> { ( asserter, workflow_completion::Success::from_cmds(reply_commands).into(), ) } -pub(crate) fn gen_assert_and_fail<'a>( - asserter: &'a dyn Fn(&WfActivation), -) -> AsserterWithReply<'a> { +pub(crate) fn gen_assert_and_fail(asserter: &dyn Fn(&WfActivation)) -> AsserterWithReply<'_> { ( asserter, workflow_completion::Failure { From 2e3988e74df9a6d12a5d79423c7f4824bf627157 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 31 Mar 2021 10:30:53 -0700 Subject: [PATCH 15/15] Remove unneeded TODO --- src/machines/test_help/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index f9290fdc8..49939da51 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -73,10 +73,6 @@ pub(crate) fn build_fake_core( CoreSDK::new(mock_gateway) } -// TODO: In reality this whole thing might be better done by just using the async wf driver -- -// then we don't need to bother with the commands being issued and assertions, but keeping this -// probably has some value still to test without it getting in the way, too. - type AsserterWithReply<'a> = (&'a dyn Fn(&WfActivation), wf_activation_completion::Status); pub(crate) async fn poll_and_reply<'a>(