Skip to content

Commit

Permalink
Worker poller and ratio options (#200)
Browse files Browse the repository at this point in the history
Fixes #196
  • Loading branch information
cretz committed Mar 7, 2024
1 parent 666ab45 commit ce3f195
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 7 deletions.
7 changes: 6 additions & 1 deletion src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,12 @@ internal partial struct WorkerOptions
public byte use_worker_versioning;

[NativeTypeName("uint32_t")]
public uint max_concurrent_wft_polls;
public uint max_concurrent_workflow_task_polls;

public float nonsticky_to_sticky_poll_ratio;

[NativeTypeName("uint32_t")]
public uint max_concurrent_activity_task_polls;
}

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
Expand Down
9 changes: 6 additions & 3 deletions src/Temporalio/Bridge/OptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,9 @@ internal static class OptionsExtensions
graceful_shutdown_period_millis =
(ulong)options.GracefulShutdownTimeout.TotalMilliseconds,
use_worker_versioning = (byte)(options.UseWorkerVersioning ? 1 : 0),
// TODO: Expose to user
max_concurrent_wft_polls = 5,
max_concurrent_workflow_task_polls = (uint)options.MaxConcurrentWorkflowTaskPolls,
nonsticky_to_sticky_poll_ratio = options.NonStickyToStickyPollRatio,
max_concurrent_activity_task_polls = (uint)options.MaxConcurrentActivityTaskPolls,
};
}

Expand Down Expand Up @@ -448,7 +449,6 @@ internal static class OptionsExtensions
identity_override = scope.ByteArray(options.Identity),
max_cached_workflows = 2,
max_outstanding_workflow_tasks = 2,
max_concurrent_wft_polls = 1,
max_outstanding_activities = 1,
max_outstanding_local_activities = 1,
no_remote_activities = 1,
Expand All @@ -458,6 +458,9 @@ internal static class OptionsExtensions
max_activities_per_second = 0,
max_task_queue_activities_per_second = 0,
graceful_shutdown_period_millis = 0,
max_concurrent_workflow_task_polls = 1,
nonsticky_to_sticky_poll_ratio = 1,
max_concurrent_activity_task_polls = 1,
};
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,9 @@ typedef struct WorkerOptions {
double max_task_queue_activities_per_second;
uint64_t graceful_shutdown_period_millis;
bool use_worker_versioning;
uint32_t max_concurrent_wft_polls;
uint32_t max_concurrent_workflow_task_polls;
float nonsticky_to_sticky_poll_ratio;
uint32_t max_concurrent_activity_task_polls;
} WorkerOptions;

/**
Expand Down
8 changes: 6 additions & 2 deletions src/Temporalio/Bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ pub struct WorkerOptions {
max_task_queue_activities_per_second: f64,
graceful_shutdown_period_millis: u64,
use_worker_versioning: bool,
max_concurrent_wft_polls: u32,
max_concurrent_workflow_task_polls: u32,
nonsticky_to_sticky_poll_ratio: f32,
max_concurrent_activity_task_polls: u32,
}

#[derive(Clone)]
Expand Down Expand Up @@ -498,7 +500,9 @@ impl TryFrom<&WorkerOptions> for temporal_sdk_core::WorkerConfig {
// auto-cancel-activity behavior or shutdown will not occur, so we
// always set it even if 0.
.graceful_shutdown_period(Duration::from_millis(opt.graceful_shutdown_period_millis))
.max_concurrent_wft_polls(opt.max_concurrent_wft_polls as usize)
.max_concurrent_wft_polls(opt.max_concurrent_workflow_task_polls as usize)
.nonsticky_to_sticky_poll_ratio(opt.nonsticky_to_sticky_poll_ratio)
.max_concurrent_at_polls(opt.max_concurrent_activity_task_polls as usize)
.build()
.map_err(|err| anyhow::anyhow!(err))
}
Expand Down
22 changes: 22 additions & 0 deletions src/Temporalio/Worker/TemporalWorkerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,28 @@ public TemporalWorkerOptions()
/// </summary>
public TimeSpan StickyQueueScheduleToStartTimeout { get; set; } = TimeSpan.FromSeconds(10);

/// <summary>
/// Gets or sets the maximum number of concurrent poll workflow task requests we will
/// perform at a time on this worker's task queue. Default is 5.
/// </summary>
public int MaxConcurrentWorkflowTaskPolls { get; set; } = 5;

/// <summary>
/// Gets or sets the sticky poll ratio. <see cref="MaxConcurrentWorkflowTaskPolls" /> times
/// this value will be the number of max pollers that will be allowed for the non-sticky
/// queue when sticky tasks are enabled. If both defaults are used, the sticky queue will
/// allow 4 max pollers while the non-sticky queue will allow 1. The minimum for either
/// poller is 1, so if <see cref="MaxConcurrentWorkflowTaskPolls" /> is 1 and sticky queues
/// are enabled, there will be 2 concurrent polls. Default is 0.2.
/// </summary>
public float NonStickyToStickyPollRatio { get; set; } = 0.2F;

/// <summary>
/// Gets or sets the maximum number of concurrent poll activity task requests we will
/// perform at a time on this worker's task queue. Default is 5.
/// </summary>
public int MaxConcurrentActivityTaskPolls { get; set; } = 5;

/// <summary>
/// Gets or sets a value indicating whether deadlock detection will be disabled for all
/// workflows. If unset, this value defaults to true only if the <c>TEMPORAL_DEBUG</c>
Expand Down

0 comments on commit ce3f195

Please sign in to comment.