Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pending activations interrupt long poll #89

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ let _enter = s.enter();

To run the Jaeger instance:
`docker run --rm -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest`
Jaeger collection is off by default, you must set `TEMPORAL_ENABLE_OPENTELEMENTRY=true` in the
environment to enable it.

To show logs in the console, set the `RUST_LOG` environment variable to `temporal_sdk_core=DEBUG`
or whatever level you desire. The env var is parsed according to tracing's
Expand Down
54 changes: 32 additions & 22 deletions src/core_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,49 @@
use itertools::Itertools;
use std::{collections::VecDeque, sync::Once};
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

const ENABLE_OPENTELEM_ENV_VAR: &str = "TEMPORAL_ENABLE_OPENTELEMETRY";
static TRACING_INIT: Once = Once::new();

/// Initialize tracing subscribers and output. Core will not call this itself, it exists here so
/// that consumers and tests have an easy way to initialize tracing.
pub fn tracing_init() {
TRACING_INIT.call_once(|| {
// Not all low-down unit tests use Tokio
#[cfg(test)]
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("coresdk")
.install_simple()
.unwrap();

#[cfg(not(test))]
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("coresdk")
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let opentelem_on = std::env::var(ENABLE_OPENTELEM_ENV_VAR).is_ok();

let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("info"))
.unwrap();
let fmt_layer = tracing_subscriber::fmt::layer().with_target(false).pretty();

tracing_subscriber::registry()
.with(opentelemetry)
.with(filter_layer)
.with(fmt_layer)
.try_init()
.unwrap();
if opentelem_on {
// Not all low-down unit tests use Tokio
#[cfg(test)]
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("coresdk")
.install_simple()
.unwrap();

#[cfg(not(test))]
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("coresdk")
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();

let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);

tracing_subscriber::registry()
.with(opentelemetry)
.with(filter_layer)
.with(tracing_subscriber::fmt::layer().with_target(false).pretty())
.try_init()
.unwrap();
} else {
// Because these types don't compose nicely we must repeat ourselves in these branches
let reg = tracing_subscriber::registry()
.with(filter_layer)
.with(tracing_subscriber::fmt::layer().with_target(false).pretty());
tracing::subscriber::set_global_default(reg).unwrap();
};
});
}

Expand Down
94 changes: 67 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ use crate::{
workflow::{NextWfActivation, WorkflowConcurrencyManager, WorkflowError, WorkflowManager},
};
use dashmap::DashMap;
use futures::TryFutureExt;
use std::future::Future;
use std::{
convert::TryInto,
fmt::Debug,
Expand All @@ -71,6 +73,10 @@ pub trait Core: Send + Sync {
/// responsibility to call the appropriate workflow code with the provided inputs. Blocks
/// indefinitely until such work is available or [Core::shutdown] is called.
///
/// Note that it is possible to receive work from a task queue that isn't the argument you
/// passed if you previously polled that task queue and there is remaining work that the lang
/// side must do before core can respond to the server.
///
/// TODO: Examples
async fn poll_workflow_task(&self, task_queue: &str) -> Result<WfActivation, PollWfError>;

Expand Down Expand Up @@ -145,28 +151,8 @@ struct CoreSDK<WP> {
shutdown_requested: AtomicBool,
/// Used to wake up future which checks shutdown state
shutdown_notify: Notify,
}

/// Can be used inside the CoreSDK impl to block on any method that polls the server until it
/// responds, or until the shutdown flag is set (aborting the poll)
macro_rules! abort_on_shutdown {
($self:ident, $gateway_fn:tt, $poll_arg:expr) => {{
let shutdownfut = async {
loop {
$self.shutdown_notify.notified().await;
if $self.shutdown_requested.load(Ordering::SeqCst) {
break;
}
}
};
let poll_result_future = $self.server_gateway.$gateway_fn($poll_arg);
tokio::select! {
_ = shutdownfut => {
Err(ShutdownErr.into())
}
r = poll_result_future => r.map_err(Into::into)
}
}};
/// Used to interrupt workflow task polling if there is a new pending activation
pending_activation_notify: Notify,
}

#[async_trait::async_trait]
Expand All @@ -193,7 +179,22 @@ where
return Err(PollWfError::ShutDown);
}

match abort_on_shutdown!(self, poll_workflow_task, task_queue.to_owned()) {
let pa_fut = self.pending_activation_notify.notified();
let poll_result_future = self.shutdownable_fut(
self.server_gateway
.poll_workflow_task(task_queue.to_owned())
.map_err(Into::into),
);
let selected_f = tokio::select! {
// If a pending activation comes in while we are waiting on polling, we need to
// restart the loop right away to provide it
_ = pa_fut => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the poll request in this case?
Is it cancelled? Is it ignored?
We probably don't want to create a new request if we got interrputed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a good question, was thinking about it after I wrote this. I'll see if I can figure out a way to test that while I finish this. Ideally, it doesn't really matter, because we spit out the PA so fast that when we go back to requesting it'll kinda be like nothing ever happened... but perhaps we need to keep the polling future around in order to ensure that behavior. I'll play with it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this opens up too big of a can of worms to deal with in this PR. Ideally the request should actually be cancelled I think. Keeping it around raises too many questions like "what if the next poll is for a different task queue" or "how long should we buffer the response" and so on. I opened #91 so we can come back to this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we should discard initial poll as from the lang's perspective it polled and got an activation, it shouldn't care if core made a request to the server or not in order to get it.

continue;
}
r = poll_result_future => r
};

match selected_f {
Ok(work) => {
if let Some(activation) = self.prepare_new_activation(work)? {
return Ok(activation);
Expand All @@ -215,7 +216,14 @@ where
return Err(PollActivityError::ShutDown);
}

match abort_on_shutdown!(self, poll_activity_task, task_queue.to_owned()) {
match self
.shutdownable_fut(
self.server_gateway
.poll_activity_task(task_queue.to_owned())
.map_err(Into::into),
)
.await
{
Ok(work) => {
let task_token = work.task_token.clone();
Ok(ActivityTask::start_from_poll_resp(work, task_token))
Expand Down Expand Up @@ -319,6 +327,7 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
pending_activations: Default::default(),
shutdown_requested: AtomicBool::new(false),
shutdown_notify: Notify::new(),
pending_activation_notify: Notify::new(),
}
}

Expand Down Expand Up @@ -354,10 +363,10 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
task_token: Vec<u8>,
) -> WfActivation {
if next_a.more_activations_needed {
self.pending_activations.push(PendingActivation {
self.add_pending_activation(PendingActivation {
run_id: next_a.get_run_id().to_owned(),
task_token: task_token.clone(),
})
});
}
next_a.finalize(task_token)
}
Expand Down Expand Up @@ -406,7 +415,7 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
})?;
let push_result = self.push_lang_commands(&run_id, cmds)?;
if push_result.has_new_lang_jobs {
self.pending_activations.push(PendingActivation {
self.add_pending_activation(PendingActivation {
run_id: run_id.to_string(),
task_token: task_token.clone(),
});
Expand Down Expand Up @@ -518,6 +527,37 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
run_id: run_id.to_owned(),
})
}

/// Wrap a future, making it return early with a shutdown error in the event the shutdown
/// flag has been set
async fn shutdownable_fut<FOut, FErr>(
&self,
wrap_this: impl Future<Output = Result<FOut, FErr>>,
) -> Result<FOut, FErr>
where
FErr: From<ShutdownErr>,
{
let shutdownfut = async {
loop {
self.shutdown_notify.notified().await;
if self.shutdown_requested.load(Ordering::SeqCst) {
break;
}
}
};
tokio::select! {
_ = shutdownfut => {
Err(ShutdownErr.into())
}
r = wrap_this => r
}
}

/// Enqueue a new pending activation, and notify ourselves
fn add_pending_activation(&self, pa: PendingActivation) {
self.pending_activations.push(pa);
self.pending_activation_notify.notify_one();
}
}

