diff --git a/README.md b/README.md index 93bc66ae8..d3bbc7eb2 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ let _enter = s.enter(); To run the Jaeger instance: `docker run --rm -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest` +Jaeger collection is off by default, you must set `TEMPORAL_ENABLE_OPENTELEMENTRY=true` in the +environment to enable it. To show logs in the console, set the `RUST_LOG` environment variable to `temporal_sdk_core=DEBUG` or whatever level you desire. The env var is parsed according to tracing's diff --git a/src/core_tracing.rs b/src/core_tracing.rs index 77f217272..250292209 100644 --- a/src/core_tracing.rs +++ b/src/core_tracing.rs @@ -1,39 +1,49 @@ use itertools::Itertools; use std::{collections::VecDeque, sync::Once}; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::{layer::SubscriberExt, EnvFilter}; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +const ENABLE_OPENTELEM_ENV_VAR: &str = "TEMPORAL_ENABLE_OPENTELEMETRY"; static TRACING_INIT: Once = Once::new(); /// Initialize tracing subscribers and output. Core will not call this itself, it exists here so /// that consumers and tests have an easy way to initialize tracing. pub fn tracing_init() { TRACING_INIT.call_once(|| { - // Not all low-down unit tests use Tokio - #[cfg(test)] - let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name("coresdk") - .install_simple() - .unwrap(); - - #[cfg(not(test))] - let tracer = opentelemetry_jaeger::new_pipeline() - .with_service_name("coresdk") - .install_batch(opentelemetry::runtime::Tokio) - .unwrap(); + let opentelem_on = std::env::var(ENABLE_OPENTELEM_ENV_VAR).is_ok(); - let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); let filter_layer = EnvFilter::try_from_default_env() .or_else(|_| EnvFilter::try_new("info")) .unwrap(); - let fmt_layer = tracing_subscriber::fmt::layer().with_target(false).pretty(); - tracing_subscriber::registry() - .with(opentelemetry) - .with(filter_layer) - .with(fmt_layer) - .try_init() - .unwrap(); + if opentelem_on { + // Not all low-down unit tests use Tokio + #[cfg(test)] + let tracer = opentelemetry_jaeger::new_pipeline() + .with_service_name("coresdk") + .install_simple() + .unwrap(); + + #[cfg(not(test))] + let tracer = opentelemetry_jaeger::new_pipeline() + .with_service_name("coresdk") + .install_batch(opentelemetry::runtime::Tokio) + .unwrap(); + + let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + tracing_subscriber::registry() + .with(opentelemetry) + .with(filter_layer) + .with(tracing_subscriber::fmt::layer().with_target(false).pretty()) + .try_init() + .unwrap(); + } else { + // Because these types don't compose nicely we must repeat ourselves in these branches + let reg = tracing_subscriber::registry() + .with(filter_layer) + .with(tracing_subscriber::fmt::layer().with_target(false).pretty()); + tracing::subscriber::set_global_default(reg).unwrap(); + }; }); } diff --git a/src/lib.rs b/src/lib.rs index ef6e39bab..3869cb80a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,8 @@ use crate::{ workflow::{NextWfActivation, WorkflowConcurrencyManager, WorkflowError, WorkflowManager}, }; use dashmap::DashMap; +use futures::TryFutureExt; +use std::future::Future; use std::{ convert::TryInto, fmt::Debug, @@ -71,6 +73,10 @@ pub trait Core: Send + Sync { /// responsibility to call the appropriate workflow code with the provided inputs. Blocks /// indefinitely until such work is available or [Core::shutdown] is called. /// + /// Note that it is possible to receive work from a task queue that isn't the argument you + /// passed if you previously polled that task queue and there is remaining work that the lang + /// side must do before core can respond to the server. + /// /// TODO: Examples async fn poll_workflow_task(&self, task_queue: &str) -> Result; @@ -145,28 +151,8 @@ struct CoreSDK { shutdown_requested: AtomicBool, /// Used to wake up future which checks shutdown state shutdown_notify: Notify, -} - -/// Can be used inside the CoreSDK impl to block on any method that polls the server until it -/// responds, or until the shutdown flag is set (aborting the poll) -macro_rules! abort_on_shutdown { - ($self:ident, $gateway_fn:tt, $poll_arg:expr) => {{ - let shutdownfut = async { - loop { - $self.shutdown_notify.notified().await; - if $self.shutdown_requested.load(Ordering::SeqCst) { - break; - } - } - }; - let poll_result_future = $self.server_gateway.$gateway_fn($poll_arg); - tokio::select! { - _ = shutdownfut => { - Err(ShutdownErr.into()) - } - r = poll_result_future => r.map_err(Into::into) - } - }}; + /// Used to interrupt workflow task polling if there is a new pending activation + pending_activation_notify: Notify, } #[async_trait::async_trait] @@ -193,7 +179,22 @@ where return Err(PollWfError::ShutDown); } - match abort_on_shutdown!(self, poll_workflow_task, task_queue.to_owned()) { + let pa_fut = self.pending_activation_notify.notified(); + let poll_result_future = self.shutdownable_fut( + self.server_gateway + .poll_workflow_task(task_queue.to_owned()) + .map_err(Into::into), + ); + let selected_f = tokio::select! { + // If a pending activation comes in while we are waiting on polling, we need to + // restart the loop right away to provide it + _ = pa_fut => { + continue; + } + r = poll_result_future => r + }; + + match selected_f { Ok(work) => { if let Some(activation) = self.prepare_new_activation(work)? { return Ok(activation); @@ -215,7 +216,14 @@ where return Err(PollActivityError::ShutDown); } - match abort_on_shutdown!(self, poll_activity_task, task_queue.to_owned()) { + match self + .shutdownable_fut( + self.server_gateway + .poll_activity_task(task_queue.to_owned()) + .map_err(Into::into), + ) + .await + { Ok(work) => { let task_token = work.task_token.clone(); Ok(ActivityTask::start_from_poll_resp(work, task_token)) @@ -319,6 +327,7 @@ impl CoreSDK { pending_activations: Default::default(), shutdown_requested: AtomicBool::new(false), shutdown_notify: Notify::new(), + pending_activation_notify: Notify::new(), } } @@ -354,10 +363,10 @@ impl CoreSDK { task_token: Vec, ) -> WfActivation { if next_a.more_activations_needed { - self.pending_activations.push(PendingActivation { + self.add_pending_activation(PendingActivation { run_id: next_a.get_run_id().to_owned(), task_token: task_token.clone(), - }) + }); } next_a.finalize(task_token) } @@ -406,7 +415,7 @@ impl CoreSDK { })?; let push_result = self.push_lang_commands(&run_id, cmds)?; if push_result.has_new_lang_jobs { - self.pending_activations.push(PendingActivation { + self.add_pending_activation(PendingActivation { run_id: run_id.to_string(), task_token: task_token.clone(), }); @@ -518,6 +527,37 @@ impl CoreSDK { run_id: run_id.to_owned(), }) } + + /// Wrap a future, making it return early with a shutdown error in the event the shutdown + /// flag has been set + async fn shutdownable_fut( + &self, + wrap_this: impl Future>, + ) -> Result + where + FErr: From, + { + let shutdownfut = async { + loop { + self.shutdown_notify.notified().await; + if self.shutdown_requested.load(Ordering::SeqCst) { + break; + } + } + }; + tokio::select! { + _ = shutdownfut => { + Err(ShutdownErr.into()) + } + r = wrap_this => r + } + } + + /// Enqueue a new pending activation, and notify ourselves + fn add_pending_activation(&self, pa: PendingActivation) { + self.pending_activations.push(pa); + self.pending_activation_notify.notify_one(); + } } #[cfg(test)] diff --git a/tests/integ_tests/polling_tests.rs b/tests/integ_tests/polling_tests.rs new file mode 100644 index 000000000..a28940dc0 --- /dev/null +++ b/tests/integ_tests/polling_tests.rs @@ -0,0 +1,98 @@ +use crate::integ_tests::{ + create_workflow, get_integ_core, simple_wf_tests::schedule_activity_and_timer_cmds, +}; +use assert_matches::assert_matches; +use rand::Rng; +use std::sync::Arc; +use std::time::Duration; +use temporal_sdk_core::protos::coresdk::{ + activity_task::activity_task as act_task, + workflow_activation::{wf_activation_job, FireTimer, WfActivationJob}, + workflow_commands::{ + ActivityCancellationType, CompleteWorkflowExecution, RequestCancelActivity, + }, + workflow_completion::WfActivationCompletion, +}; +use temporal_sdk_core::Core; + +#[tokio::test] +async fn long_poll_interrupted_by_new_pending_activation() { + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); + let core = Arc::new(get_integ_core().await); + let workflow_id: u32 = rng.gen(); + create_workflow(&*core, &task_q, &workflow_id.to_string(), None).await; + let activity_id: String = rng.gen::().to_string(); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_workflow_task(&task_q).await.unwrap(); + // Complete workflow task and schedule activity and a timer that fires immediately + core.complete_workflow_task(schedule_activity_and_timer_cmds( + &task_q, + &activity_id, + &timer_id, + ActivityCancellationType::TryCancel, + task, + Duration::from_secs(60), + Duration::from_millis(50), + )) + .await + .unwrap(); + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to + // complete it in this test as activity is try-cancelled. + let activity_task = core.poll_activity_task(&task_q).await.unwrap(); + assert_matches!( + activity_task.variant, + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) + } + ); + // Poll workflow task and verify that activity has failed. + let task = core.poll_workflow_task(&task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t_id } + )), + }, + ] => { + assert_eq!(t_id, &timer_id); + } + ); + + // Start polling again *before* we complete the WFT + let cc = core.clone(); + let jh = tokio::spawn(async move { + let task = cc.poll_workflow_task(&task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::ResolveActivity(_)), + }] + ); + cc.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![CompleteWorkflowExecution { result: None }.into()], + task.task_token, + )) + .await + .unwrap(); + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + // Then complete the (last) WFT with a request to cancel the AT, which should produce a + // pending activation, unblocking the (already started) poll + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![RequestCancelActivity { + activity_id, + ..Default::default() + } + .into()], + task.task_token, + )) + .await + .unwrap(); + + jh.await.unwrap(); +} diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index a2b1287c3..1cce07a51 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -1,10 +1,11 @@ +use crate::integ_tests::{ + create_workflow, get_integ_core, get_integ_server_options, with_gw, GwApi, NAMESPACE, +}; use assert_matches::assert_matches; use crossbeam::channel::{unbounded, RecvTimeoutError}; -use futures::{channel::mpsc::UnboundedReceiver, future, Future, SinkExt, StreamExt}; +use futures::{channel::mpsc::UnboundedReceiver, future, SinkExt, StreamExt}; use rand::{self, Rng}; -use std::{collections::HashMap, convert::TryFrom, env, sync::Arc, time::Duration}; -use temporal_sdk_core::protos::coresdk::workflow_commands::ActivityCancellationType; -use temporal_sdk_core::protos::coresdk::ActivityTaskCompletion; +use std::{collections::HashMap, sync::Arc, time::Duration}; use temporal_sdk_core::{ protos::coresdk::{ activity_result::{self, activity_result as act_res, ActivityResult}, @@ -15,13 +16,13 @@ use temporal_sdk_core::{ WfActivationJob, }, workflow_commands::{ - CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, RequestCancelActivity, - ScheduleActivity, StartTimer, + ActivityCancellationType, CancelTimer, CompleteWorkflowExecution, + FailWorkflowExecution, RequestCancelActivity, ScheduleActivity, StartTimer, }, workflow_completion::WfActivationCompletion, + ActivityTaskCompletion, }, - tracing_init, CompleteWfError, Core, CoreInitOptions, PollWfError, ServerGatewayApis, - ServerGatewayOptions, Url, + tracing_init, CompleteWfError, Core, CoreInitOptions, PollWfError, }; // TODO: These tests can get broken permanently if they break one time and the server is not @@ -31,56 +32,6 @@ use temporal_sdk_core::{ // TODO: We should also get expected histories for these tests and confirm that the history // at the end matches. -const NAMESPACE: &str = "default"; -type GwApi = Arc; - -async fn create_workflow( - core: &dyn Core, - task_q: &str, - workflow_id: &str, - wf_type: Option<&str>, -) -> String { - with_gw(core, |gw: GwApi| async move { - gw.start_workflow( - NAMESPACE.to_owned(), - task_q.to_owned(), - workflow_id.to_owned(), - wf_type.unwrap_or("test-workflow").to_owned(), - ) - .await - .unwrap() - .run_id - }) - .await -} - -async fn with_gw Fout, Fout: Future>(core: &dyn Core, fun: F) -> Fout::Output { - let gw = core.server_gateway(); - fun(gw).await -} - -fn get_integ_server_options() -> ServerGatewayOptions { - let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { - Ok(addr) => addr, - Err(_) => "http://localhost:7233".to_owned(), - }; - let url = Url::try_from(&*temporal_server_address).unwrap(); - ServerGatewayOptions { - namespace: NAMESPACE.to_string(), - identity: "integ_tester".to_string(), - worker_binary_id: "".to_string(), - long_poll_timeout: Duration::from_secs(60), - target_url: url, - } -} - -async fn get_integ_core() -> impl Core { - let gateway_opts = get_integ_server_options(); - temporal_sdk_core::init(CoreInitOptions { gateway_opts }) - .await - .unwrap() -} - #[tokio::test] async fn timer_workflow() { let task_q = "timer_workflow"; @@ -356,7 +307,8 @@ fn schedule_activity_cmd( task.task_token, ) } -fn schedule_activity_and_timer_cmds( + +pub fn schedule_activity_and_timer_cmds( task_q: &str, activity_id: &str, timer_id: &str, @@ -392,8 +344,6 @@ fn schedule_activity_and_timer_cmds( #[tokio::test] async fn activity_cancellation_try_cancel() { - tracing_init(); - let mut rng = rand::thread_rng(); let task_q_salt: u32 = rng.gen(); let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); @@ -459,8 +409,6 @@ async fn activity_cancellation_try_cancel() { #[tokio::test] async fn started_activity_timeout() { - tracing_init(); - let mut rng = rand::thread_rng(); let task_q_salt: u32 = rng.gen(); let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); diff --git a/tests/main.rs b/tests/main.rs index 887c013d1..b849d8354 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1,4 +1,62 @@ #[cfg(test)] mod integ_tests { + use std::{convert::TryFrom, env, future::Future, sync::Arc, time::Duration}; + use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayApis, ServerGatewayOptions}; + use url::Url; + + mod polling_tests; mod simple_wf_tests; + + const NAMESPACE: &str = "default"; + type GwApi = Arc; + + pub async fn create_workflow( + core: &dyn Core, + task_q: &str, + workflow_id: &str, + wf_type: Option<&str>, + ) -> String { + with_gw(core, |gw: GwApi| async move { + gw.start_workflow( + NAMESPACE.to_owned(), + task_q.to_owned(), + workflow_id.to_owned(), + wf_type.unwrap_or("test-workflow").to_owned(), + ) + .await + .unwrap() + .run_id + }) + .await + } + + pub async fn with_gw Fout, Fout: Future>( + core: &dyn Core, + fun: F, + ) -> Fout::Output { + let gw = core.server_gateway(); + fun(gw).await + } + + pub fn get_integ_server_options() -> ServerGatewayOptions { + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + let url = Url::try_from(&*temporal_server_address).unwrap(); + ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "integ_tester".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + target_url: url, + } + } + + pub async fn get_integ_core() -> impl Core { + let gateway_opts = get_integ_server_options(); + temporal_sdk_core::init(CoreInitOptions { gateway_opts }) + .await + .unwrap() + } }