Skip to content

Commit

Permalink
Add "addsearchattributes" worker (#1518)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed May 6, 2021
1 parent 22e5045 commit 7962111
Show file tree
Hide file tree
Showing 21 changed files with 426 additions and 38 deletions.
6 changes: 3 additions & 3 deletions client/admin/metricClient.go
Expand Up @@ -54,14 +54,14 @@ func (c *metricClient) AddSearchAttribute(
opts ...grpc.CallOption,
) (*adminservice.AddSearchAttributeResponse, error) {

c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributeScope, metrics.ClientRequests)
c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributesScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.AdminClientAddSearchAttributeScope, metrics.ClientLatency)
sw := c.metricsClient.StartTimer(metrics.AdminClientAddSearchAttributesScope, metrics.ClientLatency)
resp, err := c.client.AddSearchAttribute(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributeScope, metrics.ClientFailures)
c.metricsClient.IncCounter(metrics.AdminClientAddSearchAttributesScope, metrics.ClientFailures)
}
return resp, err
}
Expand Down
1 change: 1 addition & 0 deletions common/cache/namespaceCache_test.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/uber-go/tally"
namespacepb "go.temporal.io/api/namespace/v1"
"go.temporal.io/api/serviceerror"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/log"
Expand Down
12 changes: 12 additions & 0 deletions common/log/tag/tags.go
Expand Up @@ -609,6 +609,18 @@ func ESConfig(c interface{}) ZapTag {
return NewAnyTag("es-config", c)
}

func ESIndex(index string) ZapTag {
return NewStringTag("es-index", index)
}

func ESMapping(mapping map[string]string) ZapTag {
return NewAnyTag("es-mapping", mapping)
}

func ESClusterStatus(status string) ZapTag {
return NewStringTag("es-cluster-status", status)
}

