Skip to content

Commit

Permalink
Implement CompleteBulkLoad feature (milvus-io#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
soothing-rain committed Aug 24, 2022
1 parent 18e5aef commit 5343bef
Show file tree
Hide file tree
Showing 46 changed files with 1,365 additions and 704 deletions.
8 changes: 8 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ rootCoord:
# seconds (24 hours).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importTaskRetention: 86400
# (in seconds) Check the building status of a task's segments' indices every `importIndexCheckInterval` seconds.
# Default 10 seconds.
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importIndexCheckInterval: 10
# (in seconds) Maximum time to wait for indices to be built on a single import task's segments.
# Default 600 seconds (10 minutes).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importIndexWaitLimit: 600

# Related configuration of proxy, used to validate client requests and reduce the returned results.
proxy:
Expand Down
32 changes: 16 additions & 16 deletions internal/core/src/pb/common.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/core/src/pb/common.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
isSegmentHealthy(segment) &&
isFlush(segment) &&
!segment.isCompacting && // not compacting now
!segment.isImporting && // not importing now
!t.segRefer.HasSegmentLock(segment.ID) // not reference
}) // m is list of chanPartSegments, which is channel-partition organized segments
for _, group := range m {
Expand Down Expand Up @@ -456,7 +457,7 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
var res []*SegmentInfo
for _, s := range segments {
if !isSegmentHealthy(s) || !isFlush(s) || s.GetInsertChannel() != channel ||
s.GetPartitionID() != partitionID || s.isCompacting || t.segRefer.HasSegmentLock(s.ID) {
s.GetPartitionID() != partitionID || s.isCompacting || s.isImporting || t.segRefer.HasSegmentLock(s.ID) {
continue
}
res = append(res, s)
Expand Down
10 changes: 4 additions & 6 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,13 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: 0,
bucketName: bucketName,
rootPath: rootPath,
})
gc.clearEtcd()
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:])
validateMinioPrefixElements(t, cli, bucketName, path.Join(rootPath, `indexes`), others)
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)

gc.close()

Expand All @@ -262,7 +261,6 @@ func Test_garbageCollector_scan(t *testing.T) {
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: 0,
bucketName: bucketName,
rootPath: rootPath,
})
gc2.clearEtcd()
Expand Down
8 changes: 8 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,14 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
m.segments.SetIsCompacting(segmentID, compacting)
}

// SetSegmentIsImporting sets the importing state for a segment.
func (m *meta) SetSegmentIsImporting(segmentID UniqueID, importing bool) {
m.Lock()
defer m.Unlock()

m.segments.SetIsImporting(segmentID, importing)
}

func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult,
canCompaction func(segment *datapb.CompactionSegmentBinlogs) bool) error {
m.Lock()
Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ func (m *mockRootCoordService) GetImportFailedSegmentIDs(ctx context.Context, re
}, nil
}

func (m *mockRootCoordService) CheckSegmentIndexReady(context.Context, *internalpb.CheckSegmentIndexReadyRequest) (*commonpb.Status, error) {
panic("implement me")
}

func (m *mockRootCoordService) CreateAlias(ctx context.Context, req *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
panic("implement me")
}
Expand Down
30 changes: 29 additions & 1 deletion internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type SegmentInfo struct {
allocations []*Allocation
lastFlushTime time.Time
isCompacting bool
isImporting bool
// a cache to avoid calculate twice
size int64
}
Expand All @@ -54,6 +55,17 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
}
}

// NewImportSegmentInfo works the same as NewSegmentInfo except that isImport is explicitly set to true.
func NewImportSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
return &SegmentInfo{
SegmentInfo: info,
currRows: 0,
allocations: make([]*Allocation, 0, 16),
lastFlushTime: time.Now().Add(-1 * flushInterval),
isImporting: true,
}
}

// NewSegmentsInfo creates a `SegmentsInfo` instance, which makes sure internal map is initialized
// note that no mutex is wrapped so external concurrent control is needed
func NewSegmentsInfo() *SegmentsInfo {
Expand Down Expand Up @@ -184,13 +196,20 @@ func (s *SegmentsInfo) AddSegmentBinlogs(segmentID UniqueID, field2Binlogs map[U
}
}

// SetIsCompacting sets compactino status for segment
// SetIsCompacting sets compaction status for segment
func (s *SegmentsInfo) SetIsCompacting(segmentID UniqueID, isCompacting bool) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetIsCompacting(isCompacting))
}
}

