Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ license-file = "LICENSE.txt"
[dependencies]
anyhow = "1.0"
async-trait = "0.1"
base64 = "0.13"
crossbeam = "0.8"
dashmap = "4.0"
derive_more = "0.99"
displaydoc = "0.1"
env_logger = "0.8"
futures = "0.3"
log = "0.4"
itertools = "0.10"
once_cell = "1.5"
opentelemetry-jaeger = "0.11"
opentelemetry = "0.12"
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,18 @@ Any error which is returned from a public interface should be well-typed, and we

Errors returned from things only used in testing are free to use
[anyhow](https://github.com/dtolnay/anyhow) for less verbosity.


## Debugging
The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable
it for a test, insert the below snippet at the start of the test. By default, tracing data is output
to stdout in a (reasonably) pretty manner, and to a Jaeger instance if one exists.

```rust
core_tracing::tracing_init();
let s = info_span!("Test start");
let _enter = s.enter();
```

To run the Jaeger instance:
`docker run --rm -p6831:6831/udp -p6832:6832/udp -p16686:16686 jaegertracing/all-in-one:latest`
67 changes: 50 additions & 17 deletions src/core_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,57 @@
use itertools::Itertools;
use once_cell::sync::OnceCell;
use opentelemetry_jaeger::Uninstall;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use std::collections::VecDeque;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

static TRACING_INIT: OnceCell<Uninstall> = OnceCell::new();
const TRACING_ENABLE_ENV_VAR: &str = "TEMPORAL_CORE_TRACING";

pub(crate) fn tracing_init() {
let _ = env_logger::try_init();
if std::env::var(TRACING_ENABLE_ENV_VAR).is_ok() {
TRACING_INIT.get_or_init(|| {
let (tracer, uninstall) = opentelemetry_jaeger::new_pipeline()
.with_service_name("coresdk")
.install()
.unwrap();
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(opentelemetry)
.try_init()
.unwrap();
uninstall
});
/// Initialize tracing subscribers and output. Core will not call this itself, it exists here so
/// that consumers and tests have an easy way to initialize tracing.
pub fn tracing_init() {
TRACING_INIT.get_or_init(|| {
let (tracer, uninstall) = opentelemetry_jaeger::new_pipeline()
.with_service_name("coresdk")
.install()
.unwrap();
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let filter_layer = EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("info"))
.unwrap();
let fmt_layer = tracing_subscriber::fmt::layer().with_target(false).pretty();

tracing_subscriber::registry()
.with(opentelemetry)
.with(filter_layer)
.with(fmt_layer)
.try_init()
.unwrap();
uninstall
});
}

/// A trait for using [Display] on the contents of vecs, etc, which don't implement it.
///
/// Dislike this, but, there doesn't seem to be a great alternative. Calling itertools format
/// inline in an `event!` macro can panic because it gets evaluated twice somehow.
pub(crate) trait VecDisplayer {
fn display(&self) -> String;
}

impl<T> VecDisplayer for Vec<T>
where
T: std::fmt::Display,
{
fn display(&self) -> String {
format!("[{}]", self.iter().format(","))
}
}

impl<T> VecDisplayer for VecDeque<T>
where
T: std::fmt::Display,
{
fn display(&self) -> String {
format!("[{}]", self.iter().format(","))
}
}
65 changes: 44 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,26 @@ mod workflow;
#[cfg(test)]
mod test_help;

pub use core_tracing::tracing_init;
pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions};
pub use url::Url;

use crate::machines::WFMachinesError;
use crate::workflow::WorkflowManager;
use crate::{
machines::{InconvertibleCommandError, WFCommand},
machines::{InconvertibleCommandError, WFCommand, WFMachinesError},
protos::{
coresdk::{
task_completion, wf_activation_completion::Status, Task, TaskCompletion,
WfActivationCompletion, WfActivationSuccess,
},
temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse,
},
protosext::HistoryInfoError,
protosext::{fmt_task_token, HistoryInfoError},
workflow::{NextWfActivation, WorkflowConcurrencyManager},
};
use crossbeam::queue::SegQueue;
use dashmap::DashMap;
use std::fmt::{Display, Formatter};
use std::{
convert::TryInto,
fmt::Debug,
Expand All @@ -49,7 +51,7 @@ use std::{
};
use tokio::runtime::Runtime;
use tonic::codegen::http::uri::InvalidUri;
use tracing::Level;
use tracing::Span;

/// A result alias having [CoreError] as the error type
pub type Result<T, E = CoreError> = std::result::Result<T, E>;
Expand Down Expand Up @@ -94,7 +96,6 @@ pub struct CoreInitOptions {
/// * Will panic if called from within an async context, as it will construct a runtime and you
/// cannot construct a runtime from within a runtime.
pub fn init(opts: CoreInitOptions) -> Result<impl Core> {
core_tracing::tracing_init();
let runtime = Runtime::new().map_err(CoreError::TokioInitError)?;
// Initialize server client
let work_provider = runtime.block_on(opts.gateway_opts.connect())?;
Expand Down Expand Up @@ -143,19 +144,30 @@ struct PendingActivation {
task_token: Vec<u8>,
}

impl Display for PendingActivation {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PendingActivation(run_id: {}, task_token: {})",
&self.run_id,
fmt_task_token(&self.task_token)
)
}
}

impl<WP> Core for CoreSDK<WP>
where
WP: ServerGatewayApis + Send + Sync,
{
#[instrument(skip(self))]
#[instrument(skip(self), fields(pending_activation))]
fn poll_task(&self, task_queue: &str) -> Result<Task> {
// We must first check if there are pending workflow tasks for workflows that are currently
// replaying, and issue those tasks before bothering the server.
if let Some(pa) = self.pending_activations.pop() {
event!(Level::DEBUG, msg = "Applying pending activations", ?pa);
let next_activation = self
.workflow_machines
.access(&pa.run_id, |mgr| mgr.get_next_activation())?;
Span::current().record("pending_activation", &format!("{}", &pa).as_str());

let next_activation =
self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())?;
let task_token = pa.task_token.clone();
if next_activation.more_activations_needed {
self.pending_activations.push(pa);
Expand All @@ -175,10 +187,9 @@ where
.runtime
.block_on(self.server_gateway.poll_workflow_task(task_queue))?;
let task_token = work.task_token.clone();
event!(
Level::DEBUG,
msg = "Received workflow task from server",
?task_token
debug!(
task_token = %fmt_task_token(&task_token),
"Received workflow task from server"
);

let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?;
Expand Down Expand Up @@ -214,9 +225,9 @@ where
match wfstatus {
Status::Successful(success) => {
self.push_lang_commands(&run_id, success)?;
let commands = self
.workflow_machines
.access(&run_id, |mgr| Ok(mgr.machines.get_commands()))?;
let commands = self.access_wf_machine(&run_id, move |mgr| {
Ok(mgr.machines.get_commands())
})?;
self.runtime.block_on(
self.server_gateway
.complete_workflow_task(task_token, commands),
Expand Down Expand Up @@ -285,13 +296,28 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
.into_iter()
.map(|c| c.try_into().map_err(Into::into))
.collect::<Result<Vec<_>>>()?;
self.workflow_machines.access(run_id, |mgr| {
self.access_wf_machine(run_id, move |mgr| {
mgr.command_sink.send(cmds)?;
mgr.machines.iterate_machines()?;
Ok(())
})?;
Ok(())
}

/// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing
/// span to the wf machines thread.
fn access_wf_machine<F, Fout>(&self, run_id: &str, mutator: F) -> Result<Fout>
where
F: FnOnce(&mut WorkflowManager) -> Result<Fout> + Send + 'static,
Fout: Send + Debug + 'static,
{
let curspan = Span::current();
let mutator = move |wfm: &mut WorkflowManager| {
let _e = curspan.enter();
mutator(wfm)
};
self.workflow_machines.access(run_id, mutator)
}
}

/// The error type returned by interactions with [Core]
Expand Down Expand Up @@ -537,9 +563,6 @@ mod test {

#[test]
fn workflow_update_random_seed_on_workflow_reset() {
let s = span!(Level::DEBUG, "Test start", t = "bridge");
let _enter = s.enter();

let wfid = "fake_wf_id";
let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B";
let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156";
Expand Down
2 changes: 1 addition & 1 deletion src/machines/complete_workflow_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fsm! {
--> CompleteWorkflowCommandRecorded;
}

#[derive(Debug)]
#[derive(Debug, derive_more::Display)]
pub(super) enum CompleteWFCommand {
AddCommand(Command),
}
Expand Down
25 changes: 12 additions & 13 deletions src/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) mod test_help;
pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines};

use crate::{
core_tracing::VecDisplayer,
machines::workflow_machines::MachineResponse,
protos::{
coresdk::{self, command::Variant, wf_activation_job},
Expand All @@ -57,7 +58,6 @@ use std::{
convert::{TryFrom, TryInto},
fmt::{Debug, Display},
};
use tracing::Level;

pub(crate) type ProtoCommand = Command;

Expand Down Expand Up @@ -155,7 +155,7 @@ where
<SM as StateMachine>::Event: TryFrom<HistoryEvent>,
<SM as StateMachine>::Event: TryFrom<CommandType>,
WFMachinesError: From<<<SM as StateMachine>::Event as TryFrom<HistoryEvent>>::Error>,
<SM as StateMachine>::Command: Debug,
<SM as StateMachine>::Command: Debug + Display,
<SM as StateMachine>::State: Display,
<SM as StateMachine>::Error: Into<WFMachinesError> + 'static + Send + Sync,
{
Expand All @@ -164,12 +164,11 @@ where
}

fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError> {
event!(
Level::DEBUG,
msg = "handling command",
?command_type,
debug!(
command_type = ?command_type,
machine_name = %self.name(),
state = %self.state()
state = %self.state(),
"handling command"
);
if let Ok(converted_command) = command_type.try_into() {
match self.on_event_mut(converted_command) {
Expand All @@ -189,18 +188,18 @@ where
event: &HistoryEvent,
has_next_event: bool,
) -> Result<Vec<MachineResponse>, WFMachinesError> {
event!(
Level::DEBUG,
msg = "handling event",
%event,
debug!(
event = %event,
machine_name = %self.name(),
state = %self.state()
state = %self.state(),
"handling event"
);
let converted_event = event.clone().try_into()?;
match self.on_event_mut(converted_event) {
Ok(c) => {
if !c.is_empty() {
event!(Level::DEBUG, msg = "Machine produced commands", ?c, state = %self.state());
debug!(commands = %c.display(), state = %self.state(),
"Machine produced commands");
}
let mut machine_responses = vec![];
for cmd in c {
Expand Down
5 changes: 2 additions & 3 deletions src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::{
mpsc::{self, Receiver, Sender},
Arc,
};
use tracing::Level;

/// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control
/// over when commands are returned than a normal workflow would.
Expand Down Expand Up @@ -81,7 +80,7 @@ where
Fut: Future<Output = ()>,
{
fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) {
event!(Level::DEBUG, msg = "Test WF driver start called");
debug!("Test WF driver start called");
}

fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand> {
Expand Down Expand Up @@ -121,7 +120,7 @@ where
}
}

event!(Level::DEBUG, msg = "Test wf driver emitting", ?emit_these);
debug!(emit_these = ?emit_these, "Test wf driver emitting");

emit_these
}
Expand Down
2 changes: 1 addition & 1 deletion src/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fsm! {
CancelTimerCommandSent --(TimerCanceled) --> Canceled;
}

#[derive(Debug)]
#[derive(Debug, derive_more::Display)]
pub(super) enum TimerMachineCommand {
Complete,
Canceled,
Expand Down
Loading