// ESField returns tag for ESField
func ESField(ESField string) ZapTag {
return NewStringTag("es-Field", ESField)
Expand Down
1 change: 1 addition & 0 deletions common/log/tag/values.go
Expand Up @@ -127,6 +127,7 @@ var (
ComponentWorker = component("worker")
ComponentServiceResolver = component("service-resolver")
ComponentMetadataInitializer = component("metadata-initializer")
ComponentAddSearchAttributes = component("add-search-attributes")
VersionChecker = component("version-checker")
)

Expand Down
28 changes: 22 additions & 6 deletions common/metrics/defs.go
Expand Up @@ -478,8 +478,10 @@ const (
FrontendClientGetClusterInfoScope
// FrontendClientListTaskQueuePartitionsScope tracks RPC calls to frontend service
FrontendClientListTaskQueuePartitionsScope
// AdminClientAddSearchAttributeScope tracks RPC calls to admin service
AdminClientAddSearchAttributeScope
// AdminClientAddSearchAttributesScope tracks RPC calls to admin service
AdminClientAddSearchAttributesScope
// AdminClientGetSearchAttributesScope tracks RPC calls to admin service
AdminClientGetSearchAttributesScope
// AdminClientCloseShardScope tracks RPC calls to admin service
AdminClientCloseShardScope
// AdminClientDescribeHistoryHostScope tracks RPC calls to admin service
Expand Down Expand Up @@ -689,8 +691,10 @@ const (
const (
// AdminDescribeHistoryHostScope is the metric scope for admin.AdminDescribeHistoryHostScope
AdminDescribeHistoryHostScope = iota + NumCommonScopes
// AdminAddSearchAttributeScope is the metric scope for admin.AdminAddSearchAttributeScope
AdminAddSearchAttributeScope
// AdminAddSearchAttributesScope is the metric scope for admin.AdminAddSearchAttributesScope
AdminAddSearchAttributesScope
// AdminGetSearchAttributesScope is the metric scope for admin.AdminGetSearchAttributesScope
AdminGetSearchAttributesScope
// AdminDescribeWorkflowExecutionScope is the metric scope for admin.AdminDescribeWorkflowExecutionScope
AdminDescribeWorkflowExecutionScope
// AdminGetWorkflowExecutionRawHistoryScope is the metric scope for admin.GetWorkflowExecutionRawHistoryScope
Expand Down Expand Up @@ -1101,6 +1105,8 @@ const (
HistoryScavengerScope
// ParentClosePolicyProcessorScope is scope used by all metrics emitted by worker.ParentClosePolicyProcessor
ParentClosePolicyProcessorScope
// AddSearchAttributesWorkflowScope is scope used by all metrics emitted by worker.AddSearchAttributesWorkflowScope module
AddSearchAttributesWorkflowScope

NumWorkerScopes
)
Expand Down Expand Up @@ -1291,7 +1297,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
FrontendClientReapplyEventsScope: {operation: "FrontendClientReapplyEventsScope", tags: map[string]string{ServiceRoleTagName: FrontendRoleTagValue}},
FrontendClientGetClusterInfoScope: {operation: "FrontendClientGetClusterInfoScope", tags: map[string]string{ServiceRoleTagName: FrontendRoleTagValue}},
FrontendClientListTaskQueuePartitionsScope: {operation: "FrontendClientListTaskQueuePartitions", tags: map[string]string{ServiceRoleTagName: FrontendRoleTagValue}},
AdminClientAddSearchAttributeScope: {operation: "AdminClientAddSearchAttribute", tags: map[string]string{ServiceRoleTagName: AdminRoleTagValue}},
AdminClientAddSearchAttributesScope: {operation: "AdminClientAddSearchAttributes", tags: map[string]string{ServiceRoleTagName: AdminRoleTagValue}},
AdminClientGetSearchAttributesScope: {operation: "AdminClientGetSearchAttributes", tags: map[string]string{ServiceRoleTagName: AdminRoleTagValue}},
AdminClientDescribeHistoryHostScope: {operation: "AdminClientDescribeHistoryHost", tags: map[string]string{ServiceRoleTagName: AdminRoleTagValue}},
AdminClientDescribeWorkflowMutableStateScope: {operation: "AdminClientDescribeWorkflowMutableState", tags: map[string]string{ServiceRoleTagName: AdminRoleTagValue}},
AdminClientGetWorkflowExecutionRawHistoryScope: {operation: "AdminClientGetWorkflowExecutionRawHistory", tags: map[string]string{ServiceRoleTagName: AdminRoleTagValue}},
Expand Down Expand Up @@ -1391,7 +1398,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
AdminPurgeDLQMessagesScope: {operation: "AdminPurgeDLQMessages"},
AdminMergeDLQMessagesScope: {operation: "AdminMergeDLQMessages"},
AdminDescribeHistoryHostScope: {operation: "DescribeHistoryHost"},
AdminAddSearchAttributeScope: {operation: "AddSearchAttribute"},
AdminAddSearchAttributesScope: {operation: "AdminAddSearchAttributes"},
AdminGetSearchAttributesScope: {operation: "AdminGetSearchAttributes"},
AdminDescribeWorkflowExecutionScope: {operation: "DescribeWorkflowExecution"},
AdminGetWorkflowExecutionRawHistoryScope: {operation: "GetWorkflowExecutionRawHistory"},
AdminGetWorkflowExecutionRawHistoryV2Scope: {operation: "GetWorkflowExecutionRawHistoryV2"},
Expand Down Expand Up @@ -1595,6 +1603,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryScavengerScope: {operation: "historyscavenger"},
BatcherScope: {operation: "batcher"},
ParentClosePolicyProcessorScope: {operation: "ParentClosePolicyProcessor"},
AddSearchAttributesWorkflowScope: {operation: "AddSearchAttributesWorkflow"},
},
}

Expand Down Expand Up @@ -1741,6 +1750,8 @@ const (
ParentClosePolicyProcessorSuccess
ParentClosePolicyProcessorFailures

AddSearchAttributesWorkflowSuccessCount
AddSearchAttributesWorkflowFailuresCount
NumCommonMetrics // Needs to be last on this list for iota numbering
)

Expand Down Expand Up @@ -2018,6 +2029,7 @@ const (
NamespaceReplicationEnqueueDLQCount
ScavengerValidationRequestsCount
ScavengerValidationFailuresCount
AddSearchAttributesFailuresCount

NumWorkerMetrics
)
Expand Down Expand Up @@ -2120,6 +2132,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ParentClosePolicyProcessorSuccess: {metricName: "parent_close_policy_processor_requests", metricType: Counter},
ParentClosePolicyProcessorFailures: {metricName: "parent_close_policy_processor_errors", metricType: Counter},

AddSearchAttributesWorkflowSuccessCount: {metricName: "add_search_attributes_workflow_success", metricType: Counter},
AddSearchAttributesWorkflowFailuresCount: {metricName: "add_search_attributes_workflow_failure", metricType: Counter},

MatchingClientForwardedCounter: {metricName: "forwarded", metricType: Counter},
MatchingClientInvalidTaskQueueName: {metricName: "invalid_task_queue_name", metricType: Counter},

Expand Down Expand Up @@ -2449,6 +2464,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
NamespaceReplicationEnqueueDLQCount: {metricName: "namespace_replication_dlq_enqueue_requests", metricType: Counter},
ScavengerValidationRequestsCount: {metricName: "scavenger_validation_requests", metricType: Counter},
ScavengerValidationFailuresCount: {metricName: "scavenger_validation_failures", metricType: Counter},
AddSearchAttributesFailuresCount: {metricName: "add_search_attributes_failures", metricType: Counter},
},
}