// SetIsImporting sets the import status for a segment.
func (s *SegmentsInfo) SetIsImporting(segmentID UniqueID, isImporting bool) {
if segment, ok := s.segments[segmentID]; ok {
s.segments[segmentID] = segment.ShadowClone(SetIsImporting(isImporting))
}
}

// Clone deep clone the segment info and return a new instance
func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
info := proto.Clone(s.SegmentInfo).(*datapb.SegmentInfo)
Expand All @@ -200,6 +219,7 @@ func (s *SegmentInfo) Clone(opts ...SegmentInfoOption) *SegmentInfo {
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
isImporting: s.isImporting,
//cannot copy size, since binlog may be changed
}
for _, opt := range opts {
Expand All @@ -216,6 +236,7 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
allocations: s.allocations,
lastFlushTime: s.lastFlushTime,
isCompacting: s.isCompacting,
isImporting: s.isImporting,
size: s.size,
}

Expand Down Expand Up @@ -313,6 +334,13 @@ func SetIsCompacting(isCompacting bool) SegmentInfoOption {
}
}

// SetIsImporting is the option to set import state for segment info.
func SetIsImporting(isImporting bool) SegmentInfoOption {
return func(segment *SegmentInfo) {
segment.isImporting = isImporting
}
}

func addSegmentBinlogs(field2Binlogs map[UniqueID][]*datapb.Binlog) SegmentInfoOption {
return func(segment *SegmentInfo) {
for fieldID, binlogPaths := range field2Binlogs {
Expand Down
7 changes: 6 additions & 1 deletion internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,12 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
MaxRowNum: int64(maxNumOfRows),
LastExpireTime: 0,
}
segment := NewSegmentInfo(segmentInfo)
var segment *SegmentInfo
if segmentState == commonpb.SegmentState_Importing {
segment = NewImportSegmentInfo(segmentInfo)
} else {
segment = NewSegmentInfo(segmentInfo)
}
if err := s.meta.AddSegment(segment); err != nil {
return nil, err
}
Expand Down
23 changes: 3 additions & 20 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ type Server struct {
dnEventCh <-chan *sessionutil.SessionEvent
//icEventCh <-chan *sessionutil.SessionEvent
qcEventCh <-chan *sessionutil.SessionEvent
rcEventCh <-chan *sessionutil.SessionEvent

dataNodeCreator dataNodeCreatorFunc
rootCoordClientCreator rootCoordCreatorFunc
Expand Down Expand Up @@ -384,9 +383,9 @@ func (s *Server) initGarbageCollection() error {
}

s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, s.rootCoordClient, GcOption{
cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
rootPath: Params.MinioCfg.RootPath,
cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
rootPath: Params.MinioCfg.RootPath,

checkInterval: Params.DataCoordCfg.GCInterval,
missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
Expand Down Expand Up @@ -456,16 +455,6 @@ func (s *Server) initServiceDiscovery() error {
}
s.qcEventCh = s.session.WatchServices(typeutil.QueryCoordRole, qcRevision+1, nil)

rcSessions, rcRevision, err := s.session.GetSessions(typeutil.RootCoordRole)
if err != nil {
log.Error("DataCoord get RootCoord session failed", zap.Error(err))
return err
}
for _, session := range rcSessions {
serverIDs = append(serverIDs, session.ServerID)
}
s.rcEventCh = s.session.WatchServices(typeutil.RootCoordRole, rcRevision+1, nil)

s.segReferManager, err = NewSegmentReferenceManager(s.kvClient, serverIDs)
return err
}
Expand Down Expand Up @@ -749,12 +738,6 @@ func (s *Server) watchService(ctx context.Context) {
return
}
s.processSessionEvent(ctx, "QueryCoord", event)
case event, ok := <-s.rcEventCh:
if !ok {
s.stopServiceWatch()
return
}
s.processSessionEvent(ctx, "RootCoord", event)
}
}
}
Expand Down
Loading

0 comments on commit 5343bef

Please sign in to comment.