Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,14 @@ pub enum CompleteWfError {
/// The run associated with the completion
run_id: String,
},
/// Workflows have not been enabled on this worker.
#[error("Workflows are not enabled on this worker")]
WorkflowNotEnabled,
}

/// Errors thrown by [crate::Worker::complete_activity_task]
#[derive(thiserror::Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum CompleteActivityError {
/// Lang SDK sent us a malformed activity completion. This likely means a bug in the lang sdk.
#[error("Lang SDK sent us a malformed activity completion ({reason}): {completion:?}")]
Expand All @@ -53,6 +57,9 @@ pub enum CompleteActivityError {
/// The completion, which may not be included to avoid unnecessary copies.
completion: Option<ActivityExecutionResult>,
},
/// Activities have not been enabled on this worker.
#[error("Activities are not enabled on this worker")]
ActivityNotEnabled,
}

/// Errors thrown by [crate::Worker::complete_nexus_task]
Expand Down
4 changes: 2 additions & 2 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ pub trait Worker: Send + Sync {
/// [Worker::complete_workflow_activation] and [Worker::complete_activity_task] for those
/// workflows & activities until they are done. At that point, the lang SDK can end the process,
/// or drop the [Worker] instance via [Worker::finalize_shutdown], which will close the
/// connection and free resources. If you have set [WorkerConfig::no_remote_activities], you may
/// skip calling [Worker::poll_activity_task].
/// connection and free resources. If you have set [WorkerConfig::task_types] to exclude
/// [WorkerTaskTypes::activity_only()], you may skip calling [Worker::poll_activity_task].
///
/// Lang implementations should use [Worker::initiate_shutdown] followed by
/// [Worker::finalize_shutdown].
Expand Down
81 changes: 77 additions & 4 deletions crates/common/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,59 @@ use std::{
time::Duration,
};

/// Specifies which task types a worker will poll for.
///
/// Workers can be configured to handle any combination of workflows, activities, and nexus operations.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct WorkerTaskTypes {
pub enable_workflows: bool,
pub enable_activities: bool,
pub enable_nexus: bool,
}

impl WorkerTaskTypes {
/// Check if no task types are enabled
pub fn is_empty(&self) -> bool {
!self.enable_workflows && !self.enable_activities && !self.enable_nexus
}

/// Create a config with all task types enabled
pub fn all() -> WorkerTaskTypes {
WorkerTaskTypes {
enable_workflows: true,
enable_activities: true,
enable_nexus: true,
}
}

/// Create a config with only workflow tasks enabled
pub fn workflow_only() -> WorkerTaskTypes {
WorkerTaskTypes {
enable_workflows: true,
enable_activities: false,
enable_nexus: false,
}
}

/// Create a config with only activity tasks enabled
pub fn activity_only() -> WorkerTaskTypes {
WorkerTaskTypes {
enable_workflows: false,
enable_activities: true,
enable_nexus: false,
}
}

/// Create a config with only nexus tasks enabled
pub fn nexus_only() -> WorkerTaskTypes {
WorkerTaskTypes {
enable_workflows: false,
enable_activities: false,
enable_nexus: true,
}
}
}

