Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: enable compaction dynamicly #31068

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 63 additions & 41 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan)
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
updateCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction tasks by signal id
Expand Down Expand Up @@ -140,41 +138,6 @@
}
}

func (c *compactionPlanHandler) checkResult() {
// deal results
ts, err := c.GetCurrentTS()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
err = c.updateCompaction(ts)
if err != nil {
log.Warn("fail to update compaction", zap.Error(err))
return
}
}

func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.stopCh = make(chan struct{})
Expand All @@ -191,7 +154,9 @@
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
c.checkResult()
if compactionEnabled() {
c.CheckResult()
}
}
}
}()
Expand All @@ -210,7 +175,9 @@
return

case <-scheduleTicker.C:
c.schedule()
if compactionEnabled() {
c.Schedule()
}
}
}
}()
Expand All @@ -225,12 +192,49 @@
log.Info("Compaction handler quit clean")
return
case <-cleanTicker.C:
c.Clean()
if compactionEnabled() {
c.Clean()

Check warning on line 196 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L195-L196

Added lines #L195 - L196 were not covered by tests
}
}
}
}()
}

func (c *compactionPlanHandler) CheckResult() {
// deal results
ts, err := c.getCurrentTs()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
err = c.updateCompaction(ts)
if err != nil {
log.Warn("fail to update compaction", zap.Error(err))
return

Check warning on line 213 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L212-L213

Added lines #L212 - L213 were not covered by tests
}
}

func (c *compactionPlanHandler) getCurrentTs() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) Schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) Clean() {
current := tsoutil.GetCurrentTime()
c.mu.Lock()
Expand All @@ -255,6 +259,10 @@
}

func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
if !compactionEnabled() {
return
}

c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
Expand Down Expand Up @@ -405,7 +413,9 @@

// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) {
c.enqueuePlan(signal, plan)
if compactionEnabled() {
c.enqueuePlan(signal, plan)
}
}

func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) {
Expand Down Expand Up @@ -506,12 +516,17 @@

// getCompaction return compaction task. If planId does not exist, return nil.
func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

return c.plans[planID]
}

// updateCompaction is the inner func to check compaction task states from datanode
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -641,6 +656,9 @@

// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
if !compactionEnabled() {
return true

Check warning on line 660 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L660

Added line #L660 was not covered by tests
}
return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}

Expand All @@ -658,6 +676,10 @@

// get compaction tasks by signal id; if signalID == 0 return all tasks
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

Expand Down
33 changes: 31 additions & 2 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -74,6 +75,13 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
handler.mu.Lock()
s.Equal(0, len(handler.plans))
handler.mu.Unlock()

s.Run("disable compaction", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)

handler.removeTasksByChannel(ch)
})
}

func (s *CompactionPlanHandlerSuite) TestCheckResult() {
Expand All @@ -88,13 +96,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}

{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}
}

Expand Down Expand Up @@ -440,6 +448,16 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
plan := &datapb.CompactionPlan{
PlanID: int64(1),
}

s.Run("disable compaction", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)

s.NotPanics(func() {
handler.execCompactionPlan(&compactionSignal{id: 333}, &datapb.CompactionPlan{PlanID: 333, Channel: "ch-2"})
})
})

plan.Channel = "ch-1"

handler.execCompactionPlan(sig, plan)
Expand Down Expand Up @@ -658,6 +676,17 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {

task = handler.getCompaction(19530)
s.Nil(task)

s.Run("disable compaction", func() {
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)

got := handler.getCompactionTasksBySignalID(1)
s.Nil(got)

task := handler.getCompaction(1)
s.Nil(task)
})
}

func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
Expand Down
Loading
Loading