Skip to content

Commit

Permalink
fix: DC painc at dropvchannel when disable compaction
Browse files Browse the repository at this point in the history
Make EnableCompaction able to change dynamicly

See also: #31059

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed May 11, 2024
1 parent 36f1ea9 commit 99bb5ad
Show file tree
Hide file tree
Showing 14 changed files with 691 additions and 496 deletions.
103 changes: 63 additions & 40 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ type compactionPlanContext interface {
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
updateCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction tasks by signal id
Expand Down Expand Up @@ -138,41 +136,6 @@ func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta C
}
}

func (c *compactionPlanHandler) checkResult() {
// deal results
ts, err := c.GetCurrentTS()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
err = c.updateCompaction(ts)
if err != nil {
log.Warn("fail to update compaction", zap.Error(err))
return
}
}

func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.stopCh = make(chan struct{})
Expand All @@ -189,7 +152,9 @@ func (c *compactionPlanHandler) start() {
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
c.checkResult()
if compactionEnabled() {
c.CheckResult()
}
}
}
}()
Expand All @@ -208,7 +173,9 @@ func (c *compactionPlanHandler) start() {
return

case <-scheduleTicker.C:
c.schedule()
if compactionEnabled() {
c.Schedule()
}
}
}
}()
Expand All @@ -223,12 +190,49 @@ func (c *compactionPlanHandler) start() {
log.Info("Compaction handler quit clean")
return
case <-cleanTicker.C:
c.Clean()
if compactionEnabled() {
c.Clean()
}
}
}
}()
}

func (c *compactionPlanHandler) CheckResult() {
// deal results
ts, err := c.getCurrentTs()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
err = c.updateCompaction(ts)
if err != nil {
log.Warn("fail to update compaction", zap.Error(err))
return
}
}

func (c *compactionPlanHandler) getCurrentTs() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) Schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) Clean() {
current := tsoutil.GetCurrentTime()
c.mu.Lock()
Expand All @@ -253,6 +257,10 @@ func (c *compactionPlanHandler) stop() {
}

func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
if !compactionEnabled() {
return
}

c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
Expand Down Expand Up @@ -408,6 +416,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {

// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
if !compactionEnabled() {
return nil
}
return c.enqueuePlan(signal, plan)
}

Expand Down Expand Up @@ -517,12 +528,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact

// getCompaction return compaction task. If planId does not exist, return nil.
func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

return c.plans[planID]
}

// updateCompaction is the inner func to check compaction task states from datanode
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -653,6 +669,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou

// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
if !compactionEnabled() {
return true
}
return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}

Expand All @@ -670,6 +689,10 @@ func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*co

// get compaction tasks by signal id; if signalID == 0 return all tasks
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

Expand Down
31 changes: 29 additions & 2 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -74,6 +75,13 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
handler.mu.Lock()
s.Equal(0, len(handler.plans))
handler.mu.Unlock()

s.Run("disable compaction", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)

handler.removeTasksByChannel(ch)
})
}

func (s *CompactionPlanHandlerSuite) TestCheckResult() {
Expand All @@ -88,13 +96,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}

{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}
}

Expand Down Expand Up @@ -468,6 +476,14 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
}
})
}

s.Run("disable compaction", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)

err := handler.execCompactionPlan(&compactionSignal{id: 333}, &datapb.CompactionPlan{PlanID: 333, Channel: "ch-2"})
s.NoError(err)
})
}

func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
Expand Down Expand Up @@ -679,6 +695,17 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {

task = handler.getCompaction(19530)
s.Nil(task)

s.Run("disable compaction", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)

got := handler.getCompactionTasksBySignalID(1)
s.Nil(got)

task := handler.getCompaction(1)
s.Nil(task)
})
}

func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
Expand Down
Loading

0 comments on commit 99bb5ad

Please sign in to comment.