diff --git a/build.rs b/build.rs index 040b1f1e0..1e0a5d839 100644 --- a/build.rs +++ b/build.rs @@ -26,6 +26,10 @@ fn main() -> Result<(), Box> { "coresdk.workflow_activation.WFActivationJob.variant", "#[derive(::derive_more::From)]", ) + .type_attribute( + "coresdk.workflow_completion.WFActivationCompletion.status", + "#[derive(::derive_more::From)]", + ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") .compile( &[ diff --git a/src/lib.rs b/src/lib.rs index 0575f5d81..11e976bfe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -321,6 +321,14 @@ impl CoreSDK { } } + /// Evict a workflow from the cache by it's run id + /// + /// TODO: Very likely needs to be in Core public api + pub(crate) fn evict_run(&self, run_id: &str) { + self.workflow_machines.evict(run_id); + self.pending_activations.remove_all_with_run_id(run_id); + } + /// Given a pending activation, prepare it to be sent to lang #[instrument(skip(self))] fn prepare_pending_activation( @@ -330,15 +338,29 @@ impl CoreSDK { if let Some(next_activation) = self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())? { - let task_token = pa.task_token.clone(); - if next_activation.more_activations_needed { - self.pending_activations.push(pa); - } - return Ok(Some(next_activation.finalize(task_token))); + return Ok(Some( + self.finalize_next_activation(next_activation, pa.task_token), + )); } Ok(None) } + /// Prepare an activation we've just pulled out of a workflow machines instance to be shipped + /// to the lang sdk + fn finalize_next_activation( + &self, + next_a: NextWfActivation, + task_token: Vec, + ) -> WfActivation { + if next_a.more_activations_needed { + self.pending_activations.push(PendingActivation { + run_id: next_a.get_run_id().to_owned(), + task_token: task_token.clone(), + }) + } + next_a.finalize(task_token) + } + /// Given a wf task from the server, prepare an activation (if there is one) to be sent to lang fn prepare_new_activation( &self, @@ -354,16 +376,10 @@ impl CoreSDK { "Received workflow task from server" ); - let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; + let next_activation = self.instantiate_or_update_workflow(work)?; if let Some(na) = next_activation { - if na.more_activations_needed { - self.pending_activations.push(PendingActivation { - run_id, - task_token: task_token.clone(), - }); - } - return Ok(Some(na.finalize(task_token))); + return Ok(Some(self.finalize_next_activation(na, task_token))); } Ok(None) } @@ -438,7 +454,7 @@ impl CoreSDK { fn instantiate_or_update_workflow( &self, poll_wf_resp: PollWorkflowTaskQueueResponse, - ) -> Result<(Option, String), PollWfError> { + ) -> Result, PollWfError> { match poll_wf_resp { PollWorkflowTaskQueueResponse { task_token, @@ -454,7 +470,7 @@ impl CoreSDK { .workflow_machines .create_or_update(&run_id, history, workflow_execution) { - Ok(activation) => Ok((activation, run_id)), + Ok(activation) => Ok(activation), Err(source) => Err(PollWfError::WorkflowUpdateError { source, run_id }), } } @@ -472,11 +488,6 @@ impl CoreSDK { self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } - /// Remove a workflow run from the cache entirely - fn evict_run(&self, run_id: &str) { - self.workflow_machines.evict(run_id); - } - /// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing /// span to the wf machines thread. fn access_wf_machine( @@ -506,7 +517,10 @@ impl CoreSDK { mod test { use super::*; use crate::{ - machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, + machines::test_help::{ + build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply, FakeCore, + TestHistoryBuilder, + }, protos::{ coresdk::{ common::UserCodeFailure, @@ -525,16 +539,16 @@ mod test { test_help::canned_histories, }; use rstest::{fixture, rstest}; + use std::sync::atomic::AtomicU64; const TASK_Q: &str = "test-task-queue"; - const RUN_ID: &str = "fake_run_id"; #[fixture(hist_batches = &[])] fn single_timer_setup(hist_batches: &[usize]) -> FakeCore { let wfid = "fake_wf_id"; let mut t = canned_histories::single_timer("fake_timer"); - build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches) } #[fixture(hist_batches = &[])] @@ -542,7 +556,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_activity("fake_activity"); - build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches) } #[fixture(hist_batches = &[])] @@ -550,50 +564,36 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_failed_activity("fake_activity"); - build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches) } - #[rstest(core, - case::incremental(single_timer_setup(&[1, 2])), - case::replay(single_timer_setup(&[2])) - )] + #[rstest] + #[case::incremental(single_timer_setup(&[1, 2]), false)] + #[case::replay(single_timer_setup(&[2]), false)] + #[case::incremental_evict(single_timer_setup(&[1, 2]), true)] + #[case::replay_evict(single_timer_setup(&[2, 2]), true)] #[tokio::test] - async fn single_timer_test_across_wf_bridge(core: FakeCore) { - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - assert!(core.workflow_machines.exists(RUN_ID)); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: "fake_timer".to_string(), - ..Default::default() - } - .into()], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); + async fn single_timer_test_across_wf_bridge(#[case] core: FakeCore, #[case] evict: bool) { + poll_and_reply( + &core, + TASK_Q, + evict, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![StartTimer { + timer_id: "fake_timer".to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::FireTimer(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; } #[rstest(core, @@ -604,169 +604,129 @@ mod test { )] #[tokio::test] async fn single_activity_completion(core: FakeCore) { - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - assert!(core.workflow_machines.exists(RUN_ID)); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ScheduleActivity { - activity_id: "fake_activity".to_string(), - ..Default::default() - } - .into()], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::ResolveActivity(_)), - }] - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: "fake_activity".to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] #[tokio::test] async fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let timer_1_id = "timer1"; let timer_2_id = "timer2"; let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - assert!(core.workflow_machines.exists(run_id)); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - StartTimer { - timer_id: timer_1_id.to_string(), - ..Default::default() - } - .into(), - StartTimer { - timer_id: timer_2_id.to_string(), - ..Default::default() - } - .into(), + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ + StartTimer { + timer_id: timer_1_id.to_string(), + ..Default::default() + } + .into(), + StartTimer { + timer_id: timer_2_id.to_string(), + ..Default::default() + } + .into(), + ], + ), + gen_assert_and_reply( + &|res| { + assert_matches!( + res.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t1_id } + )), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t2_id } + )), + } + ] => { + assert_eq!(t1_id, &timer_1_id); + assert_eq!(t2_id, &timer_2_id); + } + ); + }, + vec![CompleteWorkflowExecution { result: None }.into()], + ), ], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [ - WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer( - FireTimer { timer_id: t1_id } - )), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer( - FireTimer { timer_id: t2_id } - )), - } - ] => { - assert_eq!(t1_id, &timer_1_id); - assert_eq!(t2_id, &timer_2_id); - } - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] #[tokio::test] async fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let timer_id = "wait_timer"; let cancel_timer_id = "cancel_timer"; let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - assert!(core.workflow_machines.exists(run_id)); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - StartTimer { - timer_id: cancel_timer_id.to_string(), - ..Default::default() - } - .into(), - StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into(), - ], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - CancelTimer { - timer_id: cancel_timer_id.to_string(), - } - .into(), - CompleteWorkflowExecution { result: None }.into(), + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ + StartTimer { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + StartTimer { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + ], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::FireTimer(_)), + vec![ + CancelTimer { + timer_id: cancel_timer_id.to_string(), + } + .into(), + CompleteWorkflowExecution { result: None }.into(), + ], + ), ], - task_tok, - )) - .await - .unwrap(); + ) + .await; } #[rstest(single_timer_setup(&[1]))] @@ -788,59 +748,58 @@ mod test { #[tokio::test] async fn workflow_update_random_seed_on_workflow_reset() { let wfid = "fake_wf_id"; - let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; - let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; + let new_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; let timer_1_id = "timer1"; - - let mut t = - canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, original_run_id); - let core = build_fake_core(wfid, run_id, &mut t, &[2]); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - let randomness_seed_from_start: u64; - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow( - StartWorkflow{randomness_seed, ..} - )), - }] => { - randomness_seed_from_start = *randomness_seed; - } - ); - assert!(core.workflow_machines.exists(run_id)); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_1_id.to_string(), - ..Default::default() - } - .into()], - task_tok, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_),), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed{randomness_seed})), - }] => { - assert_ne!(randomness_seed_from_start, *randomness_seed) - } - ); - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - task_tok, - )) - .await - .unwrap(); + let randomness_seed_from_start = AtomicU64::new(0); + + let mut t = canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, new_run_id); + let core = build_fake_core(wfid, &mut t, &[2]); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &|res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow( + StartWorkflow{randomness_seed, ..} + )), + }] => { + randomness_seed_from_start.store(*randomness_seed, Ordering::SeqCst); + } + ); + }, + vec![StartTimer { + timer_id: timer_1_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &|res| { + assert_matches!( + res.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer(_),), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::UpdateRandomSeed( + UpdateRandomSeed{randomness_seed})), + }] => { + assert_ne!(randomness_seed_from_start.load(Ordering::SeqCst), + *randomness_seed) + } + ) + }, + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; } // The incremental version only does one batch here, because the workflow completes right away @@ -849,7 +808,6 @@ mod test { #[tokio::test] async fn cancel_timer_before_sent_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let cancel_timer_id = "cancel_timer"; let mut t = TestHistoryBuilder::default(); @@ -857,43 +815,42 @@ mod test { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - - let task_tok = res.task_token; - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - StartTimer { - timer_id: cancel_timer_id.to_string(), - ..Default::default() - } - .into(), - CancelTimer { - timer_id: cancel_timer_id.to_string(), - } - .into(), - CompleteWorkflowExecution { result: None }.into(), - ], - task_tok, - )) - .await - .unwrap(); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ + StartTimer { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + CancelTimer { + timer_id: cancel_timer_id.to_string(), + } + .into(), + CompleteWorkflowExecution { result: None }.into(), + ], + )], + ) + .await; } + #[rstest] + #[case::no_evict_inc(&[1, 2, 3], false)] + #[case::no_evict(&[2, 3], false)] + #[case::evict(&[2, 3, 3, 3, 3], true)] #[tokio::test] - async fn complete_activation_with_failure() { + async fn complete_activation_with_failure(#[case] batches: &[usize], #[case] evict: bool) { let wfid = "fake_wf_id"; let timer_id = "timer"; let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id); - let mut core = build_fake_core(wfid, RUN_ID, &mut t, &[2, 3]); + let mut core = build_fake_core(wfid, &mut t, batches); // Need to create an expectation that we will call a failure completion Arc::get_mut(&mut core.server_gateway) .unwrap() @@ -901,126 +858,102 @@ mod test { .times(1) .returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {})); - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into()], - res.task_token, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::fail( - res.task_token, - UserCodeFailure { - message: "oh noooooooo".to_string(), - ..Default::default() - }, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::StartWorkflow(_)), - }] - ); - // Need to re-issue the start timer command (we are replaying) - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into()], - res.task_token, - )) - .await - .unwrap(); - // Now we may complete the workflow - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::FireTimer(_)), - }] - ); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![CompleteWorkflowExecution { result: None }.into()], - res.task_token, - )) - .await - .unwrap(); + poll_and_reply( + &core, + TASK_Q, + evict, + &[ + gen_assert_and_reply( + &|_| {}, + vec![StartTimer { + timer_id: timer_id.to_owned(), + ..Default::default() + } + .into()], + ), + gen_assert_and_fail(&|_| {}), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + // Need to re-issue the start timer command (we are replaying) + vec![StartTimer { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::FireTimer(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] #[tokio::test] async fn simple_timer_fail_wf_execution(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let timer_id = "timer1"; let mut t = canned_histories::single_timer(timer_id); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![StartTimer { - timer_id: timer_id.to_string(), - ..Default::default() - } - .into()], - res.task_token, - )) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![FailWorkflowExecution { - failure: Some(UserCodeFailure { - message: "I'm ded".to_string(), - ..Default::default() - }), - } - .into()], - res.task_token, - )) - .await - .unwrap(); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![StartTimer { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::FireTimer(_)), + vec![FailWorkflowExecution { + failure: Some(UserCodeFailure { + message: "I'm ded".to_string(), + ..Default::default() + }), + } + .into()], + ), + ], + ) + .await; } #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] #[tokio::test] async fn two_signals(hist_batches: &[usize]) { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; let mut t = canned_histories::two_signals("sig1", "sig2"); - let core = build_fake_core(wfid, run_id, &mut t, hist_batches); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - // Task is completed with no commands - core.complete_workflow_task(WfActivationCompletion::ok_from_cmds(vec![], res.task_token)) - .await - .unwrap(); - - let res = core.poll_workflow_task(TASK_Q).await.unwrap(); - assert_matches!( - res.jobs.as_slice(), - [ - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - } - ] - ); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + // Task is completed with no commands + vec![], + ), + gen_assert_and_reply( + &job_assert!( + wf_activation_job::Variant::SignalWorkflow(_), + wf_activation_job::Variant::SignalWorkflow(_) + ), + vec![], + ), + ], + ) + .await; } } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index ade531996..bdcdf173d 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -29,6 +29,7 @@ mod version_state_machine; mod workflow_task_state_machine; #[cfg(test)] +#[macro_use] pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 2a1c8820e..9c0874259 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -28,6 +28,7 @@ pub struct TestHistoryBuilder { workflow_task_scheduled_event_id: i64, previous_started_event_id: i64, previous_task_completed_id: i64, + original_run_id: String, } impl TestHistoryBuilder { @@ -159,6 +160,10 @@ impl TestHistoryBuilder { Ok(wf_machines.get_commands()) } + pub fn get_orig_run_id(&self) -> &str { + &self.original_run_id + } + fn handle_workflow_task( &self, wf_machines: &mut WorkflowMachines, @@ -187,6 +192,15 @@ impl TestHistoryBuilder { attributes: Some(attribs), ..Default::default() }; + if let Some(Attributes::WorkflowExecutionStartedEventAttributes( + WorkflowExecutionStartedEventAttributes { + original_execution_run_id, + .. + }, + )) = &evt.attributes + { + self.original_run_id = original_execution_run_id.to_owned(); + }; self.events.push(evt); } } diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 42965053f..49939da51 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -6,14 +6,22 @@ mod history_builder; pub(super) use async_workflow_driver::{CommandSender, TestWorkflowDriver}; pub(crate) use history_builder::TestHistoryBuilder; +use crate::protos::coresdk::common::UserCodeFailure; use crate::{ pollers::MockServerGatewayApis, - protos::temporal::api::common::v1::WorkflowExecution, - protos::temporal::api::history::v1::History, - protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + protos::{ + coresdk::{ + workflow_activation::WfActivation, + workflow_commands::workflow_command, + workflow_completion::{self, wf_activation_completion, WfActivationCompletion}, + }, + temporal::api::common::v1::WorkflowExecution, + temporal::api::history::v1::History, + temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + }, }, - CoreSDK, + Core, CoreSDK, }; use rand::{thread_rng, Rng}; use std::collections::VecDeque; @@ -28,10 +36,10 @@ pub(crate) type FakeCore = CoreSDK; /// up to the workflow task with that number, as in [TestHistoryBuilder::get_history_info]. pub(crate) fn build_fake_core( wf_id: &str, - run_id: &str, t: &mut TestHistoryBuilder, response_batches: &[usize], ) -> FakeCore { + let run_id = t.get_orig_run_id(); let wf = Some(WorkflowExecution { workflow_id: wf_id.to_string(), run_id: run_id.to_string(), @@ -64,3 +72,85 @@ pub(crate) fn build_fake_core( CoreSDK::new(mock_gateway) } + +type AsserterWithReply<'a> = (&'a dyn Fn(&WfActivation), wf_activation_completion::Status); + +pub(crate) async fn poll_and_reply<'a>( + core: &'a FakeCore, + task_queue: &'a str, + evict_after_each_reply: bool, + expect_and_reply: &'a [AsserterWithReply<'a>], +) { + let mut run_id = "".to_string(); + let mut evictions = 0; + let expected_evictions = expect_and_reply.len() - 1; + + 'outer: loop { + let expect_iter = expect_and_reply.iter(); + + for interaction in expect_iter { + let (asserter, reply) = interaction; + let res = core.poll_workflow_task(task_queue).await.unwrap(); + + asserter(&res); + + let task_tok = res.task_token; + if run_id.is_empty() { + run_id = res.run_id; + } + + core.complete_workflow_task(WfActivationCompletion::from_status( + task_tok, + reply.clone(), + )) + .await + .unwrap(); + + if evict_after_each_reply && evictions < expected_evictions { + core.evict_run(&run_id); + evictions += 1; + continue 'outer; + } + } + + break; + } +} + +pub(crate) fn gen_assert_and_reply( + asserter: &dyn Fn(&WfActivation), + reply_commands: Vec, +) -> AsserterWithReply<'_> { + ( + asserter, + workflow_completion::Success::from_cmds(reply_commands).into(), + ) +} + +pub(crate) fn gen_assert_and_fail(asserter: &dyn Fn(&WfActivation)) -> AsserterWithReply<'_> { + ( + asserter, + workflow_completion::Failure { + failure: Some(UserCodeFailure { + message: "Intentional test failure".to_string(), + ..Default::default() + }), + } + .into(), + ) +} + +/// Generate asserts for [poll_and_reply] by passing patterns to match against the job list +#[macro_export] +macro_rules! job_assert { + ($($pat:pat),+) => { + |res| { + assert_matches!( + res.jobs.as_slice(), + [$(WfActivationJob { + variant: Some($pat), + }),+] + ); + } + }; +} diff --git a/src/pending_activations.rs b/src/pending_activations.rs index da4dfa3a0..de2d4d773 100644 --- a/src/pending_activations.rs +++ b/src/pending_activations.rs @@ -1,39 +1,62 @@ use crate::protosext::fmt_task_token; -use crossbeam::queue::SegQueue; -use dashmap::DashMap; -use std::fmt::{Display, Formatter}; +use parking_lot::RwLock; +use std::{ + collections::{HashMap, VecDeque}, + fmt::{Display, Formatter}, +}; -/// Tracks pending activations using an internal queue, while also allowing fast lookup of any -/// pending activations by run ID +/// Tracks pending activations using an internal queue, while also allowing lookup and removal of +/// any pending activations by run ID. #[derive(Default)] pub struct PendingActivations { - queue: SegQueue, - count_by_id: DashMap, + inner: RwLock, +} + +#[derive(Default)] +struct PaInner { + queue: VecDeque, + // Keys are run ids + count_by_id: HashMap, } impl PendingActivations { pub fn push(&self, v: PendingActivation) { - *self + let mut inner = self.inner.write(); + *inner .count_by_id .entry(v.run_id.clone()) - .or_insert_with(|| 0) - .value_mut() += 1; - self.queue.push(v); + .or_insert_with(|| 0) += 1; + inner.queue.push_back(v); } pub fn pop(&self) -> Option { - let rme = self.queue.pop(); + let mut inner = self.inner.write(); + let rme = inner.queue.pop_front(); if let Some(pa) = &rme { - if let Some(mut c) = self.count_by_id.get_mut(&pa.run_id) { - *c.value_mut() -= 1 + let do_remove = if let Some(c) = inner.count_by_id.get_mut(&pa.run_id) { + *c -= 1; + *c == 0 + } else { + false + }; + if do_remove { + inner.count_by_id.remove(&pa.run_id); } - self.count_by_id.remove_if(&pa.run_id, |_, v| v <= &0); } rme } pub fn has_pending(&self, run_id: &str) -> bool { - self.count_by_id.contains_key(run_id) + self.inner.read().count_by_id.contains_key(run_id) + } + + pub fn remove_all_with_run_id(&self, run_id: &str) { + let mut inner = self.inner.write(); + + // The perf here can obviously be improved if it ever becomes an issue, but is left for + // later since it would require some careful fiddling + inner.queue.retain(|pa| pa.run_id != run_id); + inner.count_by_id.remove(run_id); } } @@ -53,3 +76,70 @@ impl Display for PendingActivation { ) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn counts_run_ids() { + let pas = PendingActivations::default(); + let rid1 = "1".to_string(); + let rid2 = "2".to_string(); + pas.push(PendingActivation { + run_id: rid1.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: rid2.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: rid2.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: rid2.clone(), + task_token: vec![], + }); + assert!(pas.has_pending(&rid1)); + assert!(pas.has_pending(&rid2)); + let last = pas.pop().unwrap(); + assert_eq!(&last.run_id, &rid1); + assert!(!pas.has_pending(&rid1)); + assert!(pas.has_pending(&rid2)); + for _ in 0..3 { + let last = pas.pop().unwrap(); + assert_eq!(&last.run_id, &rid2); + } + assert!(!pas.has_pending(&rid2)); + assert!(pas.pop().is_none()); + } + + #[test] + fn can_remove_all_with_id() { + let pas = PendingActivations::default(); + let remove_me = "2".to_string(); + pas.push(PendingActivation { + run_id: "1".to_owned(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: remove_me.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: remove_me.clone(), + task_token: vec![], + }); + pas.push(PendingActivation { + run_id: "3".to_owned(), + task_token: vec![], + }); + pas.remove_all_with_run_id(&remove_me); + assert!(!pas.has_pending(&remove_me)); + assert_eq!(&pas.pop().unwrap().run_id, "1"); + assert_eq!(&pas.pop().unwrap().run_id, "3"); + assert!(pas.pop().is_none()); + } +} diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 3d92a1406..90a96d777 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -30,12 +30,14 @@ pub mod coresdk { include!("coresdk.workflow_commands.rs"); } - use crate::protos::coresdk::activity_result::ActivityResult; use crate::protos::{ coresdk::{ + activity_result::ActivityResult, activity_task::ActivityTask, common::{Payload, UserCodeFailure}, workflow_activation::SignalWorkflow, + workflow_commands::workflow_command::Variant, + workflow_completion::Success, }, temporal::api::{ common::v1::{Payloads, WorkflowExecution}, @@ -64,18 +66,25 @@ pub mod coresdk { } } - impl WfActivationCompletion { - pub fn ok_from_cmds(cmds: Vec, task_token: Vec) -> Self { + impl Success { + pub fn from_cmds(cmds: Vec) -> Self { let cmds: Vec<_> = cmds .into_iter() .map(|c| WorkflowCommand { variant: Some(c) }) .collect(); - let success: workflow_completion::Success = cmds.into(); + cmds.into() + } + } + + impl WfActivationCompletion { + pub fn ok_from_cmds(cmds: Vec, task_token: Vec) -> Self { + let success = Success::from_cmds(cmds); Self { task_token, status: Some(wf_activation_completion::Status::Successful(success)), } } + pub fn fail(task_token: Vec, failure: UserCodeFailure) -> Self { Self { task_token, @@ -86,6 +95,13 @@ pub mod coresdk { )), } } + + pub fn from_status(task_token: Vec, status: wf_activation_completion::Status) -> Self { + Self { + task_token, + status: Some(status), + } + } } impl ActivityResult { diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index a831819fb..a729cb32c 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -94,6 +94,10 @@ impl NextWfActivation { a.task_token = task_token; a } + + pub(crate) fn get_run_id(&self) -> &str { + &self.activation.run_id + } } impl WorkflowManager {