Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4bdebda
WIP activity cancellation
vitarb Mar 30, 2021
752e7cf
Add notify_canceled_from_event
vitarb Mar 30, 2021
4a197a1
Address PR feedback
vitarb Mar 31, 2021
9c8db14
Add an end to end integ test for simple activity cancellation scenario
vitarb Mar 31, 2021
1dcfe12
Address lint error
vitarb Mar 31, 2021
7bb6906
Add missing cancelation handler, cleanup some unused code and improve…
vitarb Mar 31, 2021
99b996c
Use correct transition states
vitarb Apr 1, 2021
f647698
Add a test case for immediate cancellation and resolve activity job p…
vitarb Apr 1, 2021
cb4fb15
Allow returning vec of responses from Cancellable::cancel
Sushisource Apr 1, 2021
6a9dd85
push notes to share with Vitaly
Sushisource Apr 1, 2021
2275ee2
Merge branch 'master' into activity-cancel
vitarb Apr 2, 2021
d689d0e
Add cancellation failure cause for failures
vitarb Apr 2, 2021
94b8a3c
Add lib test for activity cancellation and refactor out notify_cancel…
vitarb Apr 2, 2021
3e55a8f
Allow pushing lang commands immediately and add tests for TryCancel a…
vitarb Apr 3, 2021
92e75bc
Add integ test for WaitCancellationCompleted cancellation
vitarb Apr 3, 2021
745fb27
Activity cancellation with abandon type + address code review comments
vitarb Apr 6, 2021
d16fe49
Extract common logic for timer and activity cancellation in
vitarb Apr 6, 2021
07d04c8
remove format
vitarb Apr 6, 2021
4674fbf
Fix histories / expectations and eliminate new job batch count param
Sushisource Apr 6, 2021
c83fba2
Fix comments I made
Sushisource Apr 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions protos/local/workflow_commands.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,24 @@ message ScheduleActivity {
/// configuration. Retries are happening up to schedule_to_close_timeout. To disable retries set
/// retry_policy.maximum_attempts to 1.
common.RetryPolicy retry_policy = 11;
/// Defines behaviour of the underlying workflow when activity cancellation has been requested.
ActivityCancellationType cancellation_type = 12;
}

enum ActivityCancellationType {
/// Initiate a cancellation request and immediately report cancellation to the workflow.
TRY_CANCEL = 0;
/// Wait for activity cancellation completion. Note that activity must heartbeat to receive a
/// cancellation notification. This can block the cancellation for a long time if activity doesn't
/// heartbeat or chooses to ignore the cancellation request.
WAIT_CANCELLATION_COMPLETED = 1;
/// Do not request cancellation of the activity and immediately report cancellation to the workflow
ABANDON = 2;
}

message RequestCancelActivity {
string activity_id = 1;
int64 scheduled_event_id = 2;
}

