From e76031383af3ee844a07cab3b2b63bcb70d31989 Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Tue, 11 Jun 2024 02:21:56 -0400 Subject: [PATCH 1/9] fix: sync partitiion stats blocking balance task(#33741) (#33742) related: #33741 Signed-off-by: MrPresent-Han Co-authored-by: wayblink Signed-off-by: wayblink --- internal/datacoord/handler.go | 1 + internal/querycoordv2/checkers/balance_checker.go | 2 +- internal/querycoordv2/checkers/leader_checker_test.go | 6 +----- internal/querycoordv2/task/action.go | 4 ++++ tests/integration/balance/balance_test.go | 3 --- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index ac4fd0dff358..199e6ebd68eb 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -134,6 +134,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . zap.String("channel", channel.GetName()), zap.Int("numOfSegments", len(segments)), zap.Int("indexed segment", len(indexedSegments)), + zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion), ) unIndexedIDs := make(typeutil.UniqueSet) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index d300bda51f2b..86cfb064534c 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -129,7 +129,7 @@ func (b *BalanceChecker) replicasToBalance() []int64 { hasUnbalancedCollection := false for _, cid := range loadedCollections { if b.normalBalanceCollectionsCurrentRound.Contain(cid) { - log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round", + log.Debug("ScoreBasedBalancer is balancing this collection, skip balancing in this round", zap.Int64("collectionID", cid)) continue } diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index f644cda5bc74..0d2249b14ad1 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -75,7 +75,7 @@ func (suite *LeaderCheckerTestSuite) SetupTest() { distManager := meta.NewDistributionManager() targetManager := meta.NewTargetManager(suite.broker, suite.meta) - suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr, false) + suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr) } func (suite *LeaderCheckerTestSuite) TearDownTest() { @@ -476,10 +476,6 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() { func (suite *LeaderCheckerTestSuite) TestUpdatePartitionStats() { testChannel := "test-insert-channel" - suite.checker.enableSyncPartitionStats = true - defer func() { - suite.checker.enableSyncPartitionStats = false - }() leaderID := int64(2) observer := suite.checker observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 3948b2e98eba..54823a988fa4 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -17,6 +17,8 @@ package task import ( + "reflect" + "github.com/samber/lo" "go.uber.org/atomic" @@ -234,6 +236,8 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool { return action.rpcReturned.Load() && dist != nil && dist.NodeID == action.Node() case ActionTypeReduce: return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node()) + case ActionTypeUpdate: + return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions)) } return false } diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index 9f0321d6f638..b0df436e6843 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -53,15 +53,12 @@ func (s *BalanceTestSuit) SetupSuite() { // disable compaction paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") - // todo @wayblink repair this test - // paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key, "false") s.Require().NoError(s.SetupEmbedEtcd()) } func (s *BalanceTestSuit) TearDownSuite() { defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) - // defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key) s.MiniClusterSuite.TearDownSuite() } From ba273271fd874c04928ac72b7f4edbb9f4c1f0ef Mon Sep 17 00:00:00 2001 From: wayblink Date: Thu, 13 Jun 2024 19:27:57 +0800 Subject: [PATCH 2/9] fix: Fix meta prefix overlap bug (#33830) #30633 Signed-off-by: wayblink --- internal/metastore/kv/datacoord/constant.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/metastore/kv/datacoord/constant.go b/internal/metastore/kv/datacoord/constant.go index 88038120622f..56fc47071580 100644 --- a/internal/metastore/kv/datacoord/constant.go +++ b/internal/metastore/kv/datacoord/constant.go @@ -30,7 +30,7 @@ const ( CompactionTaskPrefix = MetaPrefix + "/compaction-task" AnalyzeTaskPrefix = MetaPrefix + "/analyze-task" PartitionStatsInfoPrefix = MetaPrefix + "/partition-stats" - PartitionStatsCurrentVersionPrefix = MetaPrefix + "/partition-stats-current-version" + PartitionStatsCurrentVersionPrefix = MetaPrefix + "/current-partition-stats-version" NonRemoveFlagTomestone = "non-removed" RemoveFlagTomestone = "removed" From e0b24a0c0f791e4f6a20f1a1165ada78f7589670 Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 18 Jun 2024 16:53:58 +0800 Subject: [PATCH 3/9] fix: Small fixs of major compaction (#33929) #30633 Signed-off-by: wayblink --- internal/datacoord/compaction_task_clustering.go | 2 ++ internal/datanode/compaction/clustering_compactor.go | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index eaf7117fd03a..063c4ead831a 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -177,6 +177,8 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP func (t *clusteringCompactionTask) processPipelining() error { log := log.With(zap.Int64("triggerID", t.TriggerID), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("planID", t.GetPlanID())) + ts := time.Now().UnixMilli() + t.updateAndSaveTaskMeta(setStartTime(ts)) var operators []UpdateOperator for _, segID := range t.InputSegments { operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2)) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 536994781643..c7fd895e7642 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -23,6 +23,7 @@ import ( "path" "sort" "strconv" + "strings" "sync" "time" @@ -782,7 +783,8 @@ func (t *clusteringCompactionTask) uploadPartitionStats(ctx context.Context, col if err != nil { return err } - newStatsPath := path.Join(common.PartitionStatsPath, metautil.JoinIDPath(collectionID, partitionID), t.plan.GetChannel(), strconv.FormatInt(version, 10)) + rootPath := strings.Split(t.plan.AnalyzeResultPath, common.AnalyzeStatsPath)[0] + newStatsPath := path.Join(rootPath, common.PartitionStatsPath, metautil.JoinIDPath(collectionID, partitionID), t.plan.GetChannel(), strconv.FormatInt(version, 10)) kv := map[string][]byte{ newStatsPath: partitionStatsBytes, } From 2fa124bb622d3fb3b64c284d287f683237833291 Mon Sep 17 00:00:00 2001 From: wayblink Date: Fri, 21 Jun 2024 17:30:01 +0800 Subject: [PATCH 4/9] fix: Fix memory buffer error & some renaming (#33850) --------- Signed-off-by: wayblink --- .../compaction/clustering_compactor.go | 162 ++++++++++-------- internal/datanode/compaction/mix_compactor.go | 4 +- .../datanode/compaction/segment_writer.go | 8 + internal/storage/serde.go | 4 + 4 files changed, 106 insertions(+), 72 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index c7fd895e7642..713af63997ac 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -61,19 +61,21 @@ type clusteringCompactionTask struct { binlogIO io.BinlogIO allocator allocator.Allocator - ctx context.Context - cancel context.CancelFunc - done chan struct{} - tr *timerecord.TimeRecorder + ctx context.Context + cancel context.CancelFunc + done chan struct{} + tr *timerecord.TimeRecorder + mappingPool *conc.Pool[any] + flushPool *conc.Pool[any] plan *datapb.CompactionPlan - // schedule - spillChan chan SpillSignal - pool *conc.Pool[any] + // flush + flushMutex sync.Mutex + flushCount *atomic.Int64 + flushChan chan SpillSignal // metrics - spillCount *atomic.Int64 writtenRowNum *atomic.Int64 // inner field @@ -84,7 +86,6 @@ type clusteringCompactionTask struct { clusteringKeyField *schemapb.FieldSchema primaryKeyField *schemapb.FieldSchema - spillMutex sync.Mutex memoryBufferSize int64 clusterBuffers []*ClusterBuffer clusterBufferLocks *lock.KeyLock[int] @@ -101,7 +102,7 @@ type ClusterBuffer struct { writer *SegmentWriter bufferRowNum atomic.Int64 - flushedRowNum int64 + flushedRowNum atomic.Int64 flushedBinlogs map[typeutil.UniqueID]*datapb.FieldBinlog uploadedSegments []*datapb.CompactionSegment @@ -129,10 +130,10 @@ func NewClusteringCompactionTask( plan: plan, tr: timerecord.NewTimeRecorder("clustering_compaction"), done: make(chan struct{}, 1), - spillChan: make(chan SpillSignal, 100), + flushChan: make(chan SpillSignal, 100), clusterBuffers: make([]*ClusterBuffer, 0), clusterBufferLocks: lock.NewKeyLock[int](), - spillCount: atomic.NewInt64(0), + flushCount: atomic.NewInt64(0), writtenRowNum: atomic.NewInt64(0), } } @@ -179,7 +180,8 @@ func (t *clusteringCompactionTask) init() error { t.currentTs = tsoutil.GetCurrentTime() t.memoryBufferSize = t.getMemoryBufferSize() workerPoolSize := t.getWorkerPoolSize() - t.pool = conc.NewPool[any](workerPoolSize) + t.mappingPool = conc.NewPool[any](workerPoolSize) + t.flushPool = conc.NewPool[any](workerPoolSize) log.Info("clustering compaction task initialed", zap.Int64("memory_buffer_size", t.memoryBufferSize), zap.Int("worker_pool_size", workerPoolSize)) return nil } @@ -253,7 +255,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro metrics.DataNodeCompactionLatency. WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()). Observe(float64(t.tr.ElapseSpan().Milliseconds())) - log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan())) + log.Info("Clustering compaction finished", zap.Duration("elapse", t.tr.ElapseSpan()), zap.Int64("flushTimes", t.flushCount.Load())) return planResult, nil } @@ -345,8 +347,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, inputSegments := t.plan.GetSegmentBinlogs() mapStart := time.Now() - // start spill goroutine - go t.backgroundSpill(ctx) + // start flush goroutine + go t.backgroundFlush(ctx) futures := make([]*conc.Future[any], 0, len(inputSegments)) for _, segment := range inputSegments { @@ -355,7 +357,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, // only FieldBinlogs needed FieldBinlogs: segment.FieldBinlogs, } - future := t.pool.Submit(func() (any, error) { + future := t.mappingPool.Submit(func() (any, error) { err := t.mappingSegment(ctx, segmentClone, deltaPk2Ts) return struct{}{}, err }) @@ -365,8 +367,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return nil, nil, err } - // force spill all buffers - err := t.spillAll(ctx) + // force flush all buffers + err := t.flushAll(ctx) if err != nil { return nil, nil, err } @@ -405,7 +407,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context, return resultSegments, resultPartitionStats, nil } -func (t *clusteringCompactionTask) getWrittenMemoryBufferSize() int64 { +func (t *clusteringCompactionTask) getUsedMemoryBufferSize() int64 { var totalBufferSize int64 = 0 for _, buffer := range t.clusterBuffers { totalBufferSize = totalBufferSize + int64(buffer.writer.WrittenMemorySize()) @@ -539,22 +541,21 @@ func (t *clusteringCompactionTask) mappingSegment( } remained++ - // currentSize := t.totalBufferSize.Load() - if (remained+1)%20 == 0 { - currentBufferSize := t.getWrittenMemoryBufferSize() - // trigger spill - if clusterBuffer.bufferRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { + if (remained+1)%100 == 0 { + currentBufferSize := t.getUsedMemoryBufferSize() + // trigger flushBinlog + if clusterBuffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() || clusterBuffer.writer.IsFull() { // reach segment/binlog max size - t.spillChan <- SpillSignal{ + t.flushChan <- SpillSignal{ buffer: clusterBuffer, } - } else if currentBufferSize >= t.getMemoryBufferMiddleWatermark() { - // reach spill trigger threshold - t.spillChan <- SpillSignal{} + } else if currentBufferSize >= t.getMemoryBufferHighWatermark() { + // reach flushBinlog trigger threshold + t.flushChan <- SpillSignal{} } - // if the total buffer size is too large, block here, wait for memory release by spill - if currentBufferSize > t.getMemoryBufferHighWatermark() { + // if the total buffer size is too large, block here, wait for memory release by flushBinlog + if currentBufferSize > t.getMemoryBufferBlockSpillThreshold() { loop: for { select { @@ -565,8 +566,8 @@ func (t *clusteringCompactionTask) mappingSegment( log.Warn("stop waiting for memory buffer release as task chan done") return nil default: - currentSize := t.getWrittenMemoryBufferSize() - if currentSize < t.getMemoryBufferMiddleWatermark() { + currentSize := t.getUsedMemoryBufferSize() + if currentSize < t.getMemoryBufferLowWatermark() { break loop } time.Sleep(time.Millisecond * 200) @@ -614,15 +615,19 @@ func (t *clusteringCompactionTask) getMemoryBufferSize() int64 { return int64(float64(hardware.GetMemoryCount()) * paramtable.Get().DataNodeCfg.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) } -func (t *clusteringCompactionTask) getMemoryBufferMiddleWatermark() int64 { - return int64(float64(t.memoryBufferSize) * 0.5) +func (t *clusteringCompactionTask) getMemoryBufferLowWatermark() int64 { + return int64(float64(t.memoryBufferSize) * 0.3) } func (t *clusteringCompactionTask) getMemoryBufferHighWatermark() int64 { return int64(float64(t.memoryBufferSize) * 0.9) } -func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) { +func (t *clusteringCompactionTask) getMemoryBufferBlockSpillThreshold() int64 { + return t.memoryBufferSize +} + +func (t *clusteringCompactionTask) backgroundFlush(ctx context.Context) { for { select { case <-ctx.Done(): @@ -631,63 +636,68 @@ func (t *clusteringCompactionTask) backgroundSpill(ctx context.Context) { case <-t.done: log.Info("clustering compaction task done") return - case signal := <-t.spillChan: + case signal := <-t.flushChan: var err error if signal.buffer == nil { - err = t.spillLargestBuffers(ctx) + err = t.flushLargestBuffers(ctx) } else { err = func() error { t.clusterBufferLocks.Lock(signal.buffer.id) defer t.clusterBufferLocks.Unlock(signal.buffer.id) - return t.spill(ctx, signal.buffer) + return t.flushBinlog(ctx, signal.buffer) }() } if err != nil { - log.Warn("fail to spill data", zap.Error(err)) + log.Warn("fail to flushBinlog data", zap.Error(err)) // todo handle error } } } } -func (t *clusteringCompactionTask) spillLargestBuffers(ctx context.Context) error { - // only one spillLargestBuffers or spillAll should do at the same time - t.spillMutex.Lock() - defer t.spillMutex.Unlock() +func (t *clusteringCompactionTask) flushLargestBuffers(ctx context.Context) error { + // only one flushLargestBuffers or flushAll should do at the same time + getLock := t.flushMutex.TryLock() + if !getLock { + return nil + } + defer t.flushMutex.Unlock() bufferIDs := make([]int, 0) for _, buffer := range t.clusterBuffers { bufferIDs = append(bufferIDs, buffer.id) } sort.Slice(bufferIDs, func(i, j int) bool { - return t.clusterBuffers[i].writer.GetRowNum() > t.clusterBuffers[j].writer.GetRowNum() + return t.clusterBuffers[i].bufferRowNum.Load() > t.clusterBuffers[j].bufferRowNum.Load() }) - for index, bufferId := range bufferIDs { + log.Info("start flushLargestBuffers", zap.Ints("bufferIDs", bufferIDs)) + for _, bufferId := range bufferIDs { err := func() error { t.clusterBufferLocks.Lock(bufferId) defer t.clusterBufferLocks.Unlock(bufferId) - return t.spill(ctx, t.clusterBuffers[bufferId]) + return t.flushBinlog(ctx, t.clusterBuffers[bufferId]) }() if err != nil { return err } - if index >= len(bufferIDs) { + if t.getUsedMemoryBufferSize() <= t.getMemoryBufferLowWatermark() { + log.Info("reach memory low water mark", zap.Int64("memoryBufferSize", t.getUsedMemoryBufferSize())) break } } return nil } -func (t *clusteringCompactionTask) spillAll(ctx context.Context) error { - // only one spillLargestBuffers or spillAll should do at the same time - t.spillMutex.Lock() - defer t.spillMutex.Unlock() +func (t *clusteringCompactionTask) flushAll(ctx context.Context) error { + // only one flushLargestBuffers or flushAll should do at the same time + t.flushMutex.Lock() + defer t.flushMutex.Unlock() for _, buffer := range t.clusterBuffers { err := func() error { t.clusterBufferLocks.Lock(buffer.id) defer t.clusterBufferLocks.Unlock(buffer.id) - err := t.spill(ctx, buffer) + err := t.flushBinlog(ctx, buffer) if err != nil { - log.Error("spill fail") + log.Error("flushBinlog fail") return err } err = t.packBufferToSegment(ctx, buffer) @@ -708,16 +718,16 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff for _, fieldBinlog := range buffer.flushedBinlogs { insertLogs = append(insertLogs, fieldBinlog) } - statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum) + statPaths, err := statSerializeWrite(ctx, t.binlogIO, t.allocator, buffer.writer, buffer.flushedRowNum.Load()) if err != nil { return err } - // pack current spill data into a segment + // pack current flushBinlog data into a segment seg := &datapb.CompactionSegment{ PlanID: t.plan.GetPlanID(), SegmentID: buffer.writer.GetSegmentID(), - NumOfRows: buffer.flushedRowNum, + NumOfRows: buffer.flushedRowNum.Load(), InsertLogs: insertLogs, Field2StatslogPaths: []*datapb.FieldBinlog{statPaths}, Channel: t.plan.GetChannel(), @@ -725,27 +735,39 @@ func (t *clusteringCompactionTask) packBufferToSegment(ctx context.Context, buff buffer.uploadedSegments = append(buffer.uploadedSegments, seg) segmentStats := storage.SegmentStats{ FieldStats: []storage.FieldStats{buffer.clusteringKeyFieldStats.Clone()}, - NumRows: int(buffer.flushedRowNum), + NumRows: int(buffer.flushedRowNum.Load()), } buffer.uploadedSegmentStats[buffer.writer.GetSegmentID()] = segmentStats // refresh t.refreshBufferWriter(buffer) - buffer.flushedRowNum = 0 + buffer.flushedRowNum.Store(0) buffer.flushedBinlogs = make(map[typeutil.UniqueID]*datapb.FieldBinlog, 0) - log.Info("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("seg", seg.String()), zap.Any("segStats", segmentStats)) + for _, binlog := range seg.InsertLogs { + log.Debug("pack binlog in segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.String("binlog", binlog.String())) + } + log.Debug("finish pack segment", zap.Int64("partitionID", t.partitionID), zap.Int64("segID", buffer.writer.GetSegmentID()), zap.Any("segStats", segmentStats)) return nil } -func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuffer) error { - log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load())) +func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *ClusterBuffer) error { + log := log.With(zap.Int("bufferID", buffer.id), zap.Int64("bufferSize", buffer.bufferRowNum.Load()), zap.Int64("segmentID", buffer.writer.GetSegmentID())) if buffer.writer.IsEmpty() { return nil } - kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer) - if err != nil { - log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) + + future := t.flushPool.Submit(func() (any, error) { + kvs, partialBinlogs, err := serializeWrite(ctx, t.allocator, buffer.writer) + if err != nil { + log.Warn("compact wrong, failed to serialize writer", zap.Error(err)) + return typeutil.NewPair(kvs, partialBinlogs), err + } + return typeutil.NewPair(kvs, partialBinlogs), nil + }) + if err := conc.AwaitAll(future); err != nil { return err } + kvs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).A + partialBinlogs := future.Value().(typeutil.Pair[map[string][]byte, map[int64]*datapb.FieldBinlog]).B if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) @@ -760,14 +782,14 @@ func (t *clusteringCompactionTask) spill(ctx context.Context, buffer *ClusterBuf } buffer.flushedBinlogs[fID] = tmpBinlog } - buffer.flushedRowNum = buffer.flushedRowNum + buffer.bufferRowNum.Load() + buffer.flushedRowNum.Add(buffer.bufferRowNum.Load()) // clean buffer buffer.bufferRowNum.Store(0) - t.spillCount.Inc() - log.Info("finish spill binlogs", zap.Int64("spillCount", t.spillCount.Load())) - if buffer.flushedRowNum > t.plan.GetMaxSegmentRows() { + t.flushCount.Inc() + log.Info("finish flush binlogs", zap.Int64("flushCount", t.flushCount.Load())) + if buffer.flushedRowNum.Load() > t.plan.GetMaxSegmentRows() { if err := t.packBufferToSegment(ctx, buffer); err != nil { return err } @@ -817,7 +839,7 @@ func (t *clusteringCompactionTask) scalarAnalyze(ctx context.Context) (map[inter CollectionID: segment.CollectionID, PartitionID: segment.PartitionID, } - future := t.pool.Submit(func() (any, error) { + future := t.mappingPool.Submit(func() (any, error) { analyzeResult, err := t.scalarAnalyzeSegment(ctx, segmentClone) mutex.Lock() defer mutex.Unlock() diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index b647584f82b3..f46ef87724da 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -193,7 +193,7 @@ func (t *mixCompactionTask) merge( unflushedRowCount++ remainingRowCount++ - if (unflushedRowCount+1)%100 == 0 && writer.IsFull() { + if (unflushedRowCount+1)%100 == 0 && writer.FlushAndIsFull() { serWriteStart := time.Now() kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer) if err != nil { @@ -214,7 +214,7 @@ func (t *mixCompactionTask) merge( } } - if !writer.IsEmpty() { + if !writer.FlushAndIsEmpty() { serWriteStart := time.Now() kvs, partialBinlogs, err := serializeWrite(ctx, t.Allocator, writer) if err != nil { diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index be4265440343..c4fb51c8c80d 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -150,11 +150,19 @@ func (w *SegmentWriter) Finish(actualRowCount int64) (*storage.Blob, error) { } func (w *SegmentWriter) IsFull() bool { + return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() +} + +func (w *SegmentWriter) FlushAndIsFull() bool { w.writer.Flush() return w.writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() } func (w *SegmentWriter) IsEmpty() bool { + return w.writer.WrittenMemorySize() == 0 +} + +func (w *SegmentWriter) FlushAndIsEmpty() bool { w.writer.Flush() return w.writer.WrittenMemorySize() == 0 } diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 1474fca5a579..6360b4a821de 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -24,6 +24,7 @@ import ( "math" "sort" "strconv" + "sync" "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" @@ -789,6 +790,7 @@ type SerializeWriter[T any] struct { rw RecordWriter serializer Serializer[T] batchSize int + mu sync.Mutex buffer []T pos int @@ -796,6 +798,8 @@ type SerializeWriter[T any] struct { } func (sw *SerializeWriter[T]) Flush() error { + sw.mu.Lock() + defer sw.mu.Unlock() if sw.pos == 0 { return nil } From 7dff037a4a1a69fa8ccc21767727ac440bc0b9f8 Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Sun, 23 Jun 2024 22:16:02 -0400 Subject: [PATCH 5/9] fix: sync part stats task cannot be finished(#30376) (#34027) related: #30376 also: refine log output for query_coord task by rephrasing action string Signed-off-by: MrPresent-Han Co-authored-by: MrPresent-Han Co-authored-by: wayblink Signed-off-by: wayblink --- internal/querycoordv2/task/action.go | 27 +++++++++++++++++-- internal/querycoordv2/task/task.go | 11 ++------ internal/querynodev2/delegator/delegator.go | 3 ++- .../querynodev2/delegator/segment_pruner.go | 7 ++--- internal/storage/partition_stats.go | 3 +++ pkg/common/map.go | 13 +++++++++ pkg/common/map_test.go | 22 +++++++++++++++ 7 files changed, 71 insertions(+), 15 deletions(-) diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 54823a988fa4..c0229c4065b1 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -17,13 +17,14 @@ package task import ( - "reflect" + "fmt" "github.com/samber/lo" "go.uber.org/atomic" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -50,6 +51,7 @@ type Action interface { Node() int64 Type() ActionType IsFinished(distMgr *meta.DistributionManager) bool + String() string } type BaseAction struct { @@ -78,6 +80,10 @@ func (action *BaseAction) Shard() string { return action.shard } +func (action *BaseAction) String() string { + return fmt.Sprintf(`{[type=%v][node=%d][shard=%v]}`, action.Type(), action.Node(), action.Shard()) +} + type SegmentAction struct { *BaseAction @@ -153,6 +159,10 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool return true } +func (action *SegmentAction) String() string { + return action.BaseAction.String() + fmt.Sprintf(`{[segmentID=%d][scope=%d]}`, action.SegmentID(), action.Scope()) +} + type ChannelAction struct { *BaseAction } @@ -218,6 +228,19 @@ func (action *LeaderAction) Version() typeutil.UniqueID { return action.version } +func (action *LeaderAction) PartStats() map[int64]int64 { + return action.partStatsVersions +} + +func (action *LeaderAction) String() string { + partStatsStr := "" + if action.PartStats() != nil { + partStatsStr = fmt.Sprintf("%v", action.PartStats()) + } + return action.BaseAction.String() + fmt.Sprintf(`{[leaderID=%v][segmentID=%d][version=%d][partStats=%s]}`, + action.GetLeaderID(), action.SegmentID(), action.Version(), partStatsStr) +} + func (action *LeaderAction) GetLeaderID() typeutil.UniqueID { return action.leaderID } @@ -237,7 +260,7 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool { case ActionTypeReduce: return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node()) case ActionTypeUpdate: - return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions)) + return action.rpcReturned.Load() && common.MapEquals(action.partStatsVersions, view.PartitionStatsVersions) } return false } diff --git a/internal/querycoordv2/task/task.go b/internal/querycoordv2/task/task.go index d3f6d205aa99..ed7431a539b7 100644 --- a/internal/querycoordv2/task/task.go +++ b/internal/querycoordv2/task/task.go @@ -280,15 +280,8 @@ func (task *baseTask) SetReason(reason string) { func (task *baseTask) String() string { var actionsStr string - for i, action := range task.actions { - if realAction, ok := action.(*SegmentAction); ok { - actionsStr += fmt.Sprintf(`{[type=%v][node=%d][streaming=%v]}`, action.Type(), action.Node(), realAction.Scope() == querypb.DataScope_Streaming) - } else { - actionsStr += fmt.Sprintf(`{[type=%v][node=%d]}`, action.Type(), action.Node()) - } - if i != len(task.actions)-1 { - actionsStr += ", " - } + for _, action := range task.actions { + actionsStr += action.String() + "," } return fmt.Sprintf( "[id=%d] [type=%s] [source=%s] [reason=%s] [collectionID=%d] [replicaID=%d] [resourceGroup=%s] [priority=%s] [actionsCount=%d] [actions=%s]", diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 64cb5edf3568..8db4345f7b59 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -825,7 +825,8 @@ func (sd *shardDelegator) loadPartitionStats(ctx context.Context, partStatsVersi defer sd.partitionStatsMut.Unlock() sd.partitionStats[partID] = partStats }() - log.Info("Updated partitionStats for partition", zap.Int64("partitionID", partID)) + log.Info("Updated partitionStats for partition", zap.Int64("collectionID", sd.collectionID), zap.Int64("partitionID", partID), + zap.Int64("newVersion", newVersion), zap.Int64("oldVersion", curStats.GetVersion())) } } diff --git a/internal/querynodev2/delegator/segment_pruner.go b/internal/querynodev2/delegator/segment_pruner.go index a8155ce7c7c8..a1ea6129cb3c 100644 --- a/internal/querynodev2/delegator/segment_pruner.go +++ b/internal/querynodev2/delegator/segment_pruner.go @@ -85,12 +85,12 @@ func PruneSegments(ctx context.Context, plan := planpb.PlanNode{} err := proto.Unmarshal(expr, &plan) if err != nil { - log.Error("failed to unmarshall serialized expr from bytes, failed the operation") + log.Ctx(ctx).Error("failed to unmarshall serialized expr from bytes, failed the operation") return } expr, err := exprutil.ParseExprFromPlan(&plan) if err != nil { - log.Error("failed to parse expr from plan, failed the operation") + log.Ctx(ctx).Error("failed to parse expr from plan, failed the operation") return } targetRanges, matchALL := exprutil.ParseRanges(expr, exprutil.ClusteringKey) @@ -123,7 +123,8 @@ func PruneSegments(ctx context.Context, metrics.QueryNodeSegmentPruneRatio. WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(typeutil.IsVectorType(clusteringKeyField.GetDataType()))). Observe(float64(realFilteredSegments / totalSegNum)) - log.Debug("Pruned segment for search/query", + log.Ctx(ctx).Debug("Pruned segment for search/query", + zap.Int("filtered_segment_num[stats]", len(filteredSegments)), zap.Int("filtered_segment_num[excluded]", realFilteredSegments), zap.Int("total_segment_num", totalSegNum), zap.Float32("filtered_ratio", float32(realFilteredSegments)/float32(totalSegNum)), diff --git a/internal/storage/partition_stats.go b/internal/storage/partition_stats.go index 7a49953eb918..d7e3893e8097 100644 --- a/internal/storage/partition_stats.go +++ b/internal/storage/partition_stats.go @@ -46,6 +46,9 @@ func NewPartitionStatsSnapshot() *PartitionStatsSnapshot { } func (ps *PartitionStatsSnapshot) GetVersion() int64 { + if ps == nil { + return 0 + } return ps.Version } diff --git a/pkg/common/map.go b/pkg/common/map.go index e5c9d2162133..4c9def2aa4ff 100644 --- a/pkg/common/map.go +++ b/pkg/common/map.go @@ -22,3 +22,16 @@ func (m Str2Str) Equal(other Str2Str) bool { func CloneStr2Str(m Str2Str) Str2Str { return m.Clone() } + +func MapEquals(m1, m2 map[int64]int64) bool { + if len(m1) != len(m2) { + return false + } + for k1, v1 := range m1 { + v2, exist := m2[k1] + if !exist || v1 != v2 { + return false + } + } + return true +} diff --git a/pkg/common/map_test.go b/pkg/common/map_test.go index 2703609f653a..e84065d4741f 100644 --- a/pkg/common/map_test.go +++ b/pkg/common/map_test.go @@ -35,3 +35,25 @@ func TestCloneStr2Str(t *testing.T) { }) } } + +func TestMapEqual(t *testing.T) { + { + m1 := map[int64]int64{1: 11, 2: 22, 3: 33} + m2 := map[int64]int64{1: 11, 2: 22, 3: 33} + assert.True(t, MapEquals(m1, m2)) + } + { + m1 := map[int64]int64{1: 11, 2: 23, 3: 33} + m2 := map[int64]int64{1: 11, 2: 22, 3: 33} + assert.False(t, MapEquals(m1, m2)) + } + { + m1 := map[int64]int64{1: 11, 2: 23, 3: 33} + m2 := map[int64]int64{1: 11, 2: 22} + assert.False(t, MapEquals(m1, m2)) + } + { + m1 := map[int64]int64{1: 11, 2: 23, 3: 33} + assert.False(t, MapEquals(m1, nil)) + } +} From b1dc35e23d942b84a4384f74fbd0ac9a3e7587dc Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 25 Jun 2024 18:52:04 +0800 Subject: [PATCH 6/9] Add an option to enable/disable vector field clustering key (#34097) #30633 Signed-off-by: wayblink --- configs/milvus.yaml | 2 + internal/proxy/task.go | 4 ++ internal/proxy/task_test.go | 35 +++++++++++++ .../delegator/segment_pruner_test.go | 1 + internal/util/clustering/clustering.go | 9 +++- pkg/util/merr/errors.go | 15 +++--- pkg/util/merr/errors_test.go | 1 + pkg/util/merr/utils.go | 10 ++++ pkg/util/paramtable/component_param.go | 49 +++++++++++-------- pkg/util/paramtable/component_param_test.go | 2 + 10 files changed, 100 insertions(+), 28 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 6c63cfb5ff29..991880ff6654 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -649,8 +649,10 @@ common: traceLogMode: 0 # trace request info bloomFilterSize: 100000 # bloom filter initial size maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter + # clustering key/compaction related usePartitionKeyAsClusteringKey: false useVectorAsClusteringKey: false + enableVectorClusteringKey: false # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 87fe4bd195a6..5824bce1037b 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -248,6 +248,10 @@ func (t *createCollectionTask) validateClusteringKey() error { idx := -1 for i, field := range t.schema.Fields { if field.GetIsClusteringKey() { + if typeutil.IsVectorType(field.GetDataType()) && + !paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() { + return merr.WrapErrCollectionVectorClusteringKeyNotAllowed(t.CollectionName) + } if idx != -1 { return merr.WrapErrCollectionIllegalSchema(t.CollectionName, fmt.Sprintf("there are more than one clustering key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name)) diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 8d80338417a1..17644e99d98d 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -3568,6 +3568,41 @@ func TestClusteringKey(t *testing.T) { err = createCollectionTask.PreExecute(ctx) assert.Error(t, err) }) + + t.Run("create collection with vector clustering key", func(t *testing.T) { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false) + clusterKeyField := &schemapb.FieldSchema{ + Name: "vec_field", + DataType: schemapb.DataType_FloatVector, + IsClusteringKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + err = createCollectionTask.PreExecute(ctx) + assert.Error(t, err) + }) } func TestAlterCollectionCheckLoaded(t *testing.T) { diff --git a/internal/querynodev2/delegator/segment_pruner_test.go b/internal/querynodev2/delegator/segment_pruner_test.go index 555d80eefc6a..a6bb8d934d8d 100644 --- a/internal/querynodev2/delegator/segment_pruner_test.go +++ b/internal/querynodev2/delegator/segment_pruner_test.go @@ -380,6 +380,7 @@ func vector2Placeholder(vectors [][]float32) *commonpb.PlaceholderValue { func (sps *SegmentPrunerSuite) TestPruneSegmentsByVectorField() { paramtable.Init() + paramtable.Get().Save(paramtable.Get().CommonCfg.EnableVectorClusteringKey.Key, "true") sps.SetupForClustering("vec", schemapb.DataType_FloatVector) vector1 := []float32{0.8877872002188053, 0.6131822285635065, 0.8476814632326242, 0.6645877829359371, 0.9962627712600025, 0.8976183052440327, 0.41941169325798844, 0.7554387854258499} vector2 := []float32{0.8644394874390322, 0.023327886647378615, 0.08330118483461302, 0.7068040179963112, 0.6983994910799851, 0.5562075958994153, 0.3288536247938002, 0.07077341010237759} diff --git a/internal/util/clustering/clustering.go b/internal/util/clustering/clustering.go index b9859922332d..20b6636bca6a 100644 --- a/internal/util/clustering/clustering.go +++ b/internal/util/clustering/clustering.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func CalcVectorDistance(dim int64, dataType schemapb.DataType, left []byte, right []float32, metric string) ([]float32, error) { @@ -70,10 +71,16 @@ func GetClusteringKeyField(collectionSchema *schemapb.CollectionSchema) *schemap // in some server mode, we regard partition key field or vector field as clustering key by default. // here is the priority: clusteringKey > partitionKey > vector field(only single vector) if clusteringKeyField != nil { + if typeutil.IsVectorType(clusteringKeyField.GetDataType()) && + !paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() { + return nil + } return clusteringKeyField } else if paramtable.Get().CommonCfg.UsePartitionKeyAsClusteringKey.GetAsBool() && partitionKeyField != nil { return partitionKeyField - } else if paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && len(vectorFields) == 1 { + } else if paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() && + paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && + len(vectorFields) == 1 { return vectorFields[0] } return nil diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index a40d041ea7e3..78034ca1ab3d 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -46,13 +46,14 @@ var ( ErrServiceResourceInsufficient = newMilvusError("service resource insufficient", 12, true) // Collection related - ErrCollectionNotFound = newMilvusError("collection not found", 100, false) - ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) - ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) - ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) - ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) - ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) - ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) + ErrCollectionNotFound = newMilvusError("collection not found", 100, false) + ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) + ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) + ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) + ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) + ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) + ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) + ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false) // Partition related ErrPartitionNotFound = newMilvusError("partition not found", 200, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 67782d1c507e..125a2e72f91a 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -87,6 +87,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded) s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded) s.ErrorIs(WrapErrCollectionOnRecovering("test_collection", "channel lost %s", "dev"), ErrCollectionOnRecovering) + s.ErrorIs(WrapErrCollectionVectorClusteringKeyNotAllowed("test_collection", "field"), ErrCollectionVectorClusteringKeyNotAllowed) // Partition related s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 1b2b21fdefbf..c61a4cc92f23 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -489,6 +489,16 @@ func WrapErrCollectionOnRecovering(collection any, msgAndArgs ...any) error { return err } +// WrapErrCollectionVectorClusteringKeyNotAllowed wraps ErrCollectionVectorClusteringKeyNotAllowed with collection +func WrapErrCollectionVectorClusteringKeyNotAllowed(collection any, msgAndArgs ...any) error { + err := wrapFields(ErrCollectionVectorClusteringKeyNotAllowed, value("collection", collection)) + if len(msgAndArgs) > 0 { + msg := msgAndArgs[0].(string) + err = errors.Wrapf(err, msg, msgAndArgs[1:]...) + } + return err +} + func WrapErrAliasNotFound(db any, alias any, msg ...string) error { err := wrapFields(ErrAliasNotFound, value("database", db), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 739753901d22..2ccc38cd4253 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -249,6 +249,7 @@ type commonConfig struct { UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"` UseVectorAsClusteringKey ParamItem `refreshable:"true"` + EnableVectorClusteringKey ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -757,7 +758,7 @@ like the old password verification when updating the credential`, p.UsePartitionKeyAsClusteringKey = ParamItem{ Key: "common.usePartitionKeyAsClusteringKey", - Version: "2.4.2", + Version: "2.4.6", Doc: "if true, do clustering compaction and segment prune on partition key field", DefaultValue: "false", } @@ -765,11 +766,19 @@ like the old password verification when updating the credential`, p.UseVectorAsClusteringKey = ParamItem{ Key: "common.useVectorAsClusteringKey", - Version: "2.4.2", + Version: "2.4.6", Doc: "if true, do clustering compaction and segment prune on vector field", DefaultValue: "false", } p.UseVectorAsClusteringKey.Init(base.mgr) + + p.EnableVectorClusteringKey = ParamItem{ + Key: "common.enableVectorClusteringKey", + Version: "2.4.6", + Doc: "if true, enable vector clustering key and vector clustering compaction", + DefaultValue: "false", + } + p.EnableVectorClusteringKey.Init(base.mgr) } type gpuConfig struct { @@ -3260,7 +3269,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionEnable = ParamItem{ Key: "dataCoord.compaction.clustering.enable", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "false", Doc: "Enable clustering compaction", Export: true, @@ -3269,7 +3278,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionAutoEnable = ParamItem{ Key: "dataCoord.compaction.clustering.autoEnable", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "false", Doc: "Enable auto clustering compaction", Export: true, @@ -3278,28 +3287,28 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTriggerInterval = ParamItem{ Key: "dataCoord.compaction.clustering.triggerInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "600", } p.ClusteringCompactionTriggerInterval.Init(base.mgr) p.ClusteringCompactionStateCheckInterval = ParamItem{ Key: "dataCoord.compaction.clustering.stateCheckInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10", } p.ClusteringCompactionStateCheckInterval.Init(base.mgr) p.ClusteringCompactionGCInterval = ParamItem{ Key: "dataCoord.compaction.clustering.gcInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "600", } p.ClusteringCompactionGCInterval.Init(base.mgr) p.ClusteringCompactionMinInterval = ParamItem{ Key: "dataCoord.compaction.clustering.minInterval", - Version: "2.4.2", + Version: "2.4.6", Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction", DefaultValue: "3600", } @@ -3307,7 +3316,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxInterval = ParamItem{ Key: "dataCoord.compaction.clustering.maxInterval", - Version: "2.4.2", + Version: "2.4.6", Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact", DefaultValue: "86400", } @@ -3315,7 +3324,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionNewDataSizeThreshold = ParamItem{ Key: "dataCoord.compaction.clustering.newDataSizeThreshold", - Version: "2.4.2", + Version: "2.4.6", Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction", DefaultValue: "512m", } @@ -3323,14 +3332,14 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTimeoutInSeconds = ParamItem{ Key: "dataCoord.compaction.clustering.timeout", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "3600", } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) p.ClusteringCompactionDropTolerance = ParamItem{ Key: "dataCoord.compaction.clustering.dropTolerance", - Version: "2.4.2", + Version: "2.4.6", Doc: "If clustering compaction job is finished for a long time, gc it", DefaultValue: "259200", } @@ -3338,7 +3347,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionPreferSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.preferSegmentSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "512m", PanicIfEmpty: false, Export: true, @@ -3347,7 +3356,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxSegmentSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "1024m", PanicIfEmpty: false, Export: true, @@ -3356,7 +3365,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxTrainSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "0.8", Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit", Export: true, @@ -3365,7 +3374,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.maxCentroidsNum", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10240", Doc: "maximum centroids number in Kmeans train", Export: true, @@ -3374,7 +3383,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.minCentroidsNum", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "16", Doc: "minimum centroids number in Kmeans train", Export: true, @@ -3383,7 +3392,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.minClusterSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "0.01", Doc: "minimum cluster size / avg size in Kmeans train", Export: true, @@ -3392,7 +3401,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10", Doc: "maximum cluster size / avg size in Kmeans train", Export: true, @@ -3401,7 +3410,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "5g", Doc: "maximum cluster size in Kmeans train", Export: true, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 21f2e0218989..8968d3ab3b52 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -542,6 +542,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.UsePartitionKeyAsClusteringKey.GetAsBool()) params.Save("common.useVectorAsClusteringKey", "true") assert.Equal(t, true, Params.UseVectorAsClusteringKey.GetAsBool()) + params.Save("common.enableVectorClusteringKey", "true") + assert.Equal(t, true, Params.EnableVectorClusteringKey.GetAsBool()) }) } From b77d01d3674ea0eccf3001e60ab476eb10b8765e Mon Sep 17 00:00:00 2001 From: wayblink Date: Wed, 26 Jun 2024 10:24:03 +0800 Subject: [PATCH 7/9] fix: fix error ignore in compactor (#34169) #34170 Signed-off-by: wayblink --- internal/datanode/compaction/clustering_compactor.go | 1 + internal/datanode/compaction/mix_compactor.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 713af63997ac..1fb6b8c3acb7 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -771,6 +771,7 @@ func (t *clusteringCompactionTask) flushBinlog(ctx context.Context, buffer *Clus if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return err } for fID, path := range partialBinlogs { diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index f46ef87724da..8a4ff77f4047 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -151,6 +151,7 @@ func (t *mixCompactionTask) merge( allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) + return nil, err } downloadTimeCost += time.Since(downloadStart) @@ -205,6 +206,7 @@ func (t *mixCompactionTask) merge( uploadStart := time.Now() if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return nil, err } uploadTimeCost += time.Since(uploadStart) mergeFieldBinlogs(allBinlogs, partialBinlogs) @@ -226,6 +228,7 @@ func (t *mixCompactionTask) merge( uploadStart := time.Now() if err := t.binlogIO.Upload(ctx, kvs); err != nil { log.Warn("compact wrong, failed to upload kvs", zap.Error(err)) + return nil, err } uploadTimeCost += time.Since(uploadStart) From fb544b79e981ea85eafe3666f07f2918021c31e1 Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Fri, 28 Jun 2024 02:04:05 -0400 Subject: [PATCH 8/9] fix:load major compaction partial result(#34051) (#34052) related: #34051 Signed-off-by: MrPresent-Han Co-authored-by: MrPresent-Han Co-authored-by: wayblink Signed-off-by: wayblink --- internal/datacoord/handler.go | 14 ++++++++++++-- internal/proxy/impl.go | 4 +++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 199e6ebd68eb..696fbf5cad64 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -146,13 +146,23 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // Skip bulk insert segments. continue } - if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion > currentPartitionStatsVersion { - // skip major compaction not fully completed. + if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion != currentPartitionStatsVersion { + // in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan + // is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be + // seen atomically, otherwise users will see intermediate result continue } segmentInfos[s.GetID()] = s switch { case s.GetState() == commonpb.SegmentState_Dropped: + if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion { + // if segment.partStatsVersion is equal to currentPartitionStatsVersion, + // it must have been indexed, this is guaranteed by clustering compaction process + // this is to ensure that the current valid L2 compaction produce is available to search/query + // to avoid insufficient data + indexedIDs.Insert(s.GetID()) + continue + } droppedIDs.Insert(s.GetID()) case !isFlushState(s.GetState()): growingIDs.Insert(s.GetID()) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index afcb7153d13d..f7098cff3386 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1857,7 +1857,9 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get log.Debug( rpcDone(method), - zap.Any("request", request)) + zap.Any("request", request), + zap.Int64("loadProgress", loadProgress), + zap.Int64("refreshProgress", refreshProgress)) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.GetLoadingProgressResponse{ From a72b9bb7a7b426b796277452a76664fb654a1c2a Mon Sep 17 00:00:00 2001 From: wayblink Date: Sun, 30 Jun 2024 20:26:07 +0800 Subject: [PATCH 9/9] Use new stream segment reader in clustering compaction (#34232) #32939 Signed-off-by: wayblink --- .../compaction/clustering_compactor.go | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 1fb6b8c3acb7..70fdaa5ee365 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -19,6 +19,7 @@ package compaction import ( "context" "fmt" + sio "io" "math" "path" "sort" @@ -481,33 +482,33 @@ func (t *clusteringCompactionTask) mappingSegment( fieldBinlogPaths = append(fieldBinlogPaths, ps) } - for _, path := range fieldBinlogPaths { - bytesArr, err := t.binlogIO.Download(ctx, path) - blobs := make([]*storage.Blob, len(bytesArr)) - var segmentSize int64 - for i := range bytesArr { - blobs[i] = &storage.Blob{Value: bytesArr[i]} - segmentSize = segmentSize + int64(len(bytesArr[i])) - } + for _, paths := range fieldBinlogPaths { + allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { - log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err)) + log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) return err } - - pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType()) + blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { + return &storage.Blob{Key: paths[i], Value: v} + }) + pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID()) if err != nil { - log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err)) + log.Warn("new insert binlogs Itr wrong", zap.Strings("paths", paths), zap.Error(err)) return err } var offset int64 = -1 - for pkIter.HasNext() { - vInter, _ := pkIter.Next() - v, ok := vInter.(*storage.Value) - if !ok { - log.Warn("transfer interface to Value wrong", zap.Strings("path", path)) - return errors.New("unexpected error") + for { + err := pkIter.Next() + if err != nil { + if err == sio.EOF { + break + } else { + log.Warn("compact wrong, failed to iter through data", zap.Error(err)) + return err + } } + v := pkIter.Value() offset++ // Filtering deleted entity @@ -524,7 +525,7 @@ func (t *clusteringCompactionTask) mappingSegment( row, ok := v.Value.(map[typeutil.UniqueID]interface{}) if !ok { - log.Warn("transfer interface to map wrong", zap.Strings("path", path)) + log.Warn("transfer interface to map wrong", zap.Strings("paths", paths)) return errors.New("unexpected error") }