Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/sdk-core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ pub(crate) enum FailureReason {
Timeout,
NexusOperation(String),
NexusHandlerError(String),
GrpcMessageTooLarge,
}
impl Display for FailureReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand All @@ -651,6 +652,7 @@ impl Display for FailureReason {
FailureReason::Timeout => "timeout".to_owned(),
FailureReason::NexusOperation(op) => format!("operation_{op}"),
FailureReason::NexusHandlerError(op) => format!("handler_error_{op}"),
FailureReason::GrpcMessageTooLarge => "GrpcMessageTooLarge".to_owned(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can't be snake case because it was already like this in the other SDKs? Do we already have released versions with it like that? Otherwise it'd be good to make them all snake case since that's more standard

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah and the other two ones for workflows are NonDeterminismError and WorkflowError and those not snake case

Comment thread
Quinn-With-Two-Ns marked this conversation as resolved.
};
write!(f, "{str}")
}
Expand Down
17 changes: 14 additions & 3 deletions crates/sdk-core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ use crate::{
internal_flags::InternalFlags,
pollers::TrackedPermittedTqResp,
protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage},
telemetry::{VecDisplayer, set_trace_subscriber_for_current_thread},
telemetry::{
VecDisplayer,
metrics::{self, FailureReason},
set_trace_subscriber_for_current_thread,
},
worker::{
LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution,
PostActivateHookData,
Expand Down Expand Up @@ -66,9 +70,8 @@ use temporalio_common::{
remove_from_cache::EvictionReason, workflow_activation_job,
},
workflow_commands::*,
workflow_completion,
workflow_completion::{
Failure, WorkflowActivationCompletion, workflow_activation_completion,
self, Failure, WorkflowActivationCompletion, workflow_activation_completion,
},
},
temporal::api::{
Expand Down Expand Up @@ -134,6 +137,7 @@ pub(crate) struct Workflows {
local_act_mgr: Option<Arc<LocalActivityManager>>,
ever_polled: AtomicBool,
default_versioning_behavior: Option<VersioningBehavior>,
metrics: MetricsContext,
}

pub(crate) struct WorkflowBasics {
Expand Down Expand Up @@ -176,6 +180,7 @@ impl Workflows {
let (fetch_tx, fetch_rx) = unbounded_channel();
let shutdown_tok = basics.shutdown_token.clone();
let task_queue = basics.worker_config.task_queue.clone();
let metrics = basics.metrics.clone();
let default_versioning_behavior = basics.default_versioning_behavior;
let extracted_wft_stream = WFTExtractor::build(
client.clone(),
Expand Down Expand Up @@ -267,6 +272,7 @@ impl Workflows {
local_act_mgr,
ever_polled: AtomicBool::new(false),
default_versioning_behavior,
metrics,
}
}

Expand Down Expand Up @@ -431,6 +437,11 @@ impl Workflows {
);
self.handle_activation_failed(run_id, completion_time, new_outcome)
.await;
self.metrics
.with_new_attrs([metrics::failure_reason(
FailureReason::GrpcMessageTooLarge,
)])
.wf_task_failed();
return Err(e);
}
e => {
Expand Down
26 changes: 24 additions & 2 deletions crates/sdk-core/tests/integ_tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,12 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() {

#[tokio::test]
async fn oversize_grpc_message() {
use crate::common::{NAMESPACE, prom_metrics};
let wf_name = "oversize_grpc_message";
let mut starter = CoreWfStarter::new(wf_name);
// Enable Prometheus metrics for this test and capture the address
let (telemopts, addr, _aborter) = prom_metrics(None);
let runtime = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
let mut starter = CoreWfStarter::new_with_runtime(wf_name, runtime);
starter
.worker_config
.task_types(WorkerTaskTypes::workflow_only());
Expand All @@ -238,7 +242,25 @@ async fn oversize_grpc_message() {
} else {
false
}
}))
}));

// Verify the workflow task failure metric includes the GrpcMessageTooLarge reason
let tq = starter.get_task_queue();
crate::common::eventually(
|| async {
let body = crate::integ_tests::metrics_tests::get_text(format!("http://{addr}/metrics")).await;
if body.contains(&format!(
"temporal_workflow_task_execution_failed{{failure_reason=\"GrpcMessageTooLarge\",namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",task_queue=\"{tq}\"}} 1"
)) {
Ok(())
} else {
Err(())
}
},
Duration::from_secs(2),
)
.await
.unwrap();
}

#[tokio::test]
Expand Down
Loading