message QueryResult {
Expand All @@ -87,12 +101,6 @@ message QuerySuccess {
common.Payload response = 1;
}

/// Request cancellation of an activity from a workflow
message RequestActivityCancellation {
string activity_id = 1;
string reason = 2;
}

/// Issued when the workflow completes successfully
message CompleteWorkflowExecution {
common.Payload result = 1;
Expand Down
157 changes: 152 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ pub use core_tracing::tracing_init;
pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions};
pub use url::Url;

use crate::workflow::PushCommandsResult;
use crate::{
errors::{ShutdownErr, WorkflowUpdateError},
machines::{EmptyWorkflowCommandErr, ProtoCommand, WFCommand},
machines::{EmptyWorkflowCommandErr, WFCommand},
pending_activations::{PendingActivation, PendingActivations},
protos::{
coresdk::{
Expand Down Expand Up @@ -403,13 +404,19 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
.to_owned(),
completion: None,
})?;
let commands = self.push_lang_commands(&run_id, cmds)?;
let push_result = self.push_lang_commands(&run_id, cmds)?;
if push_result.has_new_lang_jobs {
self.pending_activations.push(PendingActivation {
run_id: run_id.to_string(),
task_token: task_token.clone(),
});
}
// We only actually want to send commands back to the server if there are
// no more pending activations -- in other words the lang SDK has caught
// up on replay.
if !self.pending_activations.has_pending(&run_id) {
self.server_gateway
.complete_workflow_task(task_token, commands)
.complete_workflow_task(task_token, push_result.server_commands)
.await
.map_err(|ts| {
if ts.code() == tonic::Code::InvalidArgument
Expand Down Expand Up @@ -484,7 +491,7 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
&self,
run_id: &str,
cmds: Vec<WFCommand>,
) -> Result<Vec<ProtoCommand>, WorkflowUpdateError> {
) -> Result<PushCommandsResult, WorkflowUpdateError> {
self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds))
}

Expand Down Expand Up @@ -516,6 +523,9 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
#[cfg(test)]
mod test {
use super::*;
use crate::protos::coresdk::workflow_commands::{
ActivityCancellationType, RequestCancelActivity,
};
use crate::{
machines::test_help::{
build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply, FakeCore,
Expand Down Expand Up @@ -684,7 +694,7 @@ mod test {
.await;
}

#[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
#[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))]
#[tokio::test]
async fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) {
let wfid = "fake_wf_id";
Expand Down Expand Up @@ -729,6 +739,143 @@ mod test {
.await;
}

// TODO: History doesn't go all the way through execution completed -- which is expected in
// real life anyway, but testing that might still be desirable
#[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
#[tokio::test]
async fn scheduled_activity_cancellation_try_cancel(hist_batches: &[usize]) {
let wfid = "fake_wf_id";
let activity_id = "fake_activity";
let signal_id = "signal";

let mut t = canned_histories::cancel_scheduled_activity(activity_id, signal_id);
let core = build_fake_core(wfid, &mut t, hist_batches);

poll_and_reply(
&core,
TASK_Q,
false,
&[
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
vec![ScheduleActivity {
activity_id: activity_id.to_string(),
cancellation_type: ActivityCancellationType::TryCancel as i32,
..Default::default()
}
.into()],
),
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
vec![RequestCancelActivity {
activity_id: activity_id.to_string(),
..Default::default()
}
.into()],
),
// Activity is getting resolved right away as we are in the TryCancel mode.
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::ResolveActivity(_)),
vec![CompleteWorkflowExecution { result: None }.into()],
),
],
)
.await;
}

#[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
#[tokio::test]
async fn scheduled_activity_cancellation_abandon(hist_batches: &[usize]) {
let wfid = "fake_wf_id";
let activity_id = "fake_activity";
let signal_id = "signal";

let mut t = canned_histories::cancel_scheduled_activity_abandon(activity_id, signal_id);
let core = build_fake_core(wfid, &mut t, hist_batches);

poll_and_reply(
&core,
TASK_Q,
false,
&[
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
vec![ScheduleActivity {
activity_id: activity_id.to_string(),
cancellation_type: ActivityCancellationType::Abandon as i32,
..Default::default()
}
.into()],
),
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
vec![RequestCancelActivity {
activity_id: activity_id.to_string(),
..Default::default()
}
.into()],
),
// Activity is getting resolved right away as we are in the Abandon mode.
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::ResolveActivity(_)),
vec![CompleteWorkflowExecution { result: None }.into()],
),
],
)
.await;
}

#[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))]
#[tokio::test]
async fn scheduled_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) {
let wfid = "fake_wf_id";
let activity_id = "fake_activity";
let signal_id = "signal";

let mut t = canned_histories::cancel_scheduled_activity_with_activity_task_cancel(
activity_id,
signal_id,
);
let core = build_fake_core(wfid, &mut t, hist_batches);

poll_and_reply(
&core,
TASK_Q,
false,
&[
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::StartWorkflow(_)),
vec![ScheduleActivity {
activity_id: activity_id.to_string(),
cancellation_type: ActivityCancellationType::WaitCancellationCompleted
as i32,
..Default::default()
}
.into()],
),
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
vec![RequestCancelActivity {
activity_id: activity_id.to_string(),
..Default::default()
}
.into()],
),
// Making sure that activity is not resolved until it's cancelled.
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::SignalWorkflow(_)),
vec![],
),
// Now ActivityTaskCanceled has been processed and activity can be resolved.
gen_assert_and_reply(
&job_assert!(wf_activation_job::Variant::ResolveActivity(_)),
vec![CompleteWorkflowExecution { result: None }.into()],
),
],
)
.await;
}

#[rstest(single_timer_setup(&[1]))]
#[tokio::test]
async fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) {
Expand Down
Loading