Skip to content

Commit

Permalink
Add ut for compaction configs
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Mar 13, 2024
1 parent dd7cfca commit 24485a5
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 23 deletions.
5 changes: 3 additions & 2 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
defer logutil.LogPanic()
defer t.wg.Done()

log.Info("start global compaction loop")
for {
select {
case <-t.quit:
Expand Down Expand Up @@ -1083,9 +1084,9 @@ func fetchSegIDs(segBinLogs []*datapb.CompactionSegmentBinlogs) []int64 {
}

func compactionEnabled() bool {
return Params.DataCoordCfg.EnableCompaction.GetAsBool()
return paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool()
}

func autoCompactionEnabled() bool {
return compactionEnabled() && Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
return compactionEnabled() && paramtable.Get().DataCoordCfg.EnableAutoCompaction.GetAsBool()
}
173 changes: 152 additions & 21 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
Expand Down Expand Up @@ -2148,6 +2150,7 @@ type CompactionTriggerSuite struct {
collectionID int64
partitionID int64
channel string
schema *schemapb.CollectionSchema

indexID int64
vecFieldID int64
Expand All @@ -2161,7 +2164,6 @@ type CompactionTriggerSuite struct {
}

func (s *CompactionTriggerSuite) SetupSuite() {
paramtable.Init()
}

func (s *CompactionTriggerSuite) genSeg(segID, numRows int64) *datapb.SegmentInfo {
Expand Down Expand Up @@ -2206,6 +2208,14 @@ func (s *CompactionTriggerSuite) SetupTest() {
s.indexID = 300
s.vecFieldID = 400
s.channel = "dml_0_100v0"
s.schema = &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: s.vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
}
}

func (s *CompactionTriggerSuite) SetupSubTest() {
Expand Down Expand Up @@ -2276,15 +2286,8 @@ func (s *CompactionTriggerSuite) SetupSubTest() {
},
collections: map[int64]*collectionInfo{
s.collectionID: {
ID: s.collectionID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: s.vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
},
ID: s.collectionID,
Schema: s.schema,
},
},
}
Expand All @@ -2308,24 +2311,152 @@ func (s *CompactionTriggerSuite) SetupSubTest() {
}

func (s *CompactionTriggerSuite) TestEnableCompactionConfigDynamicly() {
tests := []struct {
description string
enableCompaction bool
enableAutoCompaction bool

triggered bool
}{
{"both enabled triggered signal", true, true, true},
{"auto compaction disabled, no signal", true, false, false},
{"compaction disabled no signal", false, true, false},
{"all disabled no signal", false, false, false},
}
enableCompKey := paramtable.Get().DataCoordCfg.EnableCompaction.Key
enableAutoCompKey := paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key

for _, test := range tests {
s.Run(test.description, func() {
enableCompV, enableAutoCompV := "false", "false"
if test.enableCompaction {
enableCompV = "true"
}
if test.enableAutoCompaction {
enableAutoCompV = "true"
}

paramtable.Get().Save(enableCompKey, enableCompV)
paramtable.Get().Save(enableAutoCompKey, enableAutoCompV)

// test if trigger compaction by globalcompaction loop
triggerInterval := 1 * time.Millisecond
s.tr.globalTrigger = time.NewTicker(triggerInterval)
s.tr.quit = make(chan struct{})
s.tr.wg.Add(1)
go s.tr.startGlobalCompactionLoop()

if test.enableCompaction {
s.allocator.EXPECT().allocID(mock.Anything).Return(19530, nil)
}

if test.triggered {
s.Require().True(autoCompactionEnabled())
sig := <-s.tr.signals
s.True(sig.isGlobal)
s.False(sig.isForce)
} else {
s.Require().False(autoCompactionEnabled())
time.Sleep(2 * triggerInterval)
s.Equal(0, len(s.tr.signals))

}
s.tr.stop()

// test if trigger compaction by triggerSingleCompaction and forceTriggerCompaction
if test.triggered {
s.Require().True(autoCompactionEnabled())
err := s.tr.triggerSingleCompaction(s.collectionID, s.partitionID, 1, s.channel, false)
s.NoError(err)

sig := <-s.tr.signals
s.EqualValues(s.collectionID, sig.collectionID)
s.EqualValues(s.partitionID, sig.partitionID)
s.EqualValues(1, sig.segmentID)
s.EqualValues(s.channel, sig.channel)
s.EqualValues(19530, sig.id)
s.False(sig.isForce)
s.False(sig.isGlobal)
} else {
s.Require().False(autoCompactionEnabled())
err := s.tr.triggerSingleCompaction(s.collectionID, s.partitionID, 1, s.channel, false)
s.NoError(err)
s.Equal(0, len(s.tr.signals))
}

if test.enableCompaction {
s.Require().True(compactionEnabled())

s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{
ID: s.collectionID,
Schema: s.schema,
}, nil).Times(2)
s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil).Times(4)

compactionID, err := s.tr.forceTriggerCompaction(s.collectionID)
s.NoError(err)
s.True(compactionID > 0)
s.EqualValues(19530, compactionID)
} else {
s.Require().False(compactionEnabled())

compactionID, err := s.tr.forceTriggerCompaction(s.collectionID)
s.Error(err)
s.ErrorIs(merr.ErrServiceUnavailable, err)
s.EqualValues(-1, compactionID)

s.Require().False(autoCompactionEnabled())
err = s.tr.triggerSingleCompaction(s.collectionID, s.partitionID, 1, s.channel, false)
s.NoError(err)
s.Equal(0, len(s.tr.signals))
}
})
}
s.Run("enable then disable compaction", func() {
enableCompKey := paramtable.Get().DataCoordCfg.EnableCompaction.Key
enableAutoCompKey := paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key
paramtable.Get().Save(enableCompKey, "true")
paramtable.Get().Save(enableAutoCompKey, "true")

triggerInterval := 1 * time.Millisecond
s.tr.globalTrigger = time.NewTicker(triggerInterval)
s.tr.quit = make(chan struct{})
s.tr.wg.Add(1)
go s.tr.startGlobalCompactionLoop()

s.allocator.EXPECT().allocID(mock.Anything).Return(10000, nil)

sig := <-s.tr.signals
s.True(sig.isGlobal)
s.False(sig.isForce)

paramtable.Get().Save(enableCompKey, "false")
if len(s.tr.signals) > 0 {
// drain all signals from channel
log.Info("drain all signals from channel")
for sig := range s.tr.signals {
s.True(sig.isGlobal)
s.False(sig.isForce)
}
}

// wait for 2 times of the globalTrigger interval
<-time.After(2 * triggerInterval)
s.Equal(0, len(s.tr.signals))

s.tr.stop()
})
}

func (s *CompactionTriggerSuite) TestCollectionAutoCompaction() {
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: s.vecFieldID,
DataType: schemapb.DataType_FloatVector,
},
},
}
s.Run("collection autoCompaction config error", func() {
s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{
ID: s.collectionID,
Properties: map[string]string{
common.CollectionAutoCompactionKey: "bad_value",
},
Schema: schema,
Schema: s.schema,
}, nil).Times(2)

s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil).Times(1)
Expand All @@ -2350,7 +2481,7 @@ func (s *CompactionTriggerSuite) TestCollectionAutoCompaction() {
s.Run("collection autoCompaction disabled", func() {
s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{
ID: s.collectionID,
Schema: schema,
Schema: s.schema,
Properties: map[string]string{common.CollectionAutoCompactionKey: "false"},
}, nil).Times(2)

Expand All @@ -2377,7 +2508,7 @@ func (s *CompactionTriggerSuite) TestCollectionAutoCompaction() {
s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{
ID: s.collectionID,
Properties: map[string]string{common.CollectionAutoCompactionKey: "false"},
Schema: schema,
Schema: s.schema,
}, nil).Times(3)

// handleSignal is an non force signal handler, will ignore forced signal
Expand Down

0 comments on commit 24485a5

Please sign in to comment.