Skip to content

Commit

Permalink
fix: DC painc at dropvchannel when disable compaction
Browse files Browse the repository at this point in the history
Make EnableCompaction able to change dynamicly

See also: #31059

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Mar 6, 2024
1 parent a88c896 commit 9d3e365
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 86 deletions.
95 changes: 59 additions & 36 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ type compactionPlanContext interface {
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
updateCompaction(ts Timestamp) error
// isFull return true if the task pool is full
isFull() bool
// get compaction tasks by signal id
Expand Down Expand Up @@ -137,37 +135,6 @@ func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta C
}
}

func (c *compactionPlanHandler) checkResult() {
// deal results
ts, err := c.GetCurrentTS()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
_ = c.updateCompaction(ts)
}

func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) start() {
interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second)
c.stopCh = make(chan struct{})
Expand All @@ -184,7 +151,9 @@ func (c *compactionPlanHandler) start() {
log.Info("compaction handler check result loop quit")
return
case <-checkResultTicker.C:
c.checkResult()
if compactionEnabled() {
c.CheckResult()
}
}
}
}()
Expand All @@ -203,7 +172,9 @@ func (c *compactionPlanHandler) start() {
return

case <-scheduleTicker.C:
c.schedule()
if compactionEnabled() {
c.Schedule()
}
}
}
}()
Expand All @@ -218,12 +189,45 @@ func (c *compactionPlanHandler) start() {
log.Info("Compaction handler quit clean")
return
case <-cleanTicker.C:
c.Clean()
if compactionEnabled() {
c.Clean()
}
}
}
}()
}

func (c *compactionPlanHandler) CheckResult() {
// deal results
ts, err := c.getCurrentTs()
if err != nil {
log.Warn("fail to check result", zap.Error(err))
return
}
_ = c.updateCompaction(ts)
}

func (c *compactionPlanHandler) getCurrentTs() (Timestamp, error) {
interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(context.Background(), interval)
defer cancel()
ts, err := c.allocator.allocTimestamp(ctx)
if err != nil {
log.Warn("unable to alloc timestamp", zap.Error(err))
return 0, err
}
return ts, nil
}

func (c *compactionPlanHandler) Schedule() {
// schedule queuing tasks
tasks := c.scheduler.Schedule()
if len(tasks) > 0 {
c.notifyTasks(tasks)
c.scheduler.LogStatus()
}
}

func (c *compactionPlanHandler) Clean() {
current := tsoutil.GetCurrentTime()
c.mu.Lock()
Expand All @@ -248,6 +252,10 @@ func (c *compactionPlanHandler) stop() {
}

func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
if !compactionEnabled() {
return
}

c.mu.Lock()
defer c.mu.Unlock()
for id, task := range c.plans {
Expand Down Expand Up @@ -388,6 +396,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {

// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
if !compactionEnabled() {
return nil
}
return c.enqueuePlan(signal, plan)
}

Expand Down Expand Up @@ -497,12 +508,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact

// getCompaction return compaction task. If planId does not exist, return nil.
func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

return c.plans[planID]
}

// updateCompaction is the inner func to check compaction task states from datanode
func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
Expand Down Expand Up @@ -603,6 +619,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou

// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
if !compactionEnabled() {
return false
}
return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}

Expand All @@ -620,6 +639,10 @@ func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*co

// get compaction tasks by signal id; if signalID == 0 return all tasks
func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask {
if !compactionEnabled() {
return nil
}

c.mu.RLock()
defer c.mu.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}

{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc)
handler.checkResult()
handler.CheckResult()
}
}

Expand Down
38 changes: 24 additions & 14 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ type compactTime struct {
type trigger interface {
start()
stop()
// triggerCompaction triggers a compaction if any compaction condition satisfy.
triggerCompaction() error

// triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment
// used by savebinlogpath
triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error
// forceTriggerCompaction force to start a compaction

// forceTriggerCompaction force to start a compaction, used by manualCompaction, highest priority
forceTriggerCompaction(collectionID int64) (UniqueID, error)
}

Expand Down Expand Up @@ -148,21 +149,18 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
defer logutil.LogPanic()
defer t.wg.Done()

// If AutoCompaction disabled, global loop will not start
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
return
}

