Skip to content

Commit

Permalink
Add docstrings and import as elastic
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed May 17, 2024
1 parent e53fd14 commit 7114ff3
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 85 deletions.
156 changes: 78 additions & 78 deletions contactql/es/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/contactql"
"github.com/nyaruka/goflow/envs"
esq "github.com/nyaruka/goflow/utils/elastic"
"github.com/nyaruka/goflow/utils/elastic"
)

// AssetMapper is used to map engine assets to however ES identifies them
Expand Down Expand Up @@ -52,10 +52,10 @@ func boolCombination(env envs.Environment, resolver contactql.Resolver, mapper A
}

if combination.Operator() == contactql.BoolOperatorAnd {
return esq.All(queries...)
return elastic.All(queries...)
}

return esq.Any(queries...)
return elastic.Any(queries...)
}

func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMapper, c *contactql.Condition) map[string]any {
Expand All @@ -74,15 +74,15 @@ func condition(env envs.Environment, resolver contactql.Resolver, mapper AssetMa
func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contactql.Condition) map[string]any {
field := resolver.ResolveField(c.PropertyKey())
fieldType := field.Type()
fieldQuery := esq.Term("fields.field", field.UUID())
fieldQuery := elastic.Term("fields.field", field.UUID())

// special cases for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && c.Value() == "" {
query := esq.Nested("fields", esq.All(fieldQuery, esq.Exists("fields."+string(fieldType))))
query := elastic.Nested("fields", elastic.All(fieldQuery, elastic.Exists("fields."+string(fieldType))))

// if we are looking for unset, inverse our query
if c.Operator() == contactql.OpEqual {
query = esq.Not(query)
query = elastic.Not(query)
}
return query
}
Expand All @@ -92,10 +92,10 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac

switch c.Operator() {
case contactql.OpEqual:
return esq.Nested("fields", esq.All(fieldQuery, esq.Term("fields.text", value)))
return elastic.Nested("fields", elastic.All(fieldQuery, elastic.Term("fields.text", value)))
case contactql.OpNotEqual:
query := esq.All(fieldQuery, esq.Term("fields.text", value), esq.Exists("fields.text"))
return esq.Not(esq.Nested("fields", query))
query := elastic.All(fieldQuery, elastic.Term("fields.text", value), elastic.Exists("fields.text"))
return elastic.Not(elastic.Nested("fields", query))
default:
panic(fmt.Sprintf("unsupported text field operator: %s", c.Operator()))
}
Expand All @@ -106,26 +106,26 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac

switch c.Operator() {
case contactql.OpEqual:
query = esq.Match("fields.number", value)
query = elastic.Match("fields.number", value)
case contactql.OpNotEqual:
return esq.Not(
esq.Nested("fields",
esq.All(fieldQuery, esq.Match("fields.number", value)),
return elastic.Not(
elastic.Nested("fields",
elastic.All(fieldQuery, elastic.Match("fields.number", value)),
),
)
case contactql.OpGreaterThan:
query = esq.GreaterThan("fields.number", value)
query = elastic.GreaterThan("fields.number", value)
case contactql.OpGreaterThanOrEqual:
query = esq.GreaterThanOrEqual("fields.number", value)
query = elastic.GreaterThanOrEqual("fields.number", value)
case contactql.OpLessThan:
query = esq.LessThan("fields.number", value)
query = elastic.LessThan("fields.number", value)
case contactql.OpLessThanOrEqual:
query = esq.LessThanOrEqual("fields.number", value)
query = elastic.LessThanOrEqual("fields.number", value)
default:
panic(fmt.Sprintf("unsupported number field operator: %s", c.Operator()))
}

return esq.Nested("fields", esq.All(fieldQuery, query))
return elastic.Nested("fields", elastic.All(fieldQuery, query))

} else if fieldType == assets.FieldTypeDatetime {
value, _ := c.ValueAsDate(env)
Expand All @@ -134,38 +134,38 @@ func fieldCondition(env envs.Environment, resolver contactql.Resolver, c *contac

switch c.Operator() {
case contactql.OpEqual:
query = esq.Between("fields.datetime", start, end)
query = elastic.Between("fields.datetime", start, end)
case contactql.OpNotEqual:
return esq.Not(
esq.Nested("fields",
esq.All(fieldQuery, esq.Between("fields.datetime", start, end)),
return elastic.Not(
elastic.Nested("fields",
elastic.All(fieldQuery, elastic.Between("fields.datetime", start, end)),
),
)
case contactql.OpGreaterThan:
query = esq.GreaterThanOrEqual("fields.datetime", end)
query = elastic.GreaterThanOrEqual("fields.datetime", end)
case contactql.OpGreaterThanOrEqual:
query = esq.GreaterThanOrEqual("fields.datetime", start)
query = elastic.GreaterThanOrEqual("fields.datetime", start)
case contactql.OpLessThan:
query = esq.LessThan("fields.datetime", start)
query = elastic.LessThan("fields.datetime", start)
case contactql.OpLessThanOrEqual:
query = esq.LessThan("fields.datetime", end)
query = elastic.LessThan("fields.datetime", end)
default:
panic(fmt.Sprintf("unsupported datetime field operator: %s", c.Operator()))
}

return esq.Nested("fields", esq.All(fieldQuery, query))
return elastic.Nested("fields", elastic.All(fieldQuery, query))

} else if fieldType == assets.FieldTypeState || fieldType == assets.FieldTypeDistrict || fieldType == assets.FieldTypeWard {
value := strings.ToLower(c.Value())
name := fmt.Sprintf("fields.%s_keyword", string(fieldType))

switch c.Operator() {
case contactql.OpEqual:
return esq.Nested("fields", esq.All(fieldQuery, esq.Term(name, value)))
return elastic.Nested("fields", elastic.All(fieldQuery, elastic.Term(name, value)))
case contactql.OpNotEqual:
return esq.Not(
esq.Nested("fields",
esq.All(esq.Term(name, value), esq.Exists(name)),
return elastic.Not(
elastic.Nested("fields",
elastic.All(elastic.Term(name, value), elastic.Exists(name)),
),
)
default:
Expand All @@ -184,10 +184,10 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" &&
(key == contactql.AttributeName || key == contactql.AttributeLanguage) {

query := esq.All(esq.Exists(key), esq.Not(esq.Term(fmt.Sprintf("%s.keyword", key), "")))
query := elastic.All(elastic.Exists(key), elastic.Not(elastic.Term(fmt.Sprintf("%s.keyword", key), "")))

if c.Operator() == contactql.OpEqual {
query = esq.Not(query)
query = elastic.Not(query)
}

return query
Expand All @@ -199,20 +199,20 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe
case contactql.AttributeID:
switch c.Operator() {
case contactql.OpEqual:
return esq.Ids(value)
return elastic.Ids(value)
case contactql.OpNotEqual:
return esq.Not(esq.Ids(value))
return elastic.Not(elastic.Ids(value))
default:
panic(fmt.Sprintf("unsupported ID attribute operator: %s", c.Operator()))
}
case contactql.AttributeName:
switch c.Operator() {
case contactql.OpEqual:
return esq.Term("name.keyword", c.Value())
return elastic.Term("name.keyword", c.Value())
case contactql.OpNotEqual:
return esq.Not(esq.Term("name.keyword", c.Value()))
return elastic.Not(elastic.Term("name.keyword", c.Value()))
case contactql.OpContains:
return esq.Match("name", value)
return elastic.Match("name", value)
default:
panic(fmt.Sprintf("unsupported name attribute operator: %s", c.Operator()))
}
Expand All @@ -228,26 +228,26 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return esq.Between("created_on", start, end)
return elastic.Between("created_on", start, end)
case contactql.OpNotEqual:
return esq.Not(esq.Between("created_on", start, end))
return elastic.Not(elastic.Between("created_on", start, end))
case contactql.OpGreaterThan:
return esq.GreaterThanOrEqual("created_on", end)
return elastic.GreaterThanOrEqual("created_on", end)
case contactql.OpGreaterThanOrEqual:
return esq.GreaterThanOrEqual("created_on", start)
return elastic.GreaterThanOrEqual("created_on", start)
case contactql.OpLessThan:
return esq.LessThan("created_on", start)
return elastic.LessThan("created_on", start)
case contactql.OpLessThanOrEqual:
return esq.LessThan("created_on", end)
return elastic.LessThan("created_on", end)
default:
panic(fmt.Sprintf("unsupported created_on attribute operator: %s", c.Operator()))
}
case contactql.AttributeLastSeenOn:
// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := esq.Exists("last_seen_on")
query := elastic.Exists("last_seen_on")
if c.Operator() == contactql.OpEqual {
query = esq.Not(query)
query = elastic.Not(query)
}
return query
}
Expand All @@ -257,17 +257,17 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return esq.Between("last_seen_on", start, end)
return elastic.Between("last_seen_on", start, end)
case contactql.OpNotEqual:
return esq.Not(esq.Between("last_seen_on", start, end))
return elastic.Not(elastic.Between("last_seen_on", start, end))
case contactql.OpGreaterThan:
return esq.GreaterThanOrEqual("last_seen_on", end)
return elastic.GreaterThanOrEqual("last_seen_on", end)
case contactql.OpGreaterThanOrEqual:
return esq.GreaterThanOrEqual("last_seen_on", start)
return elastic.GreaterThanOrEqual("last_seen_on", start)
case contactql.OpLessThan:
return esq.LessThan("last_seen_on", start)
return elastic.LessThan("last_seen_on", start)
case contactql.OpLessThanOrEqual:
return esq.LessThan("last_seen_on", end)
return elastic.LessThan("last_seen_on", end)
default:
panic(fmt.Sprintf("unsupported last_seen_on attribute operator: %s", c.Operator()))
}
Expand All @@ -276,29 +276,29 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := esq.Nested("urns", esq.Exists("urns.path"))
query := elastic.Nested("urns", elastic.Exists("urns.path"))
if c.Operator() == contactql.OpEqual {
query = esq.Not(query)
query = elastic.Not(query)
}
return query
}

switch c.Operator() {
case contactql.OpEqual:
return esq.Nested("urns", esq.Term("urns.path.keyword", value))
return elastic.Nested("urns", elastic.Term("urns.path.keyword", value))
case contactql.OpNotEqual:
return esq.Not(esq.Nested("urns", esq.Term("urns.path.keyword", value)))
return elastic.Not(elastic.Nested("urns", elastic.Term("urns.path.keyword", value)))
case contactql.OpContains:
return esq.Nested("urns", esq.MatchPhrase("urns.path", value))
return elastic.Nested("urns", elastic.MatchPhrase("urns.path", value))
default:
panic(fmt.Sprintf("unsupported URN attribute operator: %s", c.Operator()))
}
case contactql.AttributeGroup:
// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := esq.Exists("group_ids")
query := elastic.Exists("group_ids")
if c.Operator() == contactql.OpEqual {
query = esq.Not(query)
query = elastic.Not(query)
}
return query
}
Expand All @@ -307,9 +307,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return esq.Term("group_ids", mapper.Group(group))
return elastic.Term("group_ids", mapper.Group(group))
case contactql.OpNotEqual:
return esq.Not(esq.Term("group_ids", mapper.Group(group)))
return elastic.Not(elastic.Term("group_ids", mapper.Group(group)))
default:
panic(fmt.Sprintf("unsupported group attribute operator: %s", c.Operator()))
}
Expand All @@ -321,9 +321,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := esq.Exists(fieldName)
query := elastic.Exists(fieldName)
if c.Operator() == contactql.OpEqual {
query = esq.Not(query)
query = elastic.Not(query)
}
return query
}
Expand All @@ -332,9 +332,9 @@ func attributeCondition(env envs.Environment, resolver contactql.Resolver, mappe

switch c.Operator() {
case contactql.OpEqual:
return esq.Term(fieldName, mapper.Flow(flow))
return elastic.Term(fieldName, mapper.Flow(flow))
case contactql.OpNotEqual:
return esq.Not(esq.Term(fieldName, mapper.Flow(flow)))
return elastic.Not(elastic.Term(fieldName, mapper.Flow(flow)))
default:
panic(fmt.Sprintf("unsupported flow attribute operator: %s", c.Operator()))
}
Expand All @@ -351,20 +351,20 @@ func schemeCondition(c *contactql.Condition) map[string]any {

// special case for set/unset
if (c.Operator() == contactql.OpEqual || c.Operator() == contactql.OpNotEqual) && value == "" {
query := esq.Nested("urns", esq.All(esq.Term("urns.scheme", key), esq.Exists("urns.path")))
query := elastic.Nested("urns", elastic.All(elastic.Term("urns.scheme", key), elastic.Exists("urns.path")))
if c.Operator() == contactql.OpEqual {
query = esq.Not(query)
query = elastic.Not(query)
}
return query
}

switch c.Operator() {
case contactql.OpEqual:
return esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key)))
return elastic.Nested("urns", elastic.All(elastic.Term("urns.path.keyword", value), elastic.Term("urns.scheme", key)))
case contactql.OpNotEqual:
return esq.Not(esq.Nested("urns", esq.All(esq.Term("urns.path.keyword", value), esq.Term("urns.scheme", key))))
return elastic.Not(elastic.Nested("urns", elastic.All(elastic.Term("urns.path.keyword", value), elastic.Term("urns.scheme", key))))
case contactql.OpContains:
return esq.Nested("urns", esq.All(esq.MatchPhrase("urns.path", value), esq.Term("urns.scheme", key)))
return elastic.Nested("urns", elastic.All(elastic.MatchPhrase("urns.path", value), elastic.Term("urns.scheme", key)))
default:
panic(fmt.Sprintf("unsupported scheme operator: %s", c.Operator()))
}
Expand All @@ -375,9 +375,9 @@ func textAttributeQuery(c *contactql.Condition, name string, tx func(string) str

switch c.Operator() {
case contactql.OpEqual:
return esq.Term(name, value)
return elastic.Term(name, value)
case contactql.OpNotEqual:
return esq.Not(esq.Term(name, value))
return elastic.Not(elastic.Term(name, value))
default:
panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator()))
}
Expand All @@ -388,17 +388,17 @@ func numericalAttributeQuery(c *contactql.Condition, name string) map[string]any

switch c.Operator() {
case contactql.OpEqual:
return esq.Match(name, value)
return elastic.Match(name, value)
case contactql.OpNotEqual:
return esq.Not(esq.Match(name, value))
return elastic.Not(elastic.Match(name, value))
case contactql.OpGreaterThan:
return esq.GreaterThan(name, value)
return elastic.GreaterThan(name, value)
case contactql.OpGreaterThanOrEqual:
return esq.GreaterThanOrEqual(name, value)
return elastic.GreaterThanOrEqual(name, value)
case contactql.OpLessThan:
return esq.LessThan(name, value)
return elastic.LessThan(name, value)
case contactql.OpLessThanOrEqual:
return esq.LessThanOrEqual(name, value)
return elastic.LessThanOrEqual(name, value)
default:
panic(fmt.Sprintf("unsupported %s attribute operator: %s", name, c.Operator()))
}
Expand Down

0 comments on commit 7114ff3

Please sign in to comment.