Skip to content

Commit

Permalink
Introduce flags for setting worker options such as task poller counts…
Browse files Browse the repository at this point in the history
… and concurrent execution limits (#31)
  • Loading branch information
aromanovich committed Nov 9, 2023
1 parent 227480a commit 71bf631
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 6 deletions.
40 changes: 40 additions & 0 deletions cmd/cmdoptions/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package cmdoptions

import (
"strconv"

"github.com/spf13/pflag"
)

// WorkerOptions for setting up worker parameters
type WorkerOptions struct {
MaxConcurrentActivityPollers int
MaxConcurrentWorkflowPollers int
MaxConcurrentActivities int
MaxConcurrentWorkflowTasks int
}

// AddCLIFlags adds the relevant flags to populate the options struct.
func (m *WorkerOptions) AddCLIFlags(fs *pflag.FlagSet, prefix string) {
fs.IntVar(&m.MaxConcurrentActivityPollers, prefix+"max-concurrent-activity-pollers", 0, "Max concurrent activity pollers")
fs.IntVar(&m.MaxConcurrentWorkflowPollers, prefix+"max-concurrent-workflow-pollers", 0, "Max concurrent workflow pollers")
fs.IntVar(&m.MaxConcurrentActivities, prefix+"max-concurrent-activities", 0, "Max concurrent activities")
fs.IntVar(&m.MaxConcurrentWorkflowTasks, prefix+"max-concurrent-workflow-tasks", 0, "Max concurrent workflow tasks")
}

// ToFlags converts these options to string flags.
func (m *WorkerOptions) ToFlags() (flags []string) {
if m.MaxConcurrentActivityPollers != 0 {
flags = append(flags, "--max-concurrent-activity-pollers", strconv.Itoa(m.MaxConcurrentActivityPollers))
}
if m.MaxConcurrentWorkflowPollers != 0 {
flags = append(flags, "--max-concurrent-workflow-pollers", strconv.Itoa(m.MaxConcurrentWorkflowPollers))
}
if m.MaxConcurrentActivities != 0 {
flags = append(flags, "--max-concurrent-activities", strconv.Itoa(m.MaxConcurrentActivities))
}
if m.MaxConcurrentWorkflowTasks != 0 {
flags = append(flags, "--max-concurrent-workflow-tasks", strconv.Itoa(m.MaxConcurrentWorkflowTasks))
}
return
}
3 changes: 3 additions & 0 deletions cmd/run_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type workerRunner struct {
taskQueueIndexSuffixEnd int
clientOptions cmdoptions.ClientOptions
metricsOptions cmdoptions.MetricsOptions
workerOptions cmdoptions.WorkerOptions
onWorkerStarted func()
}

Expand All @@ -70,6 +71,7 @@ func (r *workerRunner) addCLIFlags(fs *pflag.FlagSet) {
fs.IntVar(&r.taskQueueIndexSuffixEnd, "task-queue-suffix-index-end", 0, "Inclusive end for task queue suffix range")
r.clientOptions.AddCLIFlags(fs)
r.metricsOptions.AddCLIFlags(fs, "worker-")
r.workerOptions.AddCLIFlags(fs, "worker-")
}

func (r *workerRunner) run(ctx context.Context) error {
Expand Down Expand Up @@ -164,6 +166,7 @@ func (r *workerRunner) run(ctx context.Context) error {
args = append(args, r.clientOptions.ToFlags()...)
args = append(args, r.metricsOptions.ToFlags()...)
args = append(args, r.loggingOptions.ToFlags()...)
args = append(args, r.workerOptions.ToFlags()...)

// Start the command. Do not use the context so we can send interrupt.
cmd, err := prog.NewCommand(context.Background(), args...)
Expand Down
18 changes: 12 additions & 6 deletions workers/go/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package worker
import (
"fmt"

"github.com/spf13/cobra"
"github.com/temporalio/omes/cmd/cmdoptions"
"github.com/temporalio/omes/workers/go/kitchensink"
"github.com/temporalio/omes/workers/go/throughputstress"
"go.temporal.io/sdk/activity"

"github.com/spf13/cobra"
"github.com/temporalio/omes/cmd/cmdoptions"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
Expand All @@ -24,6 +23,7 @@ type App struct {
loggingOptions cmdoptions.LoggingOptions
clientOptions cmdoptions.ClientOptions
metricsOptions cmdoptions.MetricsOptions
workerOptions cmdoptions.WorkerOptions
}

func (a *App) Run(cmd *cobra.Command, args []string) {
Expand All @@ -44,23 +44,28 @@ func (a *App) Run(cmd *cobra.Command, args []string) {
}
}

if err := runWorkers(client, taskQueues); err != nil {
if err := runWorkers(client, taskQueues, a.workerOptions); err != nil {
a.logger.Fatalf("Fatal worker error: %v", err)
}
if err := metrics.Shutdown(cmd.Context()); err != nil {
a.logger.Fatalf("Failed to shutdown metrics: %v", err)
}
}

func runWorkers(client client.Client, taskQueues []string) error {
func runWorkers(client client.Client, taskQueues []string, options cmdoptions.WorkerOptions) error {
errCh := make(chan error, len(taskQueues))
tpsActivities := throughputstress.Activities{
Client: client,
}
for _, taskQueue := range taskQueues {
taskQueue := taskQueue
go func() {
w := worker.New(client, taskQueue, worker.Options{})
w := worker.New(client, taskQueue, worker.Options{
MaxConcurrentActivityExecutionSize: options.MaxConcurrentActivities,
MaxConcurrentWorkflowTaskExecutionSize: options.MaxConcurrentWorkflowTasks,
MaxConcurrentActivityTaskPollers: options.MaxConcurrentActivityPollers,
MaxConcurrentWorkflowTaskPollers: options.MaxConcurrentWorkflowPollers,
})
w.RegisterWorkflowWithOptions(kitchensink.KitchenSinkWorkflow, workflow.RegisterOptions{Name: "kitchenSink"})
w.RegisterActivityWithOptions(kitchensink.Noop, activity.RegisterOptions{Name: "noop"})
w.RegisterWorkflowWithOptions(throughputstress.ThroughputStressWorkflow, workflow.RegisterOptions{Name: "throughputStress"})
Expand Down Expand Up @@ -89,6 +94,7 @@ func Main() {
app.loggingOptions.AddCLIFlags(cmd.Flags())
app.clientOptions.AddCLIFlags(cmd.Flags())
app.metricsOptions.AddCLIFlags(cmd.Flags(), "")
app.workerOptions.AddCLIFlags(cmd.Flags(), "")
cmd.Flags().StringVarP(&app.taskQueue, "task-queue", "q", "omes", "Task queue to use")
cmd.Flags().IntVar(&app.taskQueueIndexSuffixStart,
"task-queue-suffix-index-start", 0, "Inclusive start for task queue suffix range")
Expand Down
35 changes: 35 additions & 0 deletions workers/python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,24 @@ async def run():
type=int,
help="Inclusive end for task queue suffix range",
)
parser.add_argument(
"--max-concurrent-activity-pollers",
type=int,
help="Max concurrent activity pollers",
)
parser.add_argument(
"--max-concurrent-workflow-pollers",
type=int,
help="Max concurrent workflow pollers",
)
parser.add_argument(
"--max-concurrent-activities", type=int, help="Max concurrent activities"
)
parser.add_argument(
"--max-concurrent-workflow-tasks",
type=int,
help="Max concurrent workflow tasks",
)
# Log arguments
parser.add_argument(
"--log-level", default="info", help="(debug info warn error panic fatal)"
Expand Down Expand Up @@ -141,13 +159,30 @@ def prom_app(environ, start_fn):
]
logger.info("Python worker running for %s task queue(s)" % len(task_queues))

worker_kwargs = {}
if args.max_concurrent_activity_pollers is not None:
worker_kwargs[
"max_concurrent_activity_task_polls"
] = args.max_concurrent_activity_pollers
if args.max_concurrent_workflow_pollers is not None:
worker_kwargs[
"max_concurrent_workflow_task_polls"
] = args.max_concurrent_workflow_pollers
if args.max_concurrent_activities is not None:
worker_kwargs["max_concurrent_activities"] = args.max_concurrent_activities
if args.max_concurrent_workflow_tasks is not None:
worker_kwargs[
"max_concurrent_workflow_tasks"
] = args.max_concurrent_workflow_tasks

# Start all workers, throwing on first exception
workers = [
Worker(
client,
task_queue=task_queue,
workflows=[KitchenSinkWorkflow],
activities=[noop_activity],
**worker_kwargs,
)
for task_queue in task_queues
]
Expand Down

0 comments on commit 71bf631

Please sign in to comment.