Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate pending activations by returning new activations from complete #96

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions protos/local/workflow_activation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ message WFActivation {
string run_id = 3;
/// The things to do upon activating the workflow
repeated WFActivationJob jobs = 4;
/// True if this activation was internally generated rather than from the server
bool from_pending = 5;
}

message WFActivationJob {
Expand Down
141 changes: 46 additions & 95 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub mod protos;
pub(crate) mod core_tracing;
mod errors;
mod machines;
mod pending_activations;
mod pollers;
mod protosext;
mod workflow;
Expand All @@ -33,7 +32,6 @@ pub use url::Url;
use crate::{
errors::{ShutdownErr, WorkflowUpdateError},
machines::{EmptyWorkflowCommandErr, WFCommand},
pending_activations::{PendingActivation, PendingActivations},
protos::{
coresdk::{
activity_result::{self as ar, activity_result},
Expand Down Expand Up @@ -90,11 +88,12 @@ pub trait Core: Send + Sync {
async fn poll_activity_task(&self, task_queue: &str)
-> Result<ActivityTask, PollActivityError>;

/// Tell the core that a workflow activation has completed
/// Tell the core that a workflow activation has completed, the response may contain another
/// activation that the lang side will need to perform.
async fn complete_workflow_task(
&self,
completion: WfActivationCompletion,
) -> Result<(), CompleteWfError>;
) -> Result<Option<WfActivation>, CompleteWfError>;

/// Tell the core that an activity has finished executing
async fn complete_activity_task(
Expand Down Expand Up @@ -122,9 +121,8 @@ pub struct CoreInitOptions {
/// Options for the connection to the temporal server
pub gateway_opts: ServerGatewayOptions,
/// If set to true (which should be the default choice until sticky task queues are implemented)
/// workflows are evicted after they no longer have any pending activations. IE: After they
/// have sent new commands to the server.
pub evict_after_pending_cleared: bool,
/// workflows are evicted after each completion which does not return a new activation.
pub evict_after_each_workflow_task: bool,
}

/// Initializes an instance of the core sdk and establishes a connection to the temporal server.
Expand Down Expand Up @@ -153,16 +151,10 @@ struct CoreSDK<WP> {
/// Workflows (by run id) for which the last task completion we sent was a failure
workflows_last_task_failed: DashSet<String>,

/// Workflows that are currently under replay will queue here, indicating that there are more
/// workflow tasks / activations to be performed.
pending_activations: PendingActivations,

/// Has shutdown been called?
shutdown_requested: AtomicBool,
/// Used to wake up future which checks shutdown state
shutdown_notify: Notify,
/// Used to interrupt workflow task polling if there is a new pending activation
pending_activation_notify: Notify,
}

#[async_trait::async_trait]
Expand All @@ -175,38 +167,20 @@ where
// The poll needs to be in a loop because we can't guarantee tail call optimization in Rust
// (simply) and we really, really need that for long-poll retries.
loop {
// We must first check if there are pending workflow tasks for workflows that are
// currently replaying, and issue those tasks before bothering the server.
if let Some(pa) = self
.pending_activations
.pop()
.and_then(|p| self.prepare_pending_activation(p).transpose())
{
return pa;
}

if self.shutdown_requested.load(Ordering::SeqCst) {
return Err(PollWfError::ShutDown);
}

let pa_fut = self.pending_activation_notify.notified();
let poll_result_future = self.shutdownable_fut(
self.server_gateway
.poll_workflow_task(task_queue.to_owned())
.map_err(Into::into),
);
debug!("Polling server");
let selected_f = tokio::select! {
// If a pending activation comes in while we are waiting on polling, we need to
// restart the loop right away to provide it
_ = pa_fut => {
debug!("Poll interrupted by new pending activation");
continue;
}
r = poll_result_future => r
};

match selected_f {
match self
.shutdownable_fut(
self.server_gateway
.poll_workflow_task(task_queue.to_owned())
.map_err(Into::into),
)
.await
{
Ok(work) => {
if !work.next_page_token.is_empty() {
// TODO: Support history pagination
Expand All @@ -216,8 +190,7 @@ where
return Ok(activation);
}
}
// Drain pending activations in case of shutdown.
Err(PollWfError::ShutDown) => continue,
Err(PollWfError::ShutDown) => return Err(PollWfError::ShutDown),
Err(e) => return Err(e),
}
}
Expand Down Expand Up @@ -252,7 +225,7 @@ where
async fn complete_workflow_task(
&self,
completion: WfActivationCompletion,
) -> Result<(), CompleteWfError> {
) -> Result<Option<WfActivation>, CompleteWfError> {
let task_token = completion.task_token;
let wfstatus = completion.status;
let run_id = self
Expand All @@ -271,20 +244,19 @@ where
self.wf_activation_success(task_token, &run_id, success)
.await
}
Some(wf_activation_completion::Status::Failed(failure)) => {
self.wf_activation_failed(task_token, &run_id, failure)
.await
}
Some(wf_activation_completion::Status::Failed(failure)) => self
.wf_activation_failed(task_token, &run_id, failure)
.await
.map(|_| None),
None => Err(CompleteWfError::MalformedWorkflowCompletion {
reason: "Workflow completion had empty status field".to_owned(),
completion: None,
}),
};

// Blow up workflows with no more pending activations (IE: They have completed a WFT)
if self.init_options.evict_after_pending_cleared
&& !self.pending_activations.has_pending(&run_id)
{
// Blow up workflows with no more activations (IE: They have completed a WFT)
let has_another_activation = res.as_ref().map_or(false, |o| o.is_some());
if self.init_options.evict_after_each_workflow_task && !has_another_activation {
self.evict_run(&run_id);
}
res
Expand Down Expand Up @@ -347,10 +319,8 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
workflow_machines: WorkflowConcurrencyManager::new(),
workflow_task_tokens: Default::default(),
workflows_last_task_failed: Default::default(),
pending_activations: Default::default(),
shutdown_requested: AtomicBool::new(false),
shutdown_notify: Notify::new(),
pending_activation_notify: Notify::new(),
}
}

Expand All @@ -359,42 +329,34 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
/// TODO: Very likely needs to be in Core public api
pub(crate) fn evict_run(&self, run_id: &str) {
self.workflow_machines.evict(run_id);
self.pending_activations.remove_all_with_run_id(run_id);
}

/// Given a pending activation, prepare it to be sent to lang
#[instrument(skip(self))]
fn prepare_pending_activation(
fn prepare_next_activation(
&self,
pa: PendingActivation,
) -> Result<Option<WfActivation>, PollWfError> {
task_token: Vec<u8>,
run_id: &str,
) -> Result<Option<WfActivation>, CompleteWfError> {
if let Some(next_activation) =
self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())?
self.access_wf_machine(run_id, move |mgr| mgr.get_next_activation())?
{
return Ok(Some(self.finalize_next_activation(
next_activation,
pa.task_token,
true,
)));
return Ok(Some(
self.finalize_next_activation(next_activation, task_token),
));
}
Ok(None)
}

/// Prepare an activation we've just pulled out of a workflow machines instance to be shipped
/// to the lang sdk
// TODO: Probably pointless now
fn finalize_next_activation(
&self,
next_a: NextWfActivation,
task_token: Vec<u8>,
from_pending: bool,
) -> WfActivation {
if next_a.more_activations_needed {
self.add_pending_activation(PendingActivation {
run_id: next_a.get_run_id().to_owned(),
task_token: task_token.clone(),
});
}
next_a.finalize(task_token, from_pending)
next_a.finalize(task_token)
}

/// Given a wf task from the server, prepare an activation (if there is one) to be sent to lang
Expand All @@ -409,13 +371,14 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
let task_token = work.task_token.clone();
debug!(
task_token = %fmt_task_token(&task_token),
hist_len = ?work.history.as_ref().map(|h| h.events.len()),
"Received workflow task from server"
);

let next_activation = self.instantiate_or_update_workflow(work)?;

if let Some(na) = next_activation {
return Ok(Some(self.finalize_next_activation(na, task_token, false)));
return Ok(Some(self.finalize_next_activation(na, task_token)));
}
Ok(None)
}
Expand All @@ -426,7 +389,7 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
task_token: Vec<u8>,
run_id: &str,
success: workflow_completion::Success,
) -> Result<(), CompleteWfError> {
) -> Result<Option<WfActivation>, CompleteWfError> {
// Convert to wf commands
let cmds = success
.commands
Expand All @@ -440,16 +403,14 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
completion: None,
})?;
let push_result = self.push_lang_commands(&run_id, cmds)?;
if push_result.has_new_lang_jobs {
self.add_pending_activation(PendingActivation {
run_id: run_id.to_string(),
task_token: task_token.clone(),
});
}
// We only actually want to send commands back to the server if there are
// no more pending activations -- in other words the lang SDK has caught
// up on replay.
if !self.pending_activations.has_pending(run_id) {
if let Some(next_a) = self.prepare_next_activation(task_token.clone(), run_id)? {
Ok(Some(next_a))
// TODO: The above could return None in which case we probably still need to send
// commands? Double check somehow
} else {
// We only actually want to send commands back to the server if there are no more
// activations to perform

// Since we're telling the server about a wft success, we can remove it from the
// last failed map (if it was present)
self.workflows_last_task_failed.remove(run_id);
Expand All @@ -465,8 +426,8 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
ts.into()
}
})?;
Ok(None)
}
Ok(())
}

