diff --git a/Cargo.toml b/Cargo.toml index 7922bb99d..196233379 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,13 +11,13 @@ license-file = "LICENSE.txt" [dependencies] anyhow = "1.0" async-trait = "0.1" +base64 = "0.13" crossbeam = "0.8" dashmap = "4.0" derive_more = "0.99" displaydoc = "0.1" -env_logger = "0.8" futures = "0.3" -log = "0.4" +itertools = "0.10" once_cell = "1.5" opentelemetry-jaeger = "0.11" opentelemetry = "0.12" diff --git a/README.md b/README.md index 5857ed4c2..6d6fcd9b0 100644 --- a/README.md +++ b/README.md @@ -40,3 +40,18 @@ Any error which is returned from a public interface should be well-typed, and we Errors returned from things only used in testing are free to use [anyhow](https://github.com/dtolnay/anyhow) for less verbosity. + + +## Debugging +The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable +it for a test, insert the below snippet at the start of the test. By default, tracing data is output +to stdout in a (reasonably) pretty manner, and to a Jaeger instance if one exists. + +```rust +core_tracing::tracing_init(); +let s = info_span!("Test start"); +let _enter = s.enter(); +``` + +To run the Jaeger instance: +`docker run --rm -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest` \ No newline at end of file diff --git a/src/core_tracing.rs b/src/core_tracing.rs index 86d66a5a0..6b99c5d74 100644 --- a/src/core_tracing.rs +++ b/src/core_tracing.rs @@ -1,24 +1,57 @@ +use itertools::Itertools; use once_cell::sync::OnceCell; use opentelemetry_jaeger::Uninstall; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use std::collections::VecDeque; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; static TRACING_INIT: OnceCell = OnceCell::new(); -const TRACING_ENABLE_ENV_VAR: &str = "TEMPORAL_CORE_TRACING"; -pub(crate) fn tracing_init() { - let _ = env_logger::try_init(); - if std::env::var(TRACING_ENABLE_ENV_VAR).is_ok() { - TRACING_INIT.get_or_init(|| { - let (tracer, uninstall) = opentelemetry_jaeger::new_pipeline() - .with_service_name("coresdk") - .install() - .unwrap(); - let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); - tracing_subscriber::registry() - .with(opentelemetry) - .try_init() - .unwrap(); - uninstall - }); +/// Initialize tracing subscribers and output. Core will not call this itself, it exists here so +/// that consumers and tests have an easy way to initialize tracing. +pub fn tracing_init() { + TRACING_INIT.get_or_init(|| { + let (tracer, uninstall) = opentelemetry_jaeger::new_pipeline() + .with_service_name("coresdk") + .install() + .unwrap(); + let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); + let filter_layer = EnvFilter::try_from_default_env() + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); + let fmt_layer = tracing_subscriber::fmt::layer().with_target(false).pretty(); + + tracing_subscriber::registry() + .with(opentelemetry) + .with(filter_layer) + .with(fmt_layer) + .try_init() + .unwrap(); + uninstall + }); +} + +/// A trait for using [Display] on the contents of vecs, etc, which don't implement it. +/// +/// Dislike this, but, there doesn't seem to be a great alternative. Calling itertools format +/// inline in an `event!` macro can panic because it gets evaluated twice somehow. +pub(crate) trait VecDisplayer { + fn display(&self) -> String; +} + +impl VecDisplayer for Vec +where + T: std::fmt::Display, +{ + fn display(&self) -> String { + format!("[{}]", self.iter().format(",")) + } +} + +impl VecDisplayer for VecDeque +where + T: std::fmt::Display, +{ + fn display(&self) -> String { + format!("[{}]", self.iter().format(",")) } } diff --git a/src/lib.rs b/src/lib.rs index d872da5fb..d7a046db8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,12 +20,13 @@ mod workflow; #[cfg(test)] mod test_help; +pub use core_tracing::tracing_init; pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; -use crate::machines::WFMachinesError; +use crate::workflow::WorkflowManager; use crate::{ - machines::{InconvertibleCommandError, WFCommand}, + machines::{InconvertibleCommandError, WFCommand, WFMachinesError}, protos::{ coresdk::{ task_completion, wf_activation_completion::Status, Task, TaskCompletion, @@ -33,11 +34,12 @@ use crate::{ }, temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse, }, - protosext::HistoryInfoError, + protosext::{fmt_task_token, HistoryInfoError}, workflow::{NextWfActivation, WorkflowConcurrencyManager}, }; use crossbeam::queue::SegQueue; use dashmap::DashMap; +use std::fmt::{Display, Formatter}; use std::{ convert::TryInto, fmt::Debug, @@ -49,7 +51,7 @@ use std::{ }; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; -use tracing::Level; +use tracing::Span; /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; @@ -94,7 +96,6 @@ pub struct CoreInitOptions { /// * Will panic if called from within an async context, as it will construct a runtime and you /// cannot construct a runtime from within a runtime. pub fn init(opts: CoreInitOptions) -> Result { - core_tracing::tracing_init(); let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; // Initialize server client let work_provider = runtime.block_on(opts.gateway_opts.connect())?; @@ -143,19 +144,30 @@ struct PendingActivation { task_token: Vec, } +impl Display for PendingActivation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "PendingActivation(run_id: {}, task_token: {})", + &self.run_id, + fmt_task_token(&self.task_token) + ) + } +} + impl Core for CoreSDK where WP: ServerGatewayApis + Send + Sync, { - #[instrument(skip(self))] + #[instrument(skip(self), fields(pending_activation))] fn poll_task(&self, task_queue: &str) -> Result { // We must first check if there are pending workflow tasks for workflows that are currently // replaying, and issue those tasks before bothering the server. if let Some(pa) = self.pending_activations.pop() { - event!(Level::DEBUG, msg = "Applying pending activations", ?pa); - let next_activation = self - .workflow_machines - .access(&pa.run_id, |mgr| mgr.get_next_activation())?; + Span::current().record("pending_activation", &format!("{}", &pa).as_str()); + + let next_activation = + self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())?; let task_token = pa.task_token.clone(); if next_activation.more_activations_needed { self.pending_activations.push(pa); @@ -175,10 +187,9 @@ where .runtime .block_on(self.server_gateway.poll_workflow_task(task_queue))?; let task_token = work.task_token.clone(); - event!( - Level::DEBUG, - msg = "Received workflow task from server", - ?task_token + debug!( + task_token = %fmt_task_token(&task_token), + "Received workflow task from server" ); let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; @@ -214,9 +225,9 @@ where match wfstatus { Status::Successful(success) => { self.push_lang_commands(&run_id, success)?; - let commands = self - .workflow_machines - .access(&run_id, |mgr| Ok(mgr.machines.get_commands()))?; + let commands = self.access_wf_machine(&run_id, move |mgr| { + Ok(mgr.machines.get_commands()) + })?; self.runtime.block_on( self.server_gateway .complete_workflow_task(task_token, commands), @@ -285,13 +296,28 @@ impl CoreSDK { .into_iter() .map(|c| c.try_into().map_err(Into::into)) .collect::>>()?; - self.workflow_machines.access(run_id, |mgr| { + self.access_wf_machine(run_id, move |mgr| { mgr.command_sink.send(cmds)?; mgr.machines.iterate_machines()?; Ok(()) })?; Ok(()) } + + /// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing + /// span to the wf machines thread. + fn access_wf_machine(&self, run_id: &str, mutator: F) -> Result + where + F: FnOnce(&mut WorkflowManager) -> Result + Send + 'static, + Fout: Send + Debug + 'static, + { + let curspan = Span::current(); + let mutator = move |wfm: &mut WorkflowManager| { + let _e = curspan.enter(); + mutator(wfm) + }; + self.workflow_machines.access(run_id, mutator) + } } /// The error type returned by interactions with [Core] @@ -537,9 +563,6 @@ mod test { #[test] fn workflow_update_random_seed_on_workflow_reset() { - let s = span!(Level::DEBUG, "Test start", t = "bridge"); - let _enter = s.enter(); - let wfid = "fake_wf_id"; let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; diff --git a/src/machines/complete_workflow_state_machine.rs b/src/machines/complete_workflow_state_machine.rs index 9b31dc5a7..fdf353b54 100644 --- a/src/machines/complete_workflow_state_machine.rs +++ b/src/machines/complete_workflow_state_machine.rs @@ -27,7 +27,7 @@ fsm! { --> CompleteWorkflowCommandRecorded; } -#[derive(Debug)] +#[derive(Debug, derive_more::Display)] pub(super) enum CompleteWFCommand { AddCommand(Command), } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 2985baaac..02b6cf888 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -35,6 +35,7 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; use crate::{ + core_tracing::VecDisplayer, machines::workflow_machines::MachineResponse, protos::{ coresdk::{self, command::Variant, wf_activation_job}, @@ -57,7 +58,6 @@ use std::{ convert::{TryFrom, TryInto}, fmt::{Debug, Display}, }; -use tracing::Level; pub(crate) type ProtoCommand = Command; @@ -155,7 +155,7 @@ where ::Event: TryFrom, ::Event: TryFrom, WFMachinesError: From<<::Event as TryFrom>::Error>, - ::Command: Debug, + ::Command: Debug + Display, ::State: Display, ::Error: Into + 'static + Send + Sync, { @@ -164,12 +164,11 @@ where } fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError> { - event!( - Level::DEBUG, - msg = "handling command", - ?command_type, + debug!( + command_type = ?command_type, machine_name = %self.name(), - state = %self.state() + state = %self.state(), + "handling command" ); if let Ok(converted_command) = command_type.try_into() { match self.on_event_mut(converted_command) { @@ -189,18 +188,18 @@ where event: &HistoryEvent, has_next_event: bool, ) -> Result, WFMachinesError> { - event!( - Level::DEBUG, - msg = "handling event", - %event, + debug!( + event = %event, machine_name = %self.name(), - state = %self.state() + state = %self.state(), + "handling event" ); let converted_event = event.clone().try_into()?; match self.on_event_mut(converted_event) { Ok(c) => { if !c.is_empty() { - event!(Level::DEBUG, msg = "Machine produced commands", ?c, state = %self.state()); + debug!(commands = %c.display(), state = %self.state(), + "Machine produced commands"); } let mut machine_responses = vec![]; for cmd in c { diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 1ab783337..b29510ab5 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -17,7 +17,6 @@ use std::sync::{ mpsc::{self, Receiver, Sender}, Arc, }; -use tracing::Level; /// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control /// over when commands are returned than a normal workflow would. @@ -81,7 +80,7 @@ where Fut: Future, { fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) { - event!(Level::DEBUG, msg = "Test WF driver start called"); + debug!("Test WF driver start called"); } fn fetch_workflow_iteration_output(&mut self) -> Vec { @@ -121,7 +120,7 @@ where } } - event!(Level::DEBUG, msg = "Test wf driver emitting", ?emit_these); + debug!(emit_these = ?emit_these, "Test wf driver emitting"); emit_these } diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index b6cfef40b..8450f469c 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -39,7 +39,7 @@ fsm! { CancelTimerCommandSent --(TimerCanceled) --> Canceled; } -#[derive(Debug)] +#[derive(Debug, derive_more::Display)] pub(super) enum TimerMachineCommand { Complete, Canceled, diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 1223b6875..3182799c4 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,12 +1,12 @@ -use crate::protos::coresdk::wf_activation_job; use crate::{ + core_tracing::VecDisplayer, machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, workflow_task_state_machine::WorkflowTaskMachine, DrivenWorkflow, NewMachineWithCommand, ProtoCommand, TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{StartWorkflow, UpdateRandomSeed, WfActivation}, + coresdk::{wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation}, temporal::api::{ enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent}, @@ -72,23 +72,27 @@ pub(crate) struct WorkflowMachines { } slotmap::new_key_type! { struct MachineKey; } -#[derive(Debug)] +#[derive(Debug, derive_more::Display)] +#[display(fmt = "Cmd&Machine({})", "command")] struct CommandAndMachine { command: ProtoCommand, machine: MachineKey, } /// Returned by [TemporalStateMachine]s when handling events -#[derive(Debug, derive_more::From)] +#[derive(Debug, derive_more::From, derive_more::Display)] #[must_use] #[allow(clippy::large_enum_variant)] pub enum MachineResponse { + #[display(fmt = "PushWFJob")] PushWFJob(#[from(forward)] wf_activation_job::Variant), IssueNewCommand(ProtoCommand), + #[display(fmt = "TriggerWFTaskStarted")] TriggerWFTaskStarted { task_started_event_id: i64, time: SystemTime, }, + #[display(fmt = "UpdateRunIdOnWorkflowReset({})", run_id)] UpdateRunIdOnWorkflowReset { run_id: String, }, @@ -157,7 +161,7 @@ impl WorkflowMachines { /// is the last event in the history. /// /// TODO: Describe what actually happens in here - #[instrument(level = "debug", skip(self))] + #[instrument(level = "debug", skip(self), fields(run_id = %self.run_id))] pub(crate) fn handle_event( &mut self, event: &HistoryEvent, @@ -187,11 +191,10 @@ impl WorkflowMachines { if let Some(sm) = maybe_machine { self.submachine_handle_event(sm, event, has_next_event)?; } else { - event!( - Level::ERROR, - msg = "During event handling, this event had an initial command ID but \ - we could not find a matching state machine! Event: {:?}", - ?event + error!( + event=?event, + "During event handling, this event had an initial command ID but we could \ + not find a matching state machine!" ); } @@ -257,7 +260,7 @@ impl WorkflowMachines { // if (handleLocalActivityMarker(event)) { // return; // } - event!(Level::DEBUG, msg = "handling command event", current_commands = ?self.commands); + debug!(current_commands = ?self.commands, "handling command event"); let consumed_cmd = loop { // handleVersionMarker can skip a marker event if the getVersion call was removed. @@ -481,11 +484,8 @@ impl WorkflowMachines { } })?; if !machine_responses.is_empty() { - event!( - Level::DEBUG, - msg = "Machine produced responses", - ?machine_responses - ); + debug!(responses = %machine_responses.display(), + "Machine produced responses"); } for response in machine_responses { match response { @@ -500,6 +500,8 @@ impl WorkflowMachines { self.task_started(task_started_event_id, time)?; } MachineResponse::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { + // TODO: Should this also update self.run_id? Should we track orig/current + // separately? self.outgoing_wf_activation_jobs.push_back( wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed { randomness_seed: str_to_randomness_seed(&new_run_id), @@ -566,8 +568,6 @@ impl WorkflowMachines { /// machine associated with the command. #[instrument(level = "debug", skip(self))] fn prepare_commands(&mut self) -> Result<()> { - event!(Level::DEBUG, msg = "start prepare_commands", - cur_wf_task_cmds = ?self.current_wf_task_commands); while let Some(c) = self.current_wf_task_commands.pop_front() { let cmd_type = CommandType::from_i32(c.command.command_type) .ok_or(WFMachinesError::UnknownCommandType(c.command.command_type))?; @@ -579,7 +579,7 @@ impl WorkflowMachines { } self.commands.push_back(c); } - event!(Level::DEBUG, msg = "end prepare_commands", commands = ?self.commands); + debug!(commands = %self.commands.display(), "prepared commands"); Ok(()) } diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 1fad9a517..8cfdf2c63 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -40,16 +40,16 @@ impl WorkflowTaskMachine { } } -#[derive(Debug)] +#[derive(Debug, derive_more::Display)] pub(super) enum WFTaskMachineCommand { /// Issued to (possibly) trigger the event loop + #[display(fmt = "WFTaskStartedTrigger")] WFTaskStartedTrigger { task_started_event_id: i64, time: SystemTime, }, - RunIdOnWorkflowResetUpdate { - run_id: String, - }, + #[display(fmt = "RunIdOnWorkflowResetUpdate({})", run_id)] + RunIdOnWorkflowResetUpdate { run_id: String }, } impl WFMachinesAdapter for WorkflowTaskMachine { diff --git a/src/protos/mod.rs b/src/protos/mod.rs index ed8995319..d8700378d 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -90,6 +90,7 @@ pub mod temporal { include!("temporal.api.command.v1.rs"); use crate::protos::temporal::api::enums::v1::CommandType; use command::Attributes; + use std::fmt::{Display, Formatter}; impl From for Command { fn from(c: command::Attributes) -> Self { @@ -110,6 +111,14 @@ pub mod temporal { } } } + + impl Display for Command { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let ct = CommandType::from_i32(self.command_type) + .unwrap_or(CommandType::Unspecified); + write!(f, "{:?}", ct) + } + } } } pub mod enums { diff --git a/src/protosext/mod.rs b/src/protosext/mod.rs index 91d15b0d8..8312c7b7b 100644 --- a/src/protosext/mod.rs +++ b/src/protosext/mod.rs @@ -1,3 +1,7 @@ mod history_info; pub(crate) use history_info::HistoryInfo; pub(crate) use history_info::HistoryInfoError; + +pub(crate) fn fmt_task_token(token: &[u8]) -> String { + base64::encode(token) +} diff --git a/src/workflow/bridge.rs b/src/workflow/bridge.rs index cc6f0895a..c81470186 100644 --- a/src/workflow/bridge.rs +++ b/src/workflow/bridge.rs @@ -5,7 +5,6 @@ use crate::{ protos::temporal::api::history::v1::WorkflowExecutionStartedEventAttributes, }; use std::sync::mpsc::{self, Receiver, Sender}; -use tracing::Level; /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] /// expects to be polled by the lang sdk. This struct acts as the bridge between the two, buffering @@ -33,20 +32,16 @@ impl WorkflowBridge { impl DrivenWorkflow for WorkflowBridge { fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) { - event!(Level::DEBUG, msg = "Workflow bridge start called", ?attribs); + debug!(attribs = ?attribs, "wf bridge start"); self.started_attrs = Some(attribs); } fn fetch_workflow_iteration_output(&mut self) -> Vec { let in_cmds = self.incoming_commands.try_recv(); - event!( - Level::DEBUG, - msg = "Workflow bridge iteration fetch", - ?in_cmds - ); - - in_cmds.unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang]) + let in_cmds = in_cmds.unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang]); + debug!(in_cmds = ?in_cmds, "wf bridge iteration fetch"); + in_cmds } fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) { diff --git a/src/workflow/concurrency_manager.rs b/src/workflow/concurrency_manager.rs index a356fac83..eac1d6972 100644 --- a/src/workflow/concurrency_manager.rs +++ b/src/workflow/concurrency_manager.rs @@ -13,7 +13,7 @@ use std::{ fmt::Debug, thread::{self, JoinHandle}, }; -use tracing::Level; +use tracing::Span; /// Provides a thread-safe way to access workflow machines which live exclusively on one thread /// managed by this struct. We could make this generic for any collection of things which need @@ -33,10 +33,11 @@ type MachineMutationSender = Sender type MachineMutationReceiver = Receiver>; /// This is the message sent from the concurrency manager to the dedicated thread in order to /// instantiate a new workflow manager -type MachineCreatorMsg = ( - PollWorkflowTaskQueueResponse, - Sender, -); +struct MachineCreatorMsg { + poll_resp: PollWorkflowTaskQueueResponse, + resp_chan: Sender, + span: Span, +} /// The response to [MachineCreatorMsg], which includes the wf activation and the channel to /// send requests to the newly instantiated workflow manager. type MachineCreatorResponseMsg = Result<(NextWfActivation, MachineMutationSender)>; @@ -67,9 +68,12 @@ impl WorkflowConcurrencyManager { run_id: &str, poll_wf_resp: PollWorkflowTaskQueueResponse, ) -> Result { + let span = debug_span!("create_or_update machines", %run_id); + if self.exists(run_id) { if let Some(history) = poll_wf_resp.history { - let activation = self.access(run_id, |wfm: &mut WorkflowManager| { + let activation = self.access(run_id, move |wfm: &mut WorkflowManager| { + let _enter = span.enter(); wfm.feed_history_from_server(history) })?; Ok(activation) @@ -82,7 +86,11 @@ impl WorkflowConcurrencyManager { // the activation let (resp_send, resp_rcv) = bounded(1); self.machine_creator - .send((poll_wf_resp, resp_send)) + .send(MachineCreatorMsg { + poll_resp: poll_wf_resp, + resp_chan: resp_send, + span, + }) .expect("wfm creation channel can't be dropped if we are inside this method"); let (activation, machine_sender) = resp_rcv .recv() @@ -181,28 +189,34 @@ impl WorkflowConcurrencyManager { maybe_create_chan_msg: Result, ) -> bool { match maybe_create_chan_msg { - Ok((pwtqr, resp_chan)) => match WorkflowManager::new(pwtqr) - .and_then(|mut wfm| Ok((wfm.get_next_activation()?, wfm))) - { - Ok((activation, wfm)) => { - let (machine_sender, machine_rcv) = unbounded(); - machine_rcvs.push((machine_rcv, wfm)); - resp_chan - .send(Ok((activation, machine_sender))) - .expect("wfm create resp rx side can't be dropped"); - } - Err(e) => { - resp_chan - .send(Err(e)) - .expect("wfm create resp rx side can't be dropped"); + Ok(MachineCreatorMsg { + poll_resp, + resp_chan, + span, + }) => { + let _e = span.enter(); + match WorkflowManager::new(poll_resp) + .and_then(|mut wfm| Ok((wfm.get_next_activation()?, wfm))) + { + Ok((activation, wfm)) => { + let (machine_sender, machine_rcv) = unbounded(); + machine_rcvs.push((machine_rcv, wfm)); + resp_chan + .send(Ok((activation, machine_sender))) + .expect("wfm create resp rx side can't be dropped"); + } + Err(e) => { + resp_chan + .send(Err(e)) + .expect("wfm create resp rx side can't be dropped"); + } } - }, + } Err(TryRecvError::Disconnected) => { - event!( - Level::WARN, + warn!( "Sending side of workflow machine creator was dropped. Likely the \ - WorkflowConcurrencyManager was dropped. This indicates a failure to \ - call shutdown." + WorkflowConcurrencyManager was dropped. This indicates a failure to call \ + shutdown." ); return true; } @@ -228,11 +242,7 @@ impl WorkflowConcurrencyManager { // concurrency manager is the signal to this thread that the workflow // manager can be dropped. let wfid = &machine_rcvs[index].1.machines.workflow_id; - event!( - Level::DEBUG, - "Workflow manager thread done with workflow id {}", - wfid - ); + debug!(wfid = %wfid, "Workflow manager thread done",); machine_rcvs.remove(index); } Err(TryRecvError::Empty) => {} diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 52cacd017..45ce89737 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -20,7 +20,6 @@ use crate::{ CoreError, Result, }; use std::sync::mpsc::Sender; -use tracing::Level; /// Implementors can provide new workflow tasks to the SDK. The connection to the server is the real /// implementor. @@ -112,7 +111,6 @@ impl WorkflowManager { /// Should only be called when a workflow has caught up on replay (or is just beginning). It /// will return a workflow activation if one is needed, as well as a bool indicating if there /// are more workflow tasks that need to be performed to replay the remaining history. - #[instrument(skip(self))] pub fn feed_history_from_server(&mut self, hist: History) -> Result { let task_hist = HistoryInfo::new_from_history(&hist, Some(self.current_wf_task_num))?; let task_ct = hist.get_workflow_task_count(None)?; @@ -123,7 +121,7 @@ impl WorkflowManager { let more_activations_needed = task_ct > self.current_wf_task_num; if more_activations_needed { - event!(Level::DEBUG, msg = "More activations needed"); + debug!("More activations needed"); } self.current_wf_task_num += 1;