Skip to content

Commit

Permalink
Resolve an issue with bulk load where segments in flow graph replica …
Browse files Browse the repository at this point in the history
…are not updated (#16609)

issue: #15604

/kind enhancement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
  • Loading branch information
soothing-rain committed Apr 25, 2022
1 parent 65a9e01 commit e66ac6a
Show file tree
Hide file tree
Showing 14 changed files with 445 additions and 246 deletions.
7 changes: 5 additions & 2 deletions internal/datacoord/cluster.go
Expand Up @@ -79,7 +79,7 @@ func (c *Cluster) Watch(ch string, collectionID UniqueID) error {
return c.channelManager.Watch(&channel{Name: ch, CollectionID: collectionID})
}

// Flush sends flush requests to corresponding datanodes according to channels that segments belong to
// Flush sends flush requests to corresponding dataNodes according to channels where segments are assigned to.
func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, markSegments []*datapb.SegmentInfo) {
channels := c.channelManager.GetChannels()
nodeSegments := make(map[int64][]int64)
Expand Down Expand Up @@ -131,7 +131,10 @@ func (c *Cluster) Flush(ctx context.Context, segments []*datapb.SegmentInfo, mar
SegmentIDs: segments,
MarkSegmentIDs: marks,
}
log.Info("Plan to flush", zap.Int64("node_id", nodeID), zap.Int64s("segments", segments), zap.Int64s("marks", marks))
log.Info("calling dataNode to flush",
zap.Int64("dataNode ID", nodeID),
zap.Int64s("segments", segments),
zap.Int64s("marks", marks))
c.sessionManager.Flush(ctx, nodeID, req)
}
}
Expand Down
17 changes: 11 additions & 6 deletions internal/datacoord/segment_manager.go
Expand Up @@ -72,8 +72,9 @@ type Manager interface {
AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error)
// DropSegment drops the segment from manager.
DropSegment(ctx context.Context, segmentID UniqueID)
// SealAllSegments seals all segments of collection with collectionID and return sealed segments
SealAllSegments(ctx context.Context, collectionID UniqueID) ([]UniqueID, error)
// SealAllSegments seals all segments of collection with collectionID and return sealed segments.
// If segIDs is not empty, also seals segments in segIDs.
SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error)
// GetFlushableSegments returns flushable segment ids
GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error)
// ExpireAllocations notifies segment status to expire old allocations
Expand Down Expand Up @@ -360,17 +361,21 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
}
}

// SealAllSegments seals all segmetns of collection with collectionID and return sealed segments
func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
// SealAllSegments seals all segments of collection with collectionID and return sealed segments
func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) {
sp, _ := trace.StartSpanFromContext(ctx)
defer sp.Finish()
s.mu.Lock()
defer s.mu.Unlock()
var ret []UniqueID
for _, id := range s.segments {
segCandidates := s.segments
if len(segIDs) != 0 {
segCandidates = segIDs
}
for _, id := range segCandidates {
info := s.meta.GetSegment(id)
if info == nil {
log.Warn("Failed to get seg info from meta", zap.Int64("id", id))
log.Warn("failed to get seg info from meta", zap.Int64("segment ID", id))
continue
}
if info.CollectionID != collectionID {
Expand Down
26 changes: 24 additions & 2 deletions internal/datacoord/segment_manager_test.go
Expand Up @@ -186,7 +186,29 @@ func TestSaveSegmentsToMeta(t *testing.T) {
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID)
_, err = segmentManager.SealAllSegments(context.Background(), collID, nil)
assert.Nil(t, err)
segment := meta.GetSegment(allocations[0].SegmentID)
assert.NotNil(t, segment)
assert.EqualValues(t, segment.LastExpireTime, allocations[0].ExpireTime)
assert.EqualValues(t, commonpb.SegmentState_Sealed, segment.State)
}

func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)

schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator)
allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
_, err = segmentManager.SealAllSegments(context.Background(), collID, []int64{allocations[0].SegmentID})
assert.Nil(t, err)
segment := meta.GetSegment(allocations[0].SegmentID)
assert.NotNil(t, segment)
Expand Down Expand Up @@ -297,7 +319,7 @@ func TestGetFlushableSegments(t *testing.T) {
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))

