From 815ef0e96ffef897d2ba69f88c8994c270ddd076 Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Mon, 6 May 2024 22:12:40 -0500 Subject: [PATCH] Change nexus operations request timeout dynamic config to filter by destination --- components/nexusoperations/config.go | 7 +++---- components/nexusoperations/executors.go | 18 ++++++++---------- components/nexusoperations/executors_test.go | 6 +++--- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/components/nexusoperations/config.go b/components/nexusoperations/config.go index 2aceffe1d1e..568ceb4a5c5 100644 --- a/components/nexusoperations/config.go +++ b/components/nexusoperations/config.go @@ -34,7 +34,7 @@ var Enabled = dynamicconfig.NewNamespaceBoolSetting( `Enabled toggles accepting of API requests and workflow commands that create or modify Nexus operations.`, ) -var RequestTimeout = dynamicconfig.NewNamespaceDurationSetting( +var RequestTimeout = dynamicconfig.NewDestinationDurationSetting( "component.nexusoperations.request.timeout", time.Second*10, `RequestTimeout is the timeout for making a single nexus start or cancel request.`, @@ -57,15 +57,14 @@ Uses Go's len() function to determine the length.`, type Config struct { Enabled dynamicconfig.BoolPropertyFnWithNamespaceFilter - RequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter } func ConfigProvider(dc *dynamicconfig.Collection) *Config { return &Config{ - Enabled: Enabled.Get(dc), - // TODO(bergundy): This should be controllable per namespace + destination. + Enabled: Enabled.Get(dc), RequestTimeout: RequestTimeout.Get(dc), MaxConcurrentOperations: MaxConcurrentOperations.Get(dc), MaxOperationNameLength: MaxOperationNameLength.Get(dc), diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index 93099b170ee..ff738e4eeeb 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -131,11 +131,10 @@ func (e activeExecutor) executeInvocationTask(ctx context.Context, env hsm.Envir return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err) } - ns, err := e.NamespaceRegistry.GetNamespaceByID(namespace.ID(ref.WorkflowKey.NamespaceID)) - if err != nil { - return fmt.Errorf("failed to get namespace by ID: %w", err) - } - callCtx, cancel := context.WithTimeout(ctx, e.Config.RequestTimeout(ns.Name().String())) + callCtx, cancel := context.WithTimeout( + ctx, + e.Config.RequestTimeout(ref.WorkflowKey.GetNamespaceID(), task.Destination), + ) defer cancel() rawResult, callErr := client.StartOperation(callCtx, args.operationName, args.payload, nexus.StartOperationOptions{ @@ -396,11 +395,10 @@ func (e activeExecutor) executeCancelationTask(ctx context.Context, env hsm.Envi return fmt.Errorf("failed to get handle for operation: %w", err) } - ns, err := e.NamespaceRegistry.GetNamespaceByID(namespace.ID(ref.WorkflowKey.NamespaceID)) - if err != nil { - return fmt.Errorf("failed to get namespace by ID: %w", err) - } - callCtx, cancel := context.WithTimeout(ctx, e.Config.RequestTimeout(ns.Name().String())) + callCtx, cancel := context.WithTimeout( + ctx, + e.Config.RequestTimeout(ref.WorkflowKey.GetNamespaceID(), task.Destination), + ) defer cancel() callErr := handle.Cancel(callCtx, nexus.CancelOperationOptions{}) diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index 4e3be8d926a..85c1fa4572b 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -282,7 +282,7 @@ func TestProcessInvocationTask(t *testing.T) { require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.ActiveExecutorOptions{ Config: &nexusoperations.Config{ Enabled: dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true), - RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByNamespace(tc.requestTimeout), + RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(tc.requestTimeout), }, CallbackTokenGenerator: commonnexus.NewCallbackTokenGenerator(), NamespaceRegistry: namespaceRegistry, @@ -478,7 +478,7 @@ func TestProcessCancelationTask(t *testing.T) { require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.ActiveExecutorOptions{ Config: &nexusoperations.Config{ Enabled: dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true), - RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByNamespace(tc.requestTimeout), + RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(tc.requestTimeout), }, NamespaceRegistry: namespaceRegistry, ClientProvider: func(nid queues.NamespaceIDAndDestination, spec *nexuspb.OutgoingServiceSpec) (*nexus.Client, error) { @@ -532,7 +532,7 @@ func TestProcessCancelationTask_OperationCompleted(t *testing.T) { require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.ActiveExecutorOptions{ Config: &nexusoperations.Config{ Enabled: dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true), - RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByNamespace(time.Hour), + RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Hour), }, NamespaceRegistry: namespaceRegistry, ClientProvider: func(nid queues.NamespaceIDAndDestination, spec *nexuspb.OutgoingServiceSpec) (*nexus.Client, error) {