Skip to content

Commit

Permalink
fix: file name conflict while compaction write back to local (#487)
Browse files Browse the repository at this point in the history
Signed-off-by: ZhangJian He <shoothzj@gmail.com>
  • Loading branch information
shoothzj committed Feb 27, 2024
1 parent f91833a commit 8d2cb3a
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 1 deletion.
17 changes: 16 additions & 1 deletion engine/immutable/colstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (f *FragmentIterators) updateIterators(m *MmsTables, group FilesInfo, sortK
f.breakPoint = &breakPoint{}

_, seq := group.oldFiles[0].LevelAndSequence()
ext := group.oldFiles[0].FileNameExtend()
ext := getMaxFileExtend(group.oldFiles, seq)
fileName := NewTSSPFileName(seq, group.toLevel, 0, 0, true, m.lock)
var err error
//gen msBuilder
Expand Down Expand Up @@ -818,6 +818,21 @@ func (f *FragmentIterators) updateIterators(m *MmsTables, group FilesInfo, sortK
return nil
}

// to avoid file name conflict while writing bach to local
func getMaxFileExtend(files []TSSPFile, seq uint64) uint16 {
var ext uint16
var tmpSeq uint64
for i := 1; i < len(files); i++ {
_, tmpSeq = files[i].LevelAndSequence()
//The seq of all files cannot be the same at the same time
if tmpSeq != seq {
ext = files[i-1].FileNameExtend()
break
}
}
return ext
}

func (f *FragmentIterators) writeBloomFilters(toLocal bool) error {
if f.bfIterator == nil {
return nil
Expand Down
241 changes: 241 additions & 0 deletions engine/immutable/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,9 @@ func TestCompactSwitchFilesForColumnStore(t *testing.T) {
conf.maxRowsPerSegment = 8192
conf.FragmentsNumPerFlush = 1
conf.fileSizeLimit = 1 * 1024
defer func() {
conf.fileSizeLimit = defaultFileSizeLimit
}()
tier := uint64(util.Hot)
recRows := 8192
lockPath := ""
Expand Down Expand Up @@ -1579,6 +1582,9 @@ func TestBlockCompactionPrepareForColumnStore(t *testing.T) {
conf := NewColumnStoreConfig()
conf.maxRowsPerSegment = 8192
conf.FragmentsNumPerFlush = 1
defer func() {
conf.maxRowsPerSegment = DefaultMaxRowsPerSegment4ColStore
}()
tier := uint64(util.Hot)
recRows := 10000
lockPath := ""
Expand Down Expand Up @@ -1964,6 +1970,9 @@ func TestLevelBlockCompactForColumnStoreV2(t *testing.T) {
conf := NewColumnStoreConfig()
conf.maxRowsPerSegment = 20
conf.FragmentsNumPerFlush = 1
defer func() {
conf.maxRowsPerSegment = DefaultMaxRowsPerSegment4ColStore
}()
tier := uint64(util.Hot)
recRows := 1000
lockPath := ""
Expand Down Expand Up @@ -2174,6 +2183,238 @@ func TestLevelBlockCompactForColumnStoreV2(t *testing.T) {
}
}

func TestBlockCompactFileNameConflict(t *testing.T) {
testCompDir := t.TempDir()
_ = fileops.RemoveAll(testCompDir)
sig := interruptsignal.NewInterruptSignal()
defer func() {
sig.Close()
_ = fileops.RemoveAll(testCompDir)
}()

var idMinMax, tmMinMax MinMax
var startValue = 999999.0

conf := NewColumnStoreConfig()
conf.maxRowsPerSegment = 20
conf.FragmentsNumPerFlush = 1
conf.fileSizeLimit = 1 * 1024
defer func() {
conf.maxRowsPerSegment = DefaultMaxRowsPerSegment4ColStore
conf.fileSizeLimit = defaultFileSizeLimit
}()

tier := uint64(util.Hot)
recRows := 1000
lockPath := ""

store := NewTableStore(testCompDir, &lockPath, &tier, true, conf)
defer store.Close()
store.SetImmTableType(config.COLUMNSTORE)
store.CompactionEnable()

primaryKey := []string{"time"}
sortKey := []string{"time"}
sort := []string{"time"}
schema := make(map[string]int32)
for i := range primaryKey {
for j := range schemaForColumnStore {
if primaryKey[i] == schemaForColumnStore[j].Name {
schema[primaryKey[i]] = int32(schemaForColumnStore[j].Type)
}
}
}
list := make([]*influxql.IndexList, 1)
bfColumn := []string{"primaryKey_string1", "primaryKey_string2"}
iList := influxql.IndexList{IList: bfColumn}
list[0] = &iList
mstinfo := meta.MeasurementInfo{
Name: "mst",
EngineType: config.COLUMNSTORE,
ColStoreInfo: &meta.ColStoreInfo{
PrimaryKey: primaryKey,
SortKey: sortKey,
CompactionType: config.BLOCK,
},
Schema: schema,
IndexRelation: influxql.IndexRelation{IndexNames: []string{"bloomfilter"},
Oids: []uint32{uint32(index.BloomFilter)},
IndexList: list},
}
indexRelation := &influxql.IndexRelation{
Oids: []uint32{uint32(index.BloomFilter)},
IndexNames: []string{"bloomfilter"},
}

store.ImmTable.SetMstInfo("mst", &mstinfo)
sortKeyMap := genSortedKeyMap(sort)
write := func(ids uint64, data map[uint64]*record.Record, msb *MsBuilder, merge *record.Record,
sortKeyMap map[string]int, primaryKey, sortKey []string, needMerge bool, pkSchema record.Schemas, indexRelation *influxql.IndexRelation) *record.Record {
rec := data[ids]
err := msb.WriteData(ids, rec)
if err != nil {
t.Fatal(err)
}
if len(pkSchema) != 0 {
dataFilePath := msb.FileName.String()
indexFilePath := path.Join(msb.Path, msb.msName, colstore.AppendPKIndexSuffix(dataFilePath))
fixRowsPerSegment := GenFixRowsPerSegment(rec, conf.maxRowsPerSegment)
if err = msb.writePrimaryIndex(rec, pkSchema, indexFilePath, *msb.lock, colstore.DefaultTCLocation, fixRowsPerSegment, DefaultMaxRowsPerSegment4ColStore); err != nil {
t.Fatal(err)
}
}
if indexRelation != nil && len(indexRelation.IndexNames) != 0 {
dataFilePath := msb.FileName.String()
fixRowsPerSegment := GenFixRowsPerSegment(rec, conf.maxRowsPerSegment)
schemaIdx := logstore.GenSchemaIdxs(rec.Schema, &mstinfo.IndexRelation, false)
if err := msb.writeSkipIndex(rec, schemaIdx, dataFilePath, *msb.lock, fixRowsPerSegment, false); err != nil {
t.Fatal(err)
}
}
if needMerge {
merge = mergeForColumnStore(sort, sortKeyMap, rec, merge)
return merge
}
return rec
}

check := func(name string, fn string, orig *record.Record) {
f := store.File(name, fn, true)
contains, err := f.Contains(idMinMax.min)
if err != nil || !contains {
t.Fatalf("show contain series id:%v, but not find, error:%v", idMinMax.min, err)
}

midx, _ := f.MetaIndexAt(0)
if midx == nil {
t.Fatalf("meta index not find")
}

cm, err := f.ChunkMeta(midx.id, midx.offset, midx.size, midx.count, 0, nil, fileops.IO_PRIORITY_LOW_READ)
if err != nil {
t.Fatal(err)
}

decs := NewReadContext(true)
readRec := record.NewRecordBuilder(schemaForColumnStore)
readRec.ReserveColumnRows(recRows * 4)
rec := record.NewRecordBuilder(schemaForColumnStore)
rec.ReserveColumnRows(conf.maxRowsPerSegment)
for i := range cm.timeMeta().entries {
rec, err = f.ReadAt(cm, i, rec, decs, fileops.IO_PRIORITY_LOW_READ)
if err != nil {
t.Fatal(err)
}
readRec.Merge(rec)
}

oldV0 := orig.Column(0).FloatValues()
oldV1 := orig.Column(1).IntegerValues()
oldV2 := orig.Column(2).BooleanValues()
oldV3 := orig.Column(3).StringValues(nil)
oldTimes := orig.Times()
v0 := readRec.Column(0).FloatValues()
v1 := readRec.Column(1).IntegerValues()
v2 := readRec.Column(2).BooleanValues()
v3 := readRec.Column(3).StringValues(nil)
times := readRec.Times()

if !reflect.DeepEqual(oldTimes, times) {
t.Fatalf("time not eq, \nexp:%v \nget:%v", oldTimes, times)
}

if !reflect.DeepEqual(oldV0, v0) {
t.Fatalf("flaot value not eq, \nexp:%v \nget:%v", oldV0, v0)
}

if !reflect.DeepEqual(oldV1, v1) {
t.Fatalf("int value not eq, \nexp:%v \nget:%v", oldV1, v1)
}

if !reflect.DeepEqual(oldV2, v2) {
t.Fatalf("bool value not eq, \nexp:%v \nget:%v", oldV2, v2)
}

if !reflect.DeepEqual(oldV3, v3) {
t.Fatalf("string value not eq, \nexp:%v \nget:%v", oldV3, v3)
}
}

tm := testTimeStart
tmMinMax.min = uint64(tm.UnixNano())
idMinMax.min = 0
compactionTimes := 2
filesN := LeveLMinGroupFiles[0] * compactionTimes
oldRec := record.NewRecordBuilder(schemaForColumnStore)
oldRec.ReserveColumnRows(recRows * filesN)

recs := make([]*record.Record, 0, filesN)
pk := store.ImmTable.(*csImmTableImpl).mstsInfo["mst"].ColStoreInfo.PrimaryKey
pkSchema := make([]record.Field, len(pk))
for i := range pk {
pkSchema[i] = record.Field{
Type: int(store.ImmTable.(*csImmTableImpl).mstsInfo["mst"].Schema[pk[i]]),
Name: pk[i],
}
}
needMerge := false
for i := 0; i < filesN; i++ {
ids, data := genTestDataForColumnStore(idMinMax.min, 1, recRows, &startValue, &tm)
fileName := NewTSSPFileName(store.NextSequence(), 0, 0, 0, true, &lockPath)
msb := NewMsBuilder(store.path, "mst", &lockPath, conf, 1, fileName, store.Tier(), nil, 2, config.TSSTORE)
msb.NewPKIndexWriter()
msb.NewSkipIndexWriter()
oldRec = write(ids, data, msb, oldRec, sortKeyMap, primaryKey, sortKey, needMerge, pkSchema, indexRelation)
needMerge = true
if err := writeIntoFile(msb, false); err != nil {
t.Fatal(err)
}
fn := msb.Files[len(msb.Files)-1].Path()
if err := RenameIndexFiles(fn, bfColumn); err != nil {
t.Fatal(err)
}
store.AddTSSPFiles(msb.Name(), false, msb.Files...)
for _, v := range data {
recs = append(recs, v)
}
if msb.GetPKInfoNum() != 0 {
for i, file := range msb.Files {
dataFilePath := file.Path()
indexFilePath := colstore.AppendPKIndexSuffix(RemoveTsspSuffix(dataFilePath))
store.AddPKFile(msb.Name(), indexFilePath, msb.GetPKRecord(i), msb.GetPKMark(i), colstore.DefaultTCLocation)
}
}
}

tmMinMax.max = uint64(tm.UnixNano() - timeInterval.Nanoseconds())
files := store.CSFiles
fids, ok := files["mst"]
if !ok || fids.Len() != filesN {
t.Fatalf("mst not find")
}

for i, f := range fids.files {
check("mst", f.Path(), recs[i])

fr := f.(*tsspFile).reader.(*tsspFileReader)
if fr.ref != 0 {
t.Fatal("ref error")
}
}

for i := 0; i < compactionTimes; i++ {
if err := store.LevelCompact(0, 1); err != nil {
t.Fatal(err)
}
store.Wait()
}
files = store.CSFiles
fids, ok = files["mst"]
if !ok {
t.Fatalf("mst not find")
}
}

func TestNextSingleFragmentError(t *testing.T) {
testCompDir := t.TempDir()
_ = fileops.RemoveAll(testCompDir)
Expand Down

0 comments on commit 8d2cb3a

Please sign in to comment.