Expand Down
5 changes: 2 additions & 3 deletions common/persistence/search_attributes_manager.go
Expand Up @@ -42,9 +42,6 @@ const (
type (
// TODO (alex): move this to searchattribute package
SearchAttributesManager struct {
searchattribute.Provider
searchattribute.Saver

timeSource clock.TimeSource
clusterMetadataManager ClusterMetadataManager

Expand All @@ -59,6 +56,8 @@ type (
}
)

var _ searchattribute.Manager = (*SearchAttributesManager)(nil)

func NewSearchAttributesManager(
timeSource clock.TimeSource,
clusterMetadataManager ClusterMetadataManager,
Expand Down
4 changes: 2 additions & 2 deletions common/resource/bootstrapParams.go
Expand Up @@ -39,7 +39,7 @@ import (
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/metrics"
persistenceClient "go.temporal.io/server/common/persistence/client"
"go.temporal.io/server/common/persistence/elasticsearch/client"
esclient "go.temporal.io/server/common/persistence/elasticsearch/client"
"go.temporal.io/server/common/resolver"
)

Expand All @@ -62,7 +62,7 @@ type (
ServerMetricsReporter metrics.Reporter
SDKMetricsReporter metrics.Reporter
MetricsClient metrics.Client
ESClient client.Client
ESClient esclient.Client
ESConfig *config.Elasticsearch
DynamicConfigClient dynamicconfig.Client
DCRedirectionPolicy config.DCRedirectionPolicy
Expand Down
2 changes: 2 additions & 0 deletions common/resource/resource.go
Expand Up @@ -29,6 +29,7 @@ import (

"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/server/common/searchattribute"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -59,6 +60,7 @@ type (
GetHostInfo() *membership.HostInfo
GetArchivalMetadata() archiver.ArchivalMetadata
GetClusterMetadata() cluster.Metadata
GetSearchAttributesProvider() searchattribute.Provider

// other common resources

Expand Down
11 changes: 11 additions & 0 deletions common/resource/resourceImpl.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/tchannel-go"
"go.temporal.io/api/workflowservice/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/server/common/searchattribute"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/historyservice/v1"
Expand Down Expand Up @@ -64,6 +65,7 @@ type (
// for visibility manager initialization
VisibilityManagerInitializer func(
persistenceBean persistenceClient.Bean,
searchAttributesProvider searchattribute.Provider,
logger log.Logger,
) (persistence.VisibilityManager, error)

Expand All @@ -79,6 +81,7 @@ type (
hostInfo *membership.HostInfo
metricsScope tally.Scope
clusterMetadata cluster.Metadata
saProvider searchattribute.Provider

// other common resources

Expand Down Expand Up @@ -235,8 +238,11 @@ func New(
return nil, err
}

saProvider := persistence.NewSearchAttributesManager(clock.NewRealTimeSource(), persistenceBean.GetClusterMetadataManager())

visibilityMgr, err := visibilityManagerInitializer(
persistenceBean,
saProvider,
logger,
)
if err != nil {
Expand Down Expand Up @@ -305,6 +311,7 @@ func New(
hostName: hostName,
metricsScope: params.MetricsScope,
clusterMetadata: clusterMetadata,
saProvider: saProvider,

// other common resources

Expand Down Expand Up @@ -615,3 +622,7 @@ func (h *Impl) GetThrottledLogger() log.Logger {
func (h *Impl) GetGRPCListener() net.Listener {
return h.grpcListener
}

func (h *Impl) GetSearchAttributesProvider() searchattribute.Provider {
return h.saProvider
}
20 changes: 15 additions & 5 deletions common/resource/resourceTest.go
Expand Up @@ -33,6 +33,8 @@ import (
"go.temporal.io/api/workflowservicemock/v1"
sdkclient "go.temporal.io/sdk/client"
sdkmocks "go.temporal.io/sdk/mocks"
esclient "go.temporal.io/server/common/persistence/elasticsearch/client"
"go.temporal.io/server/common/searchattribute"

"go.temporal.io/server/api/adminservice/v1"
"go.temporal.io/server/api/adminservicemock/v1"
Expand All @@ -57,8 +59,9 @@ import (
type (
// Test is the test implementation used for testing
Test struct {
MetricsScope tally.Scope
ClusterMetadata *cluster.MockMetadata
MetricsScope tally.Scope
ClusterMetadata *cluster.MockMetadata
SearchAttributesProvider *searchattribute.MockProvider

// other common resources

Expand All @@ -79,13 +82,14 @@ type (

// internal services clients

SDKClient sdkclient.Client
SDKClient *sdkmocks.Client
FrontendClient *workflowservicemock.MockWorkflowServiceClient
MatchingClient *matchingservicemock.MockMatchingServiceClient
HistoryClient *historyservicemock.MockHistoryServiceClient
RemoteAdminClient *adminservicemock.MockAdminServiceClient
RemoteFrontendClient *workflowservicemock.MockWorkflowServiceClient
ClientBean *client.MockBean
ESClient *esclient.MockClient

// persistence clients

Expand Down Expand Up @@ -166,8 +170,9 @@ func NewTest(
scope := tally.NewTestScope("test", nil)

return &Test{
MetricsScope: scope,
ClusterMetadata: cluster.NewMockMetadata(controller),
MetricsScope: scope,
ClusterMetadata: cluster.NewMockMetadata(controller),
SearchAttributesProvider: searchattribute.NewMockProvider(controller),

// other common resources

Expand Down Expand Up @@ -195,6 +200,7 @@ func NewTest(
RemoteAdminClient: remoteAdminClient,
RemoteFrontendClient: remoteFrontendClient,
ClientBean: clientBean,
ESClient: esclient.NewMockClient(controller),

// persistence clients

Expand Down Expand Up @@ -430,3 +436,7 @@ func (s *Test) GetThrottledLogger() log.Logger {
func (s *Test) GetGRPCListener() net.Listener {
panic("user should implement this method for test")
}

func (h *Test) GetSearchAttributesProvider() searchattribute.Provider {
return h.SearchAttributesProvider
}
5 changes: 3 additions & 2 deletions common/searchattribute/search_attirbute.go
Expand Up @@ -45,7 +45,8 @@ type (
GetSearchAttributes(indexName string, bypassCache bool) (map[string]enumspb.IndexedValueType, error)
}

Saver interface {
Manager interface {
Provider
SaveSearchAttributes(indexName string, newCustomSearchAttributes map[string]enumspb.IndexedValueType) error
}
)
Expand Down Expand Up @@ -110,7 +111,7 @@ func ApplyTypeMap(searchAttributes *commonpb.SearchAttributes, typeMap map[strin
}
}

func GetESType(t enumspb.IndexedValueType) string {
func MapESType(t enumspb.IndexedValueType) string {
switch t {
case enumspb.INDEXED_VALUE_TYPE_STRING:
return "text"
Expand Down

0 comments on commit 7962111

Please sign in to comment.