From 64cc6d85b3d8ae7a1b91506daa2244a1eb015244 Mon Sep 17 00:00:00 2001 From: Ilya Ozherelyev Date: Fri, 5 Apr 2024 00:07:07 +0200 Subject: [PATCH] stop redirection for deprecated domains (#5863) * [frontend] Stop redirecting deprecated domains --- common/taskTokenSerializerInterfaces.go | 8 ++ .../templates/clusterredirection.tmpl | 46 ++---- .../clusterredirection/api_generated.go | 132 ++++++++---------- .../wrappers/clusterredirection/api_test.go | 6 - .../clusterredirection/callwrappers.go | 77 ++++++++++ .../wrappers/clusterredirection/policy.go | 6 + .../clusterredirection/policy_test.go | 103 +++++++++++++- 7 files changed, 258 insertions(+), 120 deletions(-) create mode 100644 service/frontend/wrappers/clusterredirection/callwrappers.go diff --git a/common/taskTokenSerializerInterfaces.go b/common/taskTokenSerializerInterfaces.go index 107a35b164..6afeb67ae6 100644 --- a/common/taskTokenSerializerInterfaces.go +++ b/common/taskTokenSerializerInterfaces.go @@ -48,3 +48,11 @@ type ( TaskID string `json:"taskId"` } ) + +func (t TaskToken) GetDomainID() string { + return t.DomainID +} + +func (t QueryTaskToken) GetDomainID() string { + return t.DomainID +} diff --git a/service/frontend/templates/clusterredirection.tmpl b/service/frontend/templates/clusterredirection.tmpl index 91313c69b3..5adb51dd02 100644 --- a/service/frontend/templates/clusterredirection.tmpl +++ b/service/frontend/templates/clusterredirection.tmpl @@ -1,12 +1,10 @@ import ( "context" - "time" "go.uber.org/yarpc" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" - "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/types" @@ -65,20 +63,27 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} { var apiName = "{{$method.Name}}" var cluster string + {{$policyMethod := "WithDomainNameRedirect"}} + {{$domain := printf "%s.GetDomain()" (index $method.Params 1).Name}} + {{- if has $method.Name $domainIDAPIs}} + token := domainIDGetter(noopdomainIDGetter{}) + {{- end}} scope, startTime := handler.beforeCall(metrics.DCRedirection{{$method.Name}}Scope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + {{- if has $method.Name $domainIDAPIs}} + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) + {{- else}} + handler.afterCall(recover(), scope, startTime, {{$domain}}, "", cluster, &err) + {{- end}} }() - {{$policyMethod := "WithDomainNameRedirect"}} - {{$domain := printf "%s.GetDomain()" (index $method.Params 1).Name}} {{if has $method.Name $domainIDAPIs}} {{$policyMethod = "WithDomainIDRedirect"}} - {{$domain = "token.DomainID"}} + {{$domain = "token.GetDomainID()"}} {{if has $method.Name $queryTaskTokenAPIs}} - token, err := handler.tokenSerializer.DeserializeQueryTaskToken({{(index $method.Params 1).Name}}.TaskToken) + token, err = handler.tokenSerializer.DeserializeQueryTaskToken({{(index $method.Params 1).Name}}.TaskToken) {{- else}} - token, err := handler.tokenSerializer.Deserialize({{(index $method.Params 1).Name}}.TaskToken) + token, err = handler.tokenSerializer.Deserialize({{(index $method.Params 1).Name}}.TaskToken) {{- end}} if err != nil { {{- if eq (len $method.Results) 1}} @@ -123,7 +128,7 @@ func (handler *clusterRedirectionHandler) QueryWorkflow( } scope, startTime := handler.beforeCall(metrics.DCRedirectionQueryWorkflowScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &retError) + handler.afterCall(recover(), scope, startTime, request.GetDomain(), "", cluster, &retError) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error { @@ -140,26 +145,3 @@ func (handler *clusterRedirectionHandler) QueryWorkflow( return resp, err } - -func (handler *clusterRedirectionHandler) beforeCall( - scope int, -) (metrics.Scope, time.Time) { - return handler.GetMetricsClient().Scope(scope), handler.GetTimeSource().Now() -} - -func (handler *clusterRedirectionHandler) afterCall( - recovered interface{}, - scope metrics.Scope, - startTime time.Time, - cluster string, - retError *error, -) { - log.CapturePanic(recovered, handler.GetLogger(), retError) - - scope = scope.Tagged(metrics.TargetClusterTag(cluster)) - scope.IncCounter(metrics.CadenceDcRedirectionClientRequests) - scope.RecordTimer(metrics.CadenceDcRedirectionClientLatency, handler.GetTimeSource().Now().Sub(startTime)) - if *retError != nil { - scope.IncCounter(metrics.CadenceDcRedirectionClientFailures) - } -} diff --git a/service/frontend/wrappers/clusterredirection/api_generated.go b/service/frontend/wrappers/clusterredirection/api_generated.go index 4c43e378d8..68374a096d 100644 --- a/service/frontend/wrappers/clusterredirection/api_generated.go +++ b/service/frontend/wrappers/clusterredirection/api_generated.go @@ -28,13 +28,11 @@ package clusterredirection import ( "context" - "time" "go.uber.org/yarpc" "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" - "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/types" @@ -85,7 +83,7 @@ func (handler *clusterRedirectionHandler) CountWorkflowExecutions(ctx context.Co scope, startTime := handler.beforeCall(metrics.DCRedirectionCountWorkflowExecutionsScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, cp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, cp1.GetDomain(), apiName, func(targetDC string) error { @@ -117,7 +115,7 @@ func (handler *clusterRedirectionHandler) DescribeTaskList(ctx context.Context, scope, startTime := handler.beforeCall(metrics.DCRedirectionDescribeTaskListScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, dp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, dp1.GetDomain(), apiName, func(targetDC string) error { @@ -141,7 +139,7 @@ func (handler *clusterRedirectionHandler) DescribeWorkflowExecution(ctx context. scope, startTime := handler.beforeCall(metrics.DCRedirectionDescribeWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, dp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, dp1.GetDomain(), apiName, func(targetDC string) error { @@ -173,7 +171,7 @@ func (handler *clusterRedirectionHandler) GetTaskListsByDomain(ctx context.Conte scope, startTime := handler.beforeCall(metrics.DCRedirectionGetTaskListsByDomainScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, gp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, gp1.GetDomain(), apiName, func(targetDC string) error { @@ -197,7 +195,7 @@ func (handler *clusterRedirectionHandler) GetWorkflowExecutionHistory(ctx contex scope, startTime := handler.beforeCall(metrics.DCRedirectionGetWorkflowExecutionHistoryScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, gp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, gp1.GetDomain(), apiName, func(targetDC string) error { @@ -225,7 +223,7 @@ func (handler *clusterRedirectionHandler) ListArchivedWorkflowExecutions(ctx con scope, startTime := handler.beforeCall(metrics.DCRedirectionListArchivedWorkflowExecutionsScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, lp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, lp1.GetDomain(), apiName, func(targetDC string) error { @@ -249,7 +247,7 @@ func (handler *clusterRedirectionHandler) ListClosedWorkflowExecutions(ctx conte scope, startTime := handler.beforeCall(metrics.DCRedirectionListClosedWorkflowExecutionsScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, lp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, lp1.GetDomain(), apiName, func(targetDC string) error { @@ -277,7 +275,7 @@ func (handler *clusterRedirectionHandler) ListOpenWorkflowExecutions(ctx context scope, startTime := handler.beforeCall(metrics.DCRedirectionListOpenWorkflowExecutionsScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, lp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, lp1.GetDomain(), apiName, func(targetDC string) error { @@ -301,7 +299,7 @@ func (handler *clusterRedirectionHandler) ListTaskListPartitions(ctx context.Con scope, startTime := handler.beforeCall(metrics.DCRedirectionListTaskListPartitionsScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, lp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, lp1.GetDomain(), apiName, func(targetDC string) error { @@ -325,7 +323,7 @@ func (handler *clusterRedirectionHandler) ListWorkflowExecutions(ctx context.Con scope, startTime := handler.beforeCall(metrics.DCRedirectionListWorkflowExecutionsScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, lp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, lp1.GetDomain(), apiName, func(targetDC string) error { @@ -349,7 +347,7 @@ func (handler *clusterRedirectionHandler) PollForActivityTask(ctx context.Contex scope, startTime := handler.beforeCall(metrics.DCRedirectionPollForActivityTaskScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, pp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, pp1.GetDomain(), apiName, func(targetDC string) error { @@ -373,7 +371,7 @@ func (handler *clusterRedirectionHandler) PollForDecisionTask(ctx context.Contex scope, startTime := handler.beforeCall(metrics.DCRedirectionPollForDecisionTaskScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, pp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, pp1.GetDomain(), apiName, func(targetDC string) error { @@ -395,17 +393,18 @@ func (handler *clusterRedirectionHandler) RecordActivityTaskHeartbeat(ctx contex var apiName = "RecordActivityTaskHeartbeat" var cluster string + token := domainIDGetter(noopdomainIDGetter{}) scope, startTime := handler.beforeCall(metrics.DCRedirectionRecordActivityTaskHeartbeatScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) }() - token, err := handler.tokenSerializer.Deserialize(rp1.TaskToken) + token, err = handler.tokenSerializer.Deserialize(rp1.TaskToken) if err != nil { return nil, err } - err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error { + err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.GetDomainID(), apiName, func(targetDC string) error { cluster = targetDC switch { case targetDC == handler.currentClusterName: @@ -426,7 +425,7 @@ func (handler *clusterRedirectionHandler) RecordActivityTaskHeartbeatByID(ctx co scope, startTime := handler.beforeCall(metrics.DCRedirectionRecordActivityTaskHeartbeatByIDScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -450,7 +449,7 @@ func (handler *clusterRedirectionHandler) RefreshWorkflowTasks(ctx context.Conte scope, startTime := handler.beforeCall(metrics.DCRedirectionRefreshWorkflowTasksScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -478,7 +477,7 @@ func (handler *clusterRedirectionHandler) RequestCancelWorkflowExecution(ctx con scope, startTime := handler.beforeCall(metrics.DCRedirectionRequestCancelWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -502,7 +501,7 @@ func (handler *clusterRedirectionHandler) ResetStickyTaskList(ctx context.Contex scope, startTime := handler.beforeCall(metrics.DCRedirectionResetStickyTaskListScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -526,7 +525,7 @@ func (handler *clusterRedirectionHandler) ResetWorkflowExecution(ctx context.Con scope, startTime := handler.beforeCall(metrics.DCRedirectionResetWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -548,17 +547,18 @@ func (handler *clusterRedirectionHandler) RespondActivityTaskCanceled(ctx contex var apiName = "RespondActivityTaskCanceled" var cluster string + token := domainIDGetter(noopdomainIDGetter{}) scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCanceledScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) }() - token, err := handler.tokenSerializer.Deserialize(rp1.TaskToken) + token, err = handler.tokenSerializer.Deserialize(rp1.TaskToken) if err != nil { return err } - err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error { + err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.GetDomainID(), apiName, func(targetDC string) error { cluster = targetDC switch { case targetDC == handler.currentClusterName: @@ -579,7 +579,7 @@ func (handler *clusterRedirectionHandler) RespondActivityTaskCanceledByID(ctx co scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCanceledByIDScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -601,17 +601,18 @@ func (handler *clusterRedirectionHandler) RespondActivityTaskCompleted(ctx conte var apiName = "RespondActivityTaskCompleted" var cluster string + token := domainIDGetter(noopdomainIDGetter{}) scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCompletedScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) }() - token, err := handler.tokenSerializer.Deserialize(rp1.TaskToken) + token, err = handler.tokenSerializer.Deserialize(rp1.TaskToken) if err != nil { return err } - err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error { + err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.GetDomainID(), apiName, func(targetDC string) error { cluster = targetDC switch { case targetDC == handler.currentClusterName: @@ -632,7 +633,7 @@ func (handler *clusterRedirectionHandler) RespondActivityTaskCompletedByID(ctx c scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskCompletedByIDScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -654,17 +655,18 @@ func (handler *clusterRedirectionHandler) RespondActivityTaskFailed(ctx context. var apiName = "RespondActivityTaskFailed" var cluster string + token := domainIDGetter(noopdomainIDGetter{}) scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskFailedScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) }() - token, err := handler.tokenSerializer.Deserialize(rp1.TaskToken) + token, err = handler.tokenSerializer.Deserialize(rp1.TaskToken) if err != nil { return err } - err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error { + err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.GetDomainID(), apiName, func(targetDC string) error { cluster = targetDC switch { case targetDC == handler.currentClusterName: @@ -685,7 +687,7 @@ func (handler *clusterRedirectionHandler) RespondActivityTaskFailedByID(ctx cont scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondActivityTaskFailedByIDScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -707,17 +709,18 @@ func (handler *clusterRedirectionHandler) RespondDecisionTaskCompleted(ctx conte var apiName = "RespondDecisionTaskCompleted" var cluster string + token := domainIDGetter(noopdomainIDGetter{}) scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondDecisionTaskCompletedScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) }() - token, err := handler.tokenSerializer.Deserialize(rp1.TaskToken) + token, err = handler.tokenSerializer.Deserialize(rp1.TaskToken) if err != nil { return nil, err } - err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error { + err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.GetDomainID(), apiName, func(targetDC string) error { cluster = targetDC switch { case targetDC == handler.currentClusterName: @@ -736,17 +739,18 @@ func (handler *clusterRedirectionHandler) RespondDecisionTaskFailed(ctx context. var apiName = "RespondDecisionTaskFailed" var cluster string + token := domainIDGetter(noopdomainIDGetter{}) scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondDecisionTaskFailedScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) }() - token, err := handler.tokenSerializer.Deserialize(rp1.TaskToken) + token, err = handler.tokenSerializer.Deserialize(rp1.TaskToken) if err != nil { return err } - err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error { + err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.GetDomainID(), apiName, func(targetDC string) error { cluster = targetDC switch { case targetDC == handler.currentClusterName: @@ -765,17 +769,18 @@ func (handler *clusterRedirectionHandler) RespondQueryTaskCompleted(ctx context. var apiName = "RespondQueryTaskCompleted" var cluster string + token := domainIDGetter(noopdomainIDGetter{}) scope, startTime := handler.beforeCall(metrics.DCRedirectionRespondQueryTaskCompletedScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err) }() - token, err := handler.tokenSerializer.DeserializeQueryTaskToken(rp1.TaskToken) + token, err = handler.tokenSerializer.DeserializeQueryTaskToken(rp1.TaskToken) if err != nil { return err } - err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.DomainID, apiName, func(targetDC string) error { + err = handler.redirectionPolicy.WithDomainIDRedirect(ctx, token.GetDomainID(), apiName, func(targetDC string) error { cluster = targetDC switch { case targetDC == handler.currentClusterName: @@ -796,7 +801,7 @@ func (handler *clusterRedirectionHandler) RestartWorkflowExecution(ctx context.C scope, startTime := handler.beforeCall(metrics.DCRedirectionRestartWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, rp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, rp1.GetDomain(), apiName, func(targetDC string) error { @@ -820,7 +825,7 @@ func (handler *clusterRedirectionHandler) ScanWorkflowExecutions(ctx context.Con scope, startTime := handler.beforeCall(metrics.DCRedirectionScanWorkflowExecutionsScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, lp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, lp1.GetDomain(), apiName, func(targetDC string) error { @@ -844,7 +849,7 @@ func (handler *clusterRedirectionHandler) SignalWithStartWorkflowExecution(ctx c scope, startTime := handler.beforeCall(metrics.DCRedirectionSignalWithStartWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, sp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, sp1.GetDomain(), apiName, func(targetDC string) error { @@ -868,7 +873,7 @@ func (handler *clusterRedirectionHandler) SignalWithStartWorkflowExecutionAsync( scope, startTime := handler.beforeCall(metrics.DCRedirectionSignalWithStartWorkflowExecutionAsyncScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, sp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, sp1.GetDomain(), apiName, func(targetDC string) error { @@ -892,7 +897,7 @@ func (handler *clusterRedirectionHandler) SignalWorkflowExecution(ctx context.Co scope, startTime := handler.beforeCall(metrics.DCRedirectionSignalWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, sp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, sp1.GetDomain(), apiName, func(targetDC string) error { @@ -916,7 +921,7 @@ func (handler *clusterRedirectionHandler) StartWorkflowExecution(ctx context.Con scope, startTime := handler.beforeCall(metrics.DCRedirectionStartWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, sp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, sp1.GetDomain(), apiName, func(targetDC string) error { @@ -940,7 +945,7 @@ func (handler *clusterRedirectionHandler) StartWorkflowExecutionAsync(ctx contex scope, startTime := handler.beforeCall(metrics.DCRedirectionStartWorkflowExecutionAsyncScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, sp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, sp1.GetDomain(), apiName, func(targetDC string) error { @@ -964,7 +969,7 @@ func (handler *clusterRedirectionHandler) TerminateWorkflowExecution(ctx context scope, startTime := handler.beforeCall(metrics.DCRedirectionTerminateWorkflowExecutionScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &err) + handler.afterCall(recover(), scope, startTime, tp1.GetDomain(), "", cluster, &err) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, tp1.GetDomain(), apiName, func(targetDC string) error { @@ -1002,7 +1007,7 @@ func (handler *clusterRedirectionHandler) QueryWorkflow( } scope, startTime := handler.beforeCall(metrics.DCRedirectionQueryWorkflowScope) defer func() { - handler.afterCall(recover(), scope, startTime, cluster, &retError) + handler.afterCall(recover(), scope, startTime, request.GetDomain(), "", cluster, &retError) }() err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error { @@ -1019,26 +1024,3 @@ func (handler *clusterRedirectionHandler) QueryWorkflow( return resp, err } - -func (handler *clusterRedirectionHandler) beforeCall( - scope int, -) (metrics.Scope, time.Time) { - return handler.GetMetricsClient().Scope(scope), handler.GetTimeSource().Now() -} - -func (handler *clusterRedirectionHandler) afterCall( - recovered interface{}, - scope metrics.Scope, - startTime time.Time, - cluster string, - retError *error, -) { - log.CapturePanic(recovered, handler.GetLogger(), retError) - - scope = scope.Tagged(metrics.TargetClusterTag(cluster)) - scope.IncCounter(metrics.CadenceDcRedirectionClientRequests) - scope.RecordTimer(metrics.CadenceDcRedirectionClientLatency, handler.GetTimeSource().Now().Sub(startTime)) - if *retError != nil { - scope.IncCounter(metrics.CadenceDcRedirectionClientFailures) - } -} diff --git a/service/frontend/wrappers/clusterredirection/api_test.go b/service/frontend/wrappers/clusterredirection/api_test.go index 56db199c10..4273da5af5 100644 --- a/service/frontend/wrappers/clusterredirection/api_test.go +++ b/service/frontend/wrappers/clusterredirection/api_test.go @@ -78,12 +78,6 @@ func TestClusterRedirectionHandlerSuite(t *testing.T) { suite.Run(t, s) } -func (s *clusterRedirectionHandlerSuite) SetupSuite() { -} - -func (s *clusterRedirectionHandlerSuite) TearDownSuite() { -} - func (s *clusterRedirectionHandlerSuite) SetupTest() { s.Assertions = require.New(s.T()) diff --git a/service/frontend/wrappers/clusterredirection/callwrappers.go b/service/frontend/wrappers/clusterredirection/callwrappers.go new file mode 100644 index 0000000000..0aec075951 --- /dev/null +++ b/service/frontend/wrappers/clusterredirection/callwrappers.go @@ -0,0 +1,77 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package clusterredirection + +import ( + "time" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" +) + +type ( + domainIDGetter interface { + GetDomainID() string + } +) + +func (handler *clusterRedirectionHandler) beforeCall( + scope int, +) (metrics.Scope, time.Time) { + return handler.GetMetricsClient().Scope(scope), handler.GetTimeSource().Now() +} + +func (handler *clusterRedirectionHandler) afterCall( + recovered interface{}, + scope metrics.Scope, + startTime time.Time, + domainName string, + domainID string, + cluster string, + retError *error, +) { + var extraTags []tag.Tag + if domainName != "" { + extraTags = append(extraTags, tag.WorkflowDomainName(domainName)) + } + if domainID != "" { + extraTags = append(extraTags, tag.WorkflowDomainID(domainID)) + } + log.CapturePanic(recovered, handler.GetLogger().WithTags(extraTags...), retError) + + scope = scope.Tagged(metrics.TargetClusterTag(cluster)) + scope.IncCounter(metrics.CadenceDcRedirectionClientRequests) + scope.RecordTimer(metrics.CadenceDcRedirectionClientLatency, handler.GetTimeSource().Now().Sub(startTime)) + if *retError != nil { + scope.IncCounter(metrics.CadenceDcRedirectionClientFailures) + } +} + +// noopdomainIDGetter is a domainIDGetter that always returns empty string. +// it is used for extraction of domainID from domainIDGetter in case of token extraction failure. +type noopdomainIDGetter struct{} + +func (noopdomainIDGetter) GetDomainID() string { + return "" +} diff --git a/service/frontend/wrappers/clusterredirection/policy.go b/service/frontend/wrappers/clusterredirection/policy.go index 227d36bbc6..dde78e4b9b 100644 --- a/service/frontend/wrappers/clusterredirection/policy.go +++ b/service/frontend/wrappers/clusterredirection/policy.go @@ -194,6 +194,9 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) WithDomainIDRedirect if err != nil { return err } + if domainEntry.IsDeprecatedOrDeleted() { + return fmt.Errorf("domain %v is deprecated or deleted", domainEntry.GetInfo().Name) + } return policy.withRedirect(ctx, domainEntry, apiName, call) } @@ -203,6 +206,9 @@ func (policy *selectedOrAllAPIsForwardingRedirectionPolicy) WithDomainNameRedire if err != nil { return err } + if domainEntry.IsDeprecatedOrDeleted() { + return fmt.Errorf("domain %v is deprecated or deleted", domainName) + } return policy.withRedirect(ctx, domainEntry, apiName, call) } diff --git a/service/frontend/wrappers/clusterredirection/policy_test.go b/service/frontend/wrappers/clusterredirection/policy_test.go index f7733ce2c9..aa3d1d280e 100644 --- a/service/frontend/wrappers/clusterredirection/policy_test.go +++ b/service/frontend/wrappers/clusterredirection/policy_test.go @@ -69,13 +69,6 @@ func TestNoopDCRedirectionPolicySuite(t *testing.T) { suite.Run(t, s) } -func (s *noopDCRedirectionPolicySuite) SetupSuite() { -} - -func (s *noopDCRedirectionPolicySuite) TearDownSuite() { - -} - func (s *noopDCRedirectionPolicySuite) SetupTest() { s.Assertions = require.New(s.T()) @@ -343,6 +336,79 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) TestGetTargetDataCenter_G s.Equal(2*len(selectedAPIsForwardingRedirectionPolicyAPIAllowlist), alternativeClustercallCount) } +func (s *selectedAPIsForwardingRedirectionPolicySuite) TestGetTargetDataCenter_GlobalDomain_NoDomainInCache() { + currentClustercallCount := 0 + alternativeClustercallCount := 0 + callFn := func(targetCluster string) error { + switch targetCluster { + case s.currentClusterName: + currentClustercallCount++ + return nil + case s.alternativeClusterName: + alternativeClustercallCount++ + return &types.DomainNotActiveError{ + CurrentCluster: s.alternativeClusterName, + ActiveCluster: s.currentClusterName, + } + default: + panic(fmt.Sprintf("unknown cluster name %v", targetCluster)) + } + } + + expectedErr := fmt.Errorf("some random error") + s.mockDomainCache.EXPECT().GetDomainByID(s.domainID).Return(nil, expectedErr).Times(len(selectedAPIsForwardingRedirectionPolicyAPIAllowlist)) + s.mockDomainCache.EXPECT().GetDomain(s.domainName).Return(nil, expectedErr).Times(len(selectedAPIsForwardingRedirectionPolicyAPIAllowlist)) + + for apiName := range selectedAPIsForwardingRedirectionPolicyAPIAllowlist { + err := s.policy.WithDomainIDRedirect(context.Background(), s.domainID, apiName, callFn) + s.Error(err) + s.Equal(expectedErr.Error(), err.Error()) + + err = s.policy.WithDomainNameRedirect(context.Background(), s.domainName, apiName, callFn) + s.Error(err) + s.Equal(expectedErr.Error(), err.Error()) + } + + // Ensure there were no calls to the target clusters + s.Equal(0, currentClustercallCount) + s.Equal(0, alternativeClustercallCount) +} + +func (s *selectedAPIsForwardingRedirectionPolicySuite) TestGetTargetDataCenter_GlobalDomain_Forwarding_DeprecatedDomain() { + s.setupGlobalDeprecatedDomainWithTwoReplicationCluster(true, false) + + currentClustercallCount := 0 + alternativeClustercallCount := 0 + callFn := func(targetCluster string) error { + switch targetCluster { + case s.currentClusterName: + currentClustercallCount++ + return nil + case s.alternativeClusterName: + alternativeClustercallCount++ + return &types.DomainNotActiveError{ + CurrentCluster: s.alternativeClusterName, + ActiveCluster: s.currentClusterName, + } + default: + panic(fmt.Sprintf("unknown cluster name %v", targetCluster)) + } + } + + for apiName := range selectedAPIsForwardingRedirectionPolicyAPIAllowlist { + err := s.policy.WithDomainIDRedirect(context.Background(), s.domainID, apiName, callFn) + s.Error(err) + s.Equal(fmt.Sprintf("domain %v is deprecated or deleted", s.domainName), err.Error()) + + err = s.policy.WithDomainNameRedirect(context.Background(), s.domainName, apiName, callFn) + s.Error(err) + s.Equal(fmt.Sprintf("domain %v is deprecated or deleted", s.domainName), err.Error()) + } + + s.Equal(0, currentClustercallCount) + s.Equal(0, alternativeClustercallCount) +} + func (s *selectedAPIsForwardingRedirectionPolicySuite) setupLocalDomain() { domainEntry := cache.NewLocalDomainCacheEntryForTest( &persistence.DomainInfo{ID: s.domainID, Name: s.domainName}, @@ -376,3 +442,26 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) setupGlobalDomainWithTwoR s.mockDomainCache.EXPECT().GetDomain(s.domainName).Return(domainEntry, nil).AnyTimes() s.mockConfig.EnableDomainNotActiveAutoForwarding = dynamicconfig.GetBoolPropertyFnFilteredByDomain(forwardingEnabled) } + +func (s *selectedAPIsForwardingRedirectionPolicySuite) setupGlobalDeprecatedDomainWithTwoReplicationCluster(forwardingEnabled bool, isRecordActive bool) { + activeCluster := s.alternativeClusterName + if isRecordActive { + activeCluster = s.currentClusterName + } + domainEntry := cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: s.domainID, Name: s.domainName, Status: persistence.DomainStatusDeprecated}, + &persistence.DomainConfig{Retention: 1}, + &persistence.DomainReplicationConfig{ + ActiveClusterName: activeCluster, + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: cluster.TestCurrentClusterName}, + {ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + 1234, // not used + ) + + s.mockDomainCache.EXPECT().GetDomainByID(s.domainID).Return(domainEntry, nil).AnyTimes() + s.mockDomainCache.EXPECT().GetDomain(s.domainName).Return(domainEntry, nil).AnyTimes() + s.mockConfig.EnableDomainNotActiveAutoForwarding = dynamicconfig.GetBoolPropertyFnFilteredByDomain(forwardingEnabled) +}