diff --git a/common/persistence/visibility/factory.go b/common/persistence/visibility/factory.go index 8ab1081ad79..e33d8389272 100644 --- a/common/persistence/visibility/factory.go +++ b/common/persistence/visibility/factory.go @@ -252,10 +252,24 @@ func newVisibilityStoreFromDataStoreConfig( logger, ) default: - visStore, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger, metricsHandler) + visStore, err = newStandardVisibilityStore( + dsConfig, + persistenceResolver, + searchAttributesProvider, + searchAttributesMapperProvider, + logger, + metricsHandler, + ) } } else if dsConfig.Cassandra != nil { - visStore, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger, metricsHandler) + visStore, err = newStandardVisibilityStore( + dsConfig, + persistenceResolver, + searchAttributesProvider, + searchAttributesMapperProvider, + logger, + metricsHandler, + ) } else if dsConfig.Elasticsearch != nil { visStore = newElasticsearchVisibilityStore( dsConfig.Elasticsearch.GetVisibilityIndex(), @@ -275,6 +289,8 @@ func newVisibilityStoreFromDataStoreConfig( func newStandardVisibilityStore( dsConfig config.DataStore, persistenceResolver resolver.ServiceResolver, + searchAttributesProvider searchattribute.Provider, + searchAttributesMapperProvider searchattribute.MapperProvider, logger log.Logger, metricsHandler metrics.Handler, ) (store.VisibilityStore, error) { @@ -303,7 +319,11 @@ func newStandardVisibilityStore( logger.Fatal("invalid config: one of cassandra or sql params must be specified for visibility store") return nil, nil } - return standard.NewVisibilityStore(visStore), nil + return standard.NewVisibilityStore( + visStore, + searchAttributesProvider, + searchAttributesMapperProvider, + ), nil } func newElasticsearchVisibilityStore( diff --git a/common/persistence/visibility/store/elasticsearch/query_interceptors.go b/common/persistence/visibility/store/elasticsearch/query_interceptors.go index a482d78ba58..f223094dc55 100644 --- a/common/persistence/visibility/store/elasticsearch/query_interceptors.go +++ b/common/persistence/visibility/store/elasticsearch/query_interceptors.go @@ -25,6 +25,7 @@ package elasticsearch import ( + "strconv" "time" enumspb "go.temporal.io/api/enums/v1" @@ -43,25 +44,38 @@ type ( searchAttributesMapperProvider searchattribute.MapperProvider seenNamespaceDivision bool } - valuesInterceptor struct{} + + valuesInterceptor struct { + namespace namespace.Name + searchAttributesTypeMap searchattribute.NameTypeMap + searchAttributesMapperProvider searchattribute.MapperProvider + } ) func newNameInterceptor( - namespace namespace.Name, + namespaceName namespace.Name, index string, saTypeMap searchattribute.NameTypeMap, searchAttributesMapperProvider searchattribute.MapperProvider, ) *nameInterceptor { return &nameInterceptor{ - namespace: namespace, + namespace: namespaceName, index: index, searchAttributesTypeMap: saTypeMap, searchAttributesMapperProvider: searchAttributesMapperProvider, } } -func NewValuesInterceptor() *valuesInterceptor { - return &valuesInterceptor{} +func NewValuesInterceptor( + namespaceName namespace.Name, + saTypeMap searchattribute.NameTypeMap, + searchAttributesMapperProvider searchattribute.MapperProvider, +) *valuesInterceptor { + return &valuesInterceptor{ + namespace: namespaceName, + searchAttributesTypeMap: saTypeMap, + searchAttributesMapperProvider: searchAttributesMapperProvider, + } } func (ni *nameInterceptor) Name(name string, usage query.FieldNameUsage) (string, error) { @@ -109,39 +123,109 @@ func (ni *nameInterceptor) Name(name string, usage query.FieldNameUsage) (string return fieldName, nil } -func (vi *valuesInterceptor) Values(name string, values ...interface{}) ([]interface{}, error) { +func (vi *valuesInterceptor) Values(fieldName string, values ...interface{}) ([]interface{}, error) { + fieldType, err := vi.searchAttributesTypeMap.GetType(fieldName) + if err != nil { + return nil, query.NewConverterError("invalid search attribute: %s", fieldName) + } + + name := fieldName + if searchattribute.IsMappable(fieldName) { + mapper, err := vi.searchAttributesMapperProvider.GetMapper(vi.namespace) + if err != nil { + return nil, err + } + if mapper != nil { + name, err = mapper.GetAlias(fieldName, vi.namespace.String()) + if err != nil { + return nil, err + } + } + } + var result []interface{} for _, value := range values { + value, err = parseSystemSearchAttributeValues(fieldName, value) + if err != nil { + return nil, err + } + value, err = validateValueType(name, value, fieldType) + if err != nil { + return nil, err + } + result = append(result, value) + } + return result, nil +} - switch name { - case searchattribute.StartTime, searchattribute.CloseTime, searchattribute.ExecutionTime: - if nanos, isNumber := value.(int64); isNumber { - value = time.Unix(0, nanos).UTC().Format(time.RFC3339Nano) +func parseSystemSearchAttributeValues(name string, value any) (any, error) { + switch name { + case searchattribute.StartTime, searchattribute.CloseTime, searchattribute.ExecutionTime: + if nanos, isNumber := value.(int64); isNumber { + value = time.Unix(0, nanos).UTC().Format(time.RFC3339Nano) + } + case searchattribute.ExecutionStatus: + if status, isNumber := value.(int64); isNumber { + value = enumspb.WorkflowExecutionStatus_name[int32(status)] + } + case searchattribute.ExecutionDuration: + if durationStr, isString := value.(string); isString { + // To support durations passed as golang durations such as "300ms", "-1.5h" or "2h45m". + // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + // Custom timestamp.ParseDuration also supports "d" as additional unit for days. + if duration, err := timestamp.ParseDuration(durationStr); err == nil { + value = duration.Nanoseconds() + } else { + // To support "hh:mm:ss" durations. + duration, err := timestamp.ParseHHMMSSDuration(durationStr) + if err != nil { + return nil, err + } + value = duration.Nanoseconds() } - case searchattribute.ExecutionStatus: - if status, isNumber := value.(int64); isNumber { - value = enumspb.WorkflowExecutionStatus_name[int32(status)] + } + default: + } + return value, nil +} + +func validateValueType(name string, value any, fieldType enumspb.IndexedValueType) (any, error) { + switch fieldType { + case enumspb.INDEXED_VALUE_TYPE_INT, enumspb.INDEXED_VALUE_TYPE_DOUBLE: + switch v := value.(type) { + case int64, float64: + // nothing to do + case string: + // ES can do implicit casting if the value is numeric + if _, err := strconv.ParseFloat(v, 64); err != nil { + return nil, query.NewConverterError( + "invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value) } - case searchattribute.ExecutionDuration: - if durationStr, isString := value.(string); isString { - // To support durations passed as golang durations such as "300ms", "-1.5h" or "2h45m". - // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". - // Custom timestamp.ParseDuration also supports "d" as additional unit for days. - if duration, err := timestamp.ParseDuration(durationStr); err == nil { - value = duration.Nanoseconds() - } else { - // To support "hh:mm:ss" durations. - duration, err := timestamp.ParseHHMMSSDuration(durationStr) - if err != nil { - return nil, err - } - value = duration.Nanoseconds() - } + default: + return nil, query.NewConverterError( + "invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value) + } + case enumspb.INDEXED_VALUE_TYPE_BOOL: + switch value.(type) { + case bool: + // nothing to do + default: + return nil, query.NewConverterError( + "invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value) + } + case enumspb.INDEXED_VALUE_TYPE_DATETIME: + switch v := value.(type) { + case int64: + value = time.Unix(0, v).UTC().Format(time.RFC3339Nano) + case string: + if _, err := time.Parse(time.RFC3339Nano, v); err != nil { + return nil, query.NewConverterError( + "invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value) } default: + return nil, query.NewConverterError( + "invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value) } - - result = append(result, value) } - return result, nil + return value, nil } diff --git a/common/persistence/visibility/store/elasticsearch/query_interceptors_test.go b/common/persistence/visibility/store/elasticsearch/query_interceptors_test.go index 7cb856d23b5..f401ac5179e 100644 --- a/common/persistence/visibility/store/elasticsearch/query_interceptors_test.go +++ b/common/persistence/visibility/store/elasticsearch/query_interceptors_test.go @@ -56,7 +56,11 @@ func (s *QueryInterceptorSuite) TearDownTest() { } func (s *QueryInterceptorSuite) TestTimeProcessFunc() { - vi := NewValuesInterceptor() + vi := NewValuesInterceptor( + "test-namespace", + searchattribute.TestNameTypeMap, + searchattribute.NewTestMapperProvider(nil), + ) cases := []struct { key string @@ -73,7 +77,7 @@ func (s *QueryInterceptorSuite) TestTimeProcessFunc() { }{ {value: "2018-06-07T08:04:05.123456789Z", returnErr: false}, {value: "2018-06-07T15:04:05+07:00", returnErr: false}, - {value: "some invalid time string", returnErr: false}, + {value: "", returnErr: true}, {value: "should not be modified", returnErr: false}, } @@ -90,7 +94,11 @@ func (s *QueryInterceptorSuite) TestTimeProcessFunc() { } func (s *QueryInterceptorSuite) TestStatusProcessFunc() { - vi := NewValuesInterceptor() + vi := NewValuesInterceptor( + "test-namespace", + searchattribute.TestNameTypeMap, + searchattribute.NewTestMapperProvider(nil), + ) cases := []struct { key string @@ -130,14 +138,18 @@ func (s *QueryInterceptorSuite) TestStatusProcessFunc() { } func (s *QueryInterceptorSuite) TestDurationProcessFunc() { - vi := NewValuesInterceptor() + vi := NewValuesInterceptor( + "test-namespace", + searchattribute.TestNameTypeMap, + searchattribute.NewTestMapperProvider(nil), + ) cases := []struct { key string value interface{} }{ {key: searchattribute.ExecutionDuration, value: "1"}, - {key: searchattribute.ExecutionDuration, value: 1}, + {key: searchattribute.ExecutionDuration, value: int64(1)}, {key: searchattribute.ExecutionDuration, value: "5h3m"}, {key: searchattribute.ExecutionDuration, value: "00:00:01"}, {key: searchattribute.ExecutionDuration, value: "00:00:61"}, @@ -149,7 +161,7 @@ func (s *QueryInterceptorSuite) TestDurationProcessFunc() { returnErr bool }{ {value: nil, returnErr: true}, - {value: 1, returnErr: false}, + {value: int64(1), returnErr: false}, {value: int64(18180000000000), returnErr: false}, {value: int64(1000000000), returnErr: false}, {value: nil, returnErr: true}, diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index e1c0682dd65..173224826f3 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -886,7 +886,10 @@ func (s *visibilityStore) convertQuery( return nil, serviceerror.NewUnavailable(fmt.Sprintf("Unable to read search attribute types: %v", err)) } nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapperProvider) - queryConverter := newQueryConverter(nameInterceptor, NewValuesInterceptor()) + queryConverter := newQueryConverter( + nameInterceptor, + NewValuesInterceptor(namespace, saTypeMap, s.searchAttributesMapperProvider), + ) queryParams, err := queryConverter.ConvertWhereOrderBy(requestQueryStr) if err != nil { // Convert ConverterError to InvalidArgument and pass through all other errors (which should be only mapper errors). diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go b/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go index 0b933e1d3bb..4a25895fead 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go @@ -700,10 +700,9 @@ func (s *ESVisibilitySuite) Test_convertQuery() { query = `ExecutionTime < "unable to parse"` queryParams, err = s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query) - // Wrong dates goes directly to Elasticsearch, and it returns an error. - s.NoError(err) - s.Equal(`{"bool":{"filter":[{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},{"bool":{"filter":{"range":{"ExecutionTime":{"from":null,"include_lower":true,"include_upper":false,"to":"unable to parse"}}}}}],"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query)) - s.Nil(queryParams.Sorter) + s.Error(err) + s.IsType(&serviceerror.InvalidArgument{}, err) + s.Equal(err.Error(), "invalid query: unable to convert filter expression: unable to convert values of comparison expression: invalid value for search attribute ExecutionTime of type Datetime: \"unable to parse\"") // invalid union injection query = `WorkflowId = 'wid' union select * from dummy` diff --git a/common/persistence/visibility/store/query/converter.go b/common/persistence/visibility/store/query/converter.go index 14951c8c511..f1852564989 100644 --- a/common/persistence/visibility/store/query/converter.go +++ b/common/persistence/visibility/store/query/converter.go @@ -502,6 +502,8 @@ func (c *comparisonExprConverter) Convert(expr sqlparser.Expr) (elastic.Query, e return query, nil } +// convertComparisonExprValue returns a string, int64, float64, bool or +// a slice with each value of one of those types. func convertComparisonExprValue(expr sqlparser.Expr) (interface{}, error) { switch e := expr.(type) { case *sqlparser.SQLVal: @@ -551,6 +553,7 @@ func (n *notSupportedExprConverter) Convert(expr sqlparser.Expr) (elastic.Query, return nil, NewConverterError("%s: expression of type %T", NotSupportedErrMessage, expr) } +// ParseSqlValue returns a string, int64 or float64 if the parsing succeeds. func ParseSqlValue(sqlValue string) (interface{}, error) { if sqlValue == "" { return "", nil diff --git a/common/persistence/visibility/store/standard/converter.go b/common/persistence/visibility/store/standard/converter.go index b2260b26679..50131f92734 100644 --- a/common/persistence/visibility/store/standard/converter.go +++ b/common/persistence/visibility/store/standard/converter.go @@ -31,8 +31,10 @@ import ( "github.com/xwb1989/sqlparser" enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/sql/sqlplugin" "go.temporal.io/server/common/persistence/visibility/store/query" + "go.temporal.io/server/common/searchattribute" ) var allowedComparisonOperators = map[string]struct{}{ @@ -46,9 +48,17 @@ type ( } ) -func newQueryConverter() *converter { +func newQueryConverter( + namespaceName namespace.Name, + searchAttributesTypeMap searchattribute.NameTypeMap, + searchAttributesMapperProvider searchattribute.MapperProvider, +) *converter { fnInterceptor := newNameInterceptor() - fvInterceptor := newValuesInterceptor() + fvInterceptor := newValuesInterceptor( + namespaceName, + searchAttributesTypeMap, + searchAttributesMapperProvider, + ) rangeCond := query.NewRangeCondConverter(fnInterceptor, fvInterceptor, false) comparisonExpr := query.NewComparisonExprConverter(fnInterceptor, fvInterceptor, allowedComparisonOperators) diff --git a/common/persistence/visibility/store/standard/converter_test.go b/common/persistence/visibility/store/standard/converter_test.go index 87f1e082f01..45b3d7374da 100644 --- a/common/persistence/visibility/store/standard/converter_test.go +++ b/common/persistence/visibility/store/standard/converter_test.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/common/convert" "go.temporal.io/server/common/persistence/sql/sqlplugin" + "go.temporal.io/server/common/searchattribute" ) var startTimeFrom = time.Now().Add(-time.Hour) @@ -67,7 +68,7 @@ var unsupportedQuery = []string{ func TestSupportedQueryFilters(t *testing.T) { for query, expectedFilter := range supportedQuery { - converter := newQueryConverter() + converter := newQueryConverter("test-namespace", searchattribute.TestNameTypeMap, nil) filter, err := converter.GetFilter(query) assert.NoError(t, err) @@ -93,7 +94,7 @@ func TestSupportedQueryFilters(t *testing.T) { func TestUnsupportedQueryFilters(t *testing.T) { for _, query := range unsupportedQuery { - converter := newQueryConverter() + converter := newQueryConverter("test-namespace", searchattribute.TestNameTypeMap, nil) _, err := converter.GetFilter(query) assert.Error(t, err) } diff --git a/common/persistence/visibility/store/standard/query_interceptors.go b/common/persistence/visibility/store/standard/query_interceptors.go index 7e75a3f1a00..21a80731827 100644 --- a/common/persistence/visibility/store/standard/query_interceptors.go +++ b/common/persistence/visibility/store/standard/query_interceptors.go @@ -27,6 +27,7 @@ package standard import ( "time" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/searchattribute" "go.temporal.io/api/enums/v1" @@ -56,10 +57,18 @@ func newNameInterceptor() *nameInterceptor { return &nameInterceptor{} } -func newValuesInterceptor() *valuesInterceptor { +func newValuesInterceptor( + namespaceName namespace.Name, + searchAttributesTypeMap searchattribute.NameTypeMap, + searchAttributesMapperProvider searchattribute.MapperProvider, +) *valuesInterceptor { return &valuesInterceptor{ - filter: &sqlplugin.VisibilitySelectFilter{}, - nextInterceptor: elasticsearch.NewValuesInterceptor(), + filter: &sqlplugin.VisibilitySelectFilter{}, + nextInterceptor: elasticsearch.NewValuesInterceptor( + namespaceName, + searchAttributesTypeMap, + searchAttributesMapperProvider, + ), } } diff --git a/common/persistence/visibility/store/standard/visibility_store.go b/common/persistence/visibility/store/standard/visibility_store.go index 8169dc5dd2b..a81df5680c2 100644 --- a/common/persistence/visibility/store/standard/visibility_store.go +++ b/common/persistence/visibility/store/standard/visibility_store.go @@ -34,11 +34,14 @@ import ( "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/persistence/visibility/store" + "go.temporal.io/server/common/searchattribute" ) type ( standardStore struct { - store store.VisibilityStore + store store.VisibilityStore + searchAttributesProvider searchattribute.Provider + searchAttributesMapperProvider searchattribute.MapperProvider } // We wrap the token with a boolean to indicate if it is from list open workflows or list closed workflows, @@ -59,9 +62,15 @@ type ( var _ store.VisibilityStore = (*standardStore)(nil) var _ listRequest = (*manager.ListWorkflowExecutionsRequest)(nil) -func NewVisibilityStore(store store.VisibilityStore) store.VisibilityStore { +func NewVisibilityStore( + visibilityStore store.VisibilityStore, + searchAttributesProvider searchattribute.Provider, + searchAttributesMapperProvider searchattribute.MapperProvider, +) store.VisibilityStore { return &standardStore{ - store: store, + store: visibilityStore, + searchAttributesProvider: searchAttributesProvider, + searchAttributesMapperProvider: searchAttributesMapperProvider, } } @@ -187,7 +196,12 @@ func (s *standardStore) ListWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2, ) (*store.InternalListWorkflowExecutionsResponse, error) { - converter := newQueryConverter() + typeMap, err := s.searchAttributesProvider.GetSearchAttributes(s.GetIndexName(), false) + if err != nil { + return nil, err + } + + converter := newQueryConverter(request.Namespace, typeMap, s.searchAttributesMapperProvider) filter, err := converter.GetFilter(request.Query) if err != nil { return nil, err