Skip to content

Commit

Permalink
Rewrite ES sorting to not use deprecated library either
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 17, 2024
1 parent 77c86a7 commit e53fd14
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 101 deletions.
155 changes: 78 additions & 77 deletions contactql/es/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/contactql"
"github.com/nyaruka/goflow/envs"
esq "github.com/nyaruka/goflow/utils/elastic"
)

// AssetMapper is used to map engine assets to however ES identifies them
Expand Down Expand Up @@ -51,10 +52,10 @@ func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper A
}

if combination.Operator() == contactql.BoolOperatorAnd {
return All(queries...)
return esq.All(queries...)
}

return Any(queries...)
return esq.Any(queries...)
}

func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any {
Expand All @@ -73,15 +74,15 @@ func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMa
func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) map[string]any {
field := resolver.ResolveField(c.PropertyKey())
fieldType := field.Type()
fieldQuery := Term("fields.field", field.UUID())
fieldQuery := esq.Term("fields.field", field.UUID())

// special cases for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && c.Value() == "" {
query := Nested("fields", All(fieldQuery, Exists("fields."+string(fieldType))))
query := esq.Nested("fields", esq.All(fieldQuery, esq.Exists("fields."+string(fieldType))))

// if we are looking for unset, inverse our query
if c.Operator() == contactql.OpEqual {
query = Not(query)
query = esq.Not(query)
}
return query
}
Expand All @@ -91,10 +92,10 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac

switch c.Operator() {
case contactql.OpEqual:
return Nested("fields", All(fieldQuery, Term("fields.text", value)))
return esq.Nested("fields", esq.All(fieldQuery, esq.Term("fields.text", value)))
case contactql.OpNotEqual:
query := All(fieldQuery, Term("fields.text", value), Exists("fields.text"))
return Not(Nested("fields", query))
query := esq.All(fieldQuery, esq.Term("fields.text", value), esq.Exists("fields.text"))
return esq.Not(esq.Nested("fields", query))
default:
panic(fmt.Sprintf("unsupported text field operator: %s", c.Operator()))
}
Expand All @@ -105,26 +106,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac

switch c.Operator() {
case contactql.OpEqual:
query = Match("fields.number", value)
query = esq.Match("fields.number", value)
case contactql.OpNotEqual:
return Not(
Nested("fields",
All(fieldQuery, Match("fields.number", value)),
return esq.Not(
esq.Nested("fields",
esq.All(fieldQuery, esq.Match("fields.number", value)),
),
)
case contactql.OpGreaterThan:
query = GreaterThan("fields.number", value)
query = esq.GreaterThan("fields.number", value)
case contactql.OpGreaterThanOrEqual:
query = GreaterThanOrEqual("fields.number", value)
query = esq.GreaterThanOrEqual("fields.number", value)
case contactql.OpLessThan:
query = LessThan("fields.number", value)
query = esq.LessThan("fields.number", value)
case contactql.OpLessThanOrEqual:
query = LessThanOrEqual("fields.number", value)
query = esq.LessThanOrEqual("fields.number", value)
default:
panic(fmt.Sprintf("unsupported number field operator: %s", c.Operator()))
}

return Nested("fields", All(fieldQuery, query))
return esq.Nested("fields", esq.All(fieldQuery, query))

} else if fieldType == assets.FieldTypeDatetime {
value, _ := c.ValueAsDate(env)
Expand All @@ -133,38 +134,38 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac

switch c.Operator() {
case contactql.OpEqual:
query = Between("fields.datetime", start, end)
query = esq.Between("fields.datetime", start, end)
case contactql.OpNotEqual:
return Not(
Nested("fields",
All(fieldQuery, Between("fields.datetime", start, end)),
return esq.Not(
esq.Nested("fields",
esq.All(fieldQuery, esq.Between("fields.datetime", start, end)),
),
)
case contactql.OpGreaterThan:
query = GreaterThanOrEqual("fields.datetime", end)
query = esq.GreaterThanOrEqual("fields.datetime", end)
case contactql.OpGreaterThanOrEqual:
query = GreaterThanOrEqual("fields.datetime", start)
query = esq.GreaterThanOrEqual("fields.datetime", start)
case contactql.OpLessThan:
query = LessThan("fields.datetime", start)
query = esq.LessThan("fields.datetime", start)
case contactql.OpLessThanOrEqual:
query = LessThan("fields.datetime", end)
query = esq.LessThan("fields.datetime", end)
default:
panic(fmt.Sprintf("unsupported datetime field operator: %s", c.Operator()))
}

return Nested("fields", All(fieldQuery, query))
return esq.Nested("fields", esq.All(fieldQuery, query))

} else if fieldType == assets.FieldTypeState || fieldType == assets.FieldTypeDistrict || fieldType == assets.FieldTypeWard {
value := strings.ToLower(c.Value())
name := fmt.Sprintf("fields.%s_keyword", string(fieldType))

switch c.Operator() {
case contactql.OpEqual:
return Nested("fields", All(fieldQuery, Term(name, value)))
return esq.Nested("fields", esq.All(fieldQuery, esq.Term(name, value)))
case contactql.OpNotEqual:
return Not(
Nested("fields",
All(Term(name, value), Exists(name)),
return esq.Not(
esq.Nested("fields",
esq.All(esq.Term(name, value), esq.Exists(name)),
),
)
default:
Expand All @@ -183,10 +184,10 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" &&
(key == contactql.AttributeName || key == contactql.AttributeLanguage) {

query := All(Exists(key), Not(Term(fmt.Sprintf("%s.keyword", key), "")))
query := esq.All(esq.Exists(key), esq.Not(esq.Term(fmt.Sprintf("%s.keyword", key), "")))

if c.Operator() == contactql.OpEqual {
query = Not(query)
query = esq.Not(query)
}

return query
Expand All @@ -198,20 +199,20 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe
case contactql.AttributeID:
switch c.Operator() {
case contactql.OpEqual:
return Ids(value)
return esq.Ids(value)
case contactql.OpNotEqual:
return Not(Ids(value))
return esq.Not(esq.Ids(value))
default:
panic(fmt.Sprintf("unsupported ID attribute operator: %s", c.Operator()))
}
case contactql.AttributeName:
switch c.Operator() {
case contactql.OpEqual:
return Term("name.keyword", c.Value())
return esq.Term("name.keyword", c.Value())
case contactql.OpNotEqual:
return Not(Term("name.keyword", c.Value()))
return esq.Not(esq.Term("name.keyword", c.Value()))
case contactql.OpContains:
return Match("name", value)
return esq.Match("name", value)
default:
panic(fmt.Sprintf("unsupported name attribute operator: %s", c.Operator()))
}
Expand All @@ -227,26 +228,26 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return Between("created_on", start, end)
return esq.Between("created_on", start, end)
case contactql.OpNotEqual:
return Not(Between("created_on", start, end))
return esq.Not(esq.Between("created_on", start, end))
case contactql.OpGreaterThan:
return GreaterThanOrEqual("created_on", end)
return esq.GreaterThanOrEqual("created_on", end)
case contactql.OpGreaterThanOrEqual:
return GreaterThanOrEqual("created_on", start)
return esq.GreaterThanOrEqual("created_on", start)
case contactql.OpLessThan:
return LessThan("created_on", start)
return esq.LessThan("created_on", start)
case contactql.OpLessThanOrEqual:
return LessThan("created_on", end)
return esq.LessThan("created_on", end)
default:
panic(fmt.Sprintf("unsupported created_on attribute operator: %s", c.Operator()))
}
case contactql.AttributeLastSeenOn:
// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := Exists("last_seen_on")
query := esq.Exists("last_seen_on")
if c.Operator() == contactql.OpEqual {
query = Not(query)
query = esq.Not(query)
}
return query
}
Expand All @@ -256,17 +257,17 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return Between("last_seen_on", start, end)
return esq.Between("last_seen_on", start, end)
case contactql.OpNotEqual:
return Not(Between("last_seen_on", start, end))
return esq.Not(esq.Between("last_seen_on", start, end))
case contactql.OpGreaterThan:
return GreaterThanOrEqual("last_seen_on", end)
return esq.GreaterThanOrEqual("last_seen_on", end)
case contactql.OpGreaterThanOrEqual:
return GreaterThanOrEqual("last_seen_on", start)
return esq.GreaterThanOrEqual("last_seen_on", start)
case contactql.OpLessThan:
return LessThan("last_seen_on", start)
return esq.LessThan("last_seen_on", start)
case contactql.OpLessThanOrEqual:
return LessThan("last_seen_on", end)
return esq.LessThan("last_seen_on", end)
default:
panic(fmt.Sprintf("unsupported last_seen_on attribute operator: %s", c.Operator()))
}
Expand All @@ -275,29 +276,29 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := Nested("urns", Exists("urns.path"))
query := esq.Nested("urns", esq.Exists("urns.path"))
if c.Operator() == contactql.OpEqual {
query = Not(query)
query = esq.Not(query)
}
return query
}

switch c.Operator() {
case contactql.OpEqual:
return Nested("urns", Term("urns.path.keyword", value))
return esq.Nested("urns", esq.Term("urns.path.keyword", value))
case contactql.OpNotEqual:
return Not(Nested("urns", Term("urns.path.keyword", value)))
return esq.Not(esq.Nested("urns", esq.Term("urns.path.keyword", value)))
case contactql.OpContains:
return Nested("urns", MatchPhrase("urns.path", value))
return esq.Nested("urns", esq.MatchPhrase("urns.path", value))
default:
panic(fmt.Sprintf("unsupported URN attribute operator: %s", c.Operator()))
}
case contactql.AttributeGroup:
// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := Exists("group_ids")
query := esq.Exists("group_ids")
if c.Operator() == contactql.OpEqual {
query = Not(query)
query = esq.Not(query)
}
return query
}
Expand All @@ -306,9 +307,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return Term("group_ids", mapper.Group(group))
return esq.Term("group_ids", mapper.Group(group))
case contactql.OpNotEqual:
return Not(Term("group_ids", mapper.Group(group)))
return esq.Not(esq.Term("group_ids", mapper.Group(group)))
default:
panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator()))
}
Expand All @@ -320,9 +321,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := Exists(fieldName)
query := esq.Exists(fieldName)
if c.Operator() == contactql.OpEqual {
query = Not(query)
query = esq.Not(query)
}
return query
}
Expand All @@ -331,9 +332,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return Term(fieldName, mapper.Flow(flow))
return esq.Term(fieldName, mapper.Flow(flow))
case contactql.OpNotEqual:
return Not(Term(fieldName, mapper.Flow(flow)))
return esq.Not(esq.Term(fieldName, mapper.Flow(flow)))
default:
panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator()))
}
Expand All @@ -350,20 +351,20 @@ func schemeCondition(c *contactql.Condition) map[string]any {

// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := Nested("urns", All(Term("urns.scheme", key), Exists("urns.path")))
query := esq.Nested("urns", esq.All(esq.Term("urns.scheme", key), esq.Exists("urns.path")))
if c.Operator() == contactql.OpEqual {
query = Not(query)
query = esq.Not(query)
}
return query
}