/// Handle a failed workflow completion
Expand Down Expand Up @@ -585,12 +546,6 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
r = wrap_this => r
}
}

/// Enqueue a new pending activation, and notify ourselves
fn add_pending_activation(&self, pa: PendingActivation) {
self.pending_activations.push(pa);
self.pending_activation_notify.notify_waiters();
}
}

#[cfg(test)]
Expand Down Expand Up @@ -656,8 +611,6 @@ mod test {
#[rstest]
#[case::incremental(single_timer_setup(&[1, 2]), EvictionMode::NotSticky)]
#[case::replay(single_timer_setup(&[2]), EvictionMode::NotSticky)]
#[case::incremental_evict(single_timer_setup(&[1, 2]), EvictionMode::AfterEveryReply)]
#[case::replay_evict(single_timer_setup(&[2, 2]), EvictionMode::AfterEveryReply)]
#[tokio::test]
async fn single_timer_test_across_wf_bridge(
#[case] core: FakeCore,
Expand Down Expand Up @@ -1350,9 +1303,8 @@ mod test {
}

#[rstest]
#[case::no_evict_inc(&[1, 2, 2], EvictionMode::NotSticky)]
#[case::no_evict(&[2, 2], EvictionMode::NotSticky)]
#[case::evict(&[1, 2, 2, 2], EvictionMode::AfterEveryReply)]
#[case::incremental(&[1, 2, 2], EvictionMode::NotSticky)]
#[case::replay(&[2, 2], EvictionMode::NotSticky)]
#[tokio::test]
async fn complete_activation_with_failure(
#[case] batches: &[usize],
Expand Down Expand Up @@ -1462,7 +1414,6 @@ mod test {

#[tokio::test]
async fn workflow_failures_only_reported_once() {
tracing_init();
let wfid = "fake_wf_id";
let timer_1 = "timer1";
let timer_2 = "timer2";
Expand Down
Loading