Skip to content

Commit

Permalink
Change update namespace to upsert custom search attributes (#4080)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigozhou committed Mar 25, 2023
1 parent 8bdfccc commit f1ef4db
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 42 deletions.
57 changes: 40 additions & 17 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,21 @@ func (adh *AdminHandler) addSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

customSearchAttributes := currentSearchAttributes.Custom()
mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
aliasToFieldMap := util.InverseMap(fieldToAliasMap)
for saName, saType := range request.GetSearchAttributes() {
// check if alias is already in use
if _, err := mapper.GetFieldName(saName, nsName); err == nil {
if _, ok := aliasToFieldMap[saName]; ok {
return serviceerror.NewAlreadyExist(
fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName),
)
Expand All @@ -377,15 +381,18 @@ func (adh *AdminHandler) addSearchAttributesSQL(
fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()),
)
}
fieldToAliasMap[targetFieldName] = saName
upsertFieldToAliasMap[targetFieldName] = saName
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
if err.Error() == errCustomSearchAttributeFieldAlreadyAllocated.Error() {
return errRaceConditionAddingSearchAttributes
}
return err
}

Expand Down Expand Up @@ -472,25 +479,28 @@ func (adh *AdminHandler) removeSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
for _, saName := range request.GetSearchAttributes() {
fieldName, err := mapper.GetFieldName(saName, nsName)
if err != nil {
fieldName, ok := aliasToFieldMap[saName]
if !ok {
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
}
delete(fieldToAliasMap, fieldName)
upsertFieldToAliasMap[fieldName] = ""
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
return err
Expand Down Expand Up @@ -525,7 +535,7 @@ func (adh *AdminHandler) GetSearchAttributes(
if adh.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
return adh.getSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return adh.getSearchAttributesSQL(request, searchAttributes)
return adh.getSearchAttributesSQL(ctx, request, searchAttributes)
}

func (adh *AdminHandler) getSearchAttributesElasticsearch(
Expand Down Expand Up @@ -569,23 +579,36 @@ func (adh *AdminHandler) getSearchAttributesElasticsearch(
}

func (adh *AdminHandler) getSearchAttributesSQL(
ctx context.Context,
request *adminservice.GetSearchAttributesRequest,
searchAttributes searchattribute.NameTypeMap,
) (*adminservice.GetSearchAttributesResponse, error) {
_, client, err := adh.clientFactory.NewLocalFrontendClientWithTimeout(
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err))
}

nsName := request.GetNamespace()
if nsName == "" {
return nil, errNamespaceNotSet
}
ns, err := adh.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return nil, serviceerror.NewUnavailable(
fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName),
)
}
mapper := ns.CustomSearchAttributesMapper()

fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
customSearchAttributes := make(map[string]enumspb.IndexedValueType)
for field, tp := range searchAttributes.Custom() {
if alias, err := mapper.GetAlias(field, nsName); err == nil {
if alias, ok := fieldToAliasMap[field]; ok {
customSearchAttributes[alias] = tp
}
}
Expand Down
1 change: 1 addition & 0 deletions service/frontend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
errBatchOperationNotSet = serviceerror.NewInvalidArgument("Batch operation is not set on request.")
errCronAndStartDelaySet = serviceerror.NewInvalidArgument("CronSchedule and WorkflowStartDelay may not be used together.")
errInvalidWorkflowStartDelaySeconds = serviceerror.NewInvalidArgument("An invalid WorkflowStartDelaySeconds is set on request.")
errRaceConditionAddingSearchAttributes = serviceerror.NewUnavailable("Generated search attributes mapping unavailble.")

errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.")
errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.")
Expand Down
2 changes: 0 additions & 2 deletions service/frontend/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,6 @@ func OperatorHandlerProvider(
saManager searchattribute.Manager,
healthServer *health.Server,
historyClient historyservice.HistoryServiceClient,
namespaceRegistry namespace.Registry,
clusterMetadataManager persistence.ClusterMetadataManager,
clusterMetadata cluster.Metadata,
clientFactory client.Factory,
Expand All @@ -523,7 +522,6 @@ func OperatorHandlerProvider(
saManager,
healthServer,
historyClient,
namespaceRegistry,
clusterMetadataManager,
clusterMetadata,
clientFactory,
Expand Down
31 changes: 29 additions & 2 deletions service/frontend/namespace_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/util"
)

type (
Expand Down Expand Up @@ -107,6 +108,8 @@ var (
errCannotDoNamespaceFailoverAndUpdate = serviceerror.NewInvalidArgument("Cannot set active cluster to current cluster when other parameters are set.")
errInvalidRetentionPeriod = serviceerror.NewInvalidArgument("A valid retention period is not set on request.")
errInvalidNamespaceStateUpdate = serviceerror.NewInvalidArgument("Invalid namespace state update.")

errCustomSearchAttributeFieldAlreadyAllocated = serviceerror.NewInvalidArgument("Custom search attribute field name already allocated.")
)

// newNamespaceHandler create a new namespace handler
Expand Down Expand Up @@ -512,9 +515,16 @@ func (d *namespaceHandlerImpl) UpdateNamespace(
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("Total resetBinaries cannot exceed the max limit: %v", maxLength))
}
}
if updatedConfig.CustomSearchAttributeAliases != nil {
if len(updatedConfig.CustomSearchAttributeAliases) > 0 {
configurationChanged = true
config.CustomSearchAttributeAliases = updatedConfig.CustomSearchAttributeAliases
csaAliases, err := d.upsertCustomSearchAttributesAliases(
config.CustomSearchAttributeAliases,
updatedConfig.CustomSearchAttributeAliases,
)
if err != nil {
return nil, err
}
config.CustomSearchAttributeAliases = csaAliases
}
}

Expand Down Expand Up @@ -778,6 +788,23 @@ func (d *namespaceHandlerImpl) mergeNamespaceData(
return old
}

func (d *namespaceHandlerImpl) upsertCustomSearchAttributesAliases(
current map[string]string,
upsert map[string]string,
) (map[string]string, error) {
result := util.CloneMapNonNil(current)
for key, value := range upsert {
if value == "" {
delete(result, key)
} else if _, ok := current[key]; !ok {
result[key] = value
} else {
return nil, errCustomSearchAttributeFieldAlreadyAllocated
}
}
return result, nil
}

func (d *namespaceHandlerImpl) toArchivalRegisterEvent(
state enumspb.ArchivalState,
URI string,
Expand Down
60 changes: 40 additions & 20 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type (
saManager searchattribute.Manager
healthServer *health.Server
historyClient historyservice.HistoryServiceClient
namespaceRegistry namespace.Registry
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata clustermetadata.Metadata
clientFactory svc.Factory
Expand All @@ -101,7 +100,6 @@ type (
SaManager searchattribute.Manager
healthServer *health.Server
historyClient historyservice.HistoryServiceClient
namespaceRegistry namespace.Registry
clusterMetadataManager persistence.ClusterMetadataManager
clusterMetadata clustermetadata.Metadata
clientFactory svc.Factory
Expand All @@ -125,7 +123,6 @@ func NewOperatorHandlerImpl(
saManager: args.SaManager,
healthServer: args.healthServer,
historyClient: args.historyClient,
namespaceRegistry: args.namespaceRegistry,
clusterMetadataManager: args.clusterMetadataManager,
clusterMetadata: args.clusterMetadata,
clientFactory: args.clientFactory,
Expand Down Expand Up @@ -280,18 +277,22 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

dbCustomSearchAttributes := searchattribute.GetSqlDbIndexSearchAttributes().CustomSearchAttributes
cmCustomSearchAttributes := currentSearchAttributes.Custom()
mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := util.CloneMapNonNil(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
aliasToFieldMap := util.InverseMap(fieldToAliasMap)
for saName, saType := range request.GetSearchAttributes() {
// check if alias is already in use
if _, err := mapper.GetFieldName(saName, nsName); err == nil {
if _, ok := aliasToFieldMap[saName]; ok {
return serviceerror.NewAlreadyExist(
fmt.Sprintf(errSearchAttributeAlreadyExistsMessage, saName),
)
Expand All @@ -318,15 +319,18 @@ func (h *OperatorHandlerImpl) addSearchAttributesSQL(
fmt.Sprintf(errTooManySearchAttributesMessage, cntUsed, saType.String()),
)
}
fieldToAliasMap[targetFieldName] = saName
upsertFieldToAliasMap[targetFieldName] = saName
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
if err.Error() == errCustomSearchAttributeFieldAlreadyAllocated.Error() {
return errRaceConditionAddingSearchAttributes
}
return err
}

Expand Down Expand Up @@ -410,25 +414,28 @@ func (h *OperatorHandlerImpl) removeSearchAttributesSQL(
if nsName == "" {
return errNamespaceNotSet
}
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return serviceerror.NewUnavailable(fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName))
}

mapper := ns.CustomSearchAttributesMapper()
fieldToAliasMap := maps.Clone(mapper.FieldToAliasMap())
upsertFieldToAliasMap := make(map[string]string)
aliasToFieldMap := util.InverseMap(resp.Config.CustomSearchAttributeAliases)
for _, saName := range request.GetSearchAttributes() {
fieldName, err := mapper.GetFieldName(saName, nsName)
if err != nil {
fieldName, ok := aliasToFieldMap[saName]
if !ok {
return serviceerror.NewNotFound(fmt.Sprintf(errSearchAttributeDoesntExistMessage, saName))
}
delete(fieldToAliasMap, fieldName)
upsertFieldToAliasMap[fieldName] = ""
}

_, err = client.UpdateNamespace(ctx, &workflowservice.UpdateNamespaceRequest{
Namespace: nsName,
Config: &namespacepb.NamespaceConfig{
CustomSearchAttributeAliases: fieldToAliasMap,
CustomSearchAttributeAliases: upsertFieldToAliasMap,
},
})
return err
Expand Down Expand Up @@ -460,7 +467,7 @@ func (h *OperatorHandlerImpl) ListSearchAttributes(
if h.visibilityMgr.HasStoreName(elasticsearch.PersistenceName) || indexName == "" {
return h.listSearchAttributesElasticsearch(ctx, indexName, searchAttributes)
}
return h.listSearchAttributesSQL(request, searchAttributes)
return h.listSearchAttributesSQL(ctx, request, searchAttributes)
}

func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch(
Expand All @@ -486,23 +493,36 @@ func (h *OperatorHandlerImpl) listSearchAttributesElasticsearch(
}

func (h *OperatorHandlerImpl) listSearchAttributesSQL(
ctx context.Context,
request *operatorservice.ListSearchAttributesRequest,
searchAttributes searchattribute.NameTypeMap,
) (*operatorservice.ListSearchAttributesResponse, error) {
_, client, err := h.clientFactory.NewLocalFrontendClientWithTimeout(
frontend.DefaultTimeout,
frontend.DefaultLongPollTimeout,
)
if err != nil {
return nil, serviceerror.NewUnavailable(fmt.Sprintf(errUnableToCreateFrontendClientMessage, err))
}

nsName := request.GetNamespace()
if nsName == "" {
return nil, errNamespaceNotSet
}
ns, err := h.namespaceRegistry.GetNamespace(namespace.Name(nsName))
resp, err := client.DescribeNamespace(
ctx,
&workflowservice.DescribeNamespaceRequest{Namespace: nsName},
)
if err != nil {
return nil, serviceerror.NewUnavailable(
fmt.Sprintf(errUnableToGetNamespaceInfoMessage, nsName),
)
}
mapper := ns.CustomSearchAttributesMapper()

fieldToAliasMap := resp.Config.CustomSearchAttributeAliases
customSearchAttributes := make(map[string]enumspb.IndexedValueType)
for field, tp := range searchAttributes.Custom() {
if alias, err := mapper.GetAlias(field, nsName); err == nil {
if alias, ok := fieldToAliasMap[field]; ok {
customSearchAttributes[alias] = tp
}
}
Expand Down
1 change: 0 additions & 1 deletion service/frontend/operator_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (s *operatorHandlerSuite) SetupTest() {
s.mockResource.GetSearchAttributesManager(),
health.NewServer(),
s.mockResource.GetHistoryClient(),
s.mockResource.GetNamespaceRegistry(),
s.mockResource.GetClusterMetadataManager(),
s.mockResource.GetClusterMetadata(),
s.mockResource.GetClientFactory(),
Expand Down

0 comments on commit f1ef4db

Please sign in to comment.