Skip to content

Commit

Permalink
implement multiple backend indexes (#202)
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang committed Jun 11, 2022
1 parent b44c07b commit 8425d85
Show file tree
Hide file tree
Showing 48 changed files with 1,734 additions and 397 deletions.
1 change: 0 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,3 @@ docker buildx build --platform linux/amd64 --tag zinc:latest-linux-amd64 . -f Do
1. Make the changes to code.
1. Push the code to your repo.
1. Create a PR

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.4
go.opencensus.io v0.23.0 // indirect
golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/text v0.3.7
golang.org/x/xerrors v0.0.0-20220411194840-2f41105eb62f // indirect
google.golang.org/genproto v0.0.0-20220518221133-4f43b3371335 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
36 changes: 36 additions & 0 deletions pkg/bluge/directory/disk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/* Copyright 2022 Zinc Labs Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package directory

import (
"path"

"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/index"
)

// GetDiskConfig returns a bluge config that will store index data in local disk
// rootPath: the root path of data
// indexName: the name of the index to use.
func GetDiskConfig(rootPath string, indexName string, timeRange ...int64) bluge.Config {
config := index.DefaultConfig(path.Join(rootPath, indexName))
if len(timeRange) == 2 {
if timeRange[0] <= timeRange[1] {
config = config.WithTimeRange(timeRange[0], timeRange[1])
}
}
return bluge.DefaultConfigWithIndexConfig(config)
}
10 changes: 8 additions & 2 deletions pkg/bluge/directory/minio.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@ import (
// GetMinIOConfig returns a bluge config that will store index data in MinIO
// bucket: the MinIO bucket to use
// indexName: the name of the index to use. It will be an MinIO prefix (folder)
func GetMinIOConfig(bucket string, indexName string) bluge.Config {
return bluge.DefaultConfigWithDirectory(func() index.Directory {
func GetMinIOConfig(bucket string, indexName string, timeRange ...int64) bluge.Config {
config := index.DefaultConfigWithDirectory(func() index.Directory {
return NewMinIODirectory(bucket, indexName)
})
if len(timeRange) == 2 {
if timeRange[0] <= timeRange[1] {
config = config.WithTimeRange(timeRange[0], timeRange[1])
}
}
return bluge.DefaultConfigWithIndexConfig(config)
}

type MinIODirectory struct {
Expand Down
10 changes: 8 additions & 2 deletions pkg/bluge/directory/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@ import (
// GetS3Config returns a bluge config that will store index data in S3
// bucket: the S3 bucket to use
// indexName: the name of the index to use. It will be an s3 prefix (folder)
func GetS3Config(bucket string, indexName string) bluge.Config {
return bluge.DefaultConfigWithDirectory(func() index.Directory {
func GetS3Config(bucket string, indexName string, timeRange ...int64) bluge.Config {
config := index.DefaultConfigWithDirectory(func() index.Directory {
return NewS3Directory(bucket, indexName)
})
if len(timeRange) == 2 {
if timeRange[0] <= timeRange[1] {
config = config.WithTimeRange(timeRange[0], timeRange[1])
}
}
return bluge.DefaultConfigWithIndexConfig(config)
}

type S3Directory struct {
Expand Down
30 changes: 28 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"

"github.com/gin-gonic/gin"
"github.com/joho/godotenv"
"github.com/rs/zerolog/log"

Expand All @@ -46,12 +47,26 @@ type config struct {
BatchSize int `env:"ZINC_BATCH_SIZE,default=1024"`
MaxResults int `env:"ZINC_MAX_RESULTS,default=10000"`
AggregationTermsSize int `env:"ZINC_AGGREGATION_TERMS_SIZE,default=1000"`
Shard shard
Etcd etcd
S3 s3
MinIO minIO
Plugin plugin
}

// 1073741824 1g
// 536870912 512m
// 268435456 256m
// 134217728 128m
// 67108864 64m
// 33554432 32m
// 16777216 16m

type shard struct {
// MaxShards is the maximum number of shards to create.
MaxSize uint64 `env:"ZINC_SHARD_MAX_SIZE,default=1073741824"`
}

type etcd struct {
Endpoints []string `env:"ZINC_ETCD_ENDPOINTS"`
Username string `env:"ZINC_ETCD_USERNAME"`
Expand Down Expand Up @@ -91,6 +106,11 @@ func init() {
rv := reflect.ValueOf(Global).Elem()
loadConfig(rv)

// configure gin
if Global.GinMode == "release" {
gin.SetMode(gin.ReleaseMode)
}

// check data path
testPath := path.Join(Global.DataPath, "_test_")
if err := os.MkdirAll(testPath, 0755); err != nil {
Expand Down Expand Up @@ -145,12 +165,18 @@ func setField(field reflect.Value, tag string) {
return
}
switch field.Kind() {
case reflect.Int:
vi, err := strconv.Atoi(v)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
vi, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Fatal().Err(err).Msgf("env %s is not int", tag)
}
field.SetInt(int64(vi))
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
vi, err := strconv.ParseUint(v, 10, 64)
if err != nil {
log.Fatal().Err(err).Msgf("env %s is not uint", tag)
}
field.SetUint(uint64(vi))
case reflect.Bool:
vi, err := strconv.ParseBool(v)
if err != nil {
Expand Down
9 changes: 3 additions & 6 deletions pkg/core/deleteindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,17 @@ func DeleteIndex(name string) error {
dataPath := config.Global.DataPath
err := os.RemoveAll(dataPath + "/" + index.Name)
if err != nil {
log.Error().Msgf("failed to delete index: %s", err.Error())
return err
log.Error().Err(err).Msg("failed to delete index")
}
} else if index.StorageType == "s3" {
err := deleteFilesForIndexFromS3(index.Name)
if err != nil {
log.Error().Msgf("failed to delete index from S3: %s", err.Error())
return err
log.Error().Err(err).Msg("failed to delete index from S3")
}
} else if index.StorageType == "minio" {
err := deleteFilesForIndexFromMinIO(index.Name)
if err != nil {
log.Error().Msgf("failed to delete index from minIO: %s", err.Error())
return err
log.Error().Err(err).Msg("failed to delete index from minIO")
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/core/deleteindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ func TestDeleteIndex(t *testing.T) {
args: args{
name: indexNameS3,
},
wantErr: true,
wantErr: false,
},
{
name: "minio",
args: args{
name: indexNameMinIO,
},
wantErr: true,
wantErr: false,
},
}

Expand Down

0 comments on commit 8425d85

Please sign in to comment.