Skip to content

Commit

Permalink
Validate search attributes values in queries to ES (#5036)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Validate search attribute values in the SQL query to ES visibility store

<!-- Tell your future self why have you made these changes -->
**Why?**
Better error handling for UX

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Updated unit tests.

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**
No.

<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
No.
  • Loading branch information
rodrigozhou committed Oct 30, 2023
1 parent 9567ac9 commit 65f25a4
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 62 deletions.
24 changes: 21 additions & 3 deletions common/persistence/visibility/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,22 @@ func newVisibilityStoreFromDataStoreConfig(
logger,
)
default:
visStore, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger)
visStore, err = newStandardVisibilityStore(
dsConfig,
persistenceResolver,
searchAttributesProvider,
searchAttributesMapperProvider,
logger,
)
}
} else if dsConfig.Cassandra != nil {
visStore, err = newStandardVisibilityStore(dsConfig, persistenceResolver, logger)
visStore, err = newStandardVisibilityStore(
dsConfig,
persistenceResolver,
searchAttributesProvider,
searchAttributesMapperProvider,
logger,
)
} else if dsConfig.Elasticsearch != nil {
visStore = newElasticsearchVisibilityStore(
dsConfig.Elasticsearch.GetVisibilityIndex(),
Expand All @@ -268,6 +280,8 @@ func newVisibilityStoreFromDataStoreConfig(
func newStandardVisibilityStore(
dsConfig config.DataStore,
persistenceResolver resolver.ServiceResolver,
searchAttributesProvider searchattribute.Provider,
searchAttributesMapperProvider searchattribute.MapperProvider,
logger log.Logger,
) (store.VisibilityStore, error) {
var (
Expand All @@ -294,7 +308,11 @@ func newStandardVisibilityStore(
logger.Fatal("invalid config: one of cassandra or sql params must be specified for visibility store")
return nil, nil
}
return standard.NewVisibilityStore(visStore), nil
return standard.NewVisibilityStore(
visStore,
searchAttributesProvider,
searchAttributesMapperProvider,
), nil
}

func newElasticsearchVisibilityStore(
Expand Down
156 changes: 120 additions & 36 deletions common/persistence/visibility/store/elasticsearch/query_interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package elasticsearch
import (
"errors"
"fmt"
"strconv"
"time"

enumspb "go.temporal.io/api/enums/v1"
Expand All @@ -45,25 +46,38 @@ type (
searchAttributesMapperProvider searchattribute.MapperProvider
seenNamespaceDivision bool
}
valuesInterceptor struct{}

valuesInterceptor struct {
namespace namespace.Name
searchAttributesTypeMap searchattribute.NameTypeMap
searchAttributesMapperProvider searchattribute.MapperProvider
}
)

func newNameInterceptor(
namespace namespace.Name,
namespaceName namespace.Name,
index string,
saTypeMap searchattribute.NameTypeMap,
searchAttributesMapperProvider searchattribute.MapperProvider,
) *nameInterceptor {
return &nameInterceptor{
namespace: namespace,
namespace: namespaceName,
index: index,
searchAttributesTypeMap: saTypeMap,
searchAttributesMapperProvider: searchAttributesMapperProvider,
}
}

func NewValuesInterceptor() *valuesInterceptor {
return &valuesInterceptor{}
func NewValuesInterceptor(
namespaceName namespace.Name,
saTypeMap searchattribute.NameTypeMap,
searchAttributesMapperProvider searchattribute.MapperProvider,
) *valuesInterceptor {
return &valuesInterceptor{
namespace: namespaceName,
searchAttributesTypeMap: saTypeMap,
searchAttributesMapperProvider: searchAttributesMapperProvider,
}
}

func (ni *nameInterceptor) Name(name string, usage query.FieldNameUsage) (string, error) {
Expand Down Expand Up @@ -111,41 +125,36 @@ func (ni *nameInterceptor) Name(name string, usage query.FieldNameUsage) (string
return fieldName, nil
}

func (vi *valuesInterceptor) Values(name string, values ...interface{}) ([]interface{}, error) {
var result []interface{}
for _, value := range values {
func (vi *valuesInterceptor) Values(fieldName string, values ...interface{}) ([]interface{}, error) {
fieldType, err := vi.searchAttributesTypeMap.GetType(fieldName)
if err != nil {
return nil, query.NewConverterError("invalid search attribute: %s", fieldName)
}

switch name {
case searchattribute.StartTime, searchattribute.CloseTime, searchattribute.ExecutionTime:
if nanos, isNumber := value.(int64); isNumber {
value = time.Unix(0, nanos).UTC().Format(time.RFC3339Nano)
}
case searchattribute.ExecutionStatus:
if status, isNumber := value.(int64); isNumber {
value = enumspb.WorkflowExecutionStatus_name[int32(status)]
}
case searchattribute.ExecutionDuration:
if durationStr, isString := value.(string); isString {
// To support durations passed as golang durations such as "300ms", "-1.5h" or "2h45m".
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
// Custom timestamp.ParseDuration also supports "d" as additional unit for days.
if duration, err := timestamp.ParseDuration(durationStr); err == nil {
value = duration.Nanoseconds()
} else {
// To support "hh:mm:ss" durations.
durationNanos, err := vi.parseHHMMSSDuration(durationStr)
var converterErr *query.ConverterError
if errors.As(err, &converterErr) {
return nil, converterErr
}
if err == nil {
value = durationNanos
}
}
name := fieldName
if searchattribute.IsMappable(fieldName) {
mapper, err := vi.searchAttributesMapperProvider.GetMapper(vi.namespace)
if err != nil {
return nil, err
}
if mapper != nil {
name, err = mapper.GetAlias(fieldName, vi.namespace.String())
if err != nil {
return nil, err
}
default:
}
}

var result []interface{}
for _, value := range values {
value, err = vi.parseSystemSearchAttributeValues(fieldName, value)
if err != nil {
return nil, err
}
value, err = validateValueType(name, value, fieldType)
if err != nil {
return nil, err
}
result = append(result, value)
}
return result, nil
Expand All @@ -169,3 +178,78 @@ func (vi *valuesInterceptor) parseHHMMSSDuration(d string) (int64, error) {

return hours*int64(time.Hour) + minutes*int64(time.Minute) + seconds*int64(time.Second) + nanos, nil
}

func (vi *valuesInterceptor) parseSystemSearchAttributeValues(name string, value any) (any, error) {
switch name {
case searchattribute.StartTime, searchattribute.CloseTime, searchattribute.ExecutionTime:
if nanos, isNumber := value.(int64); isNumber {
value = time.Unix(0, nanos).UTC().Format(time.RFC3339Nano)
}
case searchattribute.ExecutionStatus:
if status, isNumber := value.(int64); isNumber {
value = enumspb.WorkflowExecutionStatus_name[int32(status)]
}
case searchattribute.ExecutionDuration:
if durationStr, isString := value.(string); isString {
// To support durations passed as golang durations such as "300ms", "-1.5h" or "2h45m".
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
// Custom timestamp.ParseDuration also supports "d" as additional unit for days.
if duration, err := timestamp.ParseDuration(durationStr); err == nil {
value = duration.Nanoseconds()
} else {
// To support "hh:mm:ss" durations.
durationNanos, err := vi.parseHHMMSSDuration(durationStr)
var converterErr *query.ConverterError
if errors.As(err, &converterErr) {
return nil, converterErr
}
if err == nil {
value = durationNanos
}
}
}
default:
}
return value, nil
}

func validateValueType(name string, value any, fieldType enumspb.IndexedValueType) (any, error) {
switch fieldType {
case enumspb.INDEXED_VALUE_TYPE_INT, enumspb.INDEXED_VALUE_TYPE_DOUBLE:
switch v := value.(type) {
case int64, float64:
// nothing to do
case string:
// ES can do implicit casting if the value is numeric
if _, err := strconv.ParseFloat(v, 64); err != nil {
return nil, query.NewConverterError(
"invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value)
}
default:
return nil, query.NewConverterError(
"invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value)
}
case enumspb.INDEXED_VALUE_TYPE_BOOL:
switch value.(type) {
case bool:
// nothing to do
default:
return nil, query.NewConverterError(
"invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value)
}
case enumspb.INDEXED_VALUE_TYPE_DATETIME:
switch v := value.(type) {
case int64:
value = time.Unix(0, v).UTC().Format(time.RFC3339Nano)
case string:
if _, err := time.Parse(time.RFC3339Nano, v); err != nil {
return nil, query.NewConverterError(
"invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value)
}
default:
return nil, query.NewConverterError(
"invalid value for search attribute %s of type %s: %#v", name, fieldType.String(), value)
}
}
return value, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func (s *QueryInterceptorSuite) TearDownTest() {
}

func (s *QueryInterceptorSuite) TestTimeProcessFunc() {
vi := NewValuesInterceptor()
vi := NewValuesInterceptor(
"test-namespace",
searchattribute.TestNameTypeMap,
searchattribute.NewTestMapperProvider(nil),
)

cases := []struct {
key string
Expand All @@ -73,7 +77,7 @@ func (s *QueryInterceptorSuite) TestTimeProcessFunc() {
}{
{value: "2018-06-07T08:04:05.123456789Z", returnErr: false},
{value: "2018-06-07T15:04:05+07:00", returnErr: false},
{value: "some invalid time string", returnErr: false},
{value: "", returnErr: true},
{value: "should not be modified", returnErr: false},
}

Expand All @@ -90,7 +94,11 @@ func (s *QueryInterceptorSuite) TestTimeProcessFunc() {
}

func (s *QueryInterceptorSuite) TestStatusProcessFunc() {
vi := NewValuesInterceptor()
vi := NewValuesInterceptor(
"test-namespace",
searchattribute.TestNameTypeMap,
searchattribute.NewTestMapperProvider(nil),
)

cases := []struct {
key string
Expand Down Expand Up @@ -130,14 +138,18 @@ func (s *QueryInterceptorSuite) TestStatusProcessFunc() {
}

func (s *QueryInterceptorSuite) TestDurationProcessFunc() {
vi := NewValuesInterceptor()
vi := NewValuesInterceptor(
"test-namespace",
searchattribute.TestNameTypeMap,
searchattribute.NewTestMapperProvider(nil),
)

cases := []struct {
key string
value interface{}
}{
{key: searchattribute.ExecutionDuration, value: "1"},
{key: searchattribute.ExecutionDuration, value: 1},
{key: searchattribute.ExecutionDuration, value: int64(1)},
{key: searchattribute.ExecutionDuration, value: "5h3m"},
{key: searchattribute.ExecutionDuration, value: "00:00:01"},
{key: searchattribute.ExecutionDuration, value: "00:00:61"},
Expand All @@ -149,11 +161,11 @@ func (s *QueryInterceptorSuite) TestDurationProcessFunc() {
returnErr bool
}{
{value: "1", returnErr: false},
{value: 1, returnErr: false},
{value: int64(1), returnErr: false},
{value: int64(18180000000000), returnErr: false},
{value: int64(1000000000), returnErr: false},
{value: nil, returnErr: true},
{value: "bad value", returnErr: false},
{value: nil, returnErr: true},
{value: "should not be modified", returnErr: false},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,10 @@ func (s *visibilityStore) convertQuery(
return nil, serviceerror.NewUnavailable(fmt.Sprintf("Unable to read search attribute types: %v", err))
}
nameInterceptor := newNameInterceptor(namespace, s.index, saTypeMap, s.searchAttributesMapperProvider)
queryConverter := newQueryConverter(nameInterceptor, NewValuesInterceptor())
queryConverter := newQueryConverter(
nameInterceptor,
NewValuesInterceptor(namespace, saTypeMap, s.searchAttributesMapperProvider),
)
queryParams, err := queryConverter.ConvertWhereOrderBy(requestQueryStr)
if err != nil {
// Convert ConverterError to InvalidArgument and pass through all other errors (which should be only mapper errors).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,10 +700,9 @@ func (s *ESVisibilitySuite) Test_convertQuery() {

query = `ExecutionTime < "unable to parse"`
queryParams, err = s.visibilityStore.convertQuery(testNamespace, testNamespaceID, query)
// Wrong dates goes directly to Elasticsearch, and it returns an error.
s.NoError(err)
s.Equal(`{"bool":{"filter":[{"term":{"NamespaceId":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}},{"bool":{"filter":{"range":{"ExecutionTime":{"from":null,"include_lower":true,"include_upper":false,"to":"unable to parse"}}}}}],"must_not":{"exists":{"field":"TemporalNamespaceDivision"}}}}`, s.queryToJSON(queryParams.Query))
s.Nil(queryParams.Sorter)
s.Error(err)
s.IsType(&serviceerror.InvalidArgument{}, err)
s.Equal(err.Error(), "invalid query: unable to convert filter expression: unable to convert values of comparison expression: invalid value for search attribute ExecutionTime of type Datetime: \"unable to parse\"")

// invalid union injection
query = `WorkflowId = 'wid' union select * from dummy`
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/visibility/store/query/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ func (c *comparisonExprConverter) Convert(expr sqlparser.Expr) (elastic.Query, e
return query, nil
}

// convertComparisonExprValue returns a string, int64, float64, bool or
// a slice with each value of one of those types.
func convertComparisonExprValue(expr sqlparser.Expr) (interface{}, error) {
switch e := expr.(type) {
case *sqlparser.SQLVal:
Expand Down Expand Up @@ -551,6 +553,7 @@ func (n *notSupportedExprConverter) Convert(expr sqlparser.Expr) (elastic.Query,
return nil, NewConverterError("%s: expression of type %T", NotSupportedErrMessage, expr)
}

// ParseSqlValue returns a string, int64 or float64 if the parsing succeeds.
func ParseSqlValue(sqlValue string) (interface{}, error) {
if sqlValue == "" {
return "", nil
Expand Down
14 changes: 12 additions & 2 deletions common/persistence/visibility/store/standard/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"github.com/xwb1989/sqlparser"
enumspb "go.temporal.io/api/enums/v1"

"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
"go.temporal.io/server/common/persistence/visibility/store/query"
"go.temporal.io/server/common/searchattribute"
)

var allowedComparisonOperators = map[string]struct{}{
Expand All @@ -46,9 +48,17 @@ type (
}
)

func newQueryConverter() *converter {
func newQueryConverter(
namespaceName namespace.Name,
searchAttributesTypeMap searchattribute.NameTypeMap,
searchAttributesMapperProvider searchattribute.MapperProvider,
) *converter {
fnInterceptor := newNameInterceptor()
fvInterceptor := newValuesInterceptor()
fvInterceptor := newValuesInterceptor(
namespaceName,
searchAttributesTypeMap,
searchAttributesMapperProvider,
)

rangeCond := query.NewRangeCondConverter(fnInterceptor, fvInterceptor, false)
comparisonExpr := query.NewComparisonExprConverter(fnInterceptor, fvInterceptor, allowedComparisonOperators)
Expand Down
Loading

0 comments on commit 65f25a4

Please sign in to comment.