Skip to content

Commit

Permalink
feat(storage): adapt full text index for attach flush (#508)
Browse files Browse the repository at this point in the history
Signed-off-by: xiangyu5632 <xiangyu5632@126.com>
Co-authored-by: openGemini <>
  • Loading branch information
xiangyu5632 committed Feb 29, 2024
1 parent a6b7e7f commit 66e46da
Show file tree
Hide file tree
Showing 16 changed files with 693 additions and 296 deletions.
2 changes: 1 addition & 1 deletion engine/immutable/colstore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func AppendSKIndexSuffix(dataPath string, fieldName string, indexName string) st
indexFileSuffix = MinMaxIndexFileSuffix
case index.SetIndex:
indexFileSuffix = SetIndexFileSuffix
case index.BloomFilterIndex:
case index.BloomFilterIndex, index.BloomFilterFullTextIndex:
indexFileSuffix = BloomFilterIndexFileSuffix
default:
panic(fmt.Sprintf("unsupported the skip index: %s", indexName))
Expand Down
62 changes: 22 additions & 40 deletions engine/immutable/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,10 +1621,6 @@ func TestBlockCompactionPrepareForColumnStore(t *testing.T) {
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}
indexRelation := &influxql.IndexRelation{
Oids: []uint32{uint32(index.BloomFilter)},
IndexNames: []string{"bloomfilter"},
}

store.ImmTable.SetMstInfo("mst", &mstinfo)
sortKeyMap := genSortedKeyMap(sort)
Expand All @@ -1643,11 +1639,11 @@ func TestBlockCompactionPrepareForColumnStore(t *testing.T) {
t.Fatal(err)
}
}
if indexRelation != nil && len(indexRelation.IndexNames) != 0 {
dataFilePath := msb.FileName.String()

msb.NewSkipIndex(rec.Schema, *indexRelation)
if len(msb.skipIndex.GetSkipIndexWriters()) > 0 {
fixRowsPerSegment := GenFixRowsPerSegment(rec, conf.maxRowsPerSegment)
schemaIdx := logstore.GenSchemaIdxs(rec.Schema, &mstinfo.IndexRelation, false)
if err := msb.writeSkipIndex(rec, schemaIdx, dataFilePath, *msb.lock, fixRowsPerSegment, false); err != nil {
if err := msb.writeSkipIndex(rec, fixRowsPerSegment); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1681,8 +1677,8 @@ func TestBlockCompactionPrepareForColumnStore(t *testing.T) {
fileName := NewTSSPFileName(store.NextSequence(), 0, 0, 0, true, &lockPath)
msb := NewMsBuilder(store.path, "mst", &lockPath, conf, 1, fileName, store.Tier(), nil, 2, config.TSSTORE)
msb.NewPKIndexWriter()
msb.NewSkipIndexWriter()
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, indexRelation)
msb.NewSkipIndex(data[ids].Schema, mstinfo.IndexRelation)
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, &mstinfo.IndexRelation)
needMerge = true
if err := writeIntoFile(msb, false); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1779,10 +1775,6 @@ func TestLevelBlockCompactForColumnStoreV1(t *testing.T) {
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}
indexRelation := &influxql.IndexRelation{
Oids: []uint32{uint32(index.BloomFilter)},
IndexNames: []string{"bloomfilter"},
}

store.ImmTable.SetMstInfo("mst", &mstinfo)
sortKeyMap := genSortedKeyMap(sort)
Expand All @@ -1801,11 +1793,10 @@ func TestLevelBlockCompactForColumnStoreV1(t *testing.T) {
t.Fatal(err)
}
}
if indexRelation != nil && len(indexRelation.IndexNames) != 0 {
dataFilePath := msb.FileName.String()
msb.NewSkipIndex(rec.Schema, *indexRelation)
if len(msb.skipIndex.GetSkipIndexWriters()) > 0 {
fixRowsPerSegment := GenFixRowsPerSegment(rec, conf.maxRowsPerSegment)
schemaIdx := logstore.GenSchemaIdxs(rec.Schema, &mstinfo.IndexRelation, false)
if err := msb.writeSkipIndex(rec, schemaIdx, dataFilePath, *msb.lock, fixRowsPerSegment, false); err != nil {
if err := msb.writeSkipIndex(rec, fixRowsPerSegment); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -1901,8 +1892,8 @@ func TestLevelBlockCompactForColumnStoreV1(t *testing.T) {
fileName := NewTSSPFileName(store.NextSequence(), 0, 0, 0, true, &lockPath)
msb := NewMsBuilder(store.path, "mst", &lockPath, conf, 1, fileName, store.Tier(), nil, 2, config.TSSTORE)
msb.NewPKIndexWriter()
msb.NewSkipIndexWriter()
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, indexRelation)
msb.NewSkipIndex(data[ids].Schema, mstinfo.IndexRelation)
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, &mstinfo.IndexRelation)
needMerge = true
if err := writeIntoFile(msb, false); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -2009,10 +2000,6 @@ func TestLevelBlockCompactForColumnStoreV2(t *testing.T) {
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}
indexRelation := &influxql.IndexRelation{
Oids: []uint32{uint32(index.BloomFilter)},
IndexNames: []string{"bloomfilter"},
}

store.ImmTable.SetMstInfo("mst", &mstinfo)
sortKeyMap := genSortedKeyMap(sort)
Expand All @@ -2031,11 +2018,10 @@ func TestLevelBlockCompactForColumnStoreV2(t *testing.T) {
t.Fatal(err)
}
}
if indexRelation != nil && len(indexRelation.IndexNames) != 0 {
dataFilePath := msb.FileName.String()
msb.NewSkipIndex(rec.Schema, *indexRelation)
if len(msb.skipIndex.GetSkipIndexWriters()) > 0 {
fixRowsPerSegment := GenFixRowsPerSegment(rec, conf.maxRowsPerSegment)
schemaIdx := logstore.GenSchemaIdxs(rec.Schema, &mstinfo.IndexRelation, false)
if err := msb.writeSkipIndex(rec, schemaIdx, dataFilePath, *msb.lock, fixRowsPerSegment, false); err != nil {
if err := msb.writeSkipIndex(rec, fixRowsPerSegment); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -2131,8 +2117,8 @@ func TestLevelBlockCompactForColumnStoreV2(t *testing.T) {
fileName := NewTSSPFileName(store.NextSequence(), 0, 0, 0, true, &lockPath)
msb := NewMsBuilder(store.path, "mst", &lockPath, conf, 1, fileName, store.Tier(), nil, 2, config.TSSTORE)
msb.NewPKIndexWriter()
msb.NewSkipIndexWriter()
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, indexRelation)
msb.NewSkipIndex(data[ids].Schema, mstinfo.IndexRelation)
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, &mstinfo.IndexRelation)
needMerge = true
if err := writeIntoFile(msb, false); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -2241,10 +2227,6 @@ func TestBlockCompactFileNameConflict(t *testing.T) {
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}
indexRelation := &influxql.IndexRelation{
Oids: []uint32{uint32(index.BloomFilter)},
IndexNames: []string{"bloomfilter"},
}

store.ImmTable.SetMstInfo("mst", &mstinfo)
sortKeyMap := genSortedKeyMap(sort)
Expand All @@ -2263,11 +2245,11 @@ func TestBlockCompactFileNameConflict(t *testing.T) {
t.Fatal(err)
}
}
if indexRelation != nil && len(indexRelation.IndexNames) != 0 {
dataFilePath := msb.FileName.String()

msb.NewSkipIndex(rec.Schema, *indexRelation)
if len(msb.skipIndex.GetSkipIndexWriters()) > 0 {
fixRowsPerSegment := GenFixRowsPerSegment(rec, conf.maxRowsPerSegment)
schemaIdx := logstore.GenSchemaIdxs(rec.Schema, &mstinfo.IndexRelation, false)
if err := msb.writeSkipIndex(rec, schemaIdx, dataFilePath, *msb.lock, fixRowsPerSegment, false); err != nil {
if err := msb.writeSkipIndex(rec, fixRowsPerSegment); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -2363,8 +2345,8 @@ func TestBlockCompactFileNameConflict(t *testing.T) {
fileName := NewTSSPFileName(store.NextSequence(), 0, 0, 0, true, &lockPath)
msb := NewMsBuilder(store.path, "mst", &lockPath, conf, 1, fileName, store.Tier(), nil, 2, config.TSSTORE)
msb.NewPKIndexWriter()
msb.NewSkipIndexWriter()
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, indexRelation)
msb.NewSkipIndex(data[ids].Schema, mstinfo.IndexRelation)
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, &mstinfo.IndexRelation)
needMerge = true
if err := writeIntoFile(msb, false); err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 66e46da

Please sign in to comment.