ids, err := segmentManager.SealAllSegments(context.TODO(), collID)
ids, err := segmentManager.SealAllSegments(context.TODO(), collID, nil)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(ids))
assert.EqualValues(t, allocations[0].SegmentID, ids[0])
Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/server.go
Expand Up @@ -539,7 +539,9 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
return nil
}

log.Info("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))
log.Info("start flushing segments",
zap.Int64s("segment IDs", flushableIDs),
zap.Int("# of stale/mark segments", len(staleSegments)))

s.setLastFlushTime(flushableSegments)
s.setLastFlushTime(staleSegments)
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/server_test.go
Expand Up @@ -749,7 +749,7 @@ func (s *spySegmentManager) DropSegment(ctx context.Context, segmentID UniqueID)
}

// SealAllSegments seals all segments of collection with collectionID and return sealed segments
func (s *spySegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
func (s *spySegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID, segIDs []UniqueID) ([]UniqueID, error) {
panic("not implemented") // TODO: Implement
}

Expand Down
12 changes: 8 additions & 4 deletions internal/datacoord/services.go
Expand Up @@ -84,7 +84,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
resp.Status.Reason = serverNotServingErrMsg
return resp, nil
}
sealedSegments, err := s.segmentManager.SealAllSegments(ctx, req.CollectionID)
sealedSegments, err := s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs())
if err != nil {
resp.Status.Reason = fmt.Sprintf("failed to flush %d, %s", req.CollectionID, err)
return resp, nil
Expand Down Expand Up @@ -343,7 +343,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
s.segmentManager.DropSegment(ctx, segment.GetID())
}

// set segment to SegmentState_Flushing and save binlogs and checkpoints
// Set segment to SegmentState_Flushing. Also save binlogs and checkpoints.
err := s.meta.UpdateFlushSegmentsInfo(
req.GetSegmentID(),
req.GetFlushed(),
Expand All @@ -362,7 +362,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
return resp, nil
}

log.Info("flush segment with meta", zap.Int64("id", req.SegmentID),
log.Info("flush segment with meta", zap.Int64("segment id", req.SegmentID),
zap.Any("meta", req.GetField2BinlogPaths()))

if req.GetFlushed() {
Expand All @@ -378,8 +378,12 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(),
segment.GetPartitionID(), segmentID, segment.GetInsertChannel(), tt)
if err != nil {
log.Warn("failed to trigger single compaction", zap.Int64("segmentID", segmentID))
log.Warn("failed to trigger single compaction", zap.Int64("segment ID", segmentID))
} else {
log.Info("compaction triggered for segment", zap.Int64("segment ID", segmentID))
}
} else {
log.Warn("failed to get time travel reverse time")
}
}
}
Expand Down
62 changes: 50 additions & 12 deletions internal/datanode/data_node.go
Expand Up @@ -556,27 +556,29 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
}

log.Info("Receive FlushSegments req",
zap.Int64("collectionID", req.GetCollectionID()), zap.Int("num", len(req.SegmentIDs)),
zap.Int64s("segments", req.SegmentIDs),
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("segments", req.GetSegmentIDs()),
zap.Int64s("stale segments", req.GetMarkSegmentIDs()),
)

processSegments := func(segmentIDs []UniqueID, flushed bool) bool {
noErr := true
for _, id := range segmentIDs {
if node.segmentCache.checkIfCached(id) {
// Segment in flushing, ignore
log.Info("Segment flushing, ignore the flush request until flush is done.",
log.Info("segment flushing, ignore the flush request until flush is done.",
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("segmentID", id))

status.Reason = "segment is flushing, nothing is done"
noErr = false
continue
}

node.segmentCache.Cache(id)

flushCh, err := node.flowgraphManager.getFlushCh(id)
if err != nil {
status.Reason = "DataNode abnormal, restarting"
log.Error("DataNode abnormal, no flushCh for a vchannel", zap.Error(err))
status.Reason = "no flush channel found for v-channel"
log.Error("no flush channel found for v-channel", zap.Error(err))
noErr = false
continue
}
Expand All @@ -589,9 +591,11 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
flushed: flushed,
}
}
log.Info("Flowgraph flushSegment tasks triggered", zap.Bool("flushed", flushed),
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64s("segments", segmentIDs))

log.Info("flow graph flushSegment tasks triggered",
zap.Bool("flushed", flushed),
zap.Int64("collection ID", req.GetCollectionID()),
zap.Int64s("segments", segmentIDs),
zap.Int64s("mark segments", req.GetMarkSegmentIDs()))
return noErr
}