/// Defines per-worker configuration options
#[derive(Clone, derive_builder::Builder)]
#[builder(setter(into), build_fn(validate = "Self::validate"))]
Expand Down Expand Up @@ -64,10 +117,10 @@ pub struct WorkerConfig {
/// worker's task queue
#[builder(default = "PollerBehavior::SimpleMaximum(5)")]
pub nexus_task_poller_behavior: PollerBehavior,
/// If set to true this worker will only handle workflow tasks and local activities, it will not
/// poll for activity tasks.
#[builder(default = "false")]
pub no_remote_activities: bool,
/// Specifies which task types this worker will poll for.
///
/// Note: At least one task type must be specified or the worker will fail validation.
pub task_types: WorkerTaskTypes,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figure just three bools is simplest, even if you have to make a hash internally, but I guess not a big deal. Also at this point it could be argued it shouldn't have a default and should be required to be set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanna say having default be all by default matches existing behavior today, but then again not sure it matters much since it's not user facing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong opinion on whether we should have a default since every SDK needs to start setting this anyways and the default therefore will never be used if done properly by SDKs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point, I can remove the default to force SDKs to be explicit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I'm ok w/ defaulted too, just that I hope SDKs will ignore that and always set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there somewhere in the client code that has to be altered to properly account for this as part of the worker dedupe check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Planning on keeping that PR separate, #1059, since that feature introduces different behavior, whereas this PR introduces a new feature, but maintains existing behavior, and to make review easier

/// How long a workflow task is allowed to sit on the sticky queue before it is timed out
/// and moved to the non-sticky queue where it may be picked up by any worker.
#[builder(default = "Duration::from_secs(10)")]
Expand Down Expand Up @@ -218,6 +271,15 @@ impl WorkerConfigBuilder {
}

fn validate(&self) -> Result<(), String> {
let task_types = self
.task_types
.as_ref()
.cloned()
.unwrap_or_else(WorkerTaskTypes::all);
if task_types.is_empty() {
return Err("At least one task type must be enabled in `task_types`".to_owned());
}

if let Some(b) = self.workflow_task_poller_behavior.as_ref() {
b.validate()?
}
Expand Down Expand Up @@ -249,6 +311,17 @@ impl WorkerConfigBuilder {
return Err("`max_outstanding_nexus_tasks` must be > 0".to_owned());
}

// Validate workflow cache is consistent with task_types
if !task_types.enable_workflows
&& let Some(cache) = self.max_cached_workflows.as_ref()
&& *cache > 0
{
return Err(
"Cannot have `max_cached_workflows` > 0 when workflows are not enabled in `task_types`"
.to_owned(),
);
}

if let Some(cache) = self.max_cached_workflows.as_ref()
&& *cache > 0
{
Expand Down
121 changes: 121 additions & 0 deletions crates/common/tests/worker_task_types_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use temporalio_common::worker::{WorkerConfigBuilder, WorkerTaskTypes, WorkerVersioningStrategy};

fn default_versioning_strategy() -> WorkerVersioningStrategy {
WorkerVersioningStrategy::None {
build_id: String::new(),
}
}

#[test]
fn test_default_configuration_polls_all_types() {
let config = WorkerConfigBuilder::default()
.namespace("default")
.task_queue("test-queue")
.versioning_strategy(default_versioning_strategy())
.task_types(WorkerTaskTypes::all())
.build()
.expect("Failed to build default config");

let effective = &config.task_types;
assert!(
effective.enable_workflows,
"Should poll workflows by default"
);
assert!(
effective.enable_activities,
"Should poll activities by default"
);
assert!(effective.enable_nexus, "Should poll nexus by default");
}

#[test]
fn test_empty_task_types_fails_validation() {
let result = WorkerConfigBuilder::default()
.namespace("default")
.task_queue("test-queue")
.versioning_strategy(default_versioning_strategy())
.task_types(WorkerTaskTypes {
enable_workflows: false,
enable_activities: false,
enable_nexus: false,
})
.build();

assert!(result.is_err(), "Empty task_types should fail validation");
let err = result.err().unwrap().to_string();
assert!(
err.contains("At least one task type"),
"Error should mention task types: {err}",
);
}

#[test]
fn test_workflow_cache_without_workflows_fails() {
let result = WorkerConfigBuilder::default()
.namespace("default")
.task_queue("test-queue")
.versioning_strategy(default_versioning_strategy())
.task_types(WorkerTaskTypes::activity_only())
.max_cached_workflows(10usize)
.build();

assert!(
result.is_err(),
"Workflow cache > 0 without workflows should fail"
);
let err = result.err().unwrap().to_string();
assert!(
err.contains("max_cached_workflows"),
"Error should mention max_cached_workflows: {err}",
);
}

#[test]
fn test_all_combinations() {
let combinations = [
(WorkerTaskTypes::workflow_only(), "workflows only"),
(WorkerTaskTypes::activity_only(), "activities only"),
(WorkerTaskTypes::nexus_only(), "nexus only"),
(
WorkerTaskTypes {
enable_workflows: true,
enable_activities: true,
enable_nexus: false,
},
"workflows + activities",
),
(
WorkerTaskTypes {
enable_workflows: true,
enable_activities: false,
enable_nexus: true,
},
"workflows + nexus",
),
(
WorkerTaskTypes {
enable_workflows: false,
enable_activities: true,
enable_nexus: true,
},
"activities + nexus",
),
(WorkerTaskTypes::all(), "all types"),
];

for (task_types, description) in combinations {
let config = WorkerConfigBuilder::default()
.namespace("default")
.task_queue("test-queue")
.versioning_strategy(default_versioning_strategy())
.task_types(task_types)
.build()
.unwrap_or_else(|e| panic!("Failed to build config for {description}: {e:?}"));

let effective = config.task_types;
assert_eq!(
effective, task_types,
"Effective types should match for {description}",
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,12 @@ typedef struct TemporalCoreTunerHolder {
struct TemporalCoreSlotSupplier nexus_task_slot_supplier;
} TemporalCoreTunerHolder;

typedef struct TemporalCoreWorkerTaskTypes {
bool enable_workflows;
bool enable_activities;
bool enable_nexus;
} TemporalCoreWorkerTaskTypes;

typedef struct TemporalCorePollerBehaviorSimpleMaximum {
uintptr_t simple_maximum;
} TemporalCorePollerBehaviorSimpleMaximum;
Expand Down Expand Up @@ -765,7 +771,7 @@ typedef struct TemporalCoreWorkerOptions {
struct TemporalCoreByteArrayRef identity_override;
uint32_t max_cached_workflows;
struct TemporalCoreTunerHolder tuner;
bool no_remote_activities;
struct TemporalCoreWorkerTaskTypes task_types;
Copy link
Member

@cretz cretz Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be a whole separate struct IMO just for the few fields within, but it's mostly harmless

uint64_t sticky_queue_schedule_to_start_timeout_millis;
uint64_t max_heartbeat_throttle_interval_millis;
uint64_t default_heartbeat_throttle_interval_millis;
Expand Down
23 changes: 21 additions & 2 deletions crates/sdk-core-c-bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct WorkerOptions {
pub identity_override: ByteArrayRef,
pub max_cached_workflows: u32,
pub tuner: TunerHolder,
pub no_remote_activities: bool,
pub task_types: WorkerTaskTypes,
pub sticky_queue_schedule_to_start_timeout_millis: u64,
pub max_heartbeat_throttle_interval_millis: u64,
pub default_heartbeat_throttle_interval_millis: u64,
Expand All @@ -58,6 +58,23 @@ pub struct WorkerOptions {
pub nondeterminism_as_workflow_fail_for_types: ByteArrayRefArray,
}

#[repr(C)]
pub struct WorkerTaskTypes {
pub enable_workflows: bool,
pub enable_activities: bool,
pub enable_nexus: bool,
}

impl From<&WorkerTaskTypes> for temporalio_common::worker::WorkerTaskTypes {
fn from(t: &WorkerTaskTypes) -> Self {
Self {
enable_workflows: t.enable_workflows,
enable_activities: t.enable_activities,
enable_nexus: t.enable_nexus,
}
}
}

#[repr(C)]
pub struct PollerBehaviorSimpleMaximum {
pub simple_maximum: usize,
Expand Down Expand Up @@ -1183,7 +1200,9 @@ impl TryFrom<&WorkerOptions> for temporalio_sdk_core::WorkerConfig {
.client_identity_override(opt.identity_override.to_option_string())
.max_cached_workflows(opt.max_cached_workflows as usize)
.tuner(Arc::new(converted_tuner))
.no_remote_activities(opt.no_remote_activities)
.task_types(temporalio_common::worker::WorkerTaskTypes::from(
&opt.task_types,
))
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
opt.sticky_queue_schedule_to_start_timeout_millis,
))
Expand Down
4 changes: 2 additions & 2 deletions crates/sdk-core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use temporalio_common::{
},
test_utils::start_timer_cmd,
},
worker::PollerBehavior,
worker::{PollerBehavior, WorkerTaskTypes},
};
use tokio::{join, time::sleep};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -725,7 +725,7 @@ async fn no_eager_activities_requested_when_worker_options_disable_it(
mock.worker_cfg(|wc| {
wc.max_cached_workflows = 2;
if reason == "no_remote" {
wc.no_remote_activities = true;
wc.task_types = WorkerTaskTypes::workflow_only();
} else {
wc.max_task_queue_activities_per_second = Some(1.0);
}
Expand Down
Loading
Loading