Skip to content

Commit

Permalink
[dbnode] Safe handling of concurrent bloom filters (#1837)
Browse files Browse the repository at this point in the history
  • Loading branch information
obitech authored and robskillington committed Jul 28, 2019
1 parent 04d9c06 commit d4e206b
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 32 deletions.
11 changes: 11 additions & 0 deletions DEVELOPER.md
Expand Up @@ -54,6 +54,17 @@ M3 has an extensive, and ever increasing, set of tests to ensure we are able to

Once you have identified a change you want to make, and gathered consensus by talking to some devs, go ahead and make a branch with the changes. To test your changes:

(0) If you have updated an interface that has been mocked, you need to update the generated `gomock `files.

```shell
# Generate mocks for all top level packages
make mock-gen

# If you just want to generate it for a single package,
# replace xyz with the package you want to generate files for, e.g. dbnode
make mock-gen-xyz
```

(1) Run unit tests locally
```
go test ./... -v
Expand Down
30 changes: 15 additions & 15 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/dbnode/persist/fs/retriever.go
Expand Up @@ -415,14 +415,14 @@ func (r *blockRetriever) Stream(
}
r.RUnlock()

bloomFilter, err := r.seekerMgr.ConcurrentIDBloomFilter(shard, startTime)
idExists, err := r.seekerMgr.Test(id, shard, startTime)
if err != nil {
return xio.EmptyBlockReader, err
}

// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately.
if !bloomFilter.Test(id.Bytes()) {
if !idExists {
// No need to call req.onRetrieve.OnRetrieveBlock if there is no data.
req.onRetrieved(ts.Segment{}, namespace.Context{})
return req.toBlock(), nil
Expand Down
8 changes: 1 addition & 7 deletions src/dbnode/persist/fs/retriever_test.go
Expand Up @@ -34,7 +34,6 @@ import (
"testing"
"time"

"github.com/m3db/bloom"
"github.com/m3db/m3/src/dbnode/digest"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/index/convert"
Expand Down Expand Up @@ -651,16 +650,11 @@ func testBlockRetrieverHandlesSeekErrors(t *testing.T, ctrl *gomock.Controller,
nsCtx = namespace.NewContextFrom(testNs1Metadata(t))
shard = uint32(0)
blockStart = time.Now().Truncate(rOpts.BlockSize())

// Always true because all the bits in 255 are set.
bloomBytes = []byte{255, 255, 255, 255, 255, 255, 255, 255}
alwaysTrueBloomFilter = bloom.NewConcurrentReadOnlyBloomFilter(1, 1, bloomBytes)
managedBloomFilter = newManagedConcurrentBloomFilter(alwaysTrueBloomFilter, bloomBytes)
)

mockSeekerManager := NewMockDataFileSetSeekerManager(ctrl)
mockSeekerManager.EXPECT().Open(gomock.Any()).Return(nil)
mockSeekerManager.EXPECT().ConcurrentIDBloomFilter(gomock.Any(), gomock.Any()).Return(managedBloomFilter, nil)
mockSeekerManager.EXPECT().Test(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil)
mockSeekerManager.EXPECT().Borrow(gomock.Any(), gomock.Any()).Return(mockSeeker, nil)
mockSeekerManager.EXPECT().Return(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockSeekerManager.EXPECT().Close().Return(nil)
Expand Down
19 changes: 14 additions & 5 deletions src/dbnode/persist/fs/seek_manager.go
Expand Up @@ -209,23 +209,32 @@ func (m *seekerManager) CacheShardIndices(shards []uint32) error {
return multiErr.FinalError()
}

func (m *seekerManager) ConcurrentIDBloomFilter(shard uint32, start time.Time) (*ManagedConcurrentBloomFilter, error) {
func (m *seekerManager) Test(id ident.ID, shard uint32, start time.Time) (bool, error) {
byTime := m.seekersByTime(shard)

// Try fast RLock() first.
byTime.RLock()
startNano := xtime.ToUnixNano(start)
seekers, ok := byTime.seekers[startNano]
byTime.RUnlock()

// Seekers are open: good to test but still hold RLock while doing so
if ok && seekers.active.wg == nil {
return seekers.active.bloomFilter, nil
idExists := seekers.active.bloomFilter.Test(id.Bytes())
byTime.RUnlock()
return idExists, nil
} else {
byTime.RUnlock()
}

byTime.Lock()
defer byTime.Unlock()

seekersAndBloom, err := m.getOrOpenSeekersWithLock(startNano, byTime)
byTime.Unlock()
return seekersAndBloom.bloomFilter, err
if err != nil {
return false, err
}

return seekersAndBloom.bloomFilter.Test(id.Bytes()), nil
}

func (m *seekerManager) Borrow(shard uint32, start time.Time) (ConcurrentDataFileSetSeeker, error) {
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/persist/fs/types.go
Expand Up @@ -244,9 +244,9 @@ type DataFileSetSeekerManager interface {
// Return returns an open seeker for a given shard, block start time, and volume.
Return(shard uint32, start time.Time, seeker ConcurrentDataFileSetSeeker) error

// ConcurrentIDBloomFilter returns a concurrent ID bloom filter for a given
// shard, block start time, and volume.
ConcurrentIDBloomFilter(shard uint32, start time.Time) (*ManagedConcurrentBloomFilter, error)
// Test checks if an ID exists in a concurrent ID bloom filter for a
// given shard, block, start time and volume.
Test(id ident.ID, shard uint32, start time.Time) (bool, error)
}

// DataBlockRetriever provides a block retriever for TSDB file sets
Expand Down

0 comments on commit d4e206b

Please sign in to comment.