Skip to content
196 changes: 151 additions & 45 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod protos;

pub(crate) mod core_tracing;
mod machines;
mod pending_activations;
mod pollers;
mod protosext;
mod workflow;
Expand All @@ -24,22 +25,22 @@ pub use core_tracing::tracing_init;
pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions};
pub use url::Url;

use crate::workflow::WorkflowManager;
use crate::{
machines::{InconvertibleCommandError, WFCommand, WFMachinesError},
pending_activations::{PendingActivation, PendingActivations},
protos::{
coresdk::{
task_completion, wf_activation_completion::Status, Task, TaskCompletion,
WfActivationCompletion, WfActivationSuccess,
},
temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse,
temporal::api::{
enums::v1::WorkflowTaskFailedCause, workflowservice::v1::PollWorkflowTaskQueueResponse,
},
},
protosext::{fmt_task_token, HistoryInfoError},
workflow::{NextWfActivation, WorkflowConcurrencyManager},
workflow::{NextWfActivation, WorkflowConcurrencyManager, WorkflowManager},
};
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use std::fmt::{Display, Formatter};
use std::{
convert::TryInto,
fmt::Debug,
Expand Down Expand Up @@ -110,14 +111,6 @@ pub fn init(opts: CoreInitOptions) -> Result<impl Core> {
})
}

/// Type of task queue to poll.
pub enum TaskQueue {
/// Workflow task
Workflow(String),
/// Activity task
_Activity(String),
}

struct CoreSDK<WP>
where
WP: ServerGatewayApis + 'static,
Expand All @@ -130,31 +123,14 @@ where
/// Maps task tokens to workflow run ids
workflow_task_tokens: DashMap<Vec<u8>, String>,

/// Workflows that are currently under replay will queue their run ID here, indicating that
/// there are more workflow tasks / activations to be performed.
pending_activations: SegQueue<PendingActivation>,
/// Workflows that are currently under replay will queue here, indicating that there are more
/// workflow tasks / activations to be performed.
pending_activations: PendingActivations,

/// Has shutdown been called?
shutdown_requested: AtomicBool,
}

#[derive(Debug)]
struct PendingActivation {
run_id: String,
task_token: Vec<u8>,
}

impl Display for PendingActivation {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PendingActivation(run_id: {}, task_token: {})",
&self.run_id,
fmt_task_token(&self.task_token)
)
}
}

impl<WP> Core for CoreSDK<WP>
where
WP: ServerGatewayApis + Send + Sync,
Expand Down Expand Up @@ -228,12 +204,29 @@ where
let commands = self.access_wf_machine(&run_id, move |mgr| {
Ok(mgr.machines.get_commands())
})?;
// We only actually want to send commands back to the server if there are
// no more pending activations -- in other words the lang SDK has caught
// up on replay.
if !self.pending_activations.has_pending(&run_id) {
self.runtime.block_on(
self.server_gateway
.complete_workflow_task(task_token, commands),
)?;
}
}
Status::Failed(failure) => {
// Blow up any cached data associated with the workflow
self.evict_run(&run_id);

self.runtime.block_on(
self.server_gateway
.complete_workflow_task(task_token, commands),
self.server_gateway.fail_workflow_task(
task_token,
WorkflowTaskFailedCause::from_i32(failure.cause)
.unwrap_or(WorkflowTaskFailedCause::Unspecified),
failure.failure,
),
)?;
}
Status::Failed(_) => {}
}
Ok(())
}
Expand Down Expand Up @@ -304,6 +297,11 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
Ok(())
}

/// Remove a workflow run from the cache entirely
fn evict_run(&self, run_id: &str) {
self.workflow_machines.evict(run_id);
}

/// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing
/// span to the wf machines thread.
fn access_wf_machine<F, Fout>(&self, run_id: &str, mutator: F) -> Result<Fout>
Expand Down Expand Up @@ -338,7 +336,7 @@ pub enum CoreError {
UninterpretableCommand(#[from] InconvertibleCommandError),
/// Underlying error in history processing
UnderlyingHistError(#[from] HistoryInfoError),
/// Underlying error in state machines
/// Underlying error in state machines: {0:?}
UnderlyingMachinesError(#[from] WFMachinesError),
/// Task token had nothing associated with it: {0:?}
NothingFoundForTaskToken(Vec<u8>),
Expand All @@ -357,18 +355,23 @@ pub enum CoreError {
#[cfg(test)]
mod test {
use super::*;
use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes;
use crate::{
machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder},
protos::{
coresdk::{
wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed,
WfActivationJob,
},
temporal::api::command::v1::{
CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes,
StartTimerCommandAttributes,
temporal::api::{
command::v1::{
CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes,
StartTimerCommandAttributes,
},
enums::v1::{EventType, WorkflowTaskFailedCause},
failure::v1::Failure,
workflowservice::v1::RespondWorkflowTaskFailedResponse,
},
temporal::api::enums::v1::EventType,
},
test_help::canned_histories,
};
Expand All @@ -377,7 +380,7 @@ mod test {
const TASK_Q: &str = "test-task-queue";
const RUN_ID: &str = "fake_run_id";

#[fixture(hist_batches=&[])]
#[fixture(hist_batches = &[])]
fn single_timer_setup(hist_batches: &[usize]) -> FakeCore {
let wfid = "fake_wf_id";

Expand All @@ -386,8 +389,8 @@ mod test {
}

#[rstest(core,
case::incremental(single_timer_setup(&[1, 2])),
case::replay(single_timer_setup(&[2]))
case::incremental(single_timer_setup(&[1, 2])),
case::replay(single_timer_setup(&[2]))
)]
fn single_timer_test_across_wf_bridge(core: FakeCore) {
let res = core.poll_task(TASK_Q).unwrap();
Expand Down Expand Up @@ -569,7 +572,8 @@ mod test {
let timer_1_id = "timer1";
let task_queue = "test-task-queue";

let mut t = canned_histories::workflow_fails_after_timer(timer_1_id, original_run_id);
let mut t =
canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, original_run_id);
let core = build_fake_core(wfid, run_id, &mut t, &[2]);

let res = core.poll_task(task_queue).unwrap();
Expand Down Expand Up @@ -660,4 +664,106 @@ mod test {
core.poll_task(task_queue).unwrap();
}
}

#[test]
fn complete_activation_with_failure() {
let wfid = "fake_wf_id";
let timer_id = "timer";

let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id);
let mut core = build_fake_core(wfid, RUN_ID, &mut t, &[2, 3]);
// Need to create an expectation that we will call a failure completion
Arc::get_mut(&mut core.server_gateway)
.unwrap()
.expect_fail_workflow_task()
.times(1)
.returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {}));

let res = core.poll_task(TASK_Q).unwrap();
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![StartTimerCommandAttributes {
timer_id: timer_id.to_string(),
..Default::default()
}
.into()],
res.task_token,
))
.unwrap();

let res = core.poll_task(TASK_Q).unwrap();
core.complete_task(TaskCompletion::fail(
res.task_token,
WorkflowTaskFailedCause::BadBinary,
Failure {
message: "oh noooooooo".to_string(),
..Default::default()
},
))
.unwrap();

let res = core.poll_task(TASK_Q).unwrap();
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
variant: Some(wf_activation_job::Variant::StartWorkflow(_)),
}]
);
// Need to re-issue the start timer command (we are replaying)
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![StartTimerCommandAttributes {
timer_id: timer_id.to_string(),
..Default::default()
}
.into()],
res.task_token,
))
.unwrap();
// Now we may complete the workflow
let res = core.poll_task(TASK_Q).unwrap();
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
variant: Some(wf_activation_job::Variant::FireTimer(_)),
}]
);
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()],
res.task_token,
))
.unwrap();
}

#[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
fn simple_timer_fail_wf_execution(hist_batches: &[usize]) {
let wfid = "fake_wf_id";
let run_id = "fake_run_id";
let timer_id = "timer1";

let mut t = canned_histories::single_timer(timer_id);
let core = build_fake_core(wfid, run_id, &mut t, hist_batches);

let res = core.poll_task(TASK_Q).unwrap();
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![StartTimerCommandAttributes {
timer_id: timer_id.to_string(),
..Default::default()
}
.into()],
res.task_token,
))
.unwrap();

let res = core.poll_task(TASK_Q).unwrap();
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![FailWorkflowExecutionCommandAttributes {
failure: Some(Failure {
message: "I'm ded".to_string(),
..Default::default()
}),
}
.into()],
res.task_token,
))
.unwrap();
}
}
4 changes: 2 additions & 2 deletions src/machines/complete_workflow_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ impl CompleteWorkflowMachine {
};
let cmd = match s
.on_event_mut(CompleteWorkflowMachineEvents::Schedule)
.expect("Scheduling timers doesn't fail")
.expect("Scheduling complete wf machines doesn't fail")
.pop()
{
Some(CompleteWFCommand::AddCommand(c)) => c,
_ => panic!("Timer on_schedule must produce command"),
_ => panic!("complete wf machine on_schedule must produce command"),
};
(s, cmd)
}
Expand Down
Loading