Skip to content

Commit

Permalink
enhance: Remove deprecated timeout param in segment/channel task
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Jan 5, 2024
1 parent 3626f49 commit 7af3c65
Show file tree
Hide file tree
Showing 9 changed files with 14 additions and 65 deletions.
8 changes: 3 additions & 5 deletions internal/querycoordv2/balance/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package balance
import (
"context"
"fmt"
"time"

"go.uber.org/zap"

Expand All @@ -34,7 +33,7 @@ const (
DistInfoPrefix = "Balance-Dists:"
)

func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeout time.Duration, plans []SegmentAssignPlan) []task.Task {
func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, plans []SegmentAssignPlan) []task.Task {
ret := make([]task.Task, 0)
for _, p := range plans {
actions := make([]task.Action, 0)
Expand All @@ -48,7 +47,6 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou
}
t, err := task.NewSegmentTask(
ctx,
timeout,
source,
p.Segment.GetCollectionID(),
p.ReplicaID,
Expand Down Expand Up @@ -86,7 +84,7 @@ func CreateSegmentTasksFromPlans(ctx context.Context, source task.Source, timeou
return ret
}

func CreateChannelTasksFromPlans(ctx context.Context, source task.Source, timeout time.Duration, plans []ChannelAssignPlan) []task.Task {
func CreateChannelTasksFromPlans(ctx context.Context, source task.Source, plans []ChannelAssignPlan) []task.Task {
ret := make([]task.Task, 0, len(plans))
for _, p := range plans {
actions := make([]task.Action, 0)
Expand All @@ -98,7 +96,7 @@ func CreateChannelTasksFromPlans(ctx context.Context, source task.Source, timeou
action := task.NewChannelAction(p.From, task.ActionTypeReduce, p.Channel.GetChannelName())
actions = append(actions, action)
}
t, err := task.NewChannelTask(ctx, timeout, source, p.Channel.GetCollectionID(), p.ReplicaID, actions...)
t, err := task.NewChannelTask(ctx, source, p.Channel.GetCollectionID(), p.ReplicaID, actions...)
if err != nil {
log.Warn("create channel task failed",
zap.Int64("collection", p.Channel.GetCollectionID()),
Expand Down
5 changes: 2 additions & 3 deletions internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package checkers
import (
"context"
"sort"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
Expand Down Expand Up @@ -154,12 +153,12 @@ func (b *BalanceChecker) Check(ctx context.Context) []task.Task {
replicasToBalance := b.replicasToBalance()
segmentPlans, channelPlans := b.balanceReplicas(replicasToBalance)

tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), segmentPlans)
tasks := balance.CreateSegmentTasksFromPlans(ctx, b.ID(), segmentPlans)
task.SetPriority(task.TaskPriorityLow, tasks...)
task.SetReason("segment unbalanced", tasks...)
ret = append(ret, tasks...)

tasks = balance.CreateChannelTasksFromPlans(ctx, b.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), channelPlans)
tasks = balance.CreateChannelTasksFromPlans(ctx, b.ID(), channelPlans)
task.SetReason("channel unbalanced", tasks...)
ret = append(ret, tasks...)
return ret
Expand Down
6 changes: 2 additions & 4 deletions internal/querycoordv2/checkers/channel_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package checkers

import (
"context"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -200,14 +198,14 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*
plans[i].ReplicaID = replica.GetID()
}

return balance.CreateChannelTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), plans)
return balance.CreateChannelTasksFromPlans(ctx, c.ID(), plans)
}

func (c *ChannelChecker) createChannelReduceTasks(ctx context.Context, channels []*meta.DmChannel, replicaID int64) []task.Task {
ret := make([]task.Task, 0, len(channels))
for _, ch := range channels {
action := task.NewChannelAction(ch.Node, task.ActionTypeReduce, ch.GetChannelName())
task, err := task.NewChannelTask(ctx, Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond), c.ID(), ch.GetCollectionID(), replicaID, action)
task, err := task.NewChannelTask(ctx, c.ID(), ch.GetCollectionID(), replicaID, action)
if err != nil {
log.Warn("create channel reduce task failed",
zap.Int64("collection", ch.GetCollectionID()),
Expand Down
3 changes: 0 additions & 3 deletions internal/querycoordv2/checkers/index_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package checkers

import (
"context"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -161,7 +159,6 @@ func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *met
action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical)
t, err := task.NewSegmentTask(
ctx,
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
c.ID(),
segment.GetCollectionID(),
replica.GetID(),
Expand Down
5 changes: 1 addition & 4 deletions internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package checkers
import (
"context"
"sort"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
Expand All @@ -28,7 +27,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/balance"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
Expand Down Expand Up @@ -381,7 +379,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
plans = append(plans, shardPlans...)
}

return balance.CreateSegmentTasksFromPlans(ctx, c.ID(), Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), plans)
return balance.CreateSegmentTasksFromPlans(ctx, c.ID(), plans)
}

func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments []*meta.Segment, replicaID int64, scope querypb.DataScope) []task.Task {
Expand All @@ -390,7 +388,6 @@ func (c *SegmentChecker) createSegmentReduceTasks(ctx context.Context, segments
action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeReduce, s.GetInsertChannel(), s.GetID(), scope)
task, err := task.NewSegmentTask(
ctx,
Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
c.ID(),
s.GetCollectionID(),
replicaID,
Expand Down
1 change: 0 additions & 1 deletion internal/querycoordv2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (s *Server) balanceSegments(ctx context.Context, req *querypb.LoadBalanceRe
zap.Int64("segmentID", plan.Segment.GetID()),
)
task, err := task.NewSegmentTask(ctx,
Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
task.WrapIDSource(req.GetBase().GetMsgID()),
req.GetCollectionID(),
replica.GetID(),
Expand Down
3 changes: 0 additions & 3 deletions internal/querycoordv2/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package task
import (
"context"
"fmt"
"time"

"github.com/cockroachdb/errors"
"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -285,7 +284,6 @@ type SegmentTask struct {
// all actions must process the same segment,
// empty actions is not allowed
func NewSegmentTask(ctx context.Context,
timeout time.Duration,
source Source,
collectionID,
replicaID UniqueID,
Expand Down Expand Up @@ -342,7 +340,6 @@ type ChannelTask struct {
// all actions must process the same channel, and the same type of channel
// empty actions is not allowed
func NewChannelTask(ctx context.Context,
timeout time.Duration,
source Source,
collectionID,
replicaID UniqueID,
Expand Down
Loading

0 comments on commit 7af3c65

Please sign in to comment.