Skip to content

Commit

Permalink
Apply for msgstream from pool when creating collection (#7738)
Browse files Browse the repository at this point in the history
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
  • Loading branch information
cydrain committed Sep 18, 2021
1 parent 8d5dc5b commit d10c6ab
Show file tree
Hide file tree
Showing 17 changed files with 263 additions and 131 deletions.
1 change: 1 addition & 0 deletions configs/advanced/channel.yaml
Expand Up @@ -14,6 +14,7 @@ msgChannel:
chanNamePrefix:
rootCoordTimeTick: "rootcoord-timetick"
rootCoordStatistics: "rootcoord-statistics"
rootCoordDml: "rootcoord-dml"
search: "search"
searchResult: "searchResult"
proxyTimeTick: "proxyTimeTick"
Expand Down
5 changes: 2 additions & 3 deletions internal/datanode/data_sync_service.go
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/flowgraph"

Expand Down Expand Up @@ -160,11 +159,11 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro

dsService.saveBinlog = saveBinlog

pchan := rootcoord.ToPhysicalChannel(vchanInfo.GetChannelName())
var dmStreamNode Node = newDmInputNode(
dsService.ctx,
dsService.msFactory,
pchan,
vchanInfo.CollectionID,
vchanInfo.GetChannelName(),
vchanInfo.GetSeekPosition(),
)
var ddNode Node = newDDNode(dsService.clearSignal, dsService.collectionID, vchanInfo)
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/flow_graph_dd_node.go
Expand Up @@ -92,7 +92,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.Uint64("Message endts", msg.EndTs()),
zap.Uint64("FilterThreshold", FilterThreshold),
)
if ddn.filterFlushedSegmentInsertMessages(msg.(*msgstream.InsertMsg)) {
if ddn.filterFlushedSegmentInsertMessages(imsg) {
continue
}
}
Expand Down
9 changes: 7 additions & 2 deletions internal/datanode/flow_graph_dmstream_input_node.go
Expand Up @@ -13,19 +13,24 @@ package datanode

import (
"context"
"strconv"

"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/flowgraph"
)

