Skip to content

Commit

Permalink
Fail workflow if search attributes size exceed limit (#1439)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 7, 2021
1 parent a1a4de2 commit 6d17915
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 58 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Expand Up @@ -1651,6 +1651,7 @@ const (
HistorySize
HistoryCount
EventBlobSize
SearchAttributesSize

ArchivalConfigFailures

Expand Down Expand Up @@ -2061,6 +2062,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
HistorySize: {metricName: "history_size", metricType: Timer},
HistoryCount: {metricName: "history_count", metricType: Timer},
EventBlobSize: {metricName: "event_blob_size", metricType: Timer},
SearchAttributesSize: {metricName: "search_attributes_size", metricType: Timer},
ArchivalConfigFailures: {metricName: "archivalconfig_failures", metricType: Counter},
ElasticsearchRequests: {metricName: "elasticsearch_requests", metricType: Counter},
ElasticsearchFailures: {metricName: "elasticsearch_errors", metricType: Counter},
Expand Down
54 changes: 32 additions & 22 deletions common/searchattribute/validator.go
Expand Up @@ -25,6 +25,7 @@
package searchattribute

import (
"errors"
"fmt"

commonpb "go.temporal.io/api/common/v1"
Expand All @@ -36,15 +37,21 @@ import (
"go.temporal.io/server/common/payload"
)

// Validator is used to validate search attributes
type Validator struct {
logger log.Logger
type (
// Validator is used to validate search attributes
Validator struct {
logger log.Logger

validSearchAttributes dynamicconfig.MapPropertyFn
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
}
validSearchAttributes dynamicconfig.MapPropertyFn
searchAttributesNumberOfKeysLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesSizeOfValueLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
searchAttributesTotalSizeLimit dynamicconfig.IntPropertyFnWithNamespaceFilter
}
)

var (
ErrExceedSizeLimit = errors.New("exceeds size limit")
)

// NewValidator create Validator
func NewValidator(
Expand All @@ -67,31 +74,28 @@ func NewValidator(
func (v *Validator) ValidateAndLog(searchAttributes *commonpb.SearchAttributes, namespace string) error {
err := v.Validate(searchAttributes, namespace)
if err != nil {
v.logger.Error("Error while validating search attributes.", tag.Error(err), tag.WorkflowNamespace(namespace))
v.logger.Warn("Search attributes are invalid.", tag.Error(err), tag.WorkflowNamespace(namespace))
}
return err
}

// Validate validate search attributes are valid for writing and not exceed limits
// Validate validate search attributes are valid for writing.
func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namespace string) error {
if searchAttributes == nil {
return nil
}

// verify: number of keys <= limit
lengthOfFields := len(searchAttributes.GetIndexedFields())
if lengthOfFields > v.searchAttributesNumberOfKeysLimit(namespace) {
return serviceerror.NewInvalidArgument(fmt.Sprintf("number of search attributes %d exceeds limit %d", lengthOfFields, v.searchAttributesNumberOfKeysLimit(namespace)))
}

totalSize := 0
typeMap, err := BuildTypeMap(v.validSearchAttributes)
if err != nil {
return serviceerror.NewInvalidArgument(fmt.Sprintf("unable to parse search attributes from config: %v", err))
}

for saName, saPayload := range searchAttributes.GetIndexedFields() {
// verify: saName is whitelisted
saType, err := GetType(saName, typeMap)
if err != nil {
return serviceerror.NewInvalidArgument(fmt.Sprintf("%s is not a valid search attribute name", saName))
Expand All @@ -105,21 +109,27 @@ func (v *Validator) Validate(searchAttributes *commonpb.SearchAttributes, namesp
return serviceerror.NewInvalidArgument(fmt.Sprintf("%v is not a valid value for search attribute %s", invalidValue, saName))
}

// verify: saName is not system reserved
if IsReservedField(saName) {
return serviceerror.NewInvalidArgument(fmt.Sprintf("%s is Temporal reserved field name", saName))
}
// verify: size of single value <= limit
dataSize := len(saPayload.GetData())
if dataSize > v.searchAttributesSizeOfValueLimit(namespace) {
return serviceerror.NewInvalidArgument(fmt.Sprintf("search attribute %s exceeds size limit %d", saName, v.searchAttributesSizeOfValueLimit(namespace)))
}
return nil
}

// Validate validate search attributes are valid for writing and not exceed limits
func (v *Validator) ValidateSize(searchAttributes *commonpb.SearchAttributes, namespace string) error {
if searchAttributes == nil {
return nil
}

for saName, saPayload := range searchAttributes.GetIndexedFields() {
if len(saPayload.GetData()) > v.searchAttributesSizeOfValueLimit(namespace) {
return fmt.Errorf("search attribute %s value of size %d: %w %d", saName, len(saPayload.GetData()), ErrExceedSizeLimit, v.searchAttributesSizeOfValueLimit(namespace))
}
totalSize += len(saName) + dataSize
}

// verify: total size <= limit
if totalSize > v.searchAttributesTotalSizeLimit(namespace) {
return serviceerror.NewInvalidArgument(fmt.Sprintf("total search attributes size %d exceeds limit %d", totalSize, v.searchAttributesTotalSizeLimit(namespace)))
if searchAttributes.Size() > v.searchAttributesTotalSizeLimit(namespace) {
return fmt.Errorf("total size of search attributes %d: %w %d", searchAttributes.Size(), ErrExceedSizeLimit, v.searchAttributesTotalSizeLimit(namespace))
}

return nil
Expand Down
52 changes: 38 additions & 14 deletions common/searchattribute/validator_test.go
Expand Up @@ -44,12 +44,12 @@ func TestSearchAttributesValidatorSuite(t *testing.T) {
suite.Run(t, s)
}

func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {
func (s *searchAttributesValidatorSuite) TestSearchAttributesValidate() {
numOfKeysLimit := 2
sizeOfValueLimit := 5
sizeOfTotalLimit := 20

validator := NewValidator(log.NewNoopLogger(),
saValidator := NewValidator(log.NewNoopLogger(),
dynamicconfig.GetMapPropertyFn(GetDefaultTypeMap()),
dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit),
Expand All @@ -58,7 +58,7 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {
namespace := "namespace"
var attr *commonpb.SearchAttributes

err := validator.Validate(attr, namespace)
err := saValidator.Validate(attr, namespace)
s.Nil(err)

intPayload, err := payload.Encode(1)
Expand All @@ -69,7 +69,7 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {
attr = &commonpb.SearchAttributes{
IndexedFields: fields,
}
err = validator.Validate(attr, namespace)
err = saValidator.Validate(attr, namespace)
s.Nil(err)

fields = map[string]*commonpb.Payload{
Expand All @@ -78,22 +78,25 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {
"CustomBoolField": payload.EncodeString("true"),
}
attr.IndexedFields = fields
err = validator.Validate(attr, namespace)
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("number of search attributes 3 exceeds limit 2", err.Error())

fields = map[string]*commonpb.Payload{
"InvalidKey": payload.EncodeString("1"),
}
attr.IndexedFields = fields
err = validator.Validate(attr, namespace)
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("InvalidKey is not a valid search attribute name", err.Error())

fields = map[string]*commonpb.Payload{
"CustomStringField": payload.EncodeString("1"),
"CustomBoolField": payload.EncodeString("123"),
}
attr.IndexedFields = fields
err = validator.Validate(attr, namespace)
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("123 is not a valid value for search attribute CustomBoolField", err.Error())

intArrayPayload, err := payload.Encode([]int{1, 2})
Expand All @@ -102,28 +105,49 @@ func (s *searchAttributesValidatorSuite) TestValidateSearchAttributes() {
"CustomIntField": intArrayPayload,
}
attr.IndexedFields = fields
err = validator.Validate(attr, namespace)
err = saValidator.Validate(attr, namespace)
s.NoError(err)

fields = map[string]*commonpb.Payload{
"StartTime": intPayload,
}
attr.IndexedFields = fields
err = validator.Validate(attr, namespace)
err = saValidator.Validate(attr, namespace)
s.Error(err)
s.Equal("StartTime is Temporal reserved field name", err.Error())
}

fields = map[string]*commonpb.Payload{
func (s *searchAttributesValidatorSuite) TestSearchAttributesValidateSize() {
numOfKeysLimit := 2
sizeOfValueLimit := 5
sizeOfTotalLimit := 20

saValidator := NewValidator(log.NewNoopLogger(),
dynamicconfig.GetMapPropertyFn(GetDefaultTypeMap()),
dynamicconfig.GetIntPropertyFilteredByNamespace(numOfKeysLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfValueLimit),
dynamicconfig.GetIntPropertyFilteredByNamespace(sizeOfTotalLimit))

namespace := "namespace"

fields := map[string]*commonpb.Payload{
"CustomKeywordField": payload.EncodeString("123456"),
}
attr := &commonpb.SearchAttributes{
IndexedFields: fields,
}

attr.IndexedFields = fields
err = validator.Validate(attr, namespace)
s.Equal("search attribute CustomKeywordField exceeds size limit 5", err.Error())
err := saValidator.ValidateSize(attr, namespace)
s.Error(err)
s.Equal("search attribute CustomKeywordField value of size 8: exceeds size limit 5", err.Error())

fields = map[string]*commonpb.Payload{
"CustomKeywordField": payload.EncodeString("123"),
"CustomStringField": payload.EncodeString("12"),
}
attr.IndexedFields = fields
err = validator.Validate(attr, namespace)
s.Equal("total search attributes size 44 exceeds limit 20", err.Error())
err = saValidator.ValidateSize(attr, namespace)
s.Error(err)
s.Equal("total size of search attributes 108: exceeds size limit 20", err.Error())
}
1 change: 1 addition & 0 deletions common/util.go
Expand Up @@ -552,6 +552,7 @@ func CheckEventBlobSizeLimit(
logger log.Logger,
blobSizeViolationOperationTag tag.ZapTag,
) error {

scope.RecordDistribution(metrics.EventBlobSize, actualSize)

if actualSize > warnLimit {
Expand Down
67 changes: 46 additions & 21 deletions service/history/commandChecker.go
Expand Up @@ -71,11 +71,12 @@ type (
historyCountLimitWarn int
historyCountLimitError int

completedID int64
mutableState mutableState
executionStats *persistencespb.ExecutionStats
metricsScope metrics.Scope
logger log.Logger
completedID int64
mutableState mutableState
searchAttributesValidator *searchattribute.Validator
executionStats *persistencespb.ExecutionStats
metricsScope metrics.Scope
logger log.Logger
}
)

Expand Down Expand Up @@ -108,22 +109,24 @@ func newWorkflowSizeChecker(
historyCountLimitError int,
completedID int64,
mutableState mutableState,
searchAttributesValidator *searchattribute.Validator,
executionStats *persistencespb.ExecutionStats,
metricsScope metrics.Scope,
logger log.Logger,
) *workflowSizeChecker {
return &workflowSizeChecker{
blobSizeLimitWarn: blobSizeLimitWarn,
blobSizeLimitError: blobSizeLimitError,
historySizeLimitWarn: historySizeLimitWarn,
historySizeLimitError: historySizeLimitError,
historyCountLimitWarn: historyCountLimitWarn,
historyCountLimitError: historyCountLimitError,
completedID: completedID,
mutableState: mutableState,
executionStats: executionStats,
metricsScope: metricsScope,
logger: logger,
blobSizeLimitWarn: blobSizeLimitWarn,
blobSizeLimitError: blobSizeLimitError,
historySizeLimitWarn: historySizeLimitWarn,
historySizeLimitError: historySizeLimitError,
historyCountLimitWarn: historyCountLimitWarn,
historyCountLimitError: historyCountLimitError,
completedID: completedID,
mutableState: mutableState,
searchAttributesValidator: searchAttributesValidator,
executionStats: executionStats,
metricsScope: metricsScope,
logger: logger,
}
}

Expand Down Expand Up @@ -161,6 +164,31 @@ func (c *workflowSizeChecker) failWorkflowIfPayloadSizeExceedsLimit(
return true, nil
}

func (c *workflowSizeChecker) failWorkflowIfSearchAttributesSizeExceedsLimit(
searchAttributes *commonpb.SearchAttributes,
namespace string,
commandTypeTag metrics.Tag,
) (bool, error) {
c.metricsScope.Tagged(commandTypeTag).RecordDistribution(metrics.SearchAttributesSize, searchAttributes.Size())

err := c.searchAttributesValidator.ValidateSize(searchAttributes, namespace)
if err == nil {
return false, nil
}

c.logger.Warn("Search attributes size exceeds limits. Fail workflow.", tag.Error(err), tag.WorkflowNamespace(namespace))

attributes := &commandpb.FailWorkflowExecutionCommandAttributes{
Failure: failure.NewServerFailure(err.Error(), true),
}

if _, err := c.mutableState.AddFailWorkflowEvent(c.completedID, enumspb.RETRY_STATE_NON_RETRYABLE_FAILURE, attributes); err != nil {
return false, err
}

return true, nil
}

func (v *commandAttrValidator) validateActivityScheduleAttributes(
namespaceID string,
targetNamespaceID string,
Expand Down Expand Up @@ -457,6 +485,7 @@ func (v *commandAttrValidator) validateUpsertWorkflowSearchAttributes(
}

func (v *commandAttrValidator) validateContinueAsNewWorkflowExecutionAttributes(
namespace string,
attributes *commandpb.ContinueAsNewWorkflowExecutionCommandAttributes,
executionInfo *persistencespb.WorkflowExecutionInfo,
) error {
Expand Down Expand Up @@ -501,11 +530,7 @@ func (v *commandAttrValidator) validateContinueAsNewWorkflowExecutionAttributes(
attributes.WorkflowTaskTimeout = timestamp.DurationPtr(timestamp.DurationValue(executionInfo.DefaultWorkflowTaskTimeout))
}

namespaceEntry, err := v.namespaceCache.GetNamespaceByID(executionInfo.NamespaceId)
if err != nil {
return err
}
return v.searchAttributesValidator.ValidateAndLog(attributes.GetSearchAttributes(), namespaceEntry.GetInfo().Name)
return v.searchAttributesValidator.ValidateAndLog(attributes.GetSearchAttributes(), namespace)
}

func (v *commandAttrValidator) validateStartChildExecutionAttributes(
Expand Down
7 changes: 6 additions & 1 deletion service/history/historyEngine.go
Expand Up @@ -2564,7 +2564,12 @@ func (e *historyEngineImpl) validateStartWorkflowExecutionRequest(
if err := common.ValidateRetryPolicy(request.RetryPolicy); err != nil {
return err
}
if err := e.searchAttributesValidator.ValidateAndLog(request.SearchAttributes, namespace); err != nil {
if err := e.searchAttributesValidator.Validate(request.SearchAttributes, namespace); err != nil {
e.logger.Warn("Search attributes are invalid.", tag.Error(err), tag.WorkflowNamespace(namespace))
return err
}
if err := e.searchAttributesValidator.ValidateSize(request.SearchAttributes, namespace); err != nil {
e.logger.Warn("Search attributes are invalid.", tag.Error(err), tag.WorkflowNamespace(namespace))
return err
}

Expand Down

0 comments on commit 6d17915

Please sign in to comment.