Expand Down Expand Up @@ -868,13 +872,15 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
return func(fields map[storage.FieldID]storage.FieldData, shardNum int) error {
if shardNum >= len(req.GetImportTask().GetChannelNames()) {
log.Error("import task returns invalid shard number",
zap.Int("# of shards", shardNum),
zap.Int("shard num", shardNum),
zap.Int("# of channels", len(req.GetImportTask().GetChannelNames())),
zap.Any("channel names", req.GetImportTask().GetChannelNames()),
)
return fmt.Errorf("syncSegmentID Failed: invalid shard number %d", shardNum)
}
log.Info("import task flush segment", zap.Any("ChannelNames", req.ImportTask.ChannelNames), zap.Int("shardNum", shardNum))
log.Info("import task flush segment",
zap.Any("channel names", req.ImportTask.ChannelNames),
zap.Int("shard num", shardNum))
segReqs := []*datapb.SegmentIDRequest{
{
ChannelName: req.ImportTask.ChannelNames[shardNum],
Expand All @@ -899,6 +905,7 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
}
segmentID := resp.SegIDAssignments[0].SegID

// TODO: this code block is long and tedious, maybe split it into separate functions.
var rowNum int
for _, field := range fields {
rowNum = field.RowNum()
Expand Down Expand Up @@ -1018,6 +1025,38 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
fieldStats = append(fieldStats, &datapb.FieldBinlog{FieldID: k, Binlogs: []*datapb.Binlog{v}})
}

ds, ok := node.flowgraphManager.getFlowgraphService(segReqs[0].GetChannelName())
if !ok {
log.Warn("channel not found in current dataNode",
zap.String("channel name", segReqs[0].GetChannelName()),
zap.Int64("node ID", node.NodeID))
return errors.New("channel " + segReqs[0].GetChannelName() + " not found in current dataNode")
}

// Update flow graph replica segment info.
// TODO: Duplicate code. Add wrapper function.
if !ds.replica.hasSegment(segmentID, true) {
err = ds.replica.addNewSegment(segmentID,
req.GetImportTask().GetCollectionId(),
req.GetImportTask().GetPartitionId(),
segReqs[0].GetChannelName(),
&internalpb.MsgPosition{
ChannelName: segReqs[0].GetChannelName(),
},
&internalpb.MsgPosition{
ChannelName: segReqs[0].GetChannelName(),
})
if err != nil {
log.Error("failed to add segment",
zap.Int64("segment ID", segmentID),
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Int64("partition ID", req.GetImportTask().GetPartitionId()),
zap.String("channel mame", segReqs[0].GetChannelName()),
zap.Error(err))
}
}
ds.replica.updateStatistics(segmentID, int64(rowNum))

req := &datapb.SaveBinlogPathsRequest{
Base: &commonpb.MsgBase{
MsgType: 0, //TODO msg type
Expand All @@ -1030,7 +1069,6 @@ func importFlushReqFunc(node *DataNode, req *datapb.ImportTaskRequest, res *root
Field2BinlogPaths: fieldInsert,
Field2StatslogPaths: fieldStats,
Importing: true,
Flushed: true,
}

err = retry.Do(context.Background(), func() error {
Expand Down
58 changes: 57 additions & 1 deletion internal/datanode/data_node_test.go
Expand Up @@ -236,7 +236,7 @@ func TestDataNode(t *testing.T) {
// dup call
status, err := node1.FlushSegments(node1.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)

// failure call
req = &datapb.FlushSegmentsRequest{
Expand Down Expand Up @@ -356,6 +356,62 @@ func TestDataNode(t *testing.T) {
assert.Equal(t, "", stat.GetReason())
})

t.Run("Test Import w/ bad flow graph", func(t *testing.T) {
node.rootCoord = &RootCoordFactory{
collectionID: 100,
pkType: schemapb.DataType_Int64,
}

chName1 := "fake-by-dev-rootcoord-dml-testimport-1"
chName2 := "fake-by-dev-rootcoord-dml-testimport-2"
err := node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{
CollectionID: 100,
ChannelName: chName1,
UnflushedSegments: []*datapb.SegmentInfo{},
FlushedSegments: []*datapb.SegmentInfo{},
})
require.Nil(t, err)
err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{
CollectionID: 999, // wrong collection ID.
ChannelName: chName2,
UnflushedSegments: []*datapb.SegmentInfo{},
FlushedSegments: []*datapb.SegmentInfo{},
})
require.Nil(t, err)

_, ok := node.flowgraphManager.getFlowgraphService(chName1)
assert.True(t, ok)
_, ok = node.flowgraphManager.getFlowgraphService(chName2)
assert.True(t, ok)

content := []byte(`{
"rows":[
{"bool_field": true, "int8_field": 10, "int16_field": 101, "int32_field": 1001, "int64_field": 10001, "float32_field": 3.14, "float64_field": 1.56, "varChar_field": "hello world", "binary_vector_field": [254, 0, 254, 0], "float_vector_field": [1.1, 1.2]},
{"bool_field": false, "int8_field": 11, "int16_field": 102, "int32_field": 1002, "int64_field": 10002, "float32_field": 3.15, "float64_field": 2.56, "varChar_field": "hello world", "binary_vector_field": [253, 0, 253, 0], "float_vector_field": [2.1, 2.2]},
{"bool_field": true, "int8_field": 12, "int16_field": 103, "int32_field": 1003, "int64_field": 10003, "float32_field": 3.16, "float64_field": 3.56, "varChar_field": "hello world", "binary_vector_field": [252, 0, 252, 0], "float_vector_field": [3.1, 3.2]},
{"bool_field": false, "int8_field": 13, "int16_field": 104, "int32_field": 1004, "int64_field": 10004, "float32_field": 3.17, "float64_field": 4.56, "varChar_field": "hello world", "binary_vector_field": [251, 0, 251, 0], "float_vector_field": [4.1, 4.2]},
{"bool_field": true, "int8_field": 14, "int16_field": 105, "int32_field": 1005, "int64_field": 10005, "float32_field": 3.18, "float64_field": 5.56, "varChar_field": "hello world", "binary_vector_field": [250, 0, 250, 0], "float_vector_field": [5.1, 5.2]}
]
}`)

filePath := "import/rows_1.json"
err = node.chunkManager.Write(filePath, content)
assert.NoError(t, err)
req := &datapb.ImportTaskRequest{
ImportTask: &datapb.ImportTask{
CollectionId: 100,
PartitionId: 100,
ChannelNames: []string{chName1, chName2},
Files: []string{filePath},
RowBased: true,
},
}
stat, err := node.Import(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, stat.GetErrorCode())
assert.Equal(t, "", stat.GetReason())
})

t.Run("Test Import report import error", func(t *testing.T) {
node.rootCoord = &RootCoordFactory{
collectionID: 100,
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/flow_graph_insert_buffer_node.go
Expand Up @@ -306,10 +306,10 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
select {
case fmsg := <-ibNode.flushChan:

log.Info(". Receiving flush message",
log.Info("receiving flush message",
zap.Int64("segmentID", fmsg.segmentID),
zap.Int64("collectionID", fmsg.collectionID),
zap.String("vchannel name", ibNode.channelName),
zap.String("v-channel name", ibNode.channelName),
)
// merging auto&manual flush segment same segment id
dup := false
Expand Down
1 change: 1 addition & 0 deletions internal/proto/data_coord.proto
Expand Up @@ -69,6 +69,7 @@ service DataNode {
message FlushRequest {
common.MsgBase base = 1;
int64 dbID = 2;
repeated int64 segmentIDs = 3;
int64 collectionID = 4;
}

Expand Down

0 comments on commit e66ac6a

Please sign in to comment.