Skip to content

Commit

Permalink
Squash (milvus-io#25)
Browse files Browse the repository at this point in the history
* Refine add segment logic for bulk load

Also restructured `addNewSegment`, `addNormalSegment` and `addFlushedSegment` into a single `addSegment` to avoid duplicate code block and to see more clearly how adding these segments are different.

/kind improvement

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Add garbage collection function for segments produced by partially failed imports (#6)

Signed-off-by: xingzhao <xing.zhao@zilliz.com>

Co-authored-by: xingzhao <xing.zhao@zilliz.com>

* Enhance error handle in bulk load (#5)

Signed-off-by: wayblink <anyang.wang@zilliz.com>

* Refine expiration/retention policy for import tasks (milvus-io#9)

* Persist import task info to meta before update in memory (milvus-io#12)

Signed-off-by: wayblink <anyang.wang@zilliz.com>

* Add start position for import segment (milvus-io#7)

Signed-off-by: wayblink <anyang.wang@zilliz.com>

* Refine bulkload proto (milvus-io#11)

Signed-off-by: groot <yihua.mo@zilliz.com>

* Remove queryable/indexed logic and some other refinement (milvus-io#14)

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Resolve conflicts

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Some minor tweaks

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Implement CompleteBulkLoad feature (milvus-io#19)

* Update interface and fix bug

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Fix dml start position mistake in AddSegmentReq (milvus-io#21)

Signed-off-by: wayblink <anyang.wang@zilliz.com>

Signed-off-by: wayblink <anyang.wang@zilliz.com>

* Fix getDmlChannelPositionByBroadcast (milvus-io#23)

* Fix null start position issue

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Fix merge conflicts

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Fix conflicts

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

* Bulkload enhancement (milvus-io#24)

Signed-off-by: yhmo <yihua.mo@zilliz.com>

* Add some unit tests

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>

Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
Signed-off-by: wayblink <anyang.wang@zilliz.com>
Signed-off-by: groot <yihua.mo@zilliz.com>
Signed-off-by: yhmo <yihua.mo@zilliz.com>
Co-authored-by: xing.zhao <34002927+EricStarer@users.noreply.github.com>
Co-authored-by: xingzhao <xing.zhao@zilliz.com>
Co-authored-by: wayblink <anyang.wang@zilliz.com>
Co-authored-by: groot <yihua.mo@zilliz.com>
  • Loading branch information
5 people committed Sep 20, 2022
1 parent 50ea4ee commit 6745eb4
Show file tree
Hide file tree
Showing 67 changed files with 2,971 additions and 1,254 deletions.
8 changes: 0 additions & 8 deletions configs/embedded-milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,6 @@ rootCoord:
# seconds (24 hours).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importTaskRetention: 86400
# (in seconds) During index building phase of an import task, Milvus will check the building status of a task's
# segments' indices every `importIndexCheckInterval` seconds. Default 300 seconds (5 minutes).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importIndexCheckInterval: 300
# (in seconds) Maximum time to wait before pushing flushed segments online (make them searchable) during importing.
# Default 1200 seconds (20 minutes).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importIndexWaitLimit: 1200

# Related configuration of proxy, used to validate client requests and reduce the returned results.
proxy:
Expand Down
8 changes: 0 additions & 8 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,6 @@ rootCoord:
# seconds (24 hours).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importTaskRetention: 86400
# (in seconds) Check an import task's segment loading state in queryNodes every `importSegmentStateCheckInterval`
# seconds. Default 10 seconds.
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importSegmentStateCheckInterval: 10
# (in seconds) Maximum time to wait for segments in a single import task to be loaded in queryNodes.
# Default 60 seconds (1 minute).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importSegmentStateWaitLimit: 60
# (in seconds) Check the building status of a task's segments' indices every `importIndexCheckInterval` seconds.
# Default 10 seconds.
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
Expand Down
5 changes: 0 additions & 5 deletions internal/datacoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ func (c *Cluster) ReCollectSegmentStats(ctx context.Context, nodeID int64) {
c.sessionManager.ReCollectSegmentStats(ctx, nodeID)
}

// AddSegment triggers a AddSegment call from session manager.
func (c *Cluster) AddSegment(ctx context.Context, nodeID int64, req *datapb.AddSegmentRequest) {
c.sessionManager.AddSegment(ctx, nodeID, req)
}

// GetSessions returns all sessions
func (c *Cluster) GetSessions() []*Session {
return c.sessionManager.GetSessions()
Expand Down
71 changes: 0 additions & 71 deletions internal/datacoord/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,74 +641,3 @@ func TestCluster_ReCollectSegmentStats(t *testing.T) {
time.Sleep(500 * time.Millisecond)
})
}

func TestCluster_AddSegment(t *testing.T) {
kv := getMetaKv(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()

t.Run("add segment succeed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var mockSessionCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(1, nil)
}
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)

err = cluster.Watch("chan-1", 1)
assert.NoError(t, err)

assert.NotPanics(t, func() {
cluster.AddSegment(ctx, 1, &datapb.AddSegmentRequest{
Base: &commonpb.MsgBase{
SourceID: 0,
},
})
})
time.Sleep(500 * time.Millisecond)
})

t.Run("add segment failed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.Nil(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.Nil(t, err)

err = cluster.Watch("chan-1", 1)
assert.NoError(t, err)

assert.NotPanics(t, func() {
cluster.AddSegment(ctx, 1, &datapb.AddSegmentRequest{
Base: &commonpb.MsgBase{
SourceID: 0,
},
})
})
time.Sleep(500 * time.Millisecond)
})
}
5 changes: 3 additions & 2 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
isSegmentHealthy(segment) &&
isFlush(segment) &&
IsParentDropped(t.meta, segment) &&
!segment.isCompacting // not compacting now
!segment.isCompacting && // not compacting now
!segment.isImporting // not importing now
}) // m is list of chanPartSegments, which is channel-partition organized segments
for _, group := range m {
if !signal.isForce && t.compactionHandler.isFull() {
Expand Down Expand Up @@ -469,7 +470,7 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
var res []*SegmentInfo
for _, s := range segments {
if !isSegmentHealthy(s) || !isFlush(s) || s.GetInsertChannel() != channel ||
s.GetPartitionID() != partitionID || !IsParentDropped(t.meta, s) || s.isCompacting {
s.GetPartitionID() != partitionID || !IsParentDropped(t.meta, s) || s.isCompacting || s.isImporting {
continue
}
res = append(res, s)
Expand Down
21 changes: 12 additions & 9 deletions internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package datacoord

import (
"context"
"path"
"sync"
"time"
Expand All @@ -29,7 +30,9 @@ import (
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/minio/minio-go/v7"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -59,25 +62,25 @@ type garbageCollector struct {
segRefer *SegmentReferenceManager
indexCoord types.IndexCoord

rcc types.RootCoord

startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup
closeCh chan struct{}
}

// newGarbageCollector create garbage collector with meta and option
func newGarbageCollector(meta *meta,
segRefer *SegmentReferenceManager,
indexCoord types.IndexCoord,
opt GcOption) *garbageCollector {
func newGarbageCollector(meta *meta, segRefer *SegmentReferenceManager,
indexCoord types.IndexCoord, opt GcOption) *garbageCollector {
log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval),
zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance))
return &garbageCollector{
meta: meta,
segRefer: segRefer,
meta: meta,
segRefer: segRefer,
indexCoord: indexCoord,
option: opt,
closeCh: make(chan struct{}),
option: opt,
closeCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -221,7 +224,7 @@ func (gc *garbageCollector) clearEtcd() {

func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
droptime := time.Unix(0, int64(dropts))
return time.Since(droptime) > gc.option.dropTolerance
return time.Since(droptime) >= gc.option.dropTolerance
}

func getLogs(sinfo *SegmentInfo) []*datapb.Binlog {
Expand Down
61 changes: 60 additions & 1 deletion internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func Test_garbageCollector_scan(t *testing.T) {
bucketName := `datacoord-ut` + strings.ToLower(funcutil.RandomString(8))
rootPath := `gc` + funcutil.RandomString(8)
//TODO change to Params
cli, inserts, stats, delta, others, err := initUtOSSEnv(bucketName, rootPath, 4)
cli, inserts, stats, delta, others, err := initUtOSSEnv(bucketName, rootPath, 5)
require.NoError(t, err)

mockAllocator := newMockAllocator()
Expand All @@ -120,6 +120,7 @@ func Test_garbageCollector_scan(t *testing.T) {
segRefer, err := NewSegmentReferenceManager(etcdKV, nil)
assert.NoError(t, err)
assert.NotNil(t, segRefer)
mockRootCoord := newMockRootCoordService()

indexCoord := mocks.NewMockIndexCoord(t)

Expand Down Expand Up @@ -232,6 +233,42 @@ func Test_garbageCollector_scan(t *testing.T) {

gc.close()
})
t.Run("clear import failed segments", func(t *testing.T) {
segment := buildSegment(1, 10, ImportFailedSegmentID, "ch")
segment.State = commonpb.SegmentState_Importing
segment.Binlogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, inserts[0])}
segment.Statslogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, stats[0])}
segment.Deltalogs = []*datapb.FieldBinlog{getFieldBinlogPaths(0, delta[0])}
err = meta.AddSegment(segment)
require.NoError(t, err)

gc := newGarbageCollector(meta, segRefer, mockRootCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: 0,
rootPath: rootPath,
})
gc.clearEtcd()
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)

gc.close()

gc2 := newGarbageCollector(meta, segRefer, nil, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
missingTolerance: time.Hour * 24,
dropTolerance: 0,
rootPath: rootPath,
})
gc2.clearEtcd()
gc2.close()
})
t.Run("missing gc all", func(t *testing.T) {
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
cli: cli,
Expand All @@ -244,6 +281,28 @@ func Test_garbageCollector_scan(t *testing.T) {
gc.start()
gc.scan()
gc.clearEtcd()

// bad path shall remains since datacoord cannot determine file is garbage or not if path is not valid
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, deltaLogPrefix), delta[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, `indexes`), others)

gc.close()
})

t.Run("list object with error", func(t *testing.T) {
gc := newGarbageCollector(meta, segRefer, mockRootCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
missingTolerance: 0,
dropTolerance: 0,
rootPath: rootPath,
})
gc.start()
gc.scan()

// bad path shall remains since datacoord cannot determine file is garbage or not if path is not valid
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, insertLogPrefix), inserts[1:2])
validateMinioPrefixElements(t, cli.Client, bucketName, path.Join(rootPath, statsLogPrefix), stats[1:2])
Expand Down
17 changes: 14 additions & 3 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,15 @@ func (m *meta) UpdateFlushSegmentsInfo(
m.Lock()
defer m.Unlock()

log.Info("update flush segments info", zap.Int64("segmentId", segmentID),
log.Info("update flush segments info",
zap.Int64("segmentId", segmentID),
zap.Int("binlog", len(binlogs)),
zap.Int("statslog", len(statslogs)),
zap.Int("deltalogs", len(deltalogs)),
zap.Int("stats log", len(statslogs)),
zap.Int("delta logs", len(deltalogs)),
zap.Bool("flushed", flushed),
zap.Bool("dropped", dropped),
zap.Any("check points", checkpoints),
zap.Any("start position", startPositions),
zap.Bool("importing", importing))
segment := m.segments.GetSegment(segmentID)
if importing {
Expand Down Expand Up @@ -747,6 +750,14 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
m.segments.SetIsCompacting(segmentID, compacting)
}

// SetSegmentIsImporting sets the importing state for a segment.
func (m *meta) SetSegmentIsImporting(segmentID UniqueID, importing bool) {
m.Lock()
defer m.Unlock()

m.segments.SetIsImporting(segmentID, importing)
}

func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) error {
m.Lock()
defer m.Unlock()
Expand Down
49 changes: 49 additions & 0 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,55 @@ func Test_meta_SetSegmentCompacting(t *testing.T) {
}
}

func Test_meta_SetSegmentIsImporting(t *testing.T) {
type fields struct {
client kv.TxnKV
segments *SegmentsInfo
}
type args struct {
segmentID UniqueID
isImporting bool
}
tests := []struct {
name string
fields fields
args args
}{
{
"test set segment importing",
fields{
memkv.NewMemoryKV(),
&SegmentsInfo{
map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
State: commonpb.SegmentState_Flushed,
},
isImporting: false,
},
},
},
},
args{
segmentID: 1,
isImporting: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &meta{
client: tt.fields.client,
segments: tt.fields.segments,
}
m.SetSegmentIsImporting(tt.args.segmentID, tt.args.isImporting)
segment := m.GetSegment(tt.args.segmentID)
assert.Equal(t, tt.args.isImporting, segment.isImporting)
})
}
}

func Test_meta_GetSegmentsOfCollection(t *testing.T) {
type fields struct {
segments *SegmentsInfo
Expand Down
Loading

0 comments on commit 6745eb4

Please sign in to comment.