Skip to content

Commit

Permalink
fix: Fix panic due to failed to seek (milvus-io#34229)
Browse files Browse the repository at this point in the history
Converting the same msgposition's vchannel to a pchannel multiple times
would result in an invalid pchannel, leading to seek failure and panic.
This PR:
1. Make a copy of msgposition in msgdispatcher.
2. Check if channel is already a pchannel, no further channel conversion
is performed.

issue: milvus-io#34221

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored and yellow-shine committed Jul 2, 2024
1 parent 8ef3053 commit d6ec669
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
3 changes: 2 additions & 1 deletion pkg/mq/msgdispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func NewDispatcher(ctx context.Context,
return nil, err
}
if position != nil && len(position.MsgID) != 0 {
position = typeutil.Clone(position)
position.ChannelName = funcutil.ToPhysicalChannel(position.ChannelName)
err = stream.AsConsumer(ctx, []string{pchannel}, subName, common.SubscriptionPositionUnknown)
if err != nil {
Expand Down Expand Up @@ -234,7 +235,7 @@ func (d *Dispatcher) work() {
}
}
if err != nil {
t.pos = pack.StartPositions[0]
t.pos = typeutil.Clone(pack.StartPositions[0])
// replace the pChannel with vChannel
t.pos.ChannelName = t.vchannel
d.lagTargets.Insert(t.vchannel, t)
Expand Down
12 changes: 7 additions & 5 deletions pkg/mq/msgdispatcher/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ func TestManager(t *testing.T) {
r := rand.Intn(10) + 1
for j := 0; j < r; j++ {
offset++
t.Logf("dyh add, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
_, err := c.Add(context.Background(), fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset), nil, common.SubscriptionPositionUnknown)
vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset)
t.Logf("add vchannel, %s", vchannel)
_, err := c.Add(context.Background(), vchannel, nil, common.SubscriptionPositionUnknown)
assert.NoError(t, err)
assert.Equal(t, offset, c.Num())
}
for j := 0; j < rand.Intn(r); j++ {
t.Logf("dyh remove, %s", fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
c.Remove(fmt.Sprintf("mock-pchannel-0_vchannel_%d", offset))
vchannel := fmt.Sprintf("mock-pchannel-dml_0_vchannelv%d", offset)
t.Logf("remove vchannel, %s", vchannel)
c.Remove(vchannel)
offset--
assert.Equal(t, offset, c.Num())
}
Expand Down Expand Up @@ -166,7 +168,7 @@ func (suite *SimulationSuite) SetupSuite() {
}

func (suite *SimulationSuite) SetupTest() {
suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml-%d-%d", rand.Int(), time.Now().UnixNano())
suite.pchannel = fmt.Sprintf("by-dev-rootcoord-dispatcher-simulation-dml_%d", time.Now().UnixNano())
producer, err := newMockProducer(suite.factory, suite.pchannel)
assert.NoError(suite.T(), err)
suite.producer = producer
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/funcutil/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,16 @@ func GetAvailablePort() int {
return listener.Addr().(*net.TCPAddr).Port
}

// IsPhysicalChannel checks if the channel is a physical channel
func IsPhysicalChannel(channel string) bool {
return strings.Count(channel, "_") == 1
}

// ToPhysicalChannel get physical channel name from virtual channel name
func ToPhysicalChannel(vchannel string) string {
if IsPhysicalChannel(vchannel) {
return vchannel
}
index := strings.LastIndex(vchannel, "_")
if index < 0 {
return vchannel
Expand Down
12 changes: 9 additions & 3 deletions pkg/util/funcutil/func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,17 @@ func TestCheckPortAvailable(t *testing.T) {
}

func Test_ToPhysicalChannel(t *testing.T) {
assert.Equal(t, "abc", ToPhysicalChannel("abc_"))
assert.Equal(t, "abc", ToPhysicalChannel("abc_123"))
assert.Equal(t, "abc", ToPhysicalChannel("abc_defgsg"))
assert.Equal(t, "abc_", ToPhysicalChannel("abc_"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123"))
assert.Equal(t, "abc_defgsg", ToPhysicalChannel("abc_defgsg"))
assert.Equal(t, "abc_123", ToPhysicalChannel("abc_123_456"))
assert.Equal(t, "abc__", ToPhysicalChannel("abc___defgsg"))
assert.Equal(t, "abcdef", ToPhysicalChannel("abcdef"))
channel := "by-dev-rootcoord-dml_3_449883080965365748v0"
for i := 0; i < 10; i++ {
channel = ToPhysicalChannel(channel)
assert.Equal(t, "by-dev-rootcoord-dml_3", channel)
}
}

func Test_ConvertChannelName(t *testing.T) {
Expand Down

0 comments on commit d6ec669

Please sign in to comment.