Skip to content

Commit

Permalink
chore: synchronize the code to the community (#472)
Browse files Browse the repository at this point in the history
Signed-off-by: shilinlee <836160610@qq.com>
Signed-off-by: openGemini <>
Co-authored-by: openGemini <>
  • Loading branch information
shilinlee committed Feb 2, 2024
1 parent e2b367a commit 77c3c50
Show file tree
Hide file tree
Showing 142 changed files with 8,468 additions and 821 deletions.
1 change: 1 addition & 0 deletions app/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (cmd *Command) InitConfig(conf config.Config, path string) error {
}
crypto.Initialize(common.CryptoConfig)
config.SetProductType(common.ProductType)
config.SetCommon(*common)
}

if err := conf.Validate(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions app/ts-store/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewServer(c config.Config, info app.ServerInfo, logger *Logger.Logger) (app
immutable.InitWriterPool(3 * cpu.GetCpuNum())
immutable.SetIndexCompressMode(conf.Data.TemporaryIndexCompressMode)
immutable.SetChunkMetaCompressMode(conf.Data.ChunkMetaCompressMode)
config.SetStoreConfig(conf.Data)

s.config = conf
Logger.SetLogger(Logger.GetLogger().With(zap.String("hostname", conf.Data.IngesterAddress)))
Expand Down
5 changes: 3 additions & 2 deletions benchmarks/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/openGemini/openGemini/engine/index/tsi"
"github.com/openGemini/openGemini/engine/mutable"
"github.com/openGemini/openGemini/lib/config"
"github.com/openGemini/openGemini/lib/index"
"github.com/openGemini/openGemini/lib/netstorage"
"github.com/openGemini/openGemini/lib/resourceallocator"
"github.com/openGemini/openGemini/lib/util"
Expand Down Expand Up @@ -87,7 +88,7 @@ func createShard(db, rp string, ptId uint32, pathName string, duration ...time.D
opts := new(tsi.Options).
Ident(ident).
Path(indexPath).
IndexType(tsi.MergeSet).
IndexType(index.MergeSet).
EngineType(config.TSSTORE).
StartTime(time.Now()).
EndTime(time.Now().Add(time.Hour)).
Expand All @@ -102,7 +103,7 @@ func createShard(db, rp string, ptId uint32, pathName string, duration ...time.D
}
primaryIndex.SetIndexBuilder(indexBuilder)
indexRelation, _ := tsi.NewIndexRelation(opts, primaryIndex, indexBuilder)
indexBuilder.Relations[uint32(tsi.MergeSet)] = indexRelation
indexBuilder.Relations[uint32(index.MergeSet)] = indexRelation
err = indexBuilder.Open()
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions config/openGemini.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
# product-type can be left unset or set to "logkeeper".
# product-type = ""

## Default value is true
## Set to false, the pre-aggregation information is not recorded in the metadata
# pre-agg-enabled = true

[meta]
bind-address = "{{addr}}:8088"
http-bind-address = "{{addr}}:8091"
Expand Down Expand Up @@ -170,6 +174,7 @@
## Indicates whether to persist the index read cache to disk when index close
# index-read-cache-persistent = false


# [data.ops-monitor]
# store-http-addr = "{{addr}}:8402"
# auth-enabled = false
Expand Down
33 changes: 33 additions & 0 deletions coordinator/record_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,36 @@ func TestCheckAndUpdateRecordSchema(t *testing.T) {
_, _, err = rw.checkAndUpdateRecordSchema("db0", "rp0", "rtt", "mst0", MockRecord4())
assert.Equal(t, errno.Equal(err, errno.ArrowRecordTimeFieldErr), true)
}

func TestCutPreSchema(t *testing.T) {
schema := record.Schemas{
record.Field{Type: influx.Field_Type_Int, Name: "int"},
record.Field{Type: influx.Field_Type_Int, Name: record.SeqIDField},
record.Field{Type: influx.Field_Type_Float, Name: "float"},
record.Field{Type: influx.Field_Type_Boolean, Name: "boolean"},
record.Field{Type: influx.Field_Type_String, Name: "string"},
record.Field{Type: influx.Field_Type_Int, Name: "time"},
}

targetSchema := record.Schemas{
record.Field{Type: influx.Field_Type_Int, Name: "int"},
record.Field{Type: influx.Field_Type_Float, Name: "float"},
record.Field{Type: influx.Field_Type_Boolean, Name: "boolean"},
record.Field{Type: influx.Field_Type_String, Name: "string"},
record.Field{Type: influx.Field_Type_Int, Name: "time"},
}

_ = cutPreSchema(schema)
config.SetProductType("logkeeper")
resSchema := cutPreSchema(schema)
if len(targetSchema) != len(resSchema) {
t.Fatal("cut preSchema failed")
}

for i := range targetSchema {
if targetSchema[i].Name != resSchema[i].Name {
t.Fatal("cut preSchema failed")
}
}
config.SetProductType("csstore")
}
3 changes: 2 additions & 1 deletion coordinator/shard_mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/openGemini/openGemini/engine/hybridqp"
"github.com/openGemini/openGemini/lib/config"
"github.com/openGemini/openGemini/lib/errno"
"github.com/openGemini/openGemini/lib/index"
"github.com/openGemini/openGemini/lib/logger"
"github.com/openGemini/openGemini/lib/metaclient"
"github.com/openGemini/openGemini/lib/obs"
Expand Down Expand Up @@ -983,7 +984,7 @@ func Test_CreateLogicalPlan(t *testing.T) {
require.Equal(t, shardMapping.NodeNumbers(), 1)

// Querying the colstore engine by the unified plan
source.IndexRelation.Oids = append(source.IndexRelation.Oids, 4)
source.IndexRelation.Oids = append(source.IndexRelation.Oids, uint32(index.BloomFilter))
plan, err = shardMapping.CreateLogicalPlan(ctx, []influxql.Source{source}, schema)
if !reflect.DeepEqual(plan.Schema().GetSourcesNames(), []string{"mst1"}) {
t.Fatal()
Expand Down
4 changes: 2 additions & 2 deletions coordinator/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"time"

"github.com/openGemini/openGemini/coordinator"
"github.com/openGemini/openGemini/engine/index/tsi"
"github.com/openGemini/openGemini/lib/errno"
"github.com/openGemini/openGemini/lib/index"
"github.com/openGemini/openGemini/lib/logger"
"github.com/openGemini/openGemini/lib/stringinterner"
strings2 "github.com/openGemini/openGemini/lib/strings"
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestStreamGenerateGroupKey(t *testing.T) {
assert2.Equal(t, value, "value3")

rows[0].IndexOptions = make([]influx.IndexOption, 1)
rows[0].IndexOptions[0].Oid = uint32(tsi.Field)
rows[0].IndexOptions[0].Oid = uint32(index.Field)
rows[0].IndexOptions[0].IndexList = []uint16{5}
keys = []string{"fk3", "tk3"}
value, _ = s.GenerateGroupKey(ctx, keys, &rows[0])
Expand Down
25 changes: 23 additions & 2 deletions coordinator/write_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (wh *recordWriterHelper) checkAndUpdateRecordSchema(db, rp, mst, originName
}

// check the time field
colNum := int(rec.ColNums() - 1)
colNum := rec.ColNums() - 1
if nil == rec.Schema.Field(colNum) || rec.Schema.Field(colNum).Name != record.TimeField {
err = errno.NewError(errno.ArrowRecordTimeFieldErr)
return
Expand All @@ -329,6 +329,12 @@ func (wh *recordWriterHelper) checkAndUpdateRecordSchema(db, rp, mst, originName
wh.preSchema = &schema
}
for i := 0; i < colNum; i++ {
// key field name protection
if rec.Schema.Field(i).Name == record.SeqIDField {
err = errno.NewError(errno.KeyWordConflictErr, mst, rec.Schema.Field(i).Name)
return
}

Check warning on line 336 in coordinator/write_helper.go

View check run for this annotation

Codecov / codecov/patch

coordinator/write_helper.go#L334-L336

Added lines #L334 - L336 were not covered by tests

_, ok := wh.preMst.Schema[rec.Schema.Field(i).Name]
if !ok {
wh.fieldToCreatePool = appendField(wh.fieldToCreatePool, rec.Schema.Field(i).Name, int32(rec.Schema.Field(i).Type))
Expand Down Expand Up @@ -425,7 +431,7 @@ func (wh *recordWriterHelper) checkAndUpdateSchema(db, rp, mst, originName strin
return
}
}
r = record.NewRecord(*wh.preSchema, false)
r = record.NewRecord(cutPreSchema(*wh.preSchema), false)
return
}

Expand Down Expand Up @@ -553,3 +559,18 @@ func createShardGroup(database, retentionPolicy string, client ComMetaClient, pr
*preSg = sg
return sg, false, nil
}

func cutPreSchema(preSchema []record.Field) []record.Field {
if !config.IsLogKeeper() {
return preSchema
}
// cut seqId field
schema := make([]record.Field, 0, len(preSchema)-1)
for i := range preSchema {
if preSchema[i].Name == record.SeqIDField {
continue
}
schema = append(schema, preSchema[i])
}
return schema
}
4 changes: 2 additions & 2 deletions engine/agg_tagset_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,9 @@ func (s *fileLoopCursor) NextAggData() (*record.Record, *comm.FileInfo, error) {
schema = s.ctx.schema
}
if len(s.ctx.decs.GetOps()) > 0 {
s.recPool = record.NewCircularRecordPool(FileCursorPool, fileCursorRecordNum, schema, true)
s.recPool = record.NewCircularRecordPool(FileLoopCursorPool, fileCursorRecordNum, schema, true)
} else {
s.recPool = record.NewCircularRecordPool(FileCursorPool, fileCursorRecordNum, schema, false)
s.recPool = record.NewCircularRecordPool(FileLoopCursorPool, fileCursorRecordNum, schema, false)
}
if len(s.ctx.readers.Orders) == 0 {
s.fileLoopCursorFunctions.readDataFunction = s.ReadAggDataOnlyInMemTable
Expand Down
26 changes: 19 additions & 7 deletions engine/column_store_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type ColumnStoreReader struct {
rowBitmap []bool
dimVals []string
outInIdxMap map[int]int
inOutIdxMap map[int]int
closedCh chan struct{}
closedCount int64
}
Expand All @@ -85,6 +86,7 @@ func NewColumnStoreReader(plan hybridqp.QueryNode, frags executor.ShardsFragment
frags: frags,
plan: plan,
outInIdxMap: make(map[int]int),
inOutIdxMap: make(map[int]int),
dimVals: make([]string, len(plan.Schema().Options().GetOptDimension())),
logger: logger.NewLogger(errno.ModuleQueryEngine),
closedCh: make(chan struct{}, 2),
Expand Down Expand Up @@ -283,6 +285,7 @@ func (r *ColumnStoreReader) initSchemaAndPool() (err error) {
inIdx, outIdx := r.inSchema.FieldIndex(in.Val), r.outSchema.FieldIndex(r.ops[i].Ref.Val)
if inIdx >= 0 && outIdx >= 0 {
r.outInIdxMap[outIdx], useIdxMap[inIdx] = inIdx, struct{}{} // fields for select
r.inOutIdxMap[inIdx] = outIdx
}
}
}
Expand All @@ -300,9 +303,9 @@ func (r *ColumnStoreReader) initSchemaAndPool() (err error) {
if !r.schema.Options().IsTimeSorted() {
startTime, endTime := r.schema.Options().GetStartTime(), r.schema.Options().GetEndTime()
timeCond := binaryfilterfunc.GetTimeCondition(util.TimeRange{Min: startTime, Max: endTime}, r.inSchema, len(r.inSchema)-1)
r.queryCtx.filterOption.CondFunctions, err = binaryfilterfunc.NewCondition(timeCond, r.schema.Options().GetCondition(), r.inSchema)
r.queryCtx.filterOption.CondFunctions, err = binaryfilterfunc.NewCondition(timeCond, r.schema.Options().GetCondition(), r.inSchema, &r.opt)
} else {
r.queryCtx.filterOption.CondFunctions, err = binaryfilterfunc.NewCondition(nil, r.schema.Options().GetCondition(), r.inSchema)
r.queryCtx.filterOption.CondFunctions, err = binaryfilterfunc.NewCondition(nil, r.schema.Options().GetCondition(), r.inSchema, &r.opt)

Check warning on line 308 in engine/column_store_reader.go

View check run for this annotation

Codecov / codecov/patch

engine/column_store_reader.go#L308

Added line #L308 was not covered by tests
}
if err != nil {
return err
Expand Down Expand Up @@ -515,13 +518,22 @@ func (r *ColumnStoreReader) tranFieldToDim(rec *record.Record, chunk executor.Ch
return errno.NewError(errno.SchemaNotAligned)
}
colType := record.ToInfluxqlTypes(rec.Schema[idx].Type)
transFun, ok := transColumnFun[colType]
recColumn, dimColumn := rec.Column(idx), chunk.Dim(i)
chunkColIdx, ok := r.inOutIdxMap[idx]
if !ok {
return errno.NewError(errno.NoColValToColumnFunc, colType)
transFun, ok := transColumnFun[colType]
if !ok {
return errno.NewError(errno.NoColValToColumnFunc, colType)
}

Check warning on line 527 in engine/column_store_reader.go

View check run for this annotation

Codecov / codecov/patch

engine/column_store_reader.go#L526-L527

Added lines #L526 - L527 were not covered by tests
transFun(recColumn, dimColumn)
} else {
copyFun, ok := copyColumnFun[colType]
if !ok {
return errno.NewError(errno.NoColValToColumnFunc, colType)
}

Check warning on line 533 in engine/column_store_reader.go

View check run for this annotation

Codecov / codecov/patch

engine/column_store_reader.go#L532-L533

Added lines #L532 - L533 were not covered by tests
srcColumn := chunk.Column(chunkColIdx)
copyFun(srcColumn, dimColumn)
}

recColumn, dimColumn := rec.Column(idx), chunk.Dim(i)
transFun(recColumn, dimColumn)
if recColumn.NilCount == recColumn.Length() {
dimColumn.AppendManyNil(len(times))
} else {
Expand Down
22 changes: 22 additions & 0 deletions engine/column_store_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package engine

import (
"context"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -679,3 +680,24 @@ func TestColumnStoreReader(t *testing.T) {
})
}
}

func TestColumnStoreReaderTranRecToChunk(t *testing.T) {
schema := createQuerySchemaForTranRec()
readerPlan := executor.NewLogicalColumnStoreReader(nil, schema)
reader := NewColumnStoreReader(readerPlan, nil)
if err := reader.initReadCursor(); err != nil {
t.Fatalf("ColumnStoreReader initReadCursor, err: %+v", err)
}
if err := reader.initSchemaAndPool(); err != nil {
t.Fatalf("ColumnStoreReader initSchemaAndPool, err: %+v", err)
}
rec := genRecordFortRranRec()
chk, err := reader.tranRecToChunk(rec)
if err != nil {
t.Fatalf("trans rec to chunk failed, err: %+v", err)
}
// compare column and dim for the filed "string"
if !reflect.DeepEqual(chk.Column(3), chk.Dim(0)) {
t.Fatal("trans rec to dim failed. The column[3] not equal to dim[0]")
}
}
7 changes: 4 additions & 3 deletions engine/detached_check_metaInfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/openGemini/openGemini/engine/mutable"
"github.com/openGemini/openGemini/lib/config"
"github.com/openGemini/openGemini/lib/fileops"
"github.com/openGemini/openGemini/lib/index"
"github.com/openGemini/openGemini/lib/logstore"
"github.com/openGemini/openGemini/lib/numberenc"
"github.com/openGemini/openGemini/lib/util"
Expand Down Expand Up @@ -69,7 +70,7 @@ func TestCheckDetachedFiles(t *testing.T) {
CompactionType: config.BLOCK,
},
IndexRelation: influxql.IndexRelation{IndexNames: []string{"bloomfilter"},
Oids: []uint32{4},
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}

Expand Down Expand Up @@ -129,7 +130,7 @@ func TestTestCheckDetachedFilesV2(t *testing.T) {
PrimaryKey: primaryKey,
},
IndexRelation: influxql.IndexRelation{IndexNames: []string{"bloomfilter"},
Oids: []uint32{4},
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}

Expand Down Expand Up @@ -407,7 +408,7 @@ func TestLoadProcess(t *testing.T) {
PrimaryKey: primaryKey,
},
IndexRelation: influxql.IndexRelation{IndexNames: []string{"bloomfilter"},
Oids: []uint32{4},
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}

Expand Down
46 changes: 44 additions & 2 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/openGemini/openGemini/lib/record"
"github.com/openGemini/openGemini/lib/resourceallocator"
stat "github.com/openGemini/openGemini/lib/statisticsPusher/statistics"
"github.com/openGemini/openGemini/lib/syscontrol"
"github.com/openGemini/openGemini/lib/sysinfo"
"github.com/openGemini/openGemini/lib/util"
"github.com/openGemini/openGemini/lib/util/lifted/influx/influxql"
Expand Down Expand Up @@ -1344,6 +1345,47 @@ func (s *Engine) InitLogStoreCtx(querySchema *executor.QuerySchema) (*idKeyCurso
return ctx, nil
}

func (e *Engine) HierarchicalStorage(shardId uint64, ptID uint32, dbName string, resCh chan int64) bool {
return true
func (e *Engine) HierarchicalStorage(db string, ptId uint32, shardID uint64) error {
e.log.Info("[hierarchical storage]", zap.String("db", db), zap.Uint32("pt", ptId), zap.Uint64("shard", shardID))
e.mu.RLock()
if err := e.checkAndAddRefPTNoLock(db, uint32(ptId)); err != nil {
e.mu.RUnlock()
e.log.Error("[hierarchical storage] add pt ref err", zap.String("db", db), zap.Uint32("pt", ptId), zap.Error(err))
return err
}

dbPTInfo := e.DBPartitions[db][ptId]
e.mu.RUnlock()
defer e.unrefDBPT(db, ptId)

dbPTInfo.mu.RLock()
sh := dbPTInfo.Shard(shardID)
dbPTInfo.mu.RUnlock()
if sh == nil {
return errno.NewError(errno.ShardNotFound, shardID)
}

if err := sh.OpenAndEnable(e.metaClient); err != nil {
e.log.Error("[hierarchical storage] shard open err", zap.String("db", db),
zap.Uint32("pt", ptId), zap.Uint64("shard", shardID), zap.Error(err))
return err
}

Check warning on line 1372 in engine/engine.go

View check run for this annotation

Codecov / codecov/patch

engine/engine.go#L1369-L1372

Added lines #L1369 - L1372 were not covered by tests

if syscontrol.IsWriteColdShardEnabled() {
if err := sh.UpdateShardReadOnly(e.metaClient); err != nil {
e.log.Error("[hierarchical storage] update shard read only fail", zap.String("db", db),
zap.Uint32("pt", ptId), zap.Uint64("shard", shardID), zap.Error(err))
return err
}

Check warning on line 1379 in engine/engine.go

View check run for this annotation

Codecov / codecov/patch

engine/engine.go#L1375-L1379

Added lines #L1375 - L1379 were not covered by tests
}

if sh.CanDoShardMove() {
if err := sh.ExecShardMove(); err != nil {
return err
}
}

e.log.Info("[hierarchical storage] shard move success", zap.String("db", db),
zap.Uint32("pt", ptId), zap.Uint64("shard", shardID))
return nil
}
Loading

0 comments on commit 77c3c50

Please sign in to comment.