#[cfg(test)]
Expand Down
98 changes: 98 additions & 0 deletions tests/integ_tests/polling_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use crate::integ_tests::{
create_workflow, get_integ_core, simple_wf_tests::schedule_activity_and_timer_cmds,
};
use assert_matches::assert_matches;
use rand::Rng;
use std::sync::Arc;
use std::time::Duration;
use temporal_sdk_core::protos::coresdk::{
activity_task::activity_task as act_task,
workflow_activation::{wf_activation_job, FireTimer, WfActivationJob},
workflow_commands::{
ActivityCancellationType, CompleteWorkflowExecution, RequestCancelActivity,
},
workflow_completion::WfActivationCompletion,
};
use temporal_sdk_core::Core;

#[tokio::test]
async fn long_poll_interrupted_by_new_pending_activation() {
let mut rng = rand::thread_rng();
let task_q_salt: u32 = rng.gen();
let task_q = format!("activity_cancelled_workflow_{}", task_q_salt.to_string());
let core = Arc::new(get_integ_core().await);
let workflow_id: u32 = rng.gen();
create_workflow(&*core, &task_q, &workflow_id.to_string(), None).await;
let activity_id: String = rng.gen::<u32>().to_string();
let timer_id: String = rng.gen::<u32>().to_string();
let task = core.poll_workflow_task(&task_q).await.unwrap();
// Complete workflow task and schedule activity and a timer that fires immediately
core.complete_workflow_task(schedule_activity_and_timer_cmds(
&task_q,
&activity_id,
&timer_id,
ActivityCancellationType::TryCancel,
task,
Duration::from_secs(60),
Duration::from_millis(50),
))
.await
.unwrap();
// Poll activity and verify that it's been scheduled with correct parameters, we don't expect to
// complete it in this test as activity is try-cancelled.
let activity_task = core.poll_activity_task(&task_q).await.unwrap();
assert_matches!(
activity_task.variant,
Some(act_task::Variant::Start(start_activity)) => {
assert_eq!(start_activity.activity_type, "test_activity".to_string())
}
);
// Poll workflow task and verify that activity has failed.
let task = core.poll_workflow_task(&task_q).await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[
WfActivationJob {
variant: Some(wf_activation_job::Variant::FireTimer(
FireTimer { timer_id: t_id }
)),
},
] => {
assert_eq!(t_id, &timer_id);
}
);

// Start polling again *before* we complete the WFT
let cc = core.clone();
let jh = tokio::spawn(async move {
let task = cc.poll_workflow_task(&task_q).await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WfActivationJob {
variant: Some(wf_activation_job::Variant::ResolveActivity(_)),
}]
);
cc.complete_workflow_task(WfActivationCompletion::ok_from_cmds(
vec![CompleteWorkflowExecution { result: None }.into()],
task.task_token,
))
.await
.unwrap();
});

tokio::time::sleep(Duration::from_millis(100)).await;
// Then complete the (last) WFT with a request to cancel the AT, which should produce a
// pending activation, unblocking the (already started) poll
core.complete_workflow_task(WfActivationCompletion::ok_from_cmds(
vec![RequestCancelActivity {
activity_id,
..Default::default()
}
.into()],
task.task_token,
))
.await
.unwrap();

jh.await.unwrap();
}