Skip to content

Commit

Permalink
Operator API (#2503)
Browse files Browse the repository at this point in the history
Operator Service with Search Attributes
  • Loading branch information
Spikhalskiy committed Feb 24, 2022
1 parent 4f01355 commit 4d129d1
Show file tree
Hide file tree
Showing 13 changed files with 964 additions and 127 deletions.
78 changes: 46 additions & 32 deletions common/metrics/defs.go
Expand Up @@ -796,10 +796,22 @@ const (
NumAdminScopes
)

// -- Operation scopes for Admin service --
const (
// OperatorAddSearchAttributesScope is the metric scope for operator.AddSearchAttributes
OperatorAddSearchAttributesScope = iota + NumAdminScopes
// OperatorRemoveSearchAttributesScope is the metric scope for operator.RemoveSearchAttributes
OperatorRemoveSearchAttributesScope
// OperatorListSearchAttributesScope is the metric scope for operator.GetSearchAttributes
OperatorListSearchAttributesScope

NumOperatorScopes
)

// -- Operation scopes for Frontend service --
const (
// FrontendStartWorkflowExecutionScope is the metric scope for frontend.StartWorkflowExecution
FrontendStartWorkflowExecutionScope = iota + NumAdminScopes
FrontendStartWorkflowExecutionScope = iota + NumOperatorScopes
// FrontendPollWorkflowTaskQueueScope is the metric scope for frontend.PollWorkflowTaskQueue
FrontendPollWorkflowTaskQueueScope
// FrontendPollActivityTaskQueueScope is the metric scope for frontend.PollActivityTaskQueue
Expand Down Expand Up @@ -1500,37 +1512,39 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
// Frontend Scope Names
Frontend: {
// Admin API scope co-locates with with frontend
AdminRemoveTaskScope: {operation: "AdminRemoveTask"},
AdminCloseShardScope: {operation: "AdminCloseShard"},
AdminGetShardScope: {operation: "AdminGetShard"},
AdminListTransferTasksScope: {operation: "AdminListTransferTasks"},
AdminListTimerTasksScope: {operation: "AdminListTimerTasks"},
AdminListReplicationTasksScope: {operation: "AdminListReplicationTasks"},
AdminListVisibilityTasksScope: {operation: "AdminListTimerTasks"},
AdminReadDLQMessagesScope: {operation: "AdminReadDLQMessages"},
AdminPurgeDLQMessagesScope: {operation: "AdminPurgeDLQMessages"},
AdminMergeDLQMessagesScope: {operation: "AdminMergeDLQMessages"},
AdminDescribeHistoryHostScope: {operation: "DescribeHistoryHost"},
AdminAddSearchAttributesScope: {operation: "AdminAddSearchAttributes"},
AdminRemoveSearchAttributesScope: {operation: "AdminRemoveSearchAttributes"},
AdminGetSearchAttributesScope: {operation: "AdminGetSearchAttributes"},
AdminRebuildMutableStateScope: {operation: "AdminRebuildMutableState"},
AdminDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
AdminGetWorkflowExecutionRawHistoryScope: {operation: "GetWorkflowExecutionRawHistory"},
AdminGetWorkflowExecutionRawHistoryV2Scope: {operation: "GetWorkflowExecutionRawHistoryV2"},
AdminGetReplicationMessagesScope: {operation: "GetReplicationMessages"},
AdminListClusterMembersScope: {operation: "AdminListClusterMembers"},
AdminGetNamespaceReplicationMessagesScope: {operation: "GetNamespaceReplicationMessages"},
AdminGetDLQReplicationMessagesScope: {operation: "AdminGetDLQReplicationMessages"},
AdminReapplyEventsScope: {operation: "ReapplyEvents"},
AdminRefreshWorkflowTasksScope: {operation: "RefreshWorkflowTasks"},
AdminResendReplicationTasksScope: {operation: "ResendReplicationTasks"},
AdminGetTaskQueueTasksScope: {operation: "GetTaskQueueTasks"},
AdminDescribeClusterScope: {operation: "AdminDescribeCluster"},
AdminListClustersScope: {operation: "AdminListClusters"},
AdminAddOrUpdateRemoteClusterScope: {operation: "AdminAddOrUpdateRemoteCluster"},
AdminRemoveRemoteClusterScope: {operation: "AdminRemoveRemoteCluster"},

AdminRemoveTaskScope: {operation: "AdminRemoveTask"},
AdminCloseShardScope: {operation: "AdminCloseShard"},
AdminGetShardScope: {operation: "AdminGetShard"},
AdminListTransferTasksScope: {operation: "AdminListTransferTasks"},
AdminListTimerTasksScope: {operation: "AdminListTimerTasks"},
AdminListReplicationTasksScope: {operation: "AdminListReplicationTasks"},
AdminListVisibilityTasksScope: {operation: "AdminListTimerTasks"},
AdminReadDLQMessagesScope: {operation: "AdminReadDLQMessages"},
AdminPurgeDLQMessagesScope: {operation: "AdminPurgeDLQMessages"},
AdminMergeDLQMessagesScope: {operation: "AdminMergeDLQMessages"},
AdminDescribeHistoryHostScope: {operation: "DescribeHistoryHost"},
AdminAddSearchAttributesScope: {operation: "AdminAddSearchAttributes"},
AdminRemoveSearchAttributesScope: {operation: "AdminRemoveSearchAttributes"},
AdminGetSearchAttributesScope: {operation: "AdminGetSearchAttributes"},
AdminRebuildMutableStateScope: {operation: "AdminRebuildMutableState"},
AdminDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
AdminGetWorkflowExecutionRawHistoryScope: {operation: "GetWorkflowExecutionRawHistory"},
AdminGetWorkflowExecutionRawHistoryV2Scope: {operation: "GetWorkflowExecutionRawHistoryV2"},
AdminGetReplicationMessagesScope: {operation: "GetReplicationMessages"},
AdminListClusterMembersScope: {operation: "AdminListClusterMembers"},
AdminGetNamespaceReplicationMessagesScope: {operation: "GetNamespaceReplicationMessages"},
AdminGetDLQReplicationMessagesScope: {operation: "AdminGetDLQReplicationMessages"},
AdminReapplyEventsScope: {operation: "ReapplyEvents"},
AdminRefreshWorkflowTasksScope: {operation: "RefreshWorkflowTasks"},
AdminResendReplicationTasksScope: {operation: "ResendReplicationTasks"},
AdminGetTaskQueueTasksScope: {operation: "GetTaskQueueTasks"},
AdminDescribeClusterScope: {operation: "AdminDescribeCluster"},
AdminListClustersScope: {operation: "AdminListClusters"},
AdminAddOrUpdateRemoteClusterScope: {operation: "AdminAddOrUpdateRemoteCluster"},
AdminRemoveRemoteClusterScope: {operation: "AdminRemoveRemoteCluster"},
OperatorAddSearchAttributesScope: {operation: "OperatorAddSearchAttributes"},
OperatorRemoveSearchAttributesScope: {operation: "OperatorRemoveSearchAttributes"},
OperatorListSearchAttributesScope: {operation: "OperatorListSearchAttributes"},
FrontendStartWorkflowExecutionScope: {operation: "StartWorkflowExecution"},
FrontendPollWorkflowTaskQueueScope: {operation: "PollWorkflowTaskQueue"},
FrontendPollActivityTaskQueueScope: {operation: "PollActivityTaskQueue"},
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Expand Up @@ -44,7 +44,7 @@ require (
go.opentelemetry.io/otel/sdk v1.4.0
go.opentelemetry.io/otel/sdk/export/metric v0.27.0
go.opentelemetry.io/otel/sdk/metric v0.27.0
go.temporal.io/api v1.7.1-0.20220211205804-a4f685c2448b
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
go.temporal.io/sdk v1.13.0
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
Expand Down Expand Up @@ -113,11 +113,11 @@ require (
go.uber.org/dig v1.13.0 // indirect
golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220211171837-173942840c17 // indirect
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)
12 changes: 6 additions & 6 deletions go.sum
Expand Up @@ -477,8 +477,8 @@ go.opentelemetry.io/otel/trace v1.4.0 h1:4OOUrPZdVFQkbzl/JSdvGCWIdw5ONXXxzHlaLlW
go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.6.1-0.20211110205628-60c98e9cbfe2/go.mod h1:IlUgOTGfmJuOkGrCZdptNxyXKE9CQz6oOx7/aH9bFY4=
go.temporal.io/api v1.7.1-0.20220211205804-a4f685c2448b h1:VVkp66hR7QpeJ2lwgx+Wr6zXYUvhfnCybwQyRDfdebg=
go.temporal.io/api v1.7.1-0.20220211205804-a4f685c2448b/go.mod h1:HAD4ieSewx7651I9hHKNalm5GtmOyZ7MSfK7anw2pAA=
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a h1:SgkeoCikBXMd/3fNNtymIfhpxk8o/E3zIZFBFkHzTtU=
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a/go.mod h1:OnUq5eS+Nyx+irKb3Ws5YB7yjGFf5XmI3WcVRU9COEo=
go.temporal.io/sdk v1.13.0 h1:8PW27o/uYAf1C1u8WUd6LNa6He2nYkBhdUX3c5gif5o=
go.temporal.io/sdk v1.13.0/go.mod h1:TCof7U/xas2FyDnx/UUEv4c/O/S41Lnhva+6JVer+Jo=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down Expand Up @@ -694,8 +694,8 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c h1:sSIdNI2Dd6vGv47bKc/xArpfxVmEz2+3j0E6I484xC4=
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -891,8 +891,8 @@ google.golang.org/genproto v0.0.0-20220111164026-67b88f271998/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20220114231437-d2e6a121cae0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220204002441-d6cc3cc0770e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220211171837-173942840c17 h1:2X+CNIheCutWRyKRte8szGxrE5ggtV4U+NKAbh/oLhg=
google.golang.org/genproto v0.0.0-20220211171837-173942840c17/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf h1:SVYXkUz2yZS9FWb2Gm8ivSlbNQzL2Z/NpPKE3RG2jWk=
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
24 changes: 24 additions & 0 deletions service/frontend/fx.go
Expand Up @@ -86,6 +86,7 @@ var Module = fx.Options(
fx.Provide(HandlerProvider),
fx.Provide(func(so []grpc.ServerOption) *grpc.Server { return grpc.NewServer(so...) }),
fx.Provide(AdminHandlerProvider),
fx.Provide(OperatorHandlerProvider),
fx.Provide(NewVersionChecker),
fx.Provide(ServiceResolverProvider),
fx.Provide(NewServiceProvider),
Expand All @@ -97,6 +98,7 @@ func NewServiceProvider(
server *grpc.Server,
handler Handler,
adminHandler *AdminHandler,
operatorHandler *OperatorHandlerImpl,
versionChecker *VersionChecker,
visibilityMgr manager.VisibilityManager,
logger resource.SnTaggedLogger,
Expand All @@ -109,6 +111,7 @@ func NewServiceProvider(
server,
handler,
adminHandler,
operatorHandler,
versionChecker,
visibilityMgr,
logger,
Expand Down Expand Up @@ -398,6 +401,27 @@ func AdminHandlerProvider(
return NewAdminHandler(args)
}

func OperatorHandlerProvider(
esConfig *esclient.Config,
esClient esclient.Client,
logger resource.SnTaggedLogger,
sdkSystemClient sdkclient.Client,
metricsClient metrics.Client,
saProvider searchattribute.Provider,
saManager searchattribute.Manager,
) *OperatorHandlerImpl {
args := NewOperatorHandlerImplArgs{
esConfig,
esClient,
logger,
sdkSystemClient,
metricsClient,
saProvider,
saManager,
}
return NewOperatorHandlerImpl(args)
}

func HandlerProvider(
params *resource.BootstrapParams,
serviceConfig *Config,
Expand Down
11 changes: 9 additions & 2 deletions service/frontend/interface.go
Expand Up @@ -27,6 +27,7 @@
package frontend

import (
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"

healthpb "google.golang.org/grpc/health/grpc_health_v1"
Expand All @@ -35,17 +36,23 @@ import (
)

type (
// Handler is interface wrapping frontend handler
// Handler is interface wrapping frontend workflow handler
Handler interface {
workflowservice.WorkflowServiceServer
common.Daemon

// Health is the health check method for this rpc handler
// HealthServer is the health check method for the whole frontend server
healthpb.HealthServer
// UpdateHealthStatus sets the health status for this rpc handler.
// This health status will be used within the rpc health check handler
UpdateHealthStatus(status HealthStatus)

GetConfig() *Config
}

// OperatorHandler is interface wrapping frontend workflow handler
OperatorHandler interface {
operatorservice.OperatorServiceServer
common.Daemon
}
)

0 comments on commit 4d129d1

Please sign in to comment.