Skip to content

Commit

Permalink
enhance: Use Blocked Bloom Filter instead of basic bloom fitler impl. (
Browse files Browse the repository at this point in the history
…#33405)

issue: #32995
To speed up the construction and querying of Bloom filters, we chose a
blocked Bloom filter instead of a basic Bloom filter implementation.

WARN: This PR is compatible with old version bf impl, but if fall back
to old milvus version, it may causes bloom filter deserialize failed.

In single Bloom filter test cases with a capacity of 1,000,000 and a
false positive rate (FPR) of 0.001, the blocked Bloom filter is 5 times
faster than the basic Bloom filter in both querying and construction, at
the cost of a 30% increase in memory usage.

- Block BF construct time	{"time": "54.128131ms"}
- Block BF size	                {"size": 3021578}
- Block BF Test cost	        {"time": "55.407352ms"}
- Basic BF construct time	{"time": "210.262183ms"}
- Basic BF size	                {"size": 2396308}
- Basic BF Test cost	        {"time": "192.596229ms"}

In multi Bloom filter test cases with a capacity of 100,000, an FPR of
0.001, and 100 Bloom filters, we reuse the primary key locations for all
Bloom filters to avoid repeated hash computations. As a result, the
blocked Bloom filter is also 5 times faster than the basic Bloom filter
in querying.

- Block BF TestLocation cost    {"time": "529.97183ms"}
- Basic BF TestLocation cost	{"time": "3.197430181s"}

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed May 31, 2024
1 parent 322a4c5 commit c6a1c49
Show file tree
Hide file tree
Showing 30 changed files with 974 additions and 539 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ require (
require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70

require (
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1
github.com/zeebo/xxh3 v1.0.2
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -209,7 +211,6 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/v2 v2.305.5 // indirect
Expand Down Expand Up @@ -250,6 +251,7 @@ replace (
github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd
github.com/expr-lang/expr => github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/greatroar/blobloom => github.com/weiliu1031/blobloom v0.0.0-20240530105622-1e0e104a7160
// github.com/milvus-io/milvus-storage/go => ../milvus-storage/go
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
Expand Down
25 changes: 23 additions & 2 deletions go.sum

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions internal/datanode/metacache/bloom_filter_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package metacache
import (
"sync"

"github.com/bits-and-blooms/bloom/v3"
"github.com/samber/lo"

"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/bloomfilter"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -76,8 +76,9 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {

if bfs.current == nil {
bfs.current = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(bfs.batchSize,
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
PkFilter: bloomfilter.NewBloomFilterWithType(bfs.batchSize,
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue()),
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/datanode/syncmgr/storage_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B
FieldID: s.pkField.GetFieldID(),
MaxPk: pks.MaxPK,
MinPk: pks.MinPK,
BFType: pks.PkFilter.Type(),
BF: pks.PkFilter,
PkType: int64(s.pkField.GetDataType()),
}
Expand Down
7 changes: 5 additions & 2 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"sync"

"github.com/bits-and-blooms/bloom/v3"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/atomic"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/bloomfilter"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand Down Expand Up @@ -383,7 +383,10 @@ type inData struct {

func (id *inData) generatePkStats() {
id.batchBF = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(uint(id.rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
PkFilter: bloomfilter.NewBloomFilterWithType(
uint(id.rowNum),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue()),
}

for _, ids := range id.pkField {
Expand Down
9 changes: 6 additions & 3 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
segmentPks, segmentTss := segment.DeleteRecords()
for i, pk := range segmentPks {
if candidate.MayPkExist(pk) {
lc := storage.NewLocationsCache(pk)
if candidate.MayPkExist(lc) {
pks = append(pks, pk)
tss = append(tss, segmentTss[i])
}
Expand Down Expand Up @@ -637,7 +638,8 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
continue
}
for i, pk := range record.DeleteData.Pks {
if candidate.MayPkExist(pk) {
lc := storage.NewLocationsCache(pk)
if candidate.MayPkExist(lc) {
deleteData.Append(pk, record.DeleteData.Tss[i])
}
}
Expand Down Expand Up @@ -733,7 +735,8 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
}

for idx, pk := range storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) {
if candidate.MayPkExist(pk) {
lc := storage.NewLocationsCache(pk)
if candidate.MayPkExist(lc) {
result.Pks = append(result.Pks, pk)
result.Tss = append(result.Tss, dmsg.Timestamps[idx])
}
Expand Down
37 changes: 18 additions & 19 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"testing"
"time"

bloom "github.com/bits-and-blooms/bloom/v3"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
Expand All @@ -41,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/bloomfilter"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
Expand Down Expand Up @@ -258,12 +258,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
ms.EXPECT().GetHashFuncNum().Return(1)
ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
ms.EXPECT().MayPkExist(mock.Anything).RunAndReturn(func(lc *storage.LocationsCache) bool {
return lc.GetPk().EQ(storage.NewInt64PrimaryKey(10))
})
return ms
})
Expand All @@ -272,8 +268,9 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
bf := bloomfilter.NewBloomFilterWithType(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
pks := &storage.PkStatistics{
PkFilter: bf,
}
Expand Down Expand Up @@ -528,8 +525,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
bf := bloomfilter.NewBloomFilterWithType(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
pks := &storage.PkStatistics{
PkFilter: bf,
}
Expand Down Expand Up @@ -686,8 +685,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
bf := bloomfilter.NewBloomFilterWithType(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
pks := &storage.PkStatistics{
PkFilter: bf,
}
Expand Down Expand Up @@ -880,19 +881,17 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
ms.EXPECT().MayPkExist(mock.Anything).Call.Return(func(pk storage.PrimaryKey) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
ms.EXPECT().GetHashFuncNum().Return(1)
ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
return ms
})
}, nil)
s.loader.EXPECT().LoadBloomFilterSet(mock.Anything, s.collectionID, mock.AnythingOfType("int64"), mock.Anything).
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
bf := bloomfilter.NewBloomFilterWithType(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
pks := &storage.PkStatistics{
PkFilter: bf,
}
Expand Down
4 changes: 0 additions & 4 deletions internal/querynodev2/delegator/delegator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,6 @@ func (s *DelegatorSuite) SetupTest() {
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
ms.EXPECT().GetHashFuncNum().Return(1)
ms.EXPECT().TestLocations(mock.Anything, mock.Anything).RunAndReturn(func(pk storage.PrimaryKey, locs []uint64) bool {
return pk.EQ(storage.NewInt64PrimaryKey(10))
})
return ms
})
}, nil)
Expand Down
66 changes: 9 additions & 57 deletions internal/querynodev2/pkoracle/bloom_filter_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
package pkoracle

import (
"context"
"sync"

bloom "github.com/bits-and-blooms/bloom/v3"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/bloomfilter"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand All @@ -41,68 +40,25 @@ type BloomFilterSet struct {
segType commonpb.SegmentState
currentStat *storage.PkStatistics
historyStats []*storage.PkStatistics

kHashFunc uint
}

// MayPkExist returns whether any bloom filters returns positive.
func (s *BloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool {
func (s *BloomFilterSet) MayPkExist(lc *storage.LocationsCache) bool {
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()
if s.currentStat != nil && s.currentStat.PkExist(pk) {
if s.currentStat != nil && s.currentStat.TestLocationCache(lc) {
return true
}

// for sealed, if one of the stats shows it exist, then we have to check it
for _, historyStat := range s.historyStats {
if historyStat.PkExist(pk) {
if historyStat.TestLocationCache(lc) {
return true
}
}
return false
}

func (s *BloomFilterSet) TestLocations(pk storage.PrimaryKey, locs []uint64) bool {
log := log.Ctx(context.TODO()).WithRateGroup("BloomFilterSet.TestLocations", 1, 60)
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()

if s.currentStat != nil {
k := s.currentStat.PkFilter.K()
if k > uint(len(locs)) {
log.RatedWarn(30, "locations num is less than hash func num, return false positive result",
zap.Int("locationNum", len(locs)),
zap.Uint("hashFuncNum", k),
zap.Int64("segmentID", s.segmentID))
return true
}

if s.currentStat.TestLocations(pk, locs[:k]) {
return true
}
}

// for sealed, if one of the stats shows it exist, then we have to check it
for _, historyStat := range s.historyStats {
k := historyStat.PkFilter.K()
if k > uint(len(locs)) {
log.RatedWarn(30, "locations num is less than hash func num, return false positive result",
zap.Int("locationNum", len(locs)),
zap.Uint("hashFuncNum", k),
zap.Int64("segmentID", s.segmentID))
return true
}
if historyStat.TestLocations(pk, locs[:k]) {
return true
}
}
return false
}

func (s *BloomFilterSet) GetHashFuncNum() uint {
return s.kHashFunc
}

// ID implement candidate.
func (s *BloomFilterSet) ID() int64 {
return s.segmentID
Expand All @@ -124,13 +80,12 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) {
defer s.statsMutex.Unlock()

if s.currentStat == nil {
m, k := bloom.EstimateParameters(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
if k > s.kHashFunc {
s.kHashFunc = k
}
bf := bloomfilter.NewBloomFilterWithType(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
s.currentStat = &storage.PkStatistics{
PkFilter: bloom.New(m, k),
PkFilter: bf,
}
}

Expand All @@ -157,9 +112,6 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) {
s.statsMutex.Lock()
defer s.statsMutex.Unlock()

if stats.PkFilter.K() > s.kHashFunc {
s.kHashFunc = stats.PkFilter.K()
}
s.historyStats = append(s.historyStats, stats)
}

Expand Down
Loading

0 comments on commit c6a1c49

Please sign in to comment.