Skip to content

Commit

Permalink
Change nexus operations request timeout dynamic config to filter by d…
Browse files Browse the repository at this point in the history
…estination
  • Loading branch information
rodrigozhou committed May 7, 2024
1 parent bc673d5 commit 815ef0e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
7 changes: 3 additions & 4 deletions components/nexusoperations/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var Enabled = dynamicconfig.NewNamespaceBoolSetting(
`Enabled toggles accepting of API requests and workflow commands that create or modify Nexus operations.`,
)

var RequestTimeout = dynamicconfig.NewNamespaceDurationSetting(
var RequestTimeout = dynamicconfig.NewDestinationDurationSetting(
"component.nexusoperations.request.timeout",
time.Second*10,
`RequestTimeout is the timeout for making a single nexus start or cancel request.`,
Expand All @@ -57,15 +57,14 @@ Uses Go's len() function to determine the length.`,

type Config struct {
Enabled dynamicconfig.BoolPropertyFnWithNamespaceFilter
RequestTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
RequestTimeout dynamicconfig.DurationPropertyFnWithDestinationFilter
MaxConcurrentOperations dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxOperationNameLength dynamicconfig.IntPropertyFnWithNamespaceFilter
}

func ConfigProvider(dc *dynamicconfig.Collection) *Config {
return &Config{
Enabled: Enabled.Get(dc),
// TODO(bergundy): This should be controllable per namespace + destination.
Enabled: Enabled.Get(dc),
RequestTimeout: RequestTimeout.Get(dc),
MaxConcurrentOperations: MaxConcurrentOperations.Get(dc),
MaxOperationNameLength: MaxOperationNameLength.Get(dc),
Expand Down
18 changes: 8 additions & 10 deletions components/nexusoperations/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,10 @@ func (e activeExecutor) executeInvocationTask(ctx context.Context, env hsm.Envir
return fmt.Errorf("%w: %w", queues.NewUnprocessableTaskError("failed to generate a callback token"), err)
}

ns, err := e.NamespaceRegistry.GetNamespaceByID(namespace.ID(ref.WorkflowKey.NamespaceID))
if err != nil {
return fmt.Errorf("failed to get namespace by ID: %w", err)
}
callCtx, cancel := context.WithTimeout(ctx, e.Config.RequestTimeout(ns.Name().String()))
callCtx, cancel := context.WithTimeout(
ctx,
e.Config.RequestTimeout(ref.WorkflowKey.GetNamespaceID(), task.Destination),
)
defer cancel()

rawResult, callErr := client.StartOperation(callCtx, args.operationName, args.payload, nexus.StartOperationOptions{
Expand Down Expand Up @@ -396,11 +395,10 @@ func (e activeExecutor) executeCancelationTask(ctx context.Context, env hsm.Envi
return fmt.Errorf("failed to get handle for operation: %w", err)
}

ns, err := e.NamespaceRegistry.GetNamespaceByID(namespace.ID(ref.WorkflowKey.NamespaceID))
if err != nil {
return fmt.Errorf("failed to get namespace by ID: %w", err)
}
callCtx, cancel := context.WithTimeout(ctx, e.Config.RequestTimeout(ns.Name().String()))
callCtx, cancel := context.WithTimeout(
ctx,
e.Config.RequestTimeout(ref.WorkflowKey.GetNamespaceID(), task.Destination),
)
defer cancel()

callErr := handle.Cancel(callCtx, nexus.CancelOperationOptions{})
Expand Down
6 changes: 3 additions & 3 deletions components/nexusoperations/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestProcessInvocationTask(t *testing.T) {
require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.ActiveExecutorOptions{
Config: &nexusoperations.Config{
Enabled: dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true),
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByNamespace(tc.requestTimeout),
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(tc.requestTimeout),
},
CallbackTokenGenerator: commonnexus.NewCallbackTokenGenerator(),
NamespaceRegistry: namespaceRegistry,
Expand Down Expand Up @@ -478,7 +478,7 @@ func TestProcessCancelationTask(t *testing.T) {
require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.ActiveExecutorOptions{
Config: &nexusoperations.Config{
Enabled: dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true),
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByNamespace(tc.requestTimeout),
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(tc.requestTimeout),
},
NamespaceRegistry: namespaceRegistry,
ClientProvider: func(nid queues.NamespaceIDAndDestination, spec *nexuspb.OutgoingServiceSpec) (*nexus.Client, error) {
Expand Down Expand Up @@ -532,7 +532,7 @@ func TestProcessCancelationTask_OperationCompleted(t *testing.T) {
require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.ActiveExecutorOptions{
Config: &nexusoperations.Config{
Enabled: dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true),
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByNamespace(time.Hour),
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Hour),
},
NamespaceRegistry: namespaceRegistry,
ClientProvider: func(nid queues.NamespaceIDAndDestination, spec *nexuspb.OutgoingServiceSpec) (*nexus.Client, error) {
Expand Down

0 comments on commit 815ef0e

Please sign in to comment.