for {
select {
case <-t.quit:
t.globalTrigger.Stop()
log.Info("global compaction loop exit")
return
case <-t.globalTrigger.C:
err := t.triggerCompaction()
if err != nil {
log.Warn("unable to triggerCompaction", zap.Error(err))
if autoCompactionEnabled() {
err := t.triggerCompaction()
if err != nil {
log.Warn("unable to trigger global compaction", zap.Error(err))
}
}
}
}
Expand Down Expand Up @@ -236,7 +234,7 @@ func (t *compactionTrigger) getCompactTime(ts Timestamp, coll *collectionInfo) (
return &compactTime{0, 0}, nil
}

// triggerCompaction trigger a compaction if any compaction condition satisfy.
// triggerCompaction is an internal global compaction trigger
func (t *compactionTrigger) triggerCompaction() error {
id, err := t.allocSignalID()
if err != nil {
Expand All @@ -253,8 +251,8 @@ func (t *compactionTrigger) triggerCompaction() error {

// triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error {
// If AutoCompaction disabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
// If AutoCompaction disabled, no task would be generated
if !autoCompactionEnabled() {
return nil
}

Expand Down Expand Up @@ -287,6 +285,10 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s
// forceTriggerCompaction force to start a compaction
// invoked by user `ManualCompaction` operation
func (t *compactionTrigger) forceTriggerCompaction(collectionID int64) (UniqueID, error) {
if !compactionEnabled() {
return -1, merr.WrapErrServiceUnavailable("compaction disabled")
}

id, err := t.allocSignalID()
if err != nil {
return -1, err
Expand Down Expand Up @@ -1008,3 +1010,11 @@ func fetchSegIDs(segBinLogs []*datapb.CompactionSegmentBinlogs) []int64 {
}
return segIDs
}

func compactionEnabled() bool {
return Params.DataCoordCfg.EnableCompaction.GetAsBool()
}

func autoCompactionEnabled() bool {
return compactionEnabled() && Params.DataCoordCfg.EnableAutoCompaction.GetAsBool()
}
46 changes: 23 additions & 23 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,8 @@ func (s *Server) initDataCoord() error {
}
log.Info("init service discovery done")

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.createCompactionHandler()
s.createCompactionTrigger()
log.Info("init compaction scheduler done")
}
s.initCompaction()
log.Info("init compaction done")

if err = s.initSegmentManager(); err != nil {
return err
Expand Down Expand Up @@ -405,11 +402,6 @@ func (s *Server) Start() error {
}

func (s *Server) startDataCoord() {
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionViewManager.Start()
}
s.startServerLoop()

// http.Register(&http.Handler{
Expand Down Expand Up @@ -497,23 +489,32 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ
s.indexNodeCreator = f
}

func (s *Server) createCompactionHandler() {
func (s *Server) initCompaction() {
s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)

triggerv2 := NewCompactionTriggerManager(s.allocator, s.compactionHandler)
s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator)
}

func (s *Server) stopCompactionHandler() {
s.compactionHandler.stop()
s.compactionViewManager.Close()
func (s *Server) startCompaction() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionViewManager.Start()
}

func (s *Server) createCompactionTrigger() {
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}
func (s *Server) stopCompaction() {
if s.compactionHandler != nil {
s.compactionHandler.stop()
}

if s.compactionViewManager != nil {
s.compactionViewManager.Close()
}

func (s *Server) stopCompactionTrigger() {
s.compactionTrigger.stop()
if s.compactionTrigger != nil {
s.compactionTrigger.stop()
}
}

func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
Expand Down Expand Up @@ -656,6 +657,8 @@ func (s *Server) initIndexNodeManager() {
}

func (s *Server) startServerLoop() {
s.startCompaction()

s.serverLoopWg.Add(2)
if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
s.serverLoopWg.Add(1)
Expand Down Expand Up @@ -1114,10 +1117,7 @@ func (s *Server) Stop() error {
s.importScheduler.Close()
s.importChecker.Close()

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
s.stopCompaction()
logutil.Logger(s.ctx).Info("datacoord compaction stopped")

s.indexBuilder.Stop()
Expand Down
Loading

0 comments on commit 9d3e365

Please sign in to comment.