-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
cluster.go
215 lines (184 loc) · 6.65 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"sync"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// Cluster provides interfaces to interact with datanode cluster
//
//go:generate mockery --name=Cluster --structname=MockCluster --output=./ --filename=mock_cluster.go --with-expecter --inpackage
type Cluster interface {
Startup(ctx context.Context, nodes []*NodeInfo) error
Register(node *NodeInfo) error
UnRegister(node *NodeInfo) error
Watch(ctx context.Context, ch RWChannel) error
Flush(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error
FlushChannels(ctx context.Context, nodeID int64, flushTs Timestamp, channels []string) error
PreImport(nodeID int64, in *datapb.PreImportRequest) error
ImportV2(nodeID int64, in *datapb.ImportRequest) error
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
QuerySlots() map[int64]int64
GetSessions() []*Session
Close()
}
var _ Cluster = (*ClusterImpl)(nil)
type ClusterImpl struct {
sessionManager SessionManager
channelManager ChannelManager
}
// NewClusterImpl creates a new cluster
func NewClusterImpl(sessionManager SessionManager, channelManager ChannelManager) *ClusterImpl {
c := &ClusterImpl{
sessionManager: sessionManager,
channelManager: channelManager,
}
return c
}
// Startup inits the cluster with the given data nodes.
func (c *ClusterImpl) Startup(ctx context.Context, nodes []*NodeInfo) error {
for _, node := range nodes {
c.sessionManager.AddSession(node)
}
var (
legacyNodes []int64
allNodes []int64
)
lo.ForEach(nodes, func(info *NodeInfo, _ int) {
if info.IsLegacy {
legacyNodes = append(legacyNodes, info.NodeID)
}
allNodes = append(allNodes, info.NodeID)
})
return c.channelManager.Startup(ctx, legacyNodes, allNodes)
}
// Register registers a new node in cluster
func (c *ClusterImpl) Register(node *NodeInfo) error {
c.sessionManager.AddSession(node)
return c.channelManager.AddNode(node.NodeID)
}
// UnRegister removes a node from cluster
func (c *ClusterImpl) UnRegister(node *NodeInfo) error {
c.sessionManager.DeleteSession(node)
return c.channelManager.DeleteNode(node.NodeID)
}
// Watch tries to add a channel in datanode cluster
func (c *ClusterImpl) Watch(ctx context.Context, ch RWChannel) error {
return c.channelManager.Watch(ctx, ch)
}
// Flush sends async FlushSegments requests to dataNodes
// which also according to channels where segments are assigned to.
func (c *ClusterImpl) Flush(ctx context.Context, nodeID int64, channel string, segments []*datapb.SegmentInfo) error {
ch, founded := c.channelManager.GetChannel(nodeID, channel)
if !founded {
log.Warn("node is not matched with channel",
zap.String("channel", channel),
zap.Int64("nodeID", nodeID),
)
return fmt.Errorf("channel %s is not watched on node %d", channel, nodeID)
}
getSegmentID := func(segment *datapb.SegmentInfo, _ int) int64 {
return segment.GetID()
}
req := &datapb.FlushSegmentsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_Flush),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
commonpbutil.WithTargetID(nodeID),
),
CollectionID: ch.GetCollectionID(),
SegmentIDs: lo.Map(segments, getSegmentID),
ChannelName: channel,
}
c.sessionManager.Flush(ctx, nodeID, req)
return nil
}
func (c *ClusterImpl) FlushChannels(ctx context.Context, nodeID int64, flushTs Timestamp, channels []string) error {
if len(channels) == 0 {
return nil
}
for _, channel := range channels {
if !c.channelManager.Match(nodeID, channel) {
return fmt.Errorf("channel %s is not watched on node %d", channel, nodeID)
}
}
req := &datapb.FlushChannelsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithSourceID(paramtable.GetNodeID()),
commonpbutil.WithTargetID(nodeID),
),
FlushTs: flushTs,
Channels: channels,
}
return c.sessionManager.FlushChannels(ctx, nodeID, req)
}
func (c *ClusterImpl) PreImport(nodeID int64, in *datapb.PreImportRequest) error {
return c.sessionManager.PreImport(nodeID, in)
}
func (c *ClusterImpl) ImportV2(nodeID int64, in *datapb.ImportRequest) error {
return c.sessionManager.ImportV2(nodeID, in)
}
func (c *ClusterImpl) QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) {
return c.sessionManager.QueryPreImport(nodeID, in)
}
func (c *ClusterImpl) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) {
return c.sessionManager.QueryImport(nodeID, in)
}
func (c *ClusterImpl) DropImport(nodeID int64, in *datapb.DropImportRequest) error {
return c.sessionManager.DropImport(nodeID, in)
}
func (c *ClusterImpl) QuerySlots() map[int64]int64 {
nodeIDs := c.sessionManager.GetSessionIDs()
nodeSlots := make(map[int64]int64)
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}
for _, nodeID := range nodeIDs {
wg.Add(1)
go func(nodeID int64) {
defer wg.Done()
resp, err := c.sessionManager.QuerySlot(nodeID)
if err != nil {
log.Warn("query slot failed", zap.Int64("nodeID", nodeID), zap.Error(err))
return
}
mu.Lock()
defer mu.Unlock()
nodeSlots[nodeID] = resp.GetNumSlots()
}(nodeID)
}
wg.Wait()
log.Debug("query slot done", zap.Any("nodeSlots", nodeSlots))
return nodeSlots
}
// GetSessions returns all sessions
func (c *ClusterImpl) GetSessions() []*Session {
return c.sessionManager.GetSessions()
}
// Close releases resources opened in Cluster
func (c *ClusterImpl) Close() {
c.sessionManager.Close()
c.channelManager.Close()
}