Skip to content

Commit

Permalink
enhance: Execute bloom filter apply in parallel to speed up segment p…
Browse files Browse the repository at this point in the history
…redict (milvus-io#33792)

issue: milvus-io#33610

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 authored and yellow-shine committed Jul 2, 2024
1 parent dbf7c7c commit c999959
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 99 deletions.
52 changes: 26 additions & 26 deletions internal/datacoord/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 58 additions & 13 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,18 @@ func (t *LevelZeroCompactionTask) splitDelta(
})

// spilt all delete data to segments

retMap := t.applyBFInParallel(allDelta, io.GetBFApplyPool(), segmentBfs)

targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
split := func(pks []storage.PrimaryKey, pkTss []uint64) {
lc := storage.NewBatchLocationsCache(pks)
for segmentID, bf := range segmentBfs {
hits := bf.BatchPkExist(lc)
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits

pks := allDelta[value.DeleteDataIdx].Pks
tss := allDelta[value.DeleteDataIdx].Tss

for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
writer, ok := targetSegBuffer[segmentID]
Expand All @@ -255,27 +262,65 @@ func (t *LevelZeroCompactionTask) splitDelta(
writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), segment.GetCollectionID())
targetSegBuffer[segmentID] = writer
}
writer.Write(pks[i], pkTss[i])
writer.Write(pks[startIdx+i], tss[startIdx+i])
}
}
}
}
return true
})

return targetSegBuffer
}

type BatchApplyRet = struct {
DeleteDataIdx int
StartIdx int
Segment2Hits map[int64][]bool
}

func (t *LevelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
// spilt all delete data to segments
for _, deleteBuffer := range allDelta {
pks := deleteBuffer.Pks
pkTss := deleteBuffer.Tss

batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {
segment2Hits := make(map[int64][]bool, 0)
lc := storage.NewBatchLocationsCache(pks)
for segmentID, bf := range segmentBfs {
hits := bf.BatchPkExist(lc)
segment2Hits[segmentID] = hits
}
return segment2Hits
}

retIdx := 0
retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]()
var futures []*conc.Future[any]
for didx, data := range deleteDatas {
pks := data.Pks
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
startIdx := idx
endIdx := startIdx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx])

retIdx += 1
tmpRetIndex := retIdx
deleteDataId := didx
future := pool.Submit(func() (any, error) {
ret := batchPredict(pks[startIdx:endIdx])
retMap.Insert(tmpRetIndex, &BatchApplyRet{
DeleteDataIdx: deleteDataId,
StartIdx: startIdx,
Segment2Hits: ret,
})
return nil, nil
})
futures = append(futures, future)
}
}
conc.AwaitAll(futures...)

return targetSegBuffer
return retMap
}

func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
Expand Down
58 changes: 58 additions & 0 deletions internal/datanode/io/io_pool.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package io

import (
"context"
"sync"
"sync/atomic"

"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand All @@ -18,6 +24,11 @@ var (
statsPoolInitOnce sync.Once
)

var (
bfApplyPool atomic.Pointer[conc.Pool[any]]
bfApplyPoolInitOnce sync.Once
)

func initIOPool() {
capacity := paramtable.Get().DataNodeCfg.IOConcurrency.GetAsInt()
if capacity > 32 {
Expand Down Expand Up @@ -58,3 +69,50 @@ func getMultiReadPool() *conc.Pool[any] {
ioPoolInitOnce.Do(initMultiReadPool)
return ioPool
}

func resizePool(pool *conc.Pool[any], newSize int, tag string) {
log := log.Ctx(context.Background()).
With(
zap.String("poolTag", tag),
zap.Int("newSize", newSize),
)

if newSize <= 0 {
log.Warn("cannot set pool size to non-positive value")
return
}

err := pool.Resize(newSize)
if err != nil {
log.Warn("failed to resize pool", zap.Error(err))
return
}
log.Info("pool resize successfully")
}

func ResizeBFApplyPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newSize := hardware.GetCPUNum() * pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
resizePool(GetBFApplyPool(), newSize, "BFApplyPool")
}
}

func initBFApplyPool() {
bfApplyPoolInitOnce.Do(func() {
pt := paramtable.Get()
poolSize := hardware.GetCPUNum() * pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
log.Info("init BFApplyPool", zap.Int("poolSize", poolSize))
pool := conc.NewPool[any](
poolSize,
)

bfApplyPool.Store(pool)
pt.Watch(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key, config.NewHandler("dn.bfapply.parallel", ResizeBFApplyPool))
})
}

func GetBFApplyPool() *conc.Pool[any] {
initBFApplyPool()
return bfApplyPool.Load()
}
33 changes: 33 additions & 0 deletions internal/datanode/io/io_pool_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io

import (
"strconv"
"sync"
"testing"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -36,3 +39,33 @@ func TestGetOrCreateIOPool(t *testing.T) {
}
wg.Wait()
}

func TestResizePools(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()

defer func() {
pt.Reset(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key)
}()

t.Run("BfApplyPool", func(t *testing.T) {
expectedCap := hardware.GetCPUNum() * pt.DataNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()

ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())

pt.Save(pt.DataNodeCfg.BloomFilterApplyParallelFactor.Key, strconv.FormatFloat(pt.DataNodeCfg.BloomFilterApplyParallelFactor.GetAsFloat()*2, 'f', 10, 64))
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())

pt.Save(pt.DataNodeCfg.BloomFilterApplyParallelFactor.Key, "0")
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())
})
}
15 changes: 15 additions & 0 deletions internal/datanode/metacache/bloom_filter_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ func (bfs *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool
return hits
}

func (bfs *BloomFilterSet) BatchPkExistWithHits(lc *storage.BatchLocationsCache, hits []bool) []bool {
bfs.mut.RLock()
defer bfs.mut.RUnlock()

if bfs.current != nil {
bfs.current.BatchPkExist(lc, hits)
}

for _, bf := range bfs.history {
bf.BatchPkExist(lc, hits)
}

return hits
}

func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {
bfs.mut.Lock()
defer bfs.mut.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions internal/datanode/metacache/bloom_filter_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func (s *BloomFilterSetSuite) TestBatchPkExist() {
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}

hits = make([]bool, lc.Size())
bfs.BatchPkExistWithHits(lc, hits)
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}
}
}

Expand Down
Loading

0 comments on commit c999959

Please sign in to comment.