From 71bf6314822328d5f995b7f8a56196fab7c98902 Mon Sep 17 00:00:00 2001 From: Anton Romanovich Date: Thu, 9 Nov 2023 13:58:31 +0100 Subject: [PATCH] Introduce flags for setting worker options such as task poller counts and concurrent execution limits (#31) --- cmd/cmdoptions/worker.go | 40 +++++++++++++++++++++++++++++++++++++ cmd/run_worker.go | 3 +++ workers/go/worker/worker.go | 18 +++++++++++------ workers/python/main.py | 35 ++++++++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 6 deletions(-) create mode 100644 cmd/cmdoptions/worker.go diff --git a/cmd/cmdoptions/worker.go b/cmd/cmdoptions/worker.go new file mode 100644 index 0000000..9c5a2ca --- /dev/null +++ b/cmd/cmdoptions/worker.go @@ -0,0 +1,40 @@ +package cmdoptions + +import ( + "strconv" + + "github.com/spf13/pflag" +) + +// WorkerOptions for setting up worker parameters +type WorkerOptions struct { + MaxConcurrentActivityPollers int + MaxConcurrentWorkflowPollers int + MaxConcurrentActivities int + MaxConcurrentWorkflowTasks int +} + +// AddCLIFlags adds the relevant flags to populate the options struct. +func (m *WorkerOptions) AddCLIFlags(fs *pflag.FlagSet, prefix string) { + fs.IntVar(&m.MaxConcurrentActivityPollers, prefix+"max-concurrent-activity-pollers", 0, "Max concurrent activity pollers") + fs.IntVar(&m.MaxConcurrentWorkflowPollers, prefix+"max-concurrent-workflow-pollers", 0, "Max concurrent workflow pollers") + fs.IntVar(&m.MaxConcurrentActivities, prefix+"max-concurrent-activities", 0, "Max concurrent activities") + fs.IntVar(&m.MaxConcurrentWorkflowTasks, prefix+"max-concurrent-workflow-tasks", 0, "Max concurrent workflow tasks") +} + +// ToFlags converts these options to string flags. +func (m *WorkerOptions) ToFlags() (flags []string) { + if m.MaxConcurrentActivityPollers != 0 { + flags = append(flags, "--max-concurrent-activity-pollers", strconv.Itoa(m.MaxConcurrentActivityPollers)) + } + if m.MaxConcurrentWorkflowPollers != 0 { + flags = append(flags, "--max-concurrent-workflow-pollers", strconv.Itoa(m.MaxConcurrentWorkflowPollers)) + } + if m.MaxConcurrentActivities != 0 { + flags = append(flags, "--max-concurrent-activities", strconv.Itoa(m.MaxConcurrentActivities)) + } + if m.MaxConcurrentWorkflowTasks != 0 { + flags = append(flags, "--max-concurrent-workflow-tasks", strconv.Itoa(m.MaxConcurrentWorkflowTasks)) + } + return +} diff --git a/cmd/run_worker.go b/cmd/run_worker.go index ab15475..fae6e73 100644 --- a/cmd/run_worker.go +++ b/cmd/run_worker.go @@ -53,6 +53,7 @@ type workerRunner struct { taskQueueIndexSuffixEnd int clientOptions cmdoptions.ClientOptions metricsOptions cmdoptions.MetricsOptions + workerOptions cmdoptions.WorkerOptions onWorkerStarted func() } @@ -70,6 +71,7 @@ func (r *workerRunner) addCLIFlags(fs *pflag.FlagSet) { fs.IntVar(&r.taskQueueIndexSuffixEnd, "task-queue-suffix-index-end", 0, "Inclusive end for task queue suffix range") r.clientOptions.AddCLIFlags(fs) r.metricsOptions.AddCLIFlags(fs, "worker-") + r.workerOptions.AddCLIFlags(fs, "worker-") } func (r *workerRunner) run(ctx context.Context) error { @@ -164,6 +166,7 @@ func (r *workerRunner) run(ctx context.Context) error { args = append(args, r.clientOptions.ToFlags()...) args = append(args, r.metricsOptions.ToFlags()...) args = append(args, r.loggingOptions.ToFlags()...) + args = append(args, r.workerOptions.ToFlags()...) // Start the command. Do not use the context so we can send interrupt. cmd, err := prog.NewCommand(context.Background(), args...) diff --git a/workers/go/worker/worker.go b/workers/go/worker/worker.go index 52702a0..cfb3530 100644 --- a/workers/go/worker/worker.go +++ b/workers/go/worker/worker.go @@ -3,12 +3,11 @@ package worker import ( "fmt" + "github.com/spf13/cobra" + "github.com/temporalio/omes/cmd/cmdoptions" "github.com/temporalio/omes/workers/go/kitchensink" "github.com/temporalio/omes/workers/go/throughputstress" "go.temporal.io/sdk/activity" - - "github.com/spf13/cobra" - "github.com/temporalio/omes/cmd/cmdoptions" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" @@ -24,6 +23,7 @@ type App struct { loggingOptions cmdoptions.LoggingOptions clientOptions cmdoptions.ClientOptions metricsOptions cmdoptions.MetricsOptions + workerOptions cmdoptions.WorkerOptions } func (a *App) Run(cmd *cobra.Command, args []string) { @@ -44,7 +44,7 @@ func (a *App) Run(cmd *cobra.Command, args []string) { } } - if err := runWorkers(client, taskQueues); err != nil { + if err := runWorkers(client, taskQueues, a.workerOptions); err != nil { a.logger.Fatalf("Fatal worker error: %v", err) } if err := metrics.Shutdown(cmd.Context()); err != nil { @@ -52,7 +52,7 @@ func (a *App) Run(cmd *cobra.Command, args []string) { } } -func runWorkers(client client.Client, taskQueues []string) error { +func runWorkers(client client.Client, taskQueues []string, options cmdoptions.WorkerOptions) error { errCh := make(chan error, len(taskQueues)) tpsActivities := throughputstress.Activities{ Client: client, @@ -60,7 +60,12 @@ func runWorkers(client client.Client, taskQueues []string) error { for _, taskQueue := range taskQueues { taskQueue := taskQueue go func() { - w := worker.New(client, taskQueue, worker.Options{}) + w := worker.New(client, taskQueue, worker.Options{ + MaxConcurrentActivityExecutionSize: options.MaxConcurrentActivities, + MaxConcurrentWorkflowTaskExecutionSize: options.MaxConcurrentWorkflowTasks, + MaxConcurrentActivityTaskPollers: options.MaxConcurrentActivityPollers, + MaxConcurrentWorkflowTaskPollers: options.MaxConcurrentWorkflowPollers, + }) w.RegisterWorkflowWithOptions(kitchensink.KitchenSinkWorkflow, workflow.RegisterOptions{Name: "kitchenSink"}) w.RegisterActivityWithOptions(kitchensink.Noop, activity.RegisterOptions{Name: "noop"}) w.RegisterWorkflowWithOptions(throughputstress.ThroughputStressWorkflow, workflow.RegisterOptions{Name: "throughputStress"}) @@ -89,6 +94,7 @@ func Main() { app.loggingOptions.AddCLIFlags(cmd.Flags()) app.clientOptions.AddCLIFlags(cmd.Flags()) app.metricsOptions.AddCLIFlags(cmd.Flags(), "") + app.workerOptions.AddCLIFlags(cmd.Flags(), "") cmd.Flags().StringVarP(&app.taskQueue, "task-queue", "q", "omes", "Task queue to use") cmd.Flags().IntVar(&app.taskQueueIndexSuffixStart, "task-queue-suffix-index-start", 0, "Inclusive start for task queue suffix range") diff --git a/workers/python/main.py b/workers/python/main.py index 98fbee3..bccbb0c 100644 --- a/workers/python/main.py +++ b/workers/python/main.py @@ -46,6 +46,24 @@ async def run(): type=int, help="Inclusive end for task queue suffix range", ) + parser.add_argument( + "--max-concurrent-activity-pollers", + type=int, + help="Max concurrent activity pollers", + ) + parser.add_argument( + "--max-concurrent-workflow-pollers", + type=int, + help="Max concurrent workflow pollers", + ) + parser.add_argument( + "--max-concurrent-activities", type=int, help="Max concurrent activities" + ) + parser.add_argument( + "--max-concurrent-workflow-tasks", + type=int, + help="Max concurrent workflow tasks", + ) # Log arguments parser.add_argument( "--log-level", default="info", help="(debug info warn error panic fatal)" @@ -141,6 +159,22 @@ def prom_app(environ, start_fn): ] logger.info("Python worker running for %s task queue(s)" % len(task_queues)) + worker_kwargs = {} + if args.max_concurrent_activity_pollers is not None: + worker_kwargs[ + "max_concurrent_activity_task_polls" + ] = args.max_concurrent_activity_pollers + if args.max_concurrent_workflow_pollers is not None: + worker_kwargs[ + "max_concurrent_workflow_task_polls" + ] = args.max_concurrent_workflow_pollers + if args.max_concurrent_activities is not None: + worker_kwargs["max_concurrent_activities"] = args.max_concurrent_activities + if args.max_concurrent_workflow_tasks is not None: + worker_kwargs[ + "max_concurrent_workflow_tasks" + ] = args.max_concurrent_workflow_tasks + # Start all workers, throwing on first exception workers = [ Worker( @@ -148,6 +182,7 @@ def prom_app(environ, start_fn): task_queue=task_queue, workflows=[KitchenSinkWorkflow], activities=[noop_activity], + **worker_kwargs, ) for task_queue in task_queues ]