Skip to content

Commit

Permalink
fix date RACE of mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang committed May 23, 2022
1 parent 45e933a commit 669d48b
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 61 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ require (

replace github.com/blugelabs/bluge => ../bluge

replace github.com/blugelabs/ice => github.com/zinclabs/ice v0.2.1-0.20220523030934-f7c3d139aa14
replace github.com/blugelabs/ice => github.com/zinclabs/ice v0.2.1-0.20220523154843-772e1ae38b48

replace github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v0.2.1-0.20220523030708-2e8f9721fa17
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -463,8 +463,8 @@ github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPR
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zinclabs/bluge_segment_api v0.2.1-0.20220523030708-2e8f9721fa17 h1:6nZipqKn4MY+Jb3uHtzKvQCbHCWJWVbLy7LwoA9rZq0=
github.com/zinclabs/bluge_segment_api v0.2.1-0.20220523030708-2e8f9721fa17/go.mod h1:w+BFZs3+UsRmNSvE/Wxl7TSsZQBcso7yEwyYQ4Ovw+M=
github.com/zinclabs/ice v0.2.1-0.20220523030934-f7c3d139aa14 h1:SdjjwUggvbNxx8l5lp1gCp9+bnjfSxq8eWndJjgUlkU=
github.com/zinclabs/ice v0.2.1-0.20220523030934-f7c3d139aa14/go.mod h1:b7FG0NtjT+8iOsP57UWzG4V4WlNVrl0VG2YnOk1o82k=
github.com/zinclabs/ice v0.2.1-0.20220523154843-772e1ae38b48 h1:Ze5gzqhD2rhLsy+OUjr00Q6ZseJwcq8/Ku5JSNDnZe0=
github.com/zinclabs/ice v0.2.1-0.20220523154843-772e1ae38b48/go.mod h1:b7FG0NtjT+8iOsP57UWzG4V4WlNVrl0VG2YnOk1o82k=
github.com/zsais/go-gin-prometheus v0.0.0-20200217150448-2199a42d96c1 h1:wEkwpwzLhU3VEZRV8kS3eeYnyps4udKe14RQOmbjsZI=
github.com/zsais/go-gin-prometheus v0.0.0-20200217150448-2199a42d96c1/go.mod h1:Slirjzuz8uM8Cw0jmPNqbneoqcUtY2GGjn2bEd4NRLY=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
Expand Down
2 changes: 1 addition & 1 deletion multiarch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ aws ecr-public get-login-password --region us-east-1 | docker login --username A

# docker buildx build --push --platform linux/arm/v7,linux/arm64/v8,linux/amd64 --tag public.ecr.aws/zinclabs/zinc:v0.1.3-s3test .

docker buildx build --push --platform linux/amd64 --tag public.ecr.aws/zinclabs/zinc:v0.1.3-s3test .
docker buildx build --push --platform linux/amd64 --tag public.ecr.aws/zinclabs/zinc:test .


42 changes: 22 additions & 20 deletions pkg/core/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,25 @@ func (index *Index) BuildBlugeDocumentFromJSON(docID string, doc map[string]inte
continue
}

if _, ok := mappings.Properties[key]; !ok {
if _, ok := mappings.GetProperty(key); !ok {
// try to find the type of the value and use it to define default mapping
switch value.(type) {
case string:
mappings.Properties[key] = meta.NewProperty("text")
mappings.SetProperty(key, meta.NewProperty("text"))
case float64:
mappings.Properties[key] = meta.NewProperty("numeric")
mappings.SetProperty(key, meta.NewProperty("numeric"))
case bool:
mappings.Properties[key] = meta.NewProperty("bool")
mappings.SetProperty(key, meta.NewProperty("bool"))
case []interface{}:
if v, ok := value.([]interface{}); ok {
for _, vv := range v {
switch vv.(type) {
case string:
mappings.Properties[key] = meta.NewProperty("text")
mappings.SetProperty(key, meta.NewProperty("text"))
case float64:
mappings.Properties[key] = meta.NewProperty("numeric")
mappings.SetProperty(key, meta.NewProperty("numeric"))
case bool:
mappings.Properties[key] = meta.NewProperty("bool")
mappings.SetProperty(key, meta.NewProperty("bool"))
}
break
}
Expand All @@ -84,7 +84,7 @@ func (index *Index) BuildBlugeDocumentFromJSON(docID string, doc map[string]inte
mappingsNeedsUpdate = true
}

if !mappings.Properties[key].Index {
if prop, ok := mappings.GetProperty(key); ok && !prop.Index {
continue // not index, skip
}

Expand Down Expand Up @@ -138,7 +138,8 @@ func (index *Index) BuildBlugeDocumentFromJSON(docID string, doc map[string]inte

func (index *Index) buildField(mappings *meta.Mappings, bdoc *bluge.Document, key string, value interface{}) error {
var field *bluge.TermField
switch mappings.Properties[key].Type {
prop, _ := mappings.GetProperty(key)
switch prop.Type {
case "text":
v, ok := value.(string)
if !ok {
Expand Down Expand Up @@ -175,8 +176,8 @@ func (index *Index) buildField(mappings *meta.Mappings, bdoc *bluge.Document, ke
switch v := value.(type) {
case string:
format := time.RFC3339
if mappings.Properties[key].Format != "" {
format = mappings.Properties[key].Format
if prop.Format != "" {
format = prop.Format
}
var tim time.Time
var err error
Expand All @@ -195,19 +196,19 @@ func (index *Index) buildField(mappings *meta.Mappings, bdoc *bluge.Document, ke
}
}
}
if mappings.Properties[key].Store {
if prop.Store {
field.StoreValue()
}
if mappings.Properties[key].Sortable {
if prop.Sortable {
field.Sortable()
}
if mappings.Properties[key].Aggregatable {
if prop.Aggregatable {
field.Aggregatable()
}
if mappings.Properties[key].Highlightable {
if prop.Highlightable {
field.HighlightMatches()
}
if mappings.Properties[key].TermPositions {
if prop.TermPositions {
field.SearchTermPositions()
}
bdoc.AddField(field)
Expand Down Expand Up @@ -257,22 +258,23 @@ func (index *Index) SetAnalyzers(analyzers map[string]*analysis.Analyzer) error
}

func (index *Index) SetMappings(mappings *meta.Mappings) error {
if mappings == nil || len(mappings.Properties) == 0 {
if mappings == nil || mappings.Len() == 0 {
return nil
}

// custom analyzer just for text field
for _, prop := range mappings.Properties {
for field, prop := range mappings.ListProperty() {
if prop.Type != "text" {
prop.Analyzer = ""
prop.SearchAnalyzer = ""
mappings.SetProperty(field, prop)
}
}

mappings.Properties["_id"] = meta.NewProperty("keyword")
mappings.SetProperty("_id", meta.NewProperty("keyword"))

// @timestamp need date_range/date_histogram aggregation, and mappings used for type check in aggregation
mappings.Properties["@timestamp"] = meta.NewProperty("date")
mappings.SetProperty("@timestamp", meta.NewProperty("date"))

// update in the cache
index.CachedMappings = mappings
Expand Down
14 changes: 7 additions & 7 deletions pkg/core/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestIndex_BuildBlugeDocumentFromJSON(t *testing.T) {
},
},
init: func() {
index.CachedMappings.Properties["time"] = meta.Property{
index.CachedMappings.SetProperty("time", meta.Property{
Type: "time",
Index: true,
Format: "2006-01-02 15:04:05.000",
}
})
},
want: &bluge.Document{},
wantErr: false,
Expand All @@ -126,17 +126,17 @@ func TestIndex_BuildBlugeDocumentFromJSON(t *testing.T) {
},
},
init: func() {
index.CachedMappings.Properties["id"] = meta.Property{
index.CachedMappings.SetProperty("id", meta.Property{
Type: "keyword",
Index: true,
Store: true,
Highlightable: true,
}
index.CachedMappings.Properties["name"] = meta.Property{
})
index.CachedMappings.SetProperty("name", meta.Property{
Type: "text",
Index: true,
Analyzer: "analyzer_1",
}
})
index.CachedAnalyzers["analyzer_1"] = analyzer.NewStandardAnalyzer()
},
want: &bluge.Document{},
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestIndex_BuildBlugeDocumentFromJSON(t *testing.T) {

err = StoreIndex(index)
assert.NoError(t, err)
index.CachedMappings.Properties["time"] = meta.NewProperty("time")
index.CachedMappings.SetProperty("time", meta.NewProperty("date"))
})

for _, tt := range tests {
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ func TestIndex_Search(t *testing.T) {
if (index.CachedMappings) == nil {
index.CachedMappings = meta.NewMappings()
}
index.CachedMappings.Properties["address.city"] = meta.Property{
index.CachedMappings.SetProperty("address.city", meta.Property{
Type: "text",
Index: true,
Store: true,
Highlightable: true,
}
})

for _, d := range tt.data {
rand.Seed(time.Now().UnixNano())
Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/index/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func Analyze(c *gin.Context) {
return
}
if query.Filed != "" && query.Analyzer == "" {
if index.CachedMappings != nil && index.CachedMappings.Properties != nil {
if prop, ok := index.CachedMappings.Properties[query.Filed]; ok {
if index.CachedMappings != nil && index.CachedMappings.Len() > 0 {
if prop, ok := index.CachedMappings.GetProperty(query.Filed); ok {
if query.Analyzer == "" && prop.SearchAnalyzer != "" {
query.Analyzer = prop.SearchAnalyzer
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handlers/index/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func SetMapping(c *gin.Context) {
index, exists := core.GetIndex(indexName)
if exists {
// check if mapping is empty
if index.CachedMappings != nil && len(index.CachedMappings.Properties) > 0 {
if index.CachedMappings != nil && index.CachedMappings.Len() > 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "index [" + indexName + "] already exists"})
return
}
Expand All @@ -77,7 +77,7 @@ func SetMapping(c *gin.Context) {
}

// update mappings
if mappings != nil && len(mappings.Properties) > 0 {
if mappings != nil && mappings.Len() > 0 {
_ = index.SetMappings(mappings)
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/meta/mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

package meta

import "sync"

type Mappings struct {
Properties map[string]Property `json:"properties,omitempty"`
lock sync.RWMutex
}

type Property struct {
Expand Down Expand Up @@ -59,3 +62,33 @@ func NewProperty(typ string) Property {

return p
}

func (t *Mappings) Len() int {
t.lock.RLock()
n := len(t.Properties)
t.lock.RUnlock()
return n
}

func (t *Mappings) SetProperty(field string, prop Property) {
t.lock.Lock()
t.Properties[field] = prop
t.lock.Unlock()
}

func (t *Mappings) GetProperty(field string) (Property, bool) {
t.lock.RLock()
prop, ok := t.Properties[field]
t.lock.RUnlock()
return prop, ok
}

func (t *Mappings) ListProperty() map[string]Property {
m := make(map[string]Property)
t.lock.RLock()
for k, v := range t.Properties {
m[k] = v
}
t.lock.RUnlock()
return m
}
28 changes: 17 additions & 11 deletions pkg/uquery/aggregation/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
agg.Terms.Size = config.Global.AggregationTermsSize
}
var subreq *zincaggregation.TermsAggregation
switch mappings.Properties[agg.Terms.Field].Type {
prop, _ := mappings.GetProperty(agg.Terms.Field)
switch prop.Type {
case "text", "keyword":
subreq = zincaggregation.NewTermsAggregation(search.Field(agg.Terms.Field), zincaggregation.TextValueSource, agg.Terms.Size)
case "numeric":
subreq = zincaggregation.NewTermsAggregation(search.Field(agg.Terms.Field), zincaggregation.NumericValueSource, agg.Terms.Size)
default:
return errors.New(
errors.ErrorTypeParsingException,
fmt.Sprintf("[terms] aggregation doesn't support values of type: [%s:[%s]]", agg.Terms.Field, mappings.Properties[agg.Terms.Field].Type),
fmt.Sprintf("[terms] aggregation doesn't support values of type: [%s:[%s]]", agg.Terms.Field, prop.Type),
)
}
if len(agg.Aggregations) > 0 {
Expand All @@ -84,7 +85,8 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
return errors.New(errors.ErrorTypeParsingException, "[range] aggregation needs ranges")
}
var subreq *aggregations.RangeAggregation
switch mappings.Properties[agg.Range.Field].Type {
prop, _ := mappings.GetProperty(agg.Range.Field)
switch prop.Type {
case "numeric":
subreq = aggregations.Ranges(search.Field(agg.Range.Field))
for _, v := range agg.Range.Ranges {
Expand All @@ -100,7 +102,8 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
}
var subreq *aggregations.DateRangeAggregation
format := time.RFC3339
if prop, ok := mappings.Properties[agg.DateRange.Field]; ok {
prop, ok := mappings.GetProperty(agg.DateRange.Field)
if ok {
if prop.Format != "" {
format = prop.Format
}
Expand All @@ -115,7 +118,7 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
return errors.New(errors.ErrorTypeXContentParseException, fmt.Sprintf("[date_range] time_zone parse err %s", err.Error()))
}
}
switch mappings.Properties[agg.DateRange.Field].Type {
switch prop.Type {
case "date", "time":
subreq = aggregations.DateRanges(search.Field(agg.DateRange.Field))
for _, v := range agg.DateRange.Ranges {
Expand Down Expand Up @@ -150,7 +153,8 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
return errors.New(errors.ErrorTypeParsingException, "[histogram] aggregation offset must be in [0, interval)")
}
var subreq *zincaggregation.HistogramAggregation
switch mappings.Properties[agg.Histogram.Field].Type {
prop, _ := mappings.GetProperty(agg.Histogram.Field)
switch prop.Type {
case "numeric":
subreq = zincaggregation.NewHistogramAggregation(
search.Field(agg.Histogram.Field),
Expand All @@ -164,7 +168,7 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
default:
return errors.New(
errors.ErrorTypeParsingException,
fmt.Sprintf("[histogram] aggregation doesn't support values of type: [%s:[%s]]", agg.Histogram.Field, mappings.Properties[agg.Histogram.Field].Type),
fmt.Sprintf("[histogram] aggregation doesn't support values of type: [%s:[%s]]", agg.Histogram.Field, prop.Type),
)
}
if len(agg.Aggregations) > 0 {
Expand Down Expand Up @@ -227,7 +231,8 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
agg.DateHistogram.Format = time.RFC3339
}
var subreq *zincaggregation.DateHistogramAggregation
switch mappings.Properties[agg.DateHistogram.Field].Type {
prop, _ := mappings.GetProperty(agg.DateHistogram.Field)
switch prop.Type {
case "date", "time":
subreq = zincaggregation.NewDateHistogramAggregation(
search.Field(agg.DateHistogram.Field),
Expand All @@ -246,7 +251,7 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
fmt.Sprintf(
"[date_histogram] aggregation doesn't support values of type: [%s:[%s]]",
agg.DateHistogram.Field,
mappings.Properties[agg.DateHistogram.Field].Type,
prop.Type,
),
)
}
Expand Down Expand Up @@ -286,7 +291,8 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
agg.AutoDateHistogram.Format = time.RFC3339
}
var subreq *zincaggregation.AutoDateHistogramAggregation
switch mappings.Properties[agg.AutoDateHistogram.Field].Type {
prop, _ := mappings.GetProperty(agg.AutoDateHistogram.Field)
switch prop.Type {
case "date", "time":
subreq = zincaggregation.NewAutoDateHistogramAggregation(
search.Field(agg.AutoDateHistogram.Field),
Expand All @@ -301,7 +307,7 @@ func Request(req zincaggregation.SearchAggregation, aggs map[string]meta.Aggrega
fmt.Sprintf(
"[auto_date_histogram] aggregation doesn't support values of type: [%s:[%s]]",
agg.AutoDateHistogram.Field,
mappings.Properties[agg.AutoDateHistogram.Field].Type,
prop.Type,
),
)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/uquery/analysis/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ func QueryAnalyzerForField(data map[string]*analysis.Analyzer, mappings *meta.Ma

analyzerName := ""
searchAnalyzerName := ""
if mappings != nil && len(mappings.Properties) > 0 {
if v, ok := mappings.Properties[field]; ok {
if mappings != nil && mappings.Len() > 0 {
if v, ok := mappings.GetProperty(field); ok {
if v.Type != "text" {
return nil, nil
}
Expand Down
Loading

0 comments on commit 669d48b

Please sign in to comment.