switch c.Operator() {
case contactql.OpEqual:
return Nested("urns", All(Term("urns.path.keyword", value), Term("urns.scheme", key)))
return esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key)))
case contactql.OpNotEqual:
return Not(Nested("urns", All(Term("urns.path.keyword", value), Term("urns.scheme", key))))
return esq.Not(esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key))))
case contactql.OpContains:
return Nested("urns", All(MatchPhrase("urns.path", value), Term("urns.scheme", key)))
return esq.Nested("urns", esq.All(esq.MatchPhrase("urns.path", value), esq.Term("urns.scheme", key)))
default:
panic(fmt.Sprintf("unsupported scheme operator: %s", c.Operator()))
}
Expand All @@ -374,9 +375,9 @@ func textAttributeQuery(c *contactql.Condition, name string, tx func(string) str

switch c.Operator() {
case contactql.OpEqual:
return Term(name, value)
return esq.Term(name, value)
case contactql.OpNotEqual:
return Not(Term(name, value))
return esq.Not(esq.Term(name, value))
default:
panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator()))
}
Expand All @@ -387,17 +388,17 @@ func numericalAttributeQuery(c *contactql.Condition, name string) map[string]any

switch c.Operator() {
case contactql.OpEqual:
return Match(name, value)
return esq.Match(name, value)
case contactql.OpNotEqual:
return Not(Match(name, value))
return esq.Not(esq.Match(name, value))
case contactql.OpGreaterThan:
return GreaterThan(name, value)
return esq.GreaterThan(name, value)
case contactql.OpGreaterThanOrEqual:
return GreaterThanOrEqual(name, value)
return esq.GreaterThanOrEqual(name, value)
case contactql.OpLessThan:
return LessThan(name, value)
return esq.LessThan(name, value)
case contactql.OpLessThanOrEqual:
return LessThanOrEqual(name, value)
return esq.LessThanOrEqual(name, value)
default:
panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator()))
}
Expand Down

0 comments on commit e53fd14

Please sign in to comment.