diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2cc480628..eb7a892da 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -138,4 +138,3 @@ docker buildx build --platform linux/amd64 --tag zinc:latest-linux-amd64 . -f Do 1. Make the changes to code. 1. Push the code to your repo. 1. Create a PR - diff --git a/go.mod b/go.mod index 37e04177e..259ff2901 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,7 @@ require ( go.etcd.io/etcd/client/v3 v3.5.4 go.opencensus.io v0.23.0 // indirect golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/text v0.3.7 golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335 // indirect diff --git a/go.sum b/go.sum index 0e5e786e3..68f9d6256 100644 --- a/go.sum +++ b/go.sum @@ -691,6 +691,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/bluge/directory/disk.go b/pkg/bluge/directory/disk.go new file mode 100644 index 000000000..550a54f59 --- /dev/null +++ b/pkg/bluge/directory/disk.go @@ -0,0 +1,36 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package directory + +import ( + "path" + + "github.com/blugelabs/bluge" + "github.com/blugelabs/bluge/index" +) + +// GetDiskConfig returns a bluge config that will store index data in local disk +// rootPath: the root path of data +// indexName: the name of the index to use. +func GetDiskConfig(rootPath string, indexName string, timeRange ...int64) bluge.Config { + config := index.DefaultConfig(path.Join(rootPath, indexName)) + if len(timeRange) == 2 { + if timeRange[0] <= timeRange[1] { + config = config.WithTimeRange(timeRange[0], timeRange[1]) + } + } + return bluge.DefaultConfigWithIndexConfig(config) +} diff --git a/pkg/bluge/directory/minio.go b/pkg/bluge/directory/minio.go index 5723f58a6..2536a30fb 100644 --- a/pkg/bluge/directory/minio.go +++ b/pkg/bluge/directory/minio.go @@ -38,10 +38,16 @@ import ( // GetMinIOConfig returns a bluge config that will store index data in MinIO // bucket: the MinIO bucket to use // indexName: the name of the index to use. It will be an MinIO prefix (folder) -func GetMinIOConfig(bucket string, indexName string) bluge.Config { - return bluge.DefaultConfigWithDirectory(func() index.Directory { +func GetMinIOConfig(bucket string, indexName string, timeRange ...int64) bluge.Config { + config := index.DefaultConfigWithDirectory(func() index.Directory { return NewMinIODirectory(bucket, indexName) }) + if len(timeRange) == 2 { + if timeRange[0] <= timeRange[1] { + config = config.WithTimeRange(timeRange[0], timeRange[1]) + } + } + return bluge.DefaultConfigWithIndexConfig(config) } type MinIODirectory struct { diff --git a/pkg/bluge/directory/s3.go b/pkg/bluge/directory/s3.go index 670fb911e..9348ca873 100644 --- a/pkg/bluge/directory/s3.go +++ b/pkg/bluge/directory/s3.go @@ -35,10 +35,16 @@ import ( // GetS3Config returns a bluge config that will store index data in S3 // bucket: the S3 bucket to use // indexName: the name of the index to use. It will be an s3 prefix (folder) -func GetS3Config(bucket string, indexName string) bluge.Config { - return bluge.DefaultConfigWithDirectory(func() index.Directory { +func GetS3Config(bucket string, indexName string, timeRange ...int64) bluge.Config { + config := index.DefaultConfigWithDirectory(func() index.Directory { return NewS3Directory(bucket, indexName) }) + if len(timeRange) == 2 { + if timeRange[0] <= timeRange[1] { + config = config.WithTimeRange(timeRange[0], timeRange[1]) + } + } + return bluge.DefaultConfigWithIndexConfig(config) } type S3Directory struct { diff --git a/pkg/config/config.go b/pkg/config/config.go index f7f25a039..17fc2e351 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" + "github.com/gin-gonic/gin" "github.com/joho/godotenv" "github.com/rs/zerolog/log" @@ -46,12 +47,26 @@ type config struct { BatchSize int `env:"ZINC_BATCH_SIZE,default=1024"` MaxResults int `env:"ZINC_MAX_RESULTS,default=10000"` AggregationTermsSize int `env:"ZINC_AGGREGATION_TERMS_SIZE,default=1000"` + Shard shard Etcd etcd S3 s3 MinIO minIO Plugin plugin } +// 1073741824 1g +// 536870912 512m +// 268435456 256m +// 134217728 128m +// 67108864 64m +// 33554432 32m +// 16777216 16m + +type shard struct { + // MaxShards is the maximum number of shards to create. + MaxSize uint64 `env:"ZINC_SHARD_MAX_SIZE,default=1073741824"` +} + type etcd struct { Endpoints []string `env:"ZINC_ETCD_ENDPOINTS"` Username string `env:"ZINC_ETCD_USERNAME"` @@ -91,6 +106,11 @@ func init() { rv := reflect.ValueOf(Global).Elem() loadConfig(rv) + // configure gin + if Global.GinMode == "release" { + gin.SetMode(gin.ReleaseMode) + } + // check data path testPath := path.Join(Global.DataPath, "_test_") if err := os.MkdirAll(testPath, 0755); err != nil { @@ -145,12 +165,18 @@ func setField(field reflect.Value, tag string) { return } switch field.Kind() { - case reflect.Int: - vi, err := strconv.Atoi(v) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + vi, err := strconv.ParseInt(v, 10, 64) if err != nil { log.Fatal().Err(err).Msgf("env %s is not int", tag) } field.SetInt(int64(vi)) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + vi, err := strconv.ParseUint(v, 10, 64) + if err != nil { + log.Fatal().Err(err).Msgf("env %s is not uint", tag) + } + field.SetUint(uint64(vi)) case reflect.Bool: vi, err := strconv.ParseBool(v) if err != nil { diff --git a/pkg/core/deleteindex.go b/pkg/core/deleteindex.go index 379132fbf..c3a1ec7da 100644 --- a/pkg/core/deleteindex.go +++ b/pkg/core/deleteindex.go @@ -46,20 +46,17 @@ func DeleteIndex(name string) error { dataPath := config.Global.DataPath err := os.RemoveAll(dataPath + "/" + index.Name) if err != nil { - log.Error().Msgf("failed to delete index: %s", err.Error()) - return err + log.Error().Err(err).Msg("failed to delete index") } } else if index.StorageType == "s3" { err := deleteFilesForIndexFromS3(index.Name) if err != nil { - log.Error().Msgf("failed to delete index from S3: %s", err.Error()) - return err + log.Error().Err(err).Msg("failed to delete index from S3") } } else if index.StorageType == "minio" { err := deleteFilesForIndexFromMinIO(index.Name) if err != nil { - log.Error().Msgf("failed to delete index from minIO: %s", err.Error()) - return err + log.Error().Err(err).Msg("failed to delete index from minIO") } } diff --git a/pkg/core/deleteindex_test.go b/pkg/core/deleteindex_test.go index 54eb09619..41aff39e8 100644 --- a/pkg/core/deleteindex_test.go +++ b/pkg/core/deleteindex_test.go @@ -52,14 +52,14 @@ func TestDeleteIndex(t *testing.T) { args: args{ name: indexNameS3, }, - wantErr: true, + wantErr: false, }, { name: "minio", args: args{ name: indexNameMinIO, }, - wantErr: true, + wantErr: false, }, } diff --git a/pkg/core/index.go b/pkg/core/index.go index eeaf4bf2c..989c61929 100644 --- a/pkg/core/index.go +++ b/pkg/core/index.go @@ -16,202 +16,19 @@ package core import ( - "fmt" - "strconv" - "time" + "sync" + "sync/atomic" - "github.com/blugelabs/bluge" "github.com/blugelabs/bluge/analysis" - "github.com/goccy/go-json" "github.com/zinclabs/zinc/pkg/meta" - zincanalysis "github.com/zinclabs/zinc/pkg/uquery/analysis" - "github.com/zinclabs/zinc/pkg/zutils" - "github.com/zinclabs/zinc/pkg/zutils/flatten" + "github.com/zinclabs/zinc/pkg/metadata" ) type Index struct { meta.Index Analyzers map[string]*analysis.Analyzer `json:"-"` - Writer *bluge.Writer `json:"-"` -} - -// BuildBlugeDocumentFromJSON returns the bluge document for the json document. It also updates the mapping for the fields if not found. -// If no mappings are found, it creates te mapping for all the encountered fields. If mapping for some fields is found but not for others -// then it creates the mapping for the missing fields. -func (index *Index) BuildBlugeDocumentFromJSON(docID string, doc map[string]interface{}) (*bluge.Document, error) { - // Pick the index mapping from the cache if it already exists - mappings := index.Mappings - if mappings == nil { - mappings = meta.NewMappings() - } - - mappingsNeedsUpdate := false - - // Create a new bluge document - bdoc := bluge.NewDocument(docID) - flatDoc, _ := flatten.Flatten(doc, "") - // Iterate through each field and add it to the bluge document - for key, value := range flatDoc { - if value == nil || key == "@timestamp" { - continue - } - - if _, ok := mappings.GetProperty(key); !ok { - // try to find the type of the value and use it to define default mapping - switch value.(type) { - case string: - mappings.SetProperty(key, meta.NewProperty("text")) - case float64: - mappings.SetProperty(key, meta.NewProperty("numeric")) - case bool: - mappings.SetProperty(key, meta.NewProperty("bool")) - case []interface{}: - if v, ok := value.([]interface{}); ok { - for _, vv := range v { - switch vv.(type) { - case string: - mappings.SetProperty(key, meta.NewProperty("text")) - case float64: - mappings.SetProperty(key, meta.NewProperty("numeric")) - case bool: - mappings.SetProperty(key, meta.NewProperty("bool")) - } - break - } - } - } - - mappingsNeedsUpdate = true - } - - if prop, ok := mappings.GetProperty(key); ok && !prop.Index { - continue // not index, skip - } - - switch v := value.(type) { - case []interface{}: - for _, v := range v { - if err := index.buildField(mappings, bdoc, key, v); err != nil { - return nil, err - } - } - default: - if err := index.buildField(mappings, bdoc, key, v); err != nil { - return nil, err - } - } - } - - if mappingsNeedsUpdate { - _ = index.SetMappings(mappings) - _ = StoreIndex(index) - } - - timestamp := time.Now() - if v, ok := flatDoc["@timestamp"]; ok { - switch v := v.(type) { - case string: - if t, err := time.Parse(time.RFC3339, v); err == nil && !t.IsZero() { - timestamp = t - delete(doc, "@timestamp") - } - case float64: - if t := zutils.Unix(int64(v)); !t.IsZero() { - timestamp = t - delete(doc, "@timestamp") - } - default: - // noop - } - } - docByteVal, _ := json.Marshal(doc) - bdoc.AddField(bluge.NewDateTimeField("@timestamp", timestamp).StoreValue().Sortable().Aggregatable()) - bdoc.AddField(bluge.NewStoredOnlyField("_index", []byte(index.Name))) - bdoc.AddField(bluge.NewStoredOnlyField("_source", docByteVal)) - bdoc.AddField(bluge.NewCompositeFieldExcluding("_all", []string{"_id", "_index", "_source", "@timestamp"})) - - // test for add time index - bdoc.SetTimestamp(timestamp.UnixNano()) - - return bdoc, nil -} - -func (index *Index) buildField(mappings *meta.Mappings, bdoc *bluge.Document, key string, value interface{}) error { - var field *bluge.TermField - prop, _ := mappings.GetProperty(key) - switch prop.Type { - case "text": - v, ok := value.(string) - if !ok { - return fmt.Errorf("field [%s] was set type to [text] but got a %T value", key, value) - } - field = bluge.NewTextField(key, v).SearchTermPositions() - fieldAnalyzer, _ := zincanalysis.QueryAnalyzerForField(index.Analyzers, index.Mappings, key) - if fieldAnalyzer != nil { - field.WithAnalyzer(fieldAnalyzer) - } - case "numeric": - v, ok := value.(float64) - if !ok { - return fmt.Errorf("field [%s] was set type to [numeric] but got a %T value", key, value) - } - field = bluge.NewNumericField(key, v) - case "keyword": - switch v := value.(type) { - case string: - field = bluge.NewKeywordField(key, v) - case float64: - field = bluge.NewKeywordField(key, strconv.FormatFloat(v, 'f', -1, 64)) - case int: - field = bluge.NewKeywordField(key, strconv.FormatInt(int64(v), 10)) - case bool: - field = bluge.NewKeywordField(key, strconv.FormatBool(v)) - default: - field = bluge.NewKeywordField(key, fmt.Sprintf("%v", v)) - } - case "bool": - value := value.(bool) - field = bluge.NewKeywordField(key, strconv.FormatBool(value)) - case "date", "time": - switch v := value.(type) { - case string: - format := time.RFC3339 - if prop.Format != "" { - format = prop.Format - } - var tim time.Time - var err error - if format == "epoch_millis" { - tim = time.UnixMilli(int64(value.(float64))) - } else { - tim, err = time.Parse(format, value.(string)) - } - if err != nil { - return err - } - field = bluge.NewDateTimeField(key, tim) - case float64: - if t := zutils.Unix(int64(v)); !t.IsZero() { - field = bluge.NewDateTimeField(key, t) - } - } - } - if prop.Store || prop.Highlightable { - field.StoreValue() - } - if prop.Highlightable { - field.HighlightMatches() - } - if prop.Sortable { - field.Sortable() - } - if prop.Aggregatable { - field.Aggregatable() - } - bdoc.AddField(field) - - return nil + lock sync.RWMutex `json:"-"` } func (index *Index) UseTemplate() error { @@ -280,30 +97,93 @@ func (index *Index) SetMappings(mappings *meta.Mappings) error { return nil } -func (index *Index) UpdateMetadata() { - w := index.Writer - if w == nil { - return +func (index *Index) SetTimestamp(t int64) { + if index.DocTimeMin == 0 { + atomic.StoreInt64(&index.DocTimeMin, t) + } + if index.DocTimeMax == 0 { + atomic.StoreInt64(&index.DocTimeMax, t) + } + if t < index.DocTimeMin { + atomic.StoreInt64(&index.DocTimeMin, t) + } + if t > index.DocTimeMax { + atomic.StoreInt64(&index.DocTimeMax, t) + } +} + +func (index *Index) UpdateMetadata() error { + var totalDocNum, totalSize uint64 + // update docNum and storageSize + for i := 0; i < index.ShardNum; i++ { + index.UpdateMetadataByShard(i) + } + index.lock.RLock() + for i := 0; i < index.ShardNum; i++ { + totalDocNum += index.Shards[i].DocNum + totalSize += index.Shards[i].StorageSize + } + if totalDocNum > 0 && totalSize > 0 { + index.DocNum = totalDocNum + index.StorageSize = totalSize } - _, index.StorageSize = w.DirectoryStats() + // update docTime + index.Shards[index.ShardNum-1].DocTimeMin = index.DocTimeMin + index.Shards[index.ShardNum-1].DocTimeMax = index.DocTimeMax + index.lock.RUnlock() - if r, err := w.Reader(); err == nil { + return metadata.Index.Set(index.Name, index.Index) +} + +func (index *Index) UpdateMetadataByShard(n int) { + index.lock.RLock() + shard := index.Shards[n] + index.lock.RUnlock() + if shard.Writer == nil { + return + } + var docNum, storageSize uint64 + _, storageSize = shard.Writer.DirectoryStats() + if r, err := shard.Writer.Reader(); err == nil { if n, err := r.Count(); err == nil { - index.DocNum = n + docNum = n } + _ = r.Close() + } + if docNum > 0 { + shard.DocNum = docNum + } + if storageSize > 0 { + shard.StorageSize = storageSize } } -func (index *Index) Close() error { - if index.Writer == nil { - return nil +func (index *Index) Reopen() error { + if err := index.Close(); err != nil { + return err } - // update metadata before close - index.UpdateMetadata() - // close writer - if err := index.Writer.Close(); err != nil { + if _, err := index.GetWriter(); err != nil { return err } - index.Writer = nil return nil } + +func (index *Index) Close() error { + var err error + // update metadata before close + if err = index.UpdateMetadata(); err != nil { + return err + } + index.lock.Lock() + for _, shard := range index.Shards { + if shard.Writer == nil { + continue + } + if e := shard.Writer.Close(); e != nil { + err = e + } + shard.Writer = nil + } + index.lock.Unlock() + return err +} diff --git a/pkg/core/index_document.go b/pkg/core/index_document.go new file mode 100644 index 000000000..b0b63ff21 --- /dev/null +++ b/pkg/core/index_document.go @@ -0,0 +1,277 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package core + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/blugelabs/bluge" + "github.com/goccy/go-json" + + "github.com/zinclabs/zinc/pkg/meta" + zincanalysis "github.com/zinclabs/zinc/pkg/uquery/analysis" + "github.com/zinclabs/zinc/pkg/zutils" + "github.com/zinclabs/zinc/pkg/zutils/flatten" +) + +// BuildBlugeDocumentFromJSON returns the bluge document for the json document. It also updates the mapping for the fields if not found. +// If no mappings are found, it creates te mapping for all the encountered fields. If mapping for some fields is found but not for others +// then it creates the mapping for the missing fields. +func (index *Index) BuildBlugeDocumentFromJSON(docID string, doc map[string]interface{}) (*bluge.Document, error) { + // Pick the index mapping from the cache if it already exists + mappings := index.Mappings + if mappings == nil { + mappings = meta.NewMappings() + } + + mappingsNeedsUpdate := false + + // Create a new bluge document + bdoc := bluge.NewDocument(docID) + flatDoc, _ := flatten.Flatten(doc, "") + // Iterate through each field and add it to the bluge document + for key, value := range flatDoc { + if value == nil || key == "@timestamp" { + continue + } + + if _, ok := mappings.GetProperty(key); !ok { + // try to find the type of the value and use it to define default mapping + switch value.(type) { + case string: + mappings.SetProperty(key, meta.NewProperty("text")) + case int, int64, float64: + mappings.SetProperty(key, meta.NewProperty("numeric")) + case bool: + mappings.SetProperty(key, meta.NewProperty("bool")) + case []interface{}: + if v, ok := value.([]interface{}); ok { + for _, vv := range v { + switch vv.(type) { + case string: + mappings.SetProperty(key, meta.NewProperty("text")) + case float64: + mappings.SetProperty(key, meta.NewProperty("numeric")) + case bool: + mappings.SetProperty(key, meta.NewProperty("bool")) + } + break + } + } + } + + mappingsNeedsUpdate = true + } + + if prop, ok := mappings.GetProperty(key); ok && !prop.Index { + continue // not index, skip + } + + switch v := value.(type) { + case []interface{}: + for _, v := range v { + if err := index.buildField(mappings, bdoc, key, v); err != nil { + return nil, err + } + } + default: + if err := index.buildField(mappings, bdoc, key, v); err != nil { + return nil, err + } + } + } + + if mappingsNeedsUpdate { + _ = index.SetMappings(mappings) + _ = StoreIndex(index) + } + + timestamp := time.Now() + if v, ok := flatDoc["@timestamp"]; ok { + switch v := v.(type) { + case string: + if t, err := time.Parse(time.RFC3339, v); err == nil && !t.IsZero() { + timestamp = t + delete(doc, "@timestamp") + } + case float64: + if t := zutils.Unix(int64(v)); !t.IsZero() { + timestamp = t + delete(doc, "@timestamp") + } + default: + // noop + } + } + docByteVal, _ := json.Marshal(doc) + bdoc.AddField(bluge.NewDateTimeField("@timestamp", timestamp).StoreValue().Sortable().Aggregatable()) + bdoc.AddField(bluge.NewStoredOnlyField("_index", []byte(index.Name))) + bdoc.AddField(bluge.NewStoredOnlyField("_source", docByteVal)) + bdoc.AddField(bluge.NewCompositeFieldExcluding("_all", []string{"_id", "_index", "_source", "@timestamp"})) + + // Add time for index + bdoc.SetTimestamp(timestamp.UnixNano()) + // Upate metadata + index.SetTimestamp(timestamp.UnixNano()) + + return bdoc, nil +} + +func (index *Index) buildField(mappings *meta.Mappings, bdoc *bluge.Document, key string, value interface{}) error { + var field *bluge.TermField + prop, _ := mappings.GetProperty(key) + switch prop.Type { + case "text": + v, ok := value.(string) + if !ok { + return fmt.Errorf("field [%s] was set type to [text] but got a %T value", key, value) + } + field = bluge.NewTextField(key, v).SearchTermPositions() + fieldAnalyzer, _ := zincanalysis.QueryAnalyzerForField(index.Analyzers, index.Mappings, key) + if fieldAnalyzer != nil { + field.WithAnalyzer(fieldAnalyzer) + } + case "numeric": + switch v := value.(type) { + case float64: + field = bluge.NewNumericField(key, float64(v)) + case int64: + field = bluge.NewNumericField(key, float64(v)) + case int: + field = bluge.NewNumericField(key, float64(v)) + default: + return fmt.Errorf("field [%s] was set type to [numeric] but got a %T value", key, value) + } + case "keyword": + switch v := value.(type) { + case string: + field = bluge.NewKeywordField(key, v) + case float64: + field = bluge.NewKeywordField(key, strconv.FormatFloat(v, 'f', -1, 64)) + case int: + field = bluge.NewKeywordField(key, strconv.FormatInt(int64(v), 10)) + case bool: + field = bluge.NewKeywordField(key, strconv.FormatBool(v)) + default: + field = bluge.NewKeywordField(key, fmt.Sprintf("%v", v)) + } + case "bool": + value := value.(bool) + field = bluge.NewKeywordField(key, strconv.FormatBool(value)) + case "date", "time": + switch v := value.(type) { + case string: + format := time.RFC3339 + if prop.Format != "" { + format = prop.Format + } + tim, err := time.Parse(format, v) + if err != nil { + return err + } + field = bluge.NewDateTimeField(key, tim) + case float64: + if t := zutils.Unix(int64(v)); !t.IsZero() { + field = bluge.NewDateTimeField(key, t) + } else { + return fmt.Errorf("value is not a valid timestamp") + } + default: + return fmt.Errorf("value type of date must be string or float64") + } + } + if prop.Store || prop.Highlightable { + field.StoreValue() + } + if prop.Highlightable { + field.HighlightMatches() + } + if prop.Sortable { + field.Sortable() + } + if prop.Aggregatable { + field.Aggregatable() + } + bdoc.AddField(field) + + return nil +} + +// CreateDocument inserts or updates a document in the zinc index +func (index *Index) CreateDocument(docID string, doc map[string]interface{}, update bool) error { + bdoc, err := index.BuildBlugeDocumentFromJSON(docID, doc) + if err != nil { + return err + } + + // Finally update the document on disk + writer, err := index.GetWriter() + if err != nil { + return err + } + if update { + err = writer.Update(bdoc.ID(), bdoc) + } else { + err = writer.Insert(bdoc) + } + return err +} + +// UpdateDocument updates a document in the zinc index +func (index *Index) UpdateDocument(docID string, doc map[string]interface{}) error { + bdoc, err := index.BuildBlugeDocumentFromJSON(docID, doc) + if err != nil { + return err + } + writer, err := index.FindID(docID) + if err != nil { + return err + } + return writer.Update(bdoc.ID(), bdoc) +} + +func (index *Index) FindID(id string) (*bluge.Writer, error) { + query := bluge.NewBooleanQuery() + query.AddMust(bluge.NewTermQuery(id).SetField("_id")) + request := bluge.NewTopNSearch(1, query).WithStandardAggregations() + ctx := context.Background() + + // check id store by which shard + writers, err := index.GetWriters() + if err != nil { + return nil, err + } + + for _, w := range writers { + r, err := w.Reader() + if err != nil { + return nil, err + } + defer r.Close() + dmi, err := r.Search(ctx, request) + if err != nil { + return nil, err + } + if dmi.Aggregations().Count() > 0 { + return w, nil + } + } + return nil, errors.New("id not found") +} diff --git a/pkg/core/updatedocument_test.go b/pkg/core/index_document_test.go similarity index 57% rename from pkg/core/updatedocument_test.go rename to pkg/core/index_document_test.go index efeb781a0..4b31f2158 100644 --- a/pkg/core/updatedocument_test.go +++ b/pkg/core/index_document_test.go @@ -23,14 +23,14 @@ import ( "github.com/zinclabs/zinc/pkg/meta" ) -func TestIndex_UpdateDocument(t *testing.T) { +func TestIndex_CreateUpdateDocument(t *testing.T) { type fields struct { Name string } type args struct { - docID string - doc map[string]interface{} - mintedID bool + docID string + doc map[string]interface{} + update bool } tests := []struct { name string @@ -39,39 +39,39 @@ func TestIndex_UpdateDocument(t *testing.T) { wantErr bool }{ { - name: "UpdateDocument with generated ID", + name: "Document with generated ID", args: args{ docID: "test1", doc: map[string]interface{}{ "name": "Hello", }, - mintedID: true, + update: false, }, }, { - name: "UpdateDocument with provided ID", + name: "Document with provided ID", args: args{ docID: "test1", doc: map[string]interface{}{ "test": "Hello", }, - mintedID: false, + update: true, }, }, { - name: "UpdateDocument with type conflict", + name: "Document with type conflict", args: args{ docID: "test1", doc: map[string]interface{}{ "test": true, }, - mintedID: false, + update: true, }, wantErr: true, }, } - indexName := "TestUpdateDocument.index_1" + indexName := "TestDocument.index_1" var index *Index var err error t.Run("prepare", func(t *testing.T) { @@ -82,7 +82,7 @@ func TestIndex_UpdateDocument(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := index.UpdateDocument(tt.args.docID, tt.args.doc, tt.args.mintedID) + err := index.CreateDocument(tt.args.docID, tt.args.doc, tt.args.update) if tt.wantErr { assert.Error(t, err) return @@ -109,3 +109,54 @@ func TestIndex_UpdateDocument(t *testing.T) { assert.NoError(t, err) }) } + +func TestIndex_UpdateDocument(t *testing.T) { + type args struct { + docID string + doc map[string]interface{} + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "update", + args: args{ + docID: "1", + doc: map[string]interface{}{ + "name": "HelloUpdate", + "time": float64(1579098983), + }, + }, + wantErr: false, + }, + } + + var index *Index + var err error + t.Run("prepare", func(t *testing.T) { + index, err = NewIndex("TestIndex_UpdateDocument.index_1", "disk", nil) + assert.NoError(t, err) + assert.NotNil(t, index) + + err = StoreIndex(index) + assert.NoError(t, err) + prop := meta.NewProperty("date") + index.Mappings.SetProperty("time", prop) + + err = index.CreateDocument("1", map[string]interface{}{ + "name": "Hello", + "time": float64(1579098983), + }, false) + assert.NoError(t, err) + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := index.UpdateDocument(tt.args.docID, tt.args.doc); (err != nil) != tt.wantErr { + t.Errorf("Index.UpdateDocument() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/core/index_shards.go b/pkg/core/index_shards.go new file mode 100644 index 000000000..e88517e49 --- /dev/null +++ b/pkg/core/index_shards.go @@ -0,0 +1,158 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package core + +import ( + "fmt" + + "github.com/blugelabs/bluge" + "github.com/blugelabs/bluge/analysis" + "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" + + "github.com/zinclabs/zinc/pkg/config" + "github.com/zinclabs/zinc/pkg/errors" + "github.com/zinclabs/zinc/pkg/meta" + "github.com/zinclabs/zinc/pkg/metadata" +) + +// CheckShards if current shard reach the maximum shard size, create a new shard +func (index *Index) CheckShards() error { + w, err := index.GetWriter() + if err != nil { + return err + } + _, size := w.DirectoryStats() + if size > config.Global.Shard.MaxSize { + return index.NewShard() + } + return nil +} + +func (index *Index) NewShard() error { + log.Info().Str("index", index.Name).Int("shard", index.ShardNum).Msg("init new shard") + // update current shard + index.UpdateMetadataByShard(index.ShardNum - 1) + index.lock.Lock() + index.Shards[index.ShardNum-1].DocTimeMin = index.DocTimeMin + index.Shards[index.ShardNum-1].DocTimeMax = index.DocTimeMax + index.DocTimeMin = 0 + index.DocTimeMax = 0 + // create new shard + index.ShardNum++ + index.Shards = append(index.Shards, &meta.IndexShard{ID: index.ShardNum - 1}) + index.lock.Unlock() + // store update + metadata.Index.Set(index.Name, index.Index) + return index.openWriter(index.ShardNum - 1) +} + +// GetWriter return the newest shard writer or special shard writer +func (index *Index) GetWriter(shards ...int) (*bluge.Writer, error) { + var shard int + if len(shards) == 1 { + shard = shards[0] + } else { + shard = index.ShardNum - 1 + } + if shard >= index.ShardNum || shard < 0 { + return nil, errors.New(errors.ErrorTypeRuntimeException, "shard not found") + } + index.lock.RLock() + w := index.Shards[shard].Writer + index.lock.RUnlock() + if w != nil { + return w, nil + } + + // open writer + if err := index.openWriter(shard); err != nil { + return nil, err + } + + index.lock.RLock() + w = index.Shards[shard].Writer + index.lock.RUnlock() + + return w, nil +} + +// GetWriters return all shard writers +func (index *Index) GetWriters() ([]*bluge.Writer, error) { + ws := make([]*bluge.Writer, 0, index.ShardNum) + for i := index.ShardNum - 1; i >= 0; i-- { + w, err := index.GetWriter(i) + if err != nil { + return nil, err + } + ws = append(ws, w) + } + return ws, nil +} + +// GetReaders return all shard readers +func (index *Index) GetReaders(timeMin, timeMax int64) ([]*bluge.Reader, error) { + rs := make([]*bluge.Reader, 0, 1) + chs := make(chan *bluge.Reader, index.ShardNum) + egLimit := make(chan struct{}, 10) + eg := errgroup.Group{} + for i := index.ShardNum - 1; i >= 0; i-- { + if (timeMin > 0 && index.Shards[i].DocTimeMax > 0 && index.Shards[i].DocTimeMax < timeMin) || + (timeMax > 0 && index.Shards[i].DocTimeMin > 0 && index.Shards[i].DocTimeMin > timeMax) { + continue + } + var i = i + egLimit <- struct{}{} + eg.Go(func() error { + w, err := index.GetWriter(i) + if err != nil { + return err + } + r, err := w.Reader() + if err != nil { + return err + } + chs <- r + return nil + }) + <-egLimit + if index.Shards[i].DocTimeMin < timeMin { + break + } + } + if err := eg.Wait(); err != nil { + return nil, err + } + close(chs) + for r := range chs { + rs = append(rs, r) + } + return rs, nil +} + +func (index *Index) openWriter(shard int) error { + var defaultSearchAnalyzer *analysis.Analyzer + if index.Analyzers != nil { + defaultSearchAnalyzer = index.Analyzers["default"] + } + + indexName := fmt.Sprintf("%s/%06x", index.Name, shard) + var err error + index.lock.Lock() + index.Shards[shard].Writer, err = OpenIndexWriter(indexName, index.StorageType, defaultSearchAnalyzer, 0, 0) + index.lock.Unlock() + return err +} diff --git a/pkg/core/index_shards_test.go b/pkg/core/index_shards_test.go new file mode 100644 index 000000000..9da58cdb8 --- /dev/null +++ b/pkg/core/index_shards_test.go @@ -0,0 +1,83 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package core + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/zinclabs/zinc/pkg/config" +) + +func TestIndex_CheckShards(t *testing.T) { + type args struct { + docID string + doc map[string]interface{} + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "document", + args: args{ + docID: "", + doc: map[string]interface{}{ + "name": "Hello1", + "time": float64(time.Now().UnixNano()), + }, + }, + wantErr: false, + }, + { + name: "document", + args: args{ + docID: "", + doc: map[string]interface{}{ + "name": "Hello2", + "time": float64(time.Now().UnixNano()), + }, + }, + wantErr: false, + }, + } + + var index *Index + var err error + t.Run("perpare", func(t *testing.T) { + config.Global.Shard.MaxSize = 1024 + + index, err = NewIndex("TestIndex_CheckShards.index_1", "disk", nil) + assert.NoError(t, err) + assert.NotNil(t, index) + + err = StoreIndex(index) + assert.NoError(t, err) + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := index.CreateDocument(tt.args.docID, tt.args.doc, false) + assert.NoError(t, err) + if err := index.CheckShards(); (err != nil) != tt.wantErr { + t.Errorf("Index.CheckShards() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/core/indexlist.go b/pkg/core/indexlist.go index ab1451851..1f60ec90a 100644 --- a/pkg/core/indexlist.go +++ b/pkg/core/indexlist.go @@ -19,6 +19,9 @@ import ( "sync" "github.com/rs/zerolog/log" + + "github.com/zinclabs/zinc/pkg/meta" + "github.com/zinclabs/zinc/pkg/metadata" ) var ZINC_INDEX_LIST IndexList @@ -29,9 +32,25 @@ type IndexList struct { } func init() { + log.Info().Msgf("Starting Zinc %s", meta.Version) + // check version + version, _ := metadata.KV.Get("version") + if version == nil { + metadata.KV.Set("version", []byte(meta.Version)) + // } else { + // version := string(version) + // if version != meta.Version { + // log.Error().Msgf("Version mismatch, loading data from old version %s", version) + // if err := upgrade.Do(version); err != nil { + // log.Fatal().Err(err).Msg("Failed to upgrade") + // } + // } + } + + // start loading index ZINC_INDEX_LIST.Indexes = make(map[string]*Index) if err := LoadZincIndexesFromMetadata(); err != nil { - log.Error().Err(err).Msgf("Error loading index") + log.Error().Err(err).Msg("Error loading index") } } diff --git a/pkg/core/loadindexes.go b/pkg/core/loadindexes.go index cb226fecd..d03991465 100644 --- a/pkg/core/loadindexes.go +++ b/pkg/core/loadindexes.go @@ -16,11 +16,12 @@ package core import ( - "github.com/blugelabs/bluge/analysis" "github.com/rs/zerolog/log" "github.com/zinclabs/zinc/pkg/errors" + "github.com/zinclabs/zinc/pkg/meta" "github.com/zinclabs/zinc/pkg/metadata" + "github.com/zinclabs/zinc/pkg/upgrade" zincanalysis "github.com/zinclabs/zinc/pkg/uquery/analysis" ) @@ -35,10 +36,31 @@ func LoadZincIndexesFromMetadata() error { index := new(Index) index.Name = indexes[i].Name index.StorageType = indexes[i].StorageType + index.StorageSize = indexes[i].StorageSize + index.DocTimeMin = indexes[i].DocTimeMin + index.DocTimeMax = indexes[i].DocTimeMax + index.DocNum = indexes[i].DocNum + index.ShardNum = indexes[i].ShardNum + index.Shards = append(index.Shards, indexes[i].Shards...) index.Settings = indexes[i].Settings index.Mappings = indexes[i].Mappings - index.Mappings = indexes[i].Mappings - log.Info().Msgf("Loading index... [%s:%s]", index.Name, index.StorageType) + index.CreateAt = indexes[i].CreateAt + index.UpdateAt = indexes[i].UpdateAt + log.Info().Msgf("Loading index... [%s:%s] shards[%d]", index.Name, index.StorageType, index.ShardNum) + + // upgrade from version <= 0.2.4 + if index.ShardNum == 0 { + index.ShardNum = 1 + index.Shards = append(index.Shards, &meta.IndexShard{}) + //upgrade data + if index.StorageType != "disk" { + log.Panic().Msgf("Only disk storage type support upgrade from version <= 0.2.4, Please manual upgrade\n# mv %s %s_bak\n# mkdir %s\n# mv %s_bak %s/000000\n# restart zinc", index.Name, index.Name, index.Name, index.Name, index.Name) + } else { + if err := upgrade.UpgradeFromV024Index(index.Name); err != nil { + log.Panic().Err(err).Msgf("Automatic upgrade from version <= 0.2.4 failed, Please manual upgrade\n# mv %s %s_bak\n# mkdir %s\n# mv %s_bak %s/000000\n# restart zinc", index.Name, index.Name, index.Name, index.Name, index.Name) + } + } + } // load index analysis if index.Settings != nil && index.Settings.Analysis != nil { @@ -47,39 +69,9 @@ func LoadZincIndexesFromMetadata() error { return errors.New(errors.ErrorTypeRuntimeException, "parse stored analysis error").Cause(err) } } - - // load index data - if err := OpenIndexWriter(index); err != nil { - return err - } - // load in memory ZINC_INDEX_LIST.Add(index) } return nil } - -func ReopenIndex(indexName string) error { - index, ok := ZINC_INDEX_LIST.Get(indexName) - if !ok { - return errors.New(errors.ErrorTypeRuntimeException, "index not found") - } - if err := index.Close(); err != nil { - return err - } - return OpenIndexWriter(index) -} - -func OpenIndexWriter(index *Index) error { - var err error - var defaultSearchAnalyzer *analysis.Analyzer - if index.Analyzers != nil { - defaultSearchAnalyzer = index.Analyzers["default"] - } - index.Writer, err = LoadIndexWriter(index.Name, index.StorageType, defaultSearchAnalyzer) - if err != nil { - return errors.New(errors.ErrorTypeRuntimeException, "load index writer error").Cause(err) - } - return nil -} diff --git a/pkg/core/multi_search.go b/pkg/core/multi_search.go index a076cba42..ddf03f76d 100644 --- a/pkg/core/multi_search.go +++ b/pkg/core/multi_search.go @@ -27,29 +27,34 @@ import ( "github.com/zinclabs/zinc/pkg/meta" "github.com/zinclabs/zinc/pkg/uquery" + "github.com/zinclabs/zinc/pkg/uquery/timerange" ) func MultiSearch(indexNames []string, query *meta.ZincQuery) (*meta.SearchResponse, error) { var mappings *meta.Mappings var analyzers map[string]*analysis.Analyzer var readers []*bluge.Reader - readerMap := make(map[string]struct{}) + var shardNum int + indexMap := make(map[string]struct{}) + + timeMin, timeMax := timerange.Query(query.Query) for _, index := range ZINC_INDEX_LIST.List() { for _, indexName := range indexNames { - if _, ok := readerMap[index.Name]; ok { + if _, ok := indexMap[index.Name]; ok { continue } if indexName == "" || (indexName != "" && strings.HasPrefix(index.Name, indexName[:len(indexName)-1])) { - reader, err := index.Writer.Reader() + reader, err := index.GetReaders(timeMin, timeMax) if err != nil { return nil, err } - readers = append(readers, reader) + readers = append(readers, reader...) + shardNum += index.ShardNum if mappings == nil { mappings = index.Mappings analyzers = index.Analyzers } - readerMap[index.Name] = struct{}{} + indexMap[index.Name] = struct{}{} } } } @@ -88,5 +93,5 @@ func MultiSearch(indexNames []string, query *meta.ZincQuery) (*meta.SearchRespon return nil, err } - return searchV2(dmi, query, mappings) + return searchV2(shardNum, len(readers), dmi, query, mappings) } diff --git a/pkg/core/newindex.go b/pkg/core/newindex.go index 6889e0709..c96a53103 100644 --- a/pkg/core/newindex.go +++ b/pkg/core/newindex.go @@ -38,65 +38,55 @@ func NewIndex(name, storageType string, defaultSearchAnalyzer *analysis.Analyzer return nil, fmt.Errorf("core.NewIndex: index name cannot start with _") } - var dataPath string - var cfg bluge.Config - switch storageType { - case "s3": - dataPath = config.Global.S3.Bucket - cfg = directory.GetS3Config(dataPath, name) - case "minio": - dataPath = config.Global.MinIO.Bucket - cfg = directory.GetMinIOConfig(dataPath, name) - default: - storageType = "disk" - dataPath = config.Global.DataPath - cfg = bluge.DefaultConfig(dataPath + "/" + name) - } - - if defaultSearchAnalyzer != nil { - cfg.DefaultSearchAnalyzer = defaultSearchAnalyzer - } - - writer, err := bluge.OpenWriter(cfg) - if err != nil { - return nil, err - } - index := new(Index) index.Name = name index.StorageType = storageType - index.Writer = writer + index.ShardNum = 1 index.CreateAt = time.Now() // use template - if err = index.UseTemplate(); err != nil { + if err := index.UseTemplate(); err != nil { return nil, err } + // init shards writer + for i := 0; i < index.ShardNum; i++ { + index.Shards = append(index.Shards, &meta.IndexShard{ID: i}) + } + return index, nil } // LoadIndexWriter load the index writer from the storage -func LoadIndexWriter(name string, storageType string, defaultSearchAnalyzer *analysis.Analyzer) (*bluge.Writer, error) { +func OpenIndexWriter(name string, storageType string, defaultSearchAnalyzer *analysis.Analyzer, timeRange ...int64) (*bluge.Writer, error) { + cfg := getOpenConfig(name, storageType, defaultSearchAnalyzer, timeRange...) + return bluge.OpenWriter(cfg) +} + +// OpenIndexReader load the index reader from the storage +func OpenIndexReader(name string, storageType string, defaultSearchAnalyzer *analysis.Analyzer, timeRange ...int64) (*bluge.Reader, error) { + cfg := getOpenConfig(name, storageType, defaultSearchAnalyzer, timeRange...) + return bluge.OpenReader(cfg) +} + +func getOpenConfig(name string, storageType string, defaultSearchAnalyzer *analysis.Analyzer, timeRange ...int64) bluge.Config { var dataPath string var cfg bluge.Config switch storageType { case "s3": dataPath = config.Global.S3.Bucket - cfg = directory.GetS3Config(dataPath, name) + cfg = directory.GetS3Config(dataPath, name, timeRange...) case "minio": dataPath = config.Global.MinIO.Bucket - cfg = directory.GetMinIOConfig(dataPath, name) + cfg = directory.GetMinIOConfig(dataPath, name, timeRange...) default: dataPath = config.Global.DataPath - cfg = bluge.DefaultConfig(dataPath + "/" + name) + cfg = directory.GetDiskConfig(dataPath, name, timeRange...) } - if defaultSearchAnalyzer != nil { cfg.DefaultSearchAnalyzer = defaultSearchAnalyzer } - - return bluge.OpenWriter(cfg) + return cfg } // storeIndex stores the index to metadata diff --git a/pkg/core/search.go b/pkg/core/search.go index 2a1315844..894c163c3 100644 --- a/pkg/core/search.go +++ b/pkg/core/search.go @@ -28,6 +28,7 @@ import ( "github.com/zinclabs/zinc/pkg/uquery" "github.com/zinclabs/zinc/pkg/uquery/fields" "github.com/zinclabs/zinc/pkg/uquery/source" + "github.com/zinclabs/zinc/pkg/uquery/timerange" ) func (index *Index) Search(query *meta.ZincQuery) (*meta.SearchResponse, error) { @@ -36,12 +37,17 @@ func (index *Index) Search(query *meta.ZincQuery) (*meta.SearchResponse, error) return nil, err } - reader, err := index.Writer.Reader() + timeMin, timeMax := timerange.Query(query.Query) + readers, err := index.GetReaders(timeMin, timeMax) if err != nil { log.Printf("index.SearchV2: error accessing reader: %s", err.Error()) return nil, err } - defer reader.Close() + defer func() { + for _, reader := range readers { + reader.Close() + } + }() ctx := context.Background() var cancel context.CancelFunc @@ -50,7 +56,7 @@ func (index *Index) Search(query *meta.ZincQuery) (*meta.SearchResponse, error) defer cancel() } - dmi, err := reader.Search(ctx, searchRequest) + dmi, err := bluge.MultiSearch(ctx, searchRequest, readers...) if err != nil { log.Printf("index.SearchV2: error executing search: %s", err.Error()) if err == context.DeadlineExceeded { @@ -63,10 +69,10 @@ func (index *Index) Search(query *meta.ZincQuery) (*meta.SearchResponse, error) return nil, err } - return searchV2(dmi, query, index.Mappings) + return searchV2(index.ShardNum, len(readers), dmi, query, index.Mappings) } -func searchV2(dmi search.DocumentMatchIterator, query *meta.ZincQuery, mappings *meta.Mappings) (*meta.SearchResponse, error) { +func searchV2(shardNum, readerNum int, dmi search.DocumentMatchIterator, query *meta.ZincQuery, mappings *meta.Mappings) (*meta.SearchResponse, error) { resp := &meta.SearchResponse{ Hits: meta.Hits{Hits: []meta.Hit{}}, } @@ -148,11 +154,9 @@ func searchV2(dmi search.DocumentMatchIterator, query *meta.ZincQuery, mappings } resp.Took = int(dmi.Aggregations().Duration().Milliseconds()) - resp.Shards = meta.Shards{Total: 1, Successful: 1} + resp.Shards = meta.Shards{Total: shardNum, Successful: readerNum, Skipped: shardNum - readerNum} resp.Hits = meta.Hits{ - Total: meta.Total{ - Value: int(dmi.Aggregations().Count()), - }, + Total: meta.Total{Value: int(dmi.Aggregations().Count())}, MaxScore: dmi.Aggregations().Metric("max_score"), Hits: Hits, } diff --git a/pkg/core/search/v1/search.go b/pkg/core/search/v1/search.go index b447d5dde..32f4c0d00 100644 --- a/pkg/core/search/v1/search.go +++ b/pkg/core/search/v1/search.go @@ -24,6 +24,7 @@ import ( "github.com/zinclabs/zinc/pkg/config" "github.com/zinclabs/zinc/pkg/core" + "github.com/zinclabs/zinc/pkg/uquery/timerange" ) func Search(index *core.Index, iQuery *ZincQuery) (*SearchResponse, error) { @@ -89,13 +90,18 @@ func Search(index *core.Index, iQuery *ZincQuery) (*SearchResponse, error) { }, err } - reader, err := index.Writer.Reader() + timeMin, timeMax := timerange.Query(iQuery.Query) + readers, err := index.GetReaders(timeMin, timeMax) if err != nil { log.Printf("error accessing reader: %s", err.Error()) } - defer reader.Close() + defer func() { + for _, reader := range readers { + reader.Close() + } + }() - dmi, err := reader.Search(context.Background(), searchRequest) + dmi, err := bluge.MultiSearch(context.Background(), searchRequest, readers...) if err != nil { log.Printf("error executing search: %s", err.Error()) } diff --git a/pkg/core/search/v1/search_test.go b/pkg/core/search/v1/search_test.go index ca6c2ac49..bf9963fce 100644 --- a/pkg/core/search/v1/search_test.go +++ b/pkg/core/search/v1/search_test.go @@ -353,7 +353,7 @@ func TestSearch(t *testing.T) { for _, d := range tt.data { rand.Seed(time.Now().UnixNano()) docId := rand.Intn(1000) - err := index.UpdateDocument(strconv.Itoa(docId), d, true) + err := index.CreateDocument(strconv.Itoa(docId), d, true) assert.NoError(t, err) } got, err := Search(index, tt.args.iQuery) diff --git a/pkg/core/search_test.go b/pkg/core/search_test.go index 1041f32eb..e6fe3a60b 100644 --- a/pkg/core/search_test.go +++ b/pkg/core/search_test.go @@ -307,7 +307,7 @@ func TestIndex_Search(t *testing.T) { for _, d := range tt.data { rand.Seed(time.Now().UnixNano()) docId := rand.Intn(1000) - err := index.UpdateDocument(strconv.Itoa(docId), d, true) + err := index.CreateDocument(strconv.Itoa(docId), d, true) assert.NoError(t, err) } got, err := index.Search(tt.args.iQuery) diff --git a/pkg/handlers/document/bulk.go b/pkg/handlers/document/bulk.go index 05b5e956b..5dcc47f78 100644 --- a/pkg/handlers/document/bulk.go +++ b/pkg/handlers/document/bulk.go @@ -38,11 +38,21 @@ func Bulk(c *gin.Context) { defer c.Request.Body.Close() - ret, err := BulkWorker(target, c.Request.Body) + indexes, ret, err := BulkWorker(target, c.Request.Body) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } + + // check shards + for name := range indexes { + index, ok := core.ZINC_INDEX_LIST.Get(name) + if !ok { + continue + } + _ = index.CheckShards() + } + c.JSON(http.StatusOK, gin.H{"message": "bulk data inserted", "record_count": ret.Count}) } @@ -52,16 +62,27 @@ func ESBulk(c *gin.Context) { defer c.Request.Body.Close() startTime := time.Now() - ret, err := BulkWorker(target, c.Request.Body) + indexes, ret, err := BulkWorker(target, c.Request.Body) ret.Took = int(time.Since(startTime) / time.Millisecond) if err != nil { ret.Error = err.Error() } + + // check shards + for name := range indexes { + index, ok := core.ZINC_INDEX_LIST.Get(name) + if !ok { + continue + } + _ = index.CheckShards() + } + c.JSON(http.StatusOK, ret) } -func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { +func BulkWorker(target string, body io.Reader) (map[string]struct{}, *BulkResponse, error) { bulkRes := &BulkResponse{Items: []map[string]*BulkResponseItem{}} + bulkIndexes := make(map[string]struct{}) // Prepare to read the entire raw text of the body scanner := bufio.NewScanner(body) @@ -97,7 +118,7 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { if nextLineIsData { bulkRes.Count++ nextLineIsData = false - mintedID := false + update := false var docID = "" if val, ok := lastLineMetaData["_id"]; ok && val != nil { @@ -105,7 +126,8 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { } if docID == "" { docID = ider.Generate() - mintedID = true + } else { + update = true } indexName := lastLineMetaData["_index"].(string) @@ -130,13 +152,14 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { if !exists { // If the requested indexName does not exist then create it newIndex, err := core.NewIndex(indexName, "disk", nil) if err != nil { - return bulkRes, err + return bulkIndexes, bulkRes, err } // store index if err := core.StoreIndex(newIndex); err != nil { - return bulkRes, err + return bulkIndexes, bulkRes, err } } + bulkIndexes[indexName] = struct{}{} // Since this is a bulk request, we need to check if we already created a new batch for this index. We need to create 1 batch per index. if DoesExistInThisRequest(indexesInThisBatch, indexName) == -1 { // Add the list of indexes to the batch if it's not already there @@ -144,15 +167,15 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { batch[indexName] = index.NewBatch() } - index, _ := core.GetIndex(indexName) - bdoc, err := index.BuildBlugeDocumentFromJSON(docID, doc) + newIndex, _ := core.GetIndex(indexName) + bdoc, err := newIndex.BuildBlugeDocumentFromJSON(docID, doc) if err != nil { - return bulkRes, err + return bulkIndexes, bulkRes, err } // Add the documen to the batch. We will persist the batch to the index // when we have processed all documents in the request - if !mintedID { + if update { batch[indexName].Update(bdoc.ID(), bdoc) } else { batch[indexName].Insert(bdoc) @@ -163,9 +186,15 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { if documentsInBatch >= batchSize { for _, indexName := range indexesInThisBatch { // Persist the batch to the index - if err := index.Writer.Batch(batch[indexName]); err != nil { + newIndex, _ := core.GetIndex(indexName) + writer, err := newIndex.GetWriter() + if err != nil { log.Error().Msgf("bulk: index updating batch err %s", err.Error()) - return bulkRes, err + return bulkIndexes, bulkRes, err + } + if err := writer.Batch(batch[indexName]); err != nil { + log.Error().Msgf("bulk: index updating batch err %s", err.Error()) + return bulkIndexes, bulkRes, err } batch[indexName].Reset() } @@ -181,7 +210,7 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { lastLineMetaData["operation"] = k if _, ok := v.(map[string]interface{}); !ok { - return nil, errors.New("bulk index data format error") + return nil, nil, errors.New("bulk index data format error") } if v.(map[string]interface{})["_index"] != "" { // if index is specified in metadata then it overtakes the index in the query path @@ -190,7 +219,7 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { lastLineMetaData["_index"] = target } if lastLineMetaData["_index"] == "" { - return nil, errors.New("bulk index data format error") + return nil, nil, errors.New("bulk index data format error") } lastLineMetaData["_id"] = v.(map[string]interface{})["_id"] @@ -205,7 +234,7 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { lastLineMetaData["_index"] = target } if lastLineMetaData["_index"] == "" { - return nil, errors.New("bulk index data format error") + return nil, nil, errors.New("bulk index data format error") } // delete @@ -227,19 +256,24 @@ func BulkWorker(target string, body io.Reader) (*BulkResponse, error) { } if err := scanner.Err(); err != nil { - return bulkRes, err + return bulkIndexes, bulkRes, err } for _, indexName := range indexesInThisBatch { // Persist the batch to the index - index, _ := core.GetIndex(indexName) - if err := index.Writer.Batch(batch[indexName]); err != nil { + newIndex, _ := core.GetIndex(indexName) + writer, err := newIndex.GetWriter() + if err != nil { + log.Error().Msgf("bulk: index updating batch err %s", err.Error()) + return bulkIndexes, bulkRes, err + } + if err := writer.Batch(batch[indexName]); err != nil { log.Printf("bulk: index updating batch err %s", err.Error()) - return bulkRes, err + return bulkIndexes, bulkRes, err } } - return bulkRes, nil + return bulkIndexes, bulkRes, nil } // DoesExistInThisRequest takes a slice and looks for an element in it. If found it will diff --git a/pkg/handlers/document/create_update.go b/pkg/handlers/document/create_update.go index c441597c0..a8bcb7a6b 100644 --- a/pkg/handlers/document/create_update.go +++ b/pkg/handlers/document/create_update.go @@ -35,15 +35,15 @@ func CreateUpdate(c *gin.Context) { return } - mintedID := false - + update := false // If id field is present then use it, else create a new UUID and use it if id, ok := doc["_id"]; ok { docID = id.(string) } if docID == "" { docID = ider.Generate() - mintedID = true + } else { + update = true } // If the index does not exist, then create it @@ -59,11 +59,14 @@ func CreateUpdate(c *gin.Context) { _ = core.StoreIndex(index) } - err = index.UpdateDocument(docID, doc, mintedID) + err = index.CreateDocument(docID, doc, update) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } + // check shards + _ = index.CheckShards() + c.JSON(http.StatusOK, gin.H{"id": docID}) } diff --git a/pkg/handlers/document/delete.go b/pkg/handlers/document/delete.go index e4be4ba09..715f6b4ba 100644 --- a/pkg/handlers/document/delete.go +++ b/pkg/handlers/document/delete.go @@ -35,10 +35,17 @@ func Delete(c *gin.Context) { } bdoc := bluge.NewDocument(docID) - err := index.Writer.Delete(bdoc.ID()) + writers, err := index.GetWriters() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } + for _, w := range writers { + err = w.Delete(bdoc.ID()) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + } c.JSON(http.StatusOK, gin.H{"message": "deleted", "index": indexName, "id": docID}) } diff --git a/pkg/handlers/document/update.go b/pkg/handlers/document/update.go new file mode 100644 index 000000000..64eb3ad7c --- /dev/null +++ b/pkg/handlers/document/update.go @@ -0,0 +1,60 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package document + +import ( + "net/http" + + "github.com/gin-gonic/gin" + + "github.com/zinclabs/zinc/pkg/core" +) + +func Update(c *gin.Context) { + indexName := c.Param("target") + docID := c.Param("id") // ID for the document to be updated provided in URL path + + var err error + var doc map[string]interface{} + if err = c.BindJSON(&doc); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // If id field is present then use it, else create a new UUID and use it + if id, ok := doc["_id"]; ok { + docID = id.(string) + } + if docID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "_id field is required"}) + return + } + + // If the index does not exist, then create it + index, exists := core.GetIndex(indexName) + if !exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "index does not exists"}) + return + } + + err = index.UpdateDocument(docID, doc) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"id": docID}) +} diff --git a/pkg/handlers/document/update_test.go b/pkg/handlers/document/update_test.go new file mode 100644 index 000000000..f167c52c7 --- /dev/null +++ b/pkg/handlers/document/update_test.go @@ -0,0 +1,121 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package document + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/zinclabs/zinc/pkg/core" + "github.com/zinclabs/zinc/test/utils" +) + +func TestUpdate(t *testing.T) { + type args struct { + code int + data map[string]interface{} + rawData string + params map[string]string + result string + } + tests := []struct { + name string + args args + }{ + { + name: "normal", + args: args{ + code: http.StatusOK, + data: map[string]interface{}{ + "_id": "1", + "name": "userUpdate", + "role": "create", + }, + params: map[string]string{ + "target": "TestUpdate.index_1", + }, + result: `{"id":"1"}`, + }, + }, + { + name: "err json", + args: args{ + code: http.StatusBadRequest, + rawData: `{"_id":"1","name":"user","role":"create}`, + params: map[string]string{ + "target": "TestUpdate.index_1", + }, + result: `{"error":`, + }, + }, + { + name: "empty id", + args: args{ + code: http.StatusBadRequest, + rawData: `{"_id":"","name":"user","role":"create"}`, + params: map[string]string{ + "target": "TestUpdate.index_1", + }, + result: `{"error":`, + }, + }, + { + name: "not exists index", + args: args{ + code: http.StatusBadRequest, + rawData: `{"_id":"1","name":"user","role":"create"}`, + params: map[string]string{ + "target": "TestUpdate.index_2", + }, + result: `{"error":`, + }, + }, + } + + t.Run("prepare", func(t *testing.T) { + c, w := utils.NewGinContext() + utils.SetGinRequestData(c, `{"_id":"1","name":"user","role":"create"}`) + utils.SetGinRequestParams(c, map[string]string{"target": "TestUpdate.index_1"}) + CreateUpdate(c) + assert.Equal(t, http.StatusOK, w.Code) + assert.Contains(t, w.Body.String(), "") + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c, w := utils.NewGinContext() + if tt.args.data != nil { + utils.SetGinRequestData(c, tt.args.data) + } + if tt.args.rawData != "" { + utils.SetGinRequestData(c, tt.args.rawData) + } + if tt.args.params != nil { + utils.SetGinRequestParams(c, tt.args.params) + } + Update(c) + assert.Equal(t, tt.args.code, w.Code) + assert.Contains(t, w.Body.String(), tt.args.result) + }) + } + + t.Run("cleanup", func(t *testing.T) { + err := core.DeleteIndex("TestUpdate.index_1") + assert.NoError(t, err) + }) +} diff --git a/pkg/handlers/index/analyzer.go b/pkg/handlers/index/analyzer.go index 092cf6d6e..6fe5550b2 100644 --- a/pkg/handlers/index/analyzer.go +++ b/pkg/handlers/index/analyzer.go @@ -42,7 +42,7 @@ func Analyze(c *gin.Context) { // use index analyzer index, exists := core.GetIndex(indexName) if !exists { - c.JSON(http.StatusNotFound, gin.H{"error": "index " + indexName + " does not exists"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "index " + indexName + " does not exists"}) return } if query.Filed != "" && query.Analyzer == "" { diff --git a/pkg/handlers/index/analyzer_test.go b/pkg/handlers/index/analyzer_test.go index 5716f8632..17aa4deda 100644 --- a/pkg/handlers/index/analyzer_test.go +++ b/pkg/handlers/index/analyzer_test.go @@ -116,7 +116,7 @@ func TestAnalyze(t *testing.T) { { name: "with not exist index analyzer", args: args{ - code: http.StatusNotFound, + code: http.StatusBadRequest, data: `{"analyzer":"standard","text":"this is a test"}`, params: map[string]string{"target": "not_exist_index"}, result: "[this is a test]", diff --git a/pkg/handlers/index/mapping.go b/pkg/handlers/index/mapping.go index 171bff78a..29d811829 100644 --- a/pkg/handlers/index/mapping.go +++ b/pkg/handlers/index/mapping.go @@ -29,7 +29,7 @@ func GetMapping(c *gin.Context) { indexName := c.Param("target") index, exists := core.GetIndex(indexName) if !exists { - c.JSON(http.StatusNotFound, gin.H{"error": "index " + indexName + " does not exists"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "index " + indexName + " does not exists"}) return } diff --git a/pkg/handlers/index/mapping_test.go b/pkg/handlers/index/mapping_test.go index 6610b6e35..851465b70 100644 --- a/pkg/handlers/index/mapping_test.go +++ b/pkg/handlers/index/mapping_test.go @@ -188,7 +188,7 @@ func TestMapping(t *testing.T) { { name: "empty", args: args{ - code: http.StatusNotFound, + code: http.StatusBadRequest, target: "", result: `does not exists`, }, diff --git a/pkg/handlers/index/refresh.go b/pkg/handlers/index/refresh.go index a3e5a08f5..6b75fd0d3 100644 --- a/pkg/handlers/index/refresh.go +++ b/pkg/handlers/index/refresh.go @@ -25,8 +25,12 @@ import ( func Refresh(c *gin.Context) { indexName := c.Param("target") - err := core.ReopenIndex(indexName) - if err != nil { + index, exists := core.GetIndex(indexName) + if !exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "index " + indexName + " does not exists"}) + return + } + if err := index.Reopen(); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } diff --git a/pkg/handlers/index/refresh_test.go b/pkg/handlers/index/refresh_test.go new file mode 100644 index 000000000..0eabf00bd --- /dev/null +++ b/pkg/handlers/index/refresh_test.go @@ -0,0 +1,82 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package index + +import ( + "net/http" + "testing" + + "github.com/goccy/go-json" + "github.com/stretchr/testify/assert" + + "github.com/zinclabs/zinc/pkg/core" + "github.com/zinclabs/zinc/test/utils" +) + +func TestRefresh(t *testing.T) { + type args struct { + code int + params map[string]string + result string + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "normal", + args: args{ + code: http.StatusOK, + params: map[string]string{"target": "TestRefresh.index_1"}, + result: "refresh ok", + }, + wantErr: false, + }, + { + name: "empty", + args: args{ + code: http.StatusBadRequest, + params: map[string]string{"target": ""}, + result: "does not exists", + }, + wantErr: false, + }, + } + + t.Run("prepare", func(t *testing.T) { + index, err := core.NewIndex("TestRefresh.index_1", "disk", nil) + assert.NoError(t, err) + assert.NotNil(t, index) + + err = core.StoreIndex(index) + assert.NoError(t, err) + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c, w := utils.NewGinContext() + utils.SetGinRequestParams(c, tt.args.params) + Refresh(c) + assert.Equal(t, tt.args.code, w.Code) + assert.Contains(t, w.Body.String(), tt.args.result) + + resp := make(map[string]string) + err := json.Unmarshal(w.Body.Bytes(), &resp) + assert.NoError(t, err) + }) + } +} diff --git a/pkg/handlers/index/settings.go b/pkg/handlers/index/settings.go index a815796d1..f21b707a2 100644 --- a/pkg/handlers/index/settings.go +++ b/pkg/handlers/index/settings.go @@ -30,7 +30,7 @@ func GetSettings(c *gin.Context) { indexName := c.Param("target") index, exists := core.GetIndex(indexName) if !exists { - c.JSON(http.StatusNotFound, gin.H{"error": "index " + indexName + " does not exists"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "index " + indexName + " does not exists"}) return } diff --git a/pkg/handlers/index/settings_test.go b/pkg/handlers/index/settings_test.go index d644e8b92..ab346e1d1 100644 --- a/pkg/handlers/index/settings_test.go +++ b/pkg/handlers/index/settings_test.go @@ -156,7 +156,7 @@ func TestSettings(t *testing.T) { { name: "empty", args: args{ - code: http.StatusNotFound, + code: http.StatusBadRequest, target: "", result: `does not exists`, }, diff --git a/pkg/handlers/index/template.go b/pkg/handlers/index/template.go index 522ea4a99..96890b752 100644 --- a/pkg/handlers/index/template.go +++ b/pkg/handlers/index/template.go @@ -47,7 +47,7 @@ func GetTemplate(c *gin.Context) { return } if !exists { - c.JSON(http.StatusNotFound, gin.H{"error": "template " + name + " does not exists"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "template " + name + " does not exists"}) return } diff --git a/pkg/handlers/index/template_test.go b/pkg/handlers/index/template_test.go index 57c029d10..af91fc128 100644 --- a/pkg/handlers/index/template_test.go +++ b/pkg/handlers/index/template_test.go @@ -179,7 +179,7 @@ func TestTemplate(t *testing.T) { { name: "not exists", args: args{ - code: http.StatusNotFound, + code: http.StatusBadRequest, target: "TestTemplate.index_N", result: `does not exists`, }, diff --git a/pkg/handlers/search/search_test.go b/pkg/handlers/search/search_v1_test.go similarity index 80% rename from pkg/handlers/search/search_test.go rename to pkg/handlers/search/search_v1_test.go index e738e3286..6b445593c 100644 --- a/pkg/handlers/search/search_test.go +++ b/pkg/handlers/search/search_v1_test.go @@ -25,8 +25,8 @@ import ( "github.com/zinclabs/zinc/test/utils" ) -func TestSearchDSL(t *testing.T) { - indexName := "TestSearchDSL.index_1" +func TestSearchV1(t *testing.T) { + indexName := "TestSearchV1.index_1" type args struct { code int data string @@ -41,24 +41,16 @@ func TestSearchDSL(t *testing.T) { name: "normal", args: args{ code: http.StatusOK, - data: `{"query":{"match_all":{}},"size":10}`, + data: `{"query_type":"match_all","max_results":10}`, params: map[string]string{"target": indexName}, - result: "successful", - }, - }, - { - name: "multiple index", - args: args{ - code: http.StatusOK, - data: `{"query":{"match_all":{}},"size":10}`, - result: "successful", + result: "{\"total\":", }, }, { name: "index not found", args: args{ code: http.StatusBadRequest, - data: `{"query":{"match_all":{}},"size":10}`, + data: `{"query_type":"match_all","max_results":10}`, params: map[string]string{"target": "NotExist" + indexName}, result: "does not exists", }, @@ -67,8 +59,8 @@ func TestSearchDSL(t *testing.T) { name: "query jsone error", args: args{ code: http.StatusBadRequest, - data: `{"query":{"match_all":{x}},"size":10}`, - params: map[string]string{"target": "olympics"}, + data: `{"query_type":"match_all","max_results":10,{x}}`, + params: map[string]string{"target": indexName}, result: "invalid character", }, }, @@ -87,7 +79,7 @@ func TestSearchDSL(t *testing.T) { c, w := utils.NewGinContext() utils.SetGinRequestData(c, tt.args.data) utils.SetGinRequestParams(c, tt.args.params) - SearchDSL(c) + SearchV1(c) assert.Equal(t, tt.args.code, w.Code) assert.Contains(t, w.Body.String(), tt.args.result) }) diff --git a/pkg/handlers/search/search_v2.go b/pkg/handlers/search/search_v2.go index 85121e6fd..24014b43a 100644 --- a/pkg/handlers/search/search_v2.go +++ b/pkg/handlers/search/search_v2.go @@ -86,14 +86,14 @@ func MultipleSearch(c *gin.Context) { nextLineIsData = false var query *meta.ZincQuery if err = json.Unmarshal(scanner.Bytes(), &query); err != nil { - log.Error().Msgf("handlers.search..MultipleSearch: json.Unmarshal: err %s", err.Error()) + log.Error().Err(err).Msg("handlers.search..MultipleSearch: json.Unmarshal error") responses = append(responses, &meta.SearchResponse{Error: err.Error()}) continue } // search query resp, err := searchIndex(indexNames, query) if err != nil { - log.Error().Msgf("handlers.search..MultipleSearch: searchIndex: err %s", err.Error()) + log.Error().Err(err).Msg("handlers.search..MultipleSearch: searchIndex: error") responses = append(responses, &meta.SearchResponse{Error: err.Error()}) } else { responses = append(responses, resp) @@ -102,7 +102,7 @@ func MultipleSearch(c *gin.Context) { nextLineIsData = true indexNames = indexNames[:0] if err = json.Unmarshal(scanner.Bytes(), &doc); err != nil { - log.Error().Msgf("handlers.search..MultipleSearch: json.Unmarshal: err %s", err.Error()) + log.Error().Err(err).Msg("handlers.search..MultipleSearch: json.Unmarshal: error") continue } if v, ok := doc["index"]; ok { diff --git a/pkg/handlers/search/search_v2_test.go b/pkg/handlers/search/search_v2_test.go new file mode 100644 index 000000000..cd3b92c80 --- /dev/null +++ b/pkg/handlers/search/search_v2_test.go @@ -0,0 +1,167 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package search + +import ( + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/zinclabs/zinc/pkg/core" + "github.com/zinclabs/zinc/test/utils" +) + +func TestSearchDSL(t *testing.T) { + indexName := "TestSearchDSL.index_1" + type args struct { + code int + data string + params map[string]string + result string + } + tests := []struct { + name string + args args + }{ + { + name: "normal", + args: args{ + code: http.StatusOK, + data: `{"query":{"match_all":{}},"size":10}`, + params: map[string]string{"target": indexName}, + result: "successful", + }, + }, + { + name: "multiple index", + args: args{ + code: http.StatusOK, + data: `{"query":{"match_all":{}},"size":10}`, + result: "successful", + }, + }, + { + name: "index not found", + args: args{ + code: http.StatusBadRequest, + data: `{"query":{"match_all":{}},"size":10}`, + params: map[string]string{"target": "NotExist" + indexName}, + result: "does not exists", + }, + }, + { + name: "query jsone error", + args: args{ + code: http.StatusBadRequest, + data: `{"query":{"match_all":{x}},"size":10}`, + params: map[string]string{"target": "olympics"}, + result: "invalid character", + }, + }, + } + + t.Run("prepare", func(t *testing.T) { + index, err := core.NewIndex(indexName, "disk", nil) + assert.NoError(t, err) + assert.NotNil(t, index) + err = core.StoreIndex(index) + assert.NoError(t, err) + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c, w := utils.NewGinContext() + utils.SetGinRequestData(c, tt.args.data) + utils.SetGinRequestParams(c, tt.args.params) + SearchDSL(c) + assert.Equal(t, tt.args.code, w.Code) + assert.Contains(t, w.Body.String(), tt.args.result) + }) + } + + t.Run("cleanup", func(t *testing.T) { + err := core.DeleteIndex(indexName) + assert.NoError(t, err) + }) +} + +func TestMultipleSearch(t *testing.T) { + indexName := "TestMultipleSearch.index_1" + type args struct { + code int + data string + params map[string]string + result string + } + tests := []struct { + name string + args args + }{ + { + name: "normal", + args: args{ + code: http.StatusOK, + data: `{"index":"` + indexName + `"} +{"query":{"match_all":{}},"size":10}`, + params: map[string]string{"target": indexName}, + result: "successful", + }, + }, + { + name: "multiple index", + args: args{ + code: http.StatusOK, + data: `{"index":["` + indexName + `"]} +{"query":{"match_all":{}},"size":10}`, + result: "successful", + }, + }, + { + name: "not exists", + args: args{ + code: http.StatusOK, + data: `{"index":"TestMultipleSearch.notExists"} +{"query":{"match_all":{}},"size":10}`, + result: "does not exists", + }, + }, + } + + t.Run("prepare", func(t *testing.T) { + index, err := core.NewIndex(indexName, "disk", nil) + assert.NoError(t, err) + assert.NotNil(t, index) + err = core.StoreIndex(index) + assert.NoError(t, err) + }) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c, w := utils.NewGinContext() + utils.SetGinRequestData(c, tt.args.data) + utils.SetGinRequestParams(c, tt.args.params) + MultipleSearch(c) + assert.Equal(t, tt.args.code, w.Code) + assert.Contains(t, w.Body.String(), tt.args.result) + }) + } + + t.Run("cleanup", func(t *testing.T) { + err := core.DeleteIndex(indexName) + assert.NoError(t, err) + }) +} diff --git a/pkg/meta/index.go b/pkg/meta/index.go index 8fa6d0c32..89a93a9c2 100644 --- a/pkg/meta/index.go +++ b/pkg/meta/index.go @@ -15,19 +15,36 @@ package meta -import "time" +import ( + "time" + + "github.com/blugelabs/bluge" +) type Index struct { Name string `json:"name"` StorageType string `json:"storage_type"` StorageSize uint64 `json:"storage_size"` DocNum uint64 `json:"doc_num"` + DocTimeMin int64 `json:"doc_time_min"` + DocTimeMax int64 `json:"doc_time_max"` + ShardNum int `json:"shard_num"` + Shards []*IndexShard `json:"shards"` Settings *IndexSettings `json:"settings,omitempty"` Mappings *Mappings `json:"mappings,omitempty"` CreateAt time.Time `json:"create_at"` UpdateAt time.Time `json:"update_at"` } +type IndexShard struct { + ID int `json:"id"` + DocTimeMin int64 `json:"doc_time_min"` + DocTimeMax int64 `json:"doc_time_max"` + DocNum uint64 `json:"doc_num"` + StorageSize uint64 `json:"storage_size"` + Writer *bluge.Writer `json:"-"` +} + type IndexSimple struct { Name string `json:"name"` StorageType string `json:"storage_type"` diff --git a/pkg/routes/routes.go b/pkg/routes/routes.go index 4131b3909..d94a69bbb 100644 --- a/pkg/routes/routes.go +++ b/pkg/routes/routes.go @@ -115,10 +115,11 @@ func SetRoutes(r *gin.Engine) { r.POST("/api/_bulk", AuthMiddleware, document.Bulk) r.POST("/api/:target/_bulk", AuthMiddleware, document.Bulk) // Document CRUD APIs. Update is same as create. - r.POST("/api/:target/_doc", AuthMiddleware, document.CreateUpdate) - r.PUT("/api/:target/_doc", AuthMiddleware, document.CreateUpdate) - r.PUT("/api/:target/_doc/:id", AuthMiddleware, document.CreateUpdate) - r.DELETE("/api/:target/_doc/:id", AuthMiddleware, document.Delete) + r.POST("/api/:target/_doc", AuthMiddleware, document.CreateUpdate) // create + r.PUT("/api/:target/_doc", AuthMiddleware, document.CreateUpdate) // create + r.PUT("/api/:target/_doc/:id", AuthMiddleware, document.CreateUpdate) // create or update + r.POST("/api/:target/_update/:id", AuthMiddleware, document.Update) // update + r.DELETE("/api/:target/_doc/:id", AuthMiddleware, document.Delete) // delete /** * elastic compatible APIs @@ -159,12 +160,12 @@ func SetRoutes(r *gin.Engine) { r.POST("/es/_bulk", AuthMiddleware, document.ESBulk) r.POST("/es/:target/_bulk", AuthMiddleware, document.ESBulk) // ES Document - r.POST("/es/:target/_doc", AuthMiddleware, document.CreateUpdate) - r.PUT("/es/:target/_doc/:id", AuthMiddleware, document.CreateUpdate) - r.PUT("/es/:target/_create/:id", AuthMiddleware, document.CreateUpdate) - r.POST("/es/:target/_create/:id", AuthMiddleware, document.CreateUpdate) - r.POST("/es/:target/_update/:id", AuthMiddleware, document.CreateUpdate) - r.DELETE("/es/:target/_doc/:id", AuthMiddleware, document.Delete) + r.POST("/es/:target/_doc", AuthMiddleware, document.CreateUpdate) // create + r.PUT("/es/:target/_doc/:id", AuthMiddleware, document.CreateUpdate) // create or update + r.PUT("/es/:target/_create/:id", AuthMiddleware, document.CreateUpdate) // create + r.POST("/es/:target/_create/:id", AuthMiddleware, document.CreateUpdate) // create + r.POST("/es/:target/_update/:id", AuthMiddleware, document.Update) // update part of document + r.DELETE("/es/:target/_doc/:id", AuthMiddleware, document.Delete) // delete core.Telemetry.Instance() core.Telemetry.Event("server_start", nil) diff --git a/pkg/core/updatedocument.go b/pkg/upgrade/upgrade.go similarity index 58% rename from pkg/core/updatedocument.go rename to pkg/upgrade/upgrade.go index a840f046f..2a4a8262b 100644 --- a/pkg/core/updatedocument.go +++ b/pkg/upgrade/upgrade.go @@ -13,21 +13,20 @@ * limitations under the License. */ -package core +package upgrade -// UpdateDocument inserts or updates a document in the zinc index -func (index *Index) UpdateDocument(docID string, doc map[string]interface{}, mintedID bool) error { - bdoc, err := index.BuildBlugeDocumentFromJSON(docID, doc) - if err != nil { - return err - } +import ( + "fmt" + + "github.com/rs/zerolog/log" +) - // Finally update the document on disk - writer := index.Writer - if !mintedID { - err = writer.Update(bdoc.ID(), bdoc) - } else { - err = writer.Insert(bdoc) +func Do(oldVersion string) error { + log.Info().Msgf("Begin upgrade from version %s", oldVersion) + switch oldVersion { + case "v0.2.4": + return UpgradeFromV024() + default: + return fmt.Errorf("unsupported upgrade from version: %s", oldVersion) } - return err } diff --git a/pkg/upgrade/v024.go b/pkg/upgrade/v024.go new file mode 100644 index 000000000..ebf20b7d7 --- /dev/null +++ b/pkg/upgrade/v024.go @@ -0,0 +1,66 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package upgrade + +import ( + "os" + "path" + + "github.com/rs/zerolog/log" + + "github.com/zinclabs/zinc/pkg/config" +) + +// UpgradeFromV024 upgrades from version <= 0.2.4 +// upgrade steps: +// range ZINC_DATA_PATH/ +// -- mv index index_old +// -- mkdir index +// -- mv index_old index/000000 +func UpgradeFromV024() error { + rootPath := config.Global.DataPath + fs, err := os.ReadDir(rootPath) + if err != nil { + return err + } + for _, f := range fs { + if !f.IsDir() { + continue + } + if f.Name() == "_metadata.db" { + continue + } + log.Info().Msgf("Upgrading index: %s", f.Name()) + if err := UpgradeFromV024Index(f.Name()); err != nil { + return err + } + } + return nil +} + +func UpgradeFromV024Index(indexName string) error { + rootPath := config.Global.DataPath + if err := os.Rename(path.Join(rootPath, indexName), path.Join(rootPath, indexName+"_old")); err != nil { + return err + } + if err := os.Mkdir(path.Join(rootPath, indexName), 0755); err != nil { + return err + } + if err := os.Rename(path.Join(rootPath, indexName+"_old"), path.Join(rootPath, indexName, "000000")); err != nil { + return err + } + return nil +} diff --git a/pkg/uquery/timerange/bool.go b/pkg/uquery/timerange/bool.go new file mode 100644 index 000000000..fef26280a --- /dev/null +++ b/pkg/uquery/timerange/bool.go @@ -0,0 +1,65 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package timerange + +import "strings" + +func BoolQuery(query map[string]interface{}) (int64, int64) { + for k, v := range query { + k := strings.ToLower(k) + switch k { + case "should": + switch v := v.(type) { + case map[string]interface{}: + return Query(v) + case []interface{}: + for _, vv := range v { + min, max := Query(vv.(map[string]interface{})) + if min > 0 || max > 0 { + return min, max + } + } + } + case "must": + switch v := v.(type) { + case map[string]interface{}: + return Query(v) + case []interface{}: + for _, vv := range v { + min, max := Query(vv.(map[string]interface{})) + if min > 0 || max > 0 { + return min, max + } + } + } + case "must_not": + case "filter": + switch v := v.(type) { + case map[string]interface{}: + return Query(v) + case []interface{}: + for _, vv := range v { + min, max := Query(vv.(map[string]interface{})) + if min > 0 || max > 0 { + return min, max + } + } + } + } + } + + return 0, 0 +} diff --git a/pkg/uquery/timerange/query.go b/pkg/uquery/timerange/query.go new file mode 100644 index 000000000..fbfd2b15d --- /dev/null +++ b/pkg/uquery/timerange/query.go @@ -0,0 +1,62 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package timerange + +import ( + "strings" + + "github.com/goccy/go-json" + + "github.com/zinclabs/zinc/pkg/meta" +) + +func Query(query interface{}) (int64, int64) { + if query == nil { + return 0, 0 + } + + if q, ok := query.(*meta.Query); ok { + data, err := json.Marshal(q) + if err != nil { + return 0, 0 + } + var newQuery map[string]interface{} + if err = json.Unmarshal(data, &newQuery); err != nil { + return 0, 0 + } + query = newQuery + } + q, ok := query.(map[string]interface{}) + if !ok { + return 0, 0 + } + + for k, t := range q { + k := strings.ToLower(k) + v, ok := t.(map[string]interface{}) + if !ok { + return 0, 0 + } + switch k { + case "bool": + return BoolQuery(v) + case "range": + return RangeQuery(v) + } + } + + return 0, 0 +} diff --git a/pkg/uquery/timerange/range.go b/pkg/uquery/timerange/range.go new file mode 100644 index 000000000..062014b62 --- /dev/null +++ b/pkg/uquery/timerange/range.go @@ -0,0 +1,120 @@ +/* Copyright 2022 Zinc Labs Inc. and Contributors +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. + */ + +package timerange + +import ( + "strings" + "time" + + "github.com/zinclabs/zinc/pkg/meta" + "github.com/zinclabs/zinc/pkg/zutils" +) + +func RangeQuery(query map[string]interface{}) (int64, int64) { + for field, v := range query { + if field == "@timestamp" { + vv, ok := v.(map[string]interface{}) + if !ok { + return 0, 0 + } + return RangeQueryTime(field, vv) + } + } + return 0, 0 +} + +func RangeQueryTime(field string, query map[string]interface{}) (int64, int64) { + value := new(meta.RangeQuery) + for k, v := range query { + k := strings.ToLower(k) + switch k { + case "gt": + value.GT = v + case "gte": + value.GTE = v + case "lt": + value.LT = v + case "lte": + value.LTE = v + case "format": + value.Format = v.(string) + case "time_zone": + value.TimeZone = v.(string) + case "boost": + value.Boost = v.(float64) + default: + return 0, 0 + } + } + + var err error + format := time.RFC3339 + if value.Format != "" { + format = value.Format + } + timeZone := time.UTC + if value.TimeZone != "" { + timeZone, err = zutils.ParseTimeZone(value.TimeZone) + if err != nil { + return 0, 0 + } + } + + min := time.Time{} + max := time.Time{} + if value.GT != nil { + if format == "epoch_millis" { + min = time.UnixMilli(int64(value.GT.(float64))) + } else { + min, err = time.ParseInLocation(format, value.GT.(string), timeZone) + } + if err != nil { + return 0, 0 + } + } + if value.GTE != nil { + if format == "epoch_millis" { + min = time.UnixMilli(int64(value.GTE.(float64))) + } else { + min, err = time.ParseInLocation(format, value.GTE.(string), timeZone) + } + if err != nil { + return 0, 0 + } + } + if value.LT != nil { + if format == "epoch_millis" { + max = time.UnixMilli(int64(value.LT.(float64))) + } else { + max, err = time.ParseInLocation(format, value.LT.(string), timeZone) + } + if err != nil { + return 0, 0 + } + } + if value.LTE != nil { + if format == "epoch_millis" { + max = time.UnixMilli(int64(value.LTE.(float64))) + } else { + max, err = time.ParseInLocation(format, value.LTE.(string), timeZone) + } + if err != nil { + return 0, 0 + } + } + + return min.UTC().UnixNano(), max.UTC().UnixNano() +} diff --git a/test/benchmark/bulk_test.go b/test/benchmark/bulk_test.go index 7d03e4f51..c07bdcbd4 100644 --- a/test/benchmark/bulk_test.go +++ b/test/benchmark/bulk_test.go @@ -32,7 +32,7 @@ func BenchmarkBulk(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err = document.BulkWorker(target, f) + _, _, err = document.BulkWorker(target, f) if err != nil { b.Error(err) }