Skip to content

Commit

Permalink
Merge branch 'master' into xbowen_pinot_triple_manager_test00
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenxia committed Apr 4, 2024
2 parents e1da072 + 64cc6d8 commit 4dc1edd
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 120 deletions.
8 changes: 8 additions & 0 deletions common/taskTokenSerializerInterfaces.go
Expand Up @@ -48,3 +48,11 @@ type (
TaskID string `json:"taskId"`
}
)

func (t TaskToken) GetDomainID() string {
return t.DomainID
}

func (t QueryTaskToken) GetDomainID() string {
return t.DomainID
}
46 changes: 14 additions & 32 deletions service/frontend/templates/clusterredirection.tmpl
@@ -1,12 +1,10 @@
import (
"context"
"time"

"go.uber.org/yarpc"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -65,20 +63,27 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
var apiName = "{{$method.Name}}"
var cluster string

{{$policyMethod := "WithDomainNameRedirect"}}
{{$domain := printf "%s.GetDomain()" (index $method.Params 1).Name}}
{{- if has $method.Name $domainIDAPIs}}
token := domainIDGetter(noopdomainIDGetter{})
{{- end}}
scope, startTime := handler.beforeCall(metrics.DCRedirection{{$method.Name}}Scope)
defer func() {
handler.afterCall(recover(), scope, startTime, cluster, &err)
{{- if has $method.Name $domainIDAPIs}}
handler.afterCall(recover(), scope, startTime, "", token.GetDomainID(), cluster, &err)
{{- else}}
handler.afterCall(recover(), scope, startTime, {{$domain}}, "", cluster, &err)
{{- end}}
}()

{{$policyMethod := "WithDomainNameRedirect"}}
{{$domain := printf "%s.GetDomain()" (index $method.Params 1).Name}}
{{if has $method.Name $domainIDAPIs}}
{{$policyMethod = "WithDomainIDRedirect"}}
{{$domain = "token.DomainID"}}
{{$domain = "token.GetDomainID()"}}
{{if has $method.Name $queryTaskTokenAPIs}}
token, err := handler.tokenSerializer.DeserializeQueryTaskToken({{(index $method.Params 1).Name}}.TaskToken)
token, err = handler.tokenSerializer.DeserializeQueryTaskToken({{(index $method.Params 1).Name}}.TaskToken)
{{- else}}
token, err := handler.tokenSerializer.Deserialize({{(index $method.Params 1).Name}}.TaskToken)
token, err = handler.tokenSerializer.Deserialize({{(index $method.Params 1).Name}}.TaskToken)
{{- end}}
if err != nil {
{{- if eq (len $method.Results) 1}}
Expand Down Expand Up @@ -123,7 +128,7 @@ func (handler *clusterRedirectionHandler) QueryWorkflow(
}
scope, startTime := handler.beforeCall(metrics.DCRedirectionQueryWorkflowScope)
defer func() {
handler.afterCall(recover(), scope, startTime, cluster, &retError)
handler.afterCall(recover(), scope, startTime, request.GetDomain(), "", cluster, &retError)
}()

err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
Expand All @@ -140,26 +145,3 @@ func (handler *clusterRedirectionHandler) QueryWorkflow(

return resp, err
}

func (handler *clusterRedirectionHandler) beforeCall(
scope int,
) (metrics.Scope, time.Time) {
return handler.GetMetricsClient().Scope(scope), handler.GetTimeSource().Now()
}

func (handler *clusterRedirectionHandler) afterCall(
recovered interface{},
scope metrics.Scope,
startTime time.Time,
cluster string,
retError *error,
) {
log.CapturePanic(recovered, handler.GetLogger(), retError)

scope = scope.Tagged(metrics.TargetClusterTag(cluster))
scope.IncCounter(metrics.CadenceDcRedirectionClientRequests)
scope.RecordTimer(metrics.CadenceDcRedirectionClientLatency, handler.GetTimeSource().Now().Sub(startTime))
if *retError != nil {
scope.IncCounter(metrics.CadenceDcRedirectionClientFailures)
}
}

0 comments on commit 4dc1edd

Please sign in to comment.