func newDmInputNode(ctx context.Context, factory msgstream.Factory, pchannelName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode {
func newDmInputNode(ctx context.Context, factory msgstream.Factory, collID UniqueID, chanName string, seekPos *internalpb.MsgPosition) *flowgraph.InputNode {
maxQueueLength := Params.FlowGraphMaxQueueLength
maxParallelism := Params.FlowGraphMaxParallelism
consumeSubName := Params.MsgChannelSubName

// subName should be unique, since pchannelName is shared among several collections
consumeSubName := Params.MsgChannelSubName + "-" + strconv.FormatInt(collID, 10)
insertStream, _ := factory.NewTtMsgStream(ctx)

pchannelName := rootcoord.ToPhysicalChannel(chanName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
log.Debug("datanode AsConsumer physical channel: " + pchannelName + " : " + consumeSubName)

Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/flow_graph_dmstream_input_node_test.go
Expand Up @@ -84,5 +84,5 @@ func (mtm *mockTtMsgStream) Seek(offset []*internalpb.MsgPosition) error {

func TestNewDmInputNode(t *testing.T) {
ctx := context.Background()
newDmInputNode(ctx, &mockMsgStreamFactory{}, "abc_adc", new(internalpb.MsgPosition))
newDmInputNode(ctx, &mockMsgStreamFactory{}, 0, "abc_adc", new(internalpb.MsgPosition))
}
7 changes: 6 additions & 1 deletion internal/datanode/flow_graph_insert_buffer_node.go
Expand Up @@ -355,7 +355,12 @@ func (ibNode *insertBufferNode) updateSegStatesInReplica(insertMsgs []*msgstream
err = ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
startPos, endPos)
if err != nil {
log.Warn("add segment wrong", zap.Int64("Seg ID", currentSegID), zap.Error(err))
log.Error("add segment wrong",
zap.Int64("segID", currentSegID),
zap.Int64("collID", collID),
zap.Int64("partID", partitionID),
zap.String("chanName", msg.GetChannelID()),
zap.Error(err))
return
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/querynode/flow_graph_filter_dm_node.go
Expand Up @@ -110,9 +110,9 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg

// check if the collection from message is target collection
if msg.CollectionID != fdmNode.collectionID {
log.Debug("filter invalid insert message, collection is not the target collection",
zap.Any("collectionID", msg.CollectionID),
zap.Any("partitionID", msg.PartitionID))
//log.Debug("filter invalid insert message, collection is not the target collection",
// zap.Any("collectionID", msg.CollectionID),
// zap.Any("partitionID", msg.PartitionID))
return nil
}

Expand Down
1 change: 1 addition & 0 deletions internal/querynode/flow_graph_query_node.go
Expand Up @@ -125,6 +125,7 @@ func (q *queryNodeFlowGraph) consumerFlowGraph(channel Channel, subName ConsumeS
log.Debug("query node flow graph consumes from pChannel",
zap.Any("collectionID", q.collectionID),
zap.Any("channel", channel),
zap.Any("subName", subName),
)
return nil
}
Expand Down
155 changes: 75 additions & 80 deletions internal/rootcoord/dml_channels.go
Expand Up @@ -15,121 +15,116 @@ import (
"fmt"
"sync"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"go.uber.org/zap"
)

type dmlChannels struct {
core *Core
lock sync.RWMutex
dml map[string]msgstream.MsgStream
core *Core
namePrefix string
capacity int64
refcnt sync.Map
idx *atomic.Int64
pool sync.Map
}

func newDMLChannels(c *Core) *dmlChannels {
return &dmlChannels{
core: c,
lock: sync.RWMutex{},
dml: make(map[string]msgstream.MsgStream),
func newDmlChannels(c *Core, chanNamePrefix string, chanNum int64) *dmlChannels {
d := &dmlChannels{
core: c,
namePrefix: chanNamePrefix,
capacity: chanNum,
refcnt: sync.Map{},
idx: atomic.NewInt64(0),
pool: sync.Map{},
}

var i int64
for i = 0; i < chanNum; i++ {
name := fmt.Sprintf("%s_%d", d.namePrefix, i)
ms, err := c.msFactory.NewMsgStream(c.ctx)
if err != nil {
log.Error("add msgstream failed", zap.String("name", name), zap.Error(err))
panic("add msgstream failed")
}
ms.AsProducer([]string{name})
d.pool.Store(name, &ms)
}
log.Debug("init dml channels", zap.Int64("num", chanNum))
return d
}

// GetNumChannels get current dml channel count
func (d *dmlChannels) GetNumChannels() int {
d.lock.RLock()
defer d.lock.RUnlock()
return len(d.dml)
func (d *dmlChannels) GetDmlMsgStreamName() string {
cnt := d.idx.Load()
name := fmt.Sprintf("%s_%d", d.namePrefix, cnt)
d.idx.Store((cnt + 1) % d.capacity)
return name
}

// ListChannels lists all dml channel names
func (d *dmlChannels) ListChannels() []string {
d.lock.RLock()
defer d.lock.RUnlock()

ret := make([]string, 0, len(d.dml))
for n := range d.dml {
ret = append(ret, n)
}
return ret

chanNames := make([]string, 0)
d.refcnt.Range(
func(k, v interface{}) bool {
chanNames = append(chanNames, k.(string))
return true
})
return chanNames
}

// Produce produces msg pack into specified channel
func (d *dmlChannels) Produce(name string, pack *msgstream.MsgPack) error {
d.lock.Lock()
defer d.lock.Unlock()

ds, ok := d.dml[name]
if !ok {
return fmt.Errorf("channel %s not exist", name)
}
if err := ds.Produce(pack); err != nil {
return err
}
return nil
// GetNumChannels get current dml channel count
func (d *dmlChannels) GetNumChannels() int {
return len(d.ListChannels())
}

// Broadcast broadcasts msg pack into specified channel
func (d *dmlChannels) Broadcast(name string, pack *msgstream.MsgPack) error {
d.lock.Lock()
defer d.lock.Unlock()

ds, ok := d.dml[name]
if !ok {
return fmt.Errorf("channel %s not exist", name)
}
if err := ds.Broadcast(pack); err != nil {
return err
}
return nil
}

// BroadcastAll invoke broadcast with provided msg pack in all channels specified
func (d *dmlChannels) BroadcastAll(channels []string, pack *msgstream.MsgPack) error {
d.lock.Lock()
defer d.lock.Unlock()

for _, ch := range channels {
ds, ok := d.dml[ch]
if !ok {
return fmt.Errorf("channel %s not exist", ch)
}
if err := ds.Broadcast(pack); err != nil {
return err
func (d *dmlChannels) Broadcast(chanNames []string, pack *msgstream.MsgPack) error {
for _, chanName := range chanNames {
// only in-use chanName exist in refcnt
if _, ok := d.refcnt.Load(chanName); ok {
v, _ := d.pool.Load(chanName)
if err := (*(v.(*msgstream.MsgStream))).Broadcast(pack); err != nil {
return err
}
} else {
return fmt.Errorf("channel %s not exist", chanName)
}
}
return nil
}

// AddProducerChannels add named channels as producer
func (d *dmlChannels) AddProducerChannels(names ...string) {
d.lock.Lock()
defer d.lock.Unlock()

for _, name := range names {
log.Debug("add dml channel", zap.String("channel name", name))
_, ok := d.dml[name]
if !ok {
ms, err := d.core.msFactory.NewMsgStream(d.core.ctx)
if err != nil {
log.Debug("add msgstream failed", zap.String("name", name), zap.Error(err))
continue
if _, ok := d.pool.Load(name); ok {
var cnt int64
if _, ok := d.refcnt.Load(name); !ok {
cnt = 1
} else {
v, _ := d.refcnt.Load(name)
cnt = v.(int64) + 1
}
ms.AsProducer([]string{name})
d.dml[name] = ms
d.refcnt.Store(name, cnt)
log.Debug("assign dml channel", zap.String("chanName", name), zap.Int64("refcnt", cnt))
} else {
log.Error("invalid channel name", zap.String("chanName", name))
panic("invalid channel name: " + name)
}
}
}

// RemoveProducerChannels removes specified channels
func (d *dmlChannels) RemoveProducerChannels(names ...string) {
d.lock.Lock()
defer d.lock.Unlock()

for _, name := range names {
if ds, ok := d.dml[name]; ok {
ds.Close()
delete(d.dml, name)
if v, ok := d.refcnt.Load(name); ok {
cnt := v.(int64)
if cnt > 1 {
d.refcnt.Store(name, cnt-1)
} else {
d.refcnt.Delete(name)
}
}
}
}
78 changes: 78 additions & 0 deletions internal/rootcoord/dml_channels_test.go
@@ -0,0 +1,78 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

package rootcoord

import (
"context"
"fmt"
"testing"

"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/stretchr/testify/assert"
)

func TestDmlChannels(t *testing.T) {
const (
dmlChanPrefix = "rootcoord-dml"
totalDmlChannelNum = 2
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

factory := msgstream.NewPmsFactory()
Params.Init()

m := map[string]interface{}{
"pulsarAddress": Params.PulsarAddress,
"receiveBufSize": 1024,
"pulsarBufSize": 1024}
err := factory.SetParams(m)
assert.Nil(t, err)

core, err := NewCore(ctx, factory)
assert.Nil(t, err)

dml := newDmlChannels(core, dmlChanPrefix, totalDmlChannelNum)
chanNames := dml.ListChannels()
assert.Equal(t, 0, len(chanNames))

randStr := funcutil.RandomString(8)
assert.Panics(t, func() { dml.AddProducerChannels(randStr) })

err = dml.Broadcast([]string{randStr}, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, fmt.Sprintf("channel %s not exist", randStr))

// dml_xxx_0 => {chanName0, chanName2}
// dml_xxx_1 => {chanName1}
chanName0 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName0)
assert.Equal(t, 1, dml.GetNumChannels())

chanName1 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName1)
assert.Equal(t, 2, dml.GetNumChannels())

chanName2 := dml.GetDmlMsgStreamName()
dml.AddProducerChannels(chanName2)
assert.Equal(t, 2, dml.GetNumChannels())

dml.RemoveProducerChannels(chanName0)
assert.Equal(t, 2, dml.GetNumChannels())

dml.RemoveProducerChannels(chanName1)
assert.Equal(t, 1, dml.GetNumChannels())

dml.RemoveProducerChannels(chanName0)
assert.Equal(t, 0, dml.GetNumChannels())
}
1 change: 1 addition & 0 deletions internal/rootcoord/meta_table.go
Expand Up @@ -174,6 +174,7 @@ func (mt *metaTable) reloadFromKV() error {
mt.indexID2Meta[meta.IndexID] = meta
}

log.Debug("reload meta table from KV successfully")
return nil
}

Expand Down

0 comments on commit d10c6ab

Please sign in to comment.