From b8632c1c3e51c4a2eb8972f0c03973224e5beead Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 27 Nov 2025 18:30:30 -0800 Subject: [PATCH 1/3] Emit GrpcMessageTooLarge as failure_reason for WFT --- crates/sdk-core/src/telemetry/metrics.rs | 2 ++ crates/sdk-core/src/worker/workflow/mod.rs | 30 ++++++++----------- .../tests/integ_tests/worker_tests.rs | 26 ++++++++++++++-- 3 files changed, 39 insertions(+), 19 deletions(-) diff --git a/crates/sdk-core/src/telemetry/metrics.rs b/crates/sdk-core/src/telemetry/metrics.rs index 8c06be345..270518226 100644 --- a/crates/sdk-core/src/telemetry/metrics.rs +++ b/crates/sdk-core/src/telemetry/metrics.rs @@ -642,6 +642,7 @@ pub(crate) enum FailureReason { Timeout, NexusOperation(String), NexusHandlerError(String), + GrpcMessageTooLarge, } impl Display for FailureReason { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -651,6 +652,7 @@ impl Display for FailureReason { FailureReason::Timeout => "timeout".to_owned(), FailureReason::NexusOperation(op) => format!("operation_{op}"), FailureReason::NexusHandlerError(op) => format!("handler_error_{op}"), + FailureReason::GrpcMessageTooLarge => "GrpcMessageTooLarge".to_owned(), }; write!(f, "{str}") } diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index 4915c4e4d..f1c7ab8df 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -15,16 +15,10 @@ pub(crate) use driven_workflow::DrivenWorkflow; pub(crate) use history_update::HistoryUpdate; use crate::{ - MetricsContext, - abstractions::{ + MetricsContext, abstractions::{ MeteredPermitDealer, TrackedOwnedMeteredSemPermit, UsedMeteredSemPermit, dbg_panic, take_cell::TakeCell, - }, - internal_flags::InternalFlags, - pollers::TrackedPermittedTqResp, - protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage}, - telemetry::{VecDisplayer, set_trace_subscriber_for_current_thread}, - worker::{ + }, internal_flags::InternalFlags, pollers::TrackedPermittedTqResp, protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage}, telemetry::{VecDisplayer, metrics::{self, FailureReason}, set_trace_subscriber_for_current_thread}, worker::{ LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution, PostActivateHookData, activities::{ActivitiesFromWFTsHandle, LocalActivityManager}, @@ -37,7 +31,7 @@ use crate::{ wft_poller::validate_wft, workflow_stream::{LocalInput, LocalInputs, WFStream}, }, - }, + } }; use anyhow::anyhow; use futures_util::{Stream, StreamExt, future::abortable, stream, stream::BoxStream}; @@ -57,8 +51,7 @@ use std::{ }; use temporalio_client::MESSAGE_TOO_LARGE_KEY; use temporalio_common::{ - errors::{CompleteWfError, PollError}, - protos::{ + errors::{CompleteWfError, PollError}, protos::{ TaskToken, coresdk::{ workflow_activation::{ @@ -66,10 +59,7 @@ use temporalio_common::{ remove_from_cache::EvictionReason, workflow_activation_job, }, workflow_commands::*, - workflow_completion, - workflow_completion::{ - Failure, WorkflowActivationCompletion, workflow_activation_completion, - }, + workflow_completion::{self, Failure, WorkflowActivationCompletion, workflow_activation_completion}, }, temporal::api::{ command::v1::{Command as ProtoCommand, Command, command::Attributes}, @@ -84,8 +74,7 @@ use temporalio_common::{ taskqueue::v1::StickyExecutionAttributes, workflowservice::v1::{PollActivityTaskQueueResponse, get_system_info_response}, }, - }, - worker::{ActivitySlotKind, WorkerConfig, WorkflowSlotKind}, + }, worker::{ActivitySlotKind, WorkerConfig, WorkflowSlotKind} }; use tokio::{ sync::{ @@ -134,6 +123,7 @@ pub(crate) struct Workflows { local_act_mgr: Option>, ever_polled: AtomicBool, default_versioning_behavior: Option, + metrics: MetricsContext, } pub(crate) struct WorkflowBasics { @@ -176,6 +166,7 @@ impl Workflows { let (fetch_tx, fetch_rx) = unbounded_channel(); let shutdown_tok = basics.shutdown_token.clone(); let task_queue = basics.worker_config.task_queue.clone(); + let metrics = basics.metrics.clone(); let default_versioning_behavior = basics.default_versioning_behavior; let extracted_wft_stream = WFTExtractor::build( client.clone(), @@ -267,6 +258,7 @@ impl Workflows { local_act_mgr, ever_polled: AtomicBool::new(false), default_versioning_behavior, + metrics, } } @@ -431,6 +423,10 @@ impl Workflows { ); self.handle_activation_failed(run_id, completion_time, new_outcome) .await; + self.metrics.with_new_attrs([metrics::failure_reason( + FailureReason::GrpcMessageTooLarge + )]) + .wf_task_failed(); return Err(e); } e => { diff --git a/crates/sdk-core/tests/integ_tests/worker_tests.rs b/crates/sdk-core/tests/integ_tests/worker_tests.rs index 6c9f60c95..a59cf9664 100644 --- a/crates/sdk-core/tests/integ_tests/worker_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_tests.rs @@ -210,8 +210,12 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() { #[tokio::test] async fn oversize_grpc_message() { + use crate::common::{prom_metrics, ANY_PORT, NAMESPACE}; let wf_name = "oversize_grpc_message"; - let mut starter = CoreWfStarter::new(wf_name); + // Enable Prometheus metrics for this test and capture the address + let (telemopts, addr, _aborter) = prom_metrics(None); + let runtime = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); + let mut starter = CoreWfStarter::new_with_runtime(wf_name, runtime); starter .worker_config .task_types(WorkerTaskTypes::workflow_only()); @@ -238,7 +242,25 @@ async fn oversize_grpc_message() { } else { false } - })) + })); + + // Verify the workflow task failure metric includes the GrpcMessageTooLarge reason + let tq = starter.get_task_queue(); + crate::common::eventually( + || async { + let body = crate::integ_tests::metrics_tests::get_text(format!("http://{addr}/metrics")).await; + if body.contains(&format!( + "temporal_workflow_task_execution_failed{{failure_reason=\"GrpcMessageTooLarge\",namespace=\"{NAMESPACE}\",service_name=\"temporal-core-sdk\",task_queue=\"{tq}\"}} 1" + )) { + Ok(()) + } else { + Err(()) + } + }, + Duration::from_secs(2), + ) + .await + .unwrap(); } #[tokio::test] From 60bdf6c8a5674d959610857232e41dbb358e4933 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Mon, 1 Dec 2025 09:19:36 -0800 Subject: [PATCH 2/3] Run cargo fmt --- crates/sdk-core/src/worker/workflow/mod.rs | 35 +++++++++++++------ .../tests/integ_tests/worker_tests.rs | 2 +- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/crates/sdk-core/src/worker/workflow/mod.rs b/crates/sdk-core/src/worker/workflow/mod.rs index f1c7ab8df..32906e60f 100644 --- a/crates/sdk-core/src/worker/workflow/mod.rs +++ b/crates/sdk-core/src/worker/workflow/mod.rs @@ -15,10 +15,20 @@ pub(crate) use driven_workflow::DrivenWorkflow; pub(crate) use history_update::HistoryUpdate; use crate::{ - MetricsContext, abstractions::{ + MetricsContext, + abstractions::{ MeteredPermitDealer, TrackedOwnedMeteredSemPermit, UsedMeteredSemPermit, dbg_panic, take_cell::TakeCell, - }, internal_flags::InternalFlags, pollers::TrackedPermittedTqResp, protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage}, telemetry::{VecDisplayer, metrics::{self, FailureReason}, set_trace_subscriber_for_current_thread}, worker::{ + }, + internal_flags::InternalFlags, + pollers::TrackedPermittedTqResp, + protosext::{ValidPollWFTQResponse, protocol_messages::IncomingProtocolMessage}, + telemetry::{ + VecDisplayer, + metrics::{self, FailureReason}, + set_trace_subscriber_for_current_thread, + }, + worker::{ LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution, PostActivateHookData, activities::{ActivitiesFromWFTsHandle, LocalActivityManager}, @@ -31,7 +41,7 @@ use crate::{ wft_poller::validate_wft, workflow_stream::{LocalInput, LocalInputs, WFStream}, }, - } + }, }; use anyhow::anyhow; use futures_util::{Stream, StreamExt, future::abortable, stream, stream::BoxStream}; @@ -51,7 +61,8 @@ use std::{ }; use temporalio_client::MESSAGE_TOO_LARGE_KEY; use temporalio_common::{ - errors::{CompleteWfError, PollError}, protos::{ + errors::{CompleteWfError, PollError}, + protos::{ TaskToken, coresdk::{ workflow_activation::{ @@ -59,7 +70,9 @@ use temporalio_common::{ remove_from_cache::EvictionReason, workflow_activation_job, }, workflow_commands::*, - workflow_completion::{self, Failure, WorkflowActivationCompletion, workflow_activation_completion}, + workflow_completion::{ + self, Failure, WorkflowActivationCompletion, workflow_activation_completion, + }, }, temporal::api::{ command::v1::{Command as ProtoCommand, Command, command::Attributes}, @@ -74,7 +87,8 @@ use temporalio_common::{ taskqueue::v1::StickyExecutionAttributes, workflowservice::v1::{PollActivityTaskQueueResponse, get_system_info_response}, }, - }, worker::{ActivitySlotKind, WorkerConfig, WorkflowSlotKind} + }, + worker::{ActivitySlotKind, WorkerConfig, WorkflowSlotKind}, }; use tokio::{ sync::{ @@ -423,10 +437,11 @@ impl Workflows { ); self.handle_activation_failed(run_id, completion_time, new_outcome) .await; - self.metrics.with_new_attrs([metrics::failure_reason( - FailureReason::GrpcMessageTooLarge - )]) - .wf_task_failed(); + self.metrics + .with_new_attrs([metrics::failure_reason( + FailureReason::GrpcMessageTooLarge, + )]) + .wf_task_failed(); return Err(e); } e => { diff --git a/crates/sdk-core/tests/integ_tests/worker_tests.rs b/crates/sdk-core/tests/integ_tests/worker_tests.rs index a59cf9664..a7c02d8c2 100644 --- a/crates/sdk-core/tests/integ_tests/worker_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_tests.rs @@ -210,7 +210,7 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() { #[tokio::test] async fn oversize_grpc_message() { - use crate::common::{prom_metrics, ANY_PORT, NAMESPACE}; + use crate::common::{ANY_PORT, NAMESPACE, prom_metrics}; let wf_name = "oversize_grpc_message"; // Enable Prometheus metrics for this test and capture the address let (telemopts, addr, _aborter) = prom_metrics(None); From cdf0c95b9500940c340d9730221b0761f549d895 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 2 Dec 2025 10:29:51 -0800 Subject: [PATCH 3/3] Fix other lint errors --- crates/sdk-core/tests/integ_tests/worker_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sdk-core/tests/integ_tests/worker_tests.rs b/crates/sdk-core/tests/integ_tests/worker_tests.rs index a7c02d8c2..cb780aeb1 100644 --- a/crates/sdk-core/tests/integ_tests/worker_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_tests.rs @@ -210,7 +210,7 @@ async fn resource_based_few_pollers_guarantees_non_sticky_poll() { #[tokio::test] async fn oversize_grpc_message() { - use crate::common::{ANY_PORT, NAMESPACE, prom_metrics}; + use crate::common::{NAMESPACE, prom_metrics}; let wf_name = "oversize_grpc_message"; // Enable Prometheus metrics for this test and capture the address let (telemopts, addr, _aborter) = prom_metrics(None);