Skip to content

Commit

Permalink
fix: DC painc at dropvchannel when disable compaction
Browse files Browse the repository at this point in the history
Make EnableCompaction able to change dynamicly

See also: #31059

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Mar 14, 2024
1 parent 6939ad1 commit e09caab
Show file tree
Hide file tree
Showing 11 changed files with 601 additions and 475 deletions.
95 changes: 59 additions & 36 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ type compactionPlanContext interface {
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
updateCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction tasks by signal id
Expand Down Expand Up @@ -138,37 +136,6 @@ func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta C
}
}

func (c *compactionPlanHandler) checkResult() {
// deal results
ts, err := c.GetCurrentTS()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
_ = c.updateCompaction(ts)
}

func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.stopCh = make(chan struct{})
Expand All @@ -185,7 +152,9 @@ func (c *compactionPlanHandler) start() {
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
c.checkResult()
if compactionEnabled() {
c.CheckResult()
}
}
}
}()
Expand All @@ -204,7 +173,9 @@ func (c *compactionPlanHandler) start() {
return

case <-scheduleTicker.C:
c.schedule()
if compactionEnabled() {
c.Schedule()
}
}
}
}()
Expand All @@ -219,12 +190,45 @@ func (c *compactionPlanHandler) start() {
log.Info("Compaction handler quit clean")
return
case <-cleanTicker.C:
c.Clean()
if compactionEnabled() {
c.Clean()
}

Check warning on line 195 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L193-L195

Added lines #L193 - L195 were not covered by tests
}
}
}()
}

func (c *compactionPlanHandler) CheckResult() {
// deal results
ts, err := c.getCurrentTs()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
_ = c.updateCompaction(ts)
}

func (c *compactionPlanHandler) getCurrentTs() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) Schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}

Check warning on line 229 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L227-L229

Added lines #L227 - L229 were not covered by tests
}

func (c *compactionPlanHandler) Clean() {
current := tsoutil.GetCurrentTime()
c.mu.Lock()
Expand All @@ -249,6 +253,10 @@ func (c *compactionPlanHandler) stop() {
}

func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
if !compactionEnabled() {
return
}

Check warning on line 258 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L257-L258

Added lines #L257 - L258 were not covered by tests

c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
Expand Down Expand Up @@ -402,6 +410,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {

// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
if !compactionEnabled() {
return nil
}

Check warning on line 415 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L414-L415

Added lines #L414 - L415 were not covered by tests
return c.enqueuePlan(signal, plan)
}

Expand Down Expand Up @@ -511,12 +522,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact

// getCompaction return compaction task. If planId does not exist, return nil.
func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
if !compactionEnabled() {
return nil
}

Check warning on line 527 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L526-L527

Added lines #L526 - L527 were not covered by tests

c.mu.RLock()
defer c.mu.RUnlock()

return c.plans[planID]
}

// updateCompaction is the inner func to check compaction task states from datanode
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -617,6 +633,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou

// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
if !compactionEnabled() {
return false
}

Check warning on line 638 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L637-L638

Added lines #L637 - L638 were not covered by tests
return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}

Expand All @@ -634,6 +653,10 @@ func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*co

// get compaction tasks by signal id; if signalID == 0 return all tasks
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
if !compactionEnabled() {
return nil
}

Check warning on line 658 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L657-L658

Added lines #L657 - L658 were not covered by tests

c.mu.RLock()
defer c.mu.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}

{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}
}

Expand Down
Loading

0 comments on commit e09caab

Please sign in to comment.