From d4e206b2d53e00ebaba46df0543ddc8ebbc137eb Mon Sep 17 00:00:00 2001 From: Alexander Knipping Date: Sun, 28 Jul 2019 21:43:39 +0200 Subject: [PATCH] [dbnode] Safe handling of concurrent bloom filters (#1837) --- DEVELOPER.md | 11 +++++++++ src/dbnode/persist/fs/fs_mock.go | 30 ++++++++++++------------- src/dbnode/persist/fs/retriever.go | 4 ++-- src/dbnode/persist/fs/retriever_test.go | 8 +------ src/dbnode/persist/fs/seek_manager.go | 19 +++++++++++----- src/dbnode/persist/fs/types.go | 6 ++--- 6 files changed, 46 insertions(+), 32 deletions(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index 989ed89562..ff85784bba 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -54,6 +54,17 @@ M3 has an extensive, and ever increasing, set of tests to ensure we are able to Once you have identified a change you want to make, and gathered consensus by talking to some devs, go ahead and make a branch with the changes. To test your changes: +(0) If you have updated an interface that has been mocked, you need to update the generated `gomock `files. + +```shell +# Generate mocks for all top level packages +make mock-gen + +# If you just want to generate it for a single package, +# replace xyz with the package you want to generate files for, e.g. dbnode +make mock-gen-xyz +``` + (1) Run unit tests locally ``` go test ./... -v diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 84fd159ba5..35262a2d68 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1028,21 +1028,6 @@ func (mr *MockDataFileSetSeekerManagerMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDataFileSetSeekerManager)(nil).Close)) } -// ConcurrentIDBloomFilter mocks base method -func (m *MockDataFileSetSeekerManager) ConcurrentIDBloomFilter(arg0 uint32, arg1 time.Time) (*ManagedConcurrentBloomFilter, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ConcurrentIDBloomFilter", arg0, arg1) - ret0, _ := ret[0].(*ManagedConcurrentBloomFilter) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ConcurrentIDBloomFilter indicates an expected call of ConcurrentIDBloomFilter -func (mr *MockDataFileSetSeekerManagerMockRecorder) ConcurrentIDBloomFilter(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConcurrentIDBloomFilter", reflect.TypeOf((*MockDataFileSetSeekerManager)(nil).ConcurrentIDBloomFilter), arg0, arg1) -} - // Open mocks base method func (m *MockDataFileSetSeekerManager) Open(arg0 namespace.Metadata) error { m.ctrl.T.Helper() @@ -1071,6 +1056,21 @@ func (mr *MockDataFileSetSeekerManagerMockRecorder) Return(arg0, arg1, arg2 inte return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Return", reflect.TypeOf((*MockDataFileSetSeekerManager)(nil).Return), arg0, arg1, arg2) } +// Test mocks base method +func (m *MockDataFileSetSeekerManager) Test(arg0 ident.ID, arg1 uint32, arg2 time.Time) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Test", arg0, arg1, arg2) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Test indicates an expected call of Test +func (mr *MockDataFileSetSeekerManagerMockRecorder) Test(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Test", reflect.TypeOf((*MockDataFileSetSeekerManager)(nil).Test), arg0, arg1, arg2) +} + // MockConcurrentDataFileSetSeeker is a mock of ConcurrentDataFileSetSeeker interface type MockConcurrentDataFileSetSeeker struct { ctrl *gomock.Controller diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 6a3aac66d0..b4d1c7cbe9 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -415,14 +415,14 @@ func (r *blockRetriever) Stream( } r.RUnlock() - bloomFilter, err := r.seekerMgr.ConcurrentIDBloomFilter(shard, startTime) + idExists, err := r.seekerMgr.Test(id, shard, startTime) if err != nil { return xio.EmptyBlockReader, err } // If the ID is not in the seeker's bloom filter, then it's definitely not on // disk and we can return immediately. - if !bloomFilter.Test(id.Bytes()) { + if !idExists { // No need to call req.onRetrieve.OnRetrieveBlock if there is no data. req.onRetrieved(ts.Segment{}, namespace.Context{}) return req.toBlock(), nil diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 622f7dd616..4e00e340f4 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -34,7 +34,6 @@ import ( "testing" "time" - "github.com/m3db/bloom" "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/index/convert" @@ -651,16 +650,11 @@ func testBlockRetrieverHandlesSeekErrors(t *testing.T, ctrl *gomock.Controller, nsCtx = namespace.NewContextFrom(testNs1Metadata(t)) shard = uint32(0) blockStart = time.Now().Truncate(rOpts.BlockSize()) - - // Always true because all the bits in 255 are set. - bloomBytes = []byte{255, 255, 255, 255, 255, 255, 255, 255} - alwaysTrueBloomFilter = bloom.NewConcurrentReadOnlyBloomFilter(1, 1, bloomBytes) - managedBloomFilter = newManagedConcurrentBloomFilter(alwaysTrueBloomFilter, bloomBytes) ) mockSeekerManager := NewMockDataFileSetSeekerManager(ctrl) mockSeekerManager.EXPECT().Open(gomock.Any()).Return(nil) - mockSeekerManager.EXPECT().ConcurrentIDBloomFilter(gomock.Any(), gomock.Any()).Return(managedBloomFilter, nil) + mockSeekerManager.EXPECT().Test(gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil) mockSeekerManager.EXPECT().Borrow(gomock.Any(), gomock.Any()).Return(mockSeeker, nil) mockSeekerManager.EXPECT().Return(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockSeekerManager.EXPECT().Close().Return(nil) diff --git a/src/dbnode/persist/fs/seek_manager.go b/src/dbnode/persist/fs/seek_manager.go index 9cd791d12d..f224bd7880 100644 --- a/src/dbnode/persist/fs/seek_manager.go +++ b/src/dbnode/persist/fs/seek_manager.go @@ -209,23 +209,32 @@ func (m *seekerManager) CacheShardIndices(shards []uint32) error { return multiErr.FinalError() } -func (m *seekerManager) ConcurrentIDBloomFilter(shard uint32, start time.Time) (*ManagedConcurrentBloomFilter, error) { +func (m *seekerManager) Test(id ident.ID, shard uint32, start time.Time) (bool, error) { byTime := m.seekersByTime(shard) // Try fast RLock() first. byTime.RLock() startNano := xtime.ToUnixNano(start) seekers, ok := byTime.seekers[startNano] - byTime.RUnlock() + // Seekers are open: good to test but still hold RLock while doing so if ok && seekers.active.wg == nil { - return seekers.active.bloomFilter, nil + idExists := seekers.active.bloomFilter.Test(id.Bytes()) + byTime.RUnlock() + return idExists, nil + } else { + byTime.RUnlock() } byTime.Lock() + defer byTime.Unlock() + seekersAndBloom, err := m.getOrOpenSeekersWithLock(startNano, byTime) - byTime.Unlock() - return seekersAndBloom.bloomFilter, err + if err != nil { + return false, err + } + + return seekersAndBloom.bloomFilter.Test(id.Bytes()), nil } func (m *seekerManager) Borrow(shard uint32, start time.Time) (ConcurrentDataFileSetSeeker, error) { diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index f0050cdea0..492b62aa2a 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -244,9 +244,9 @@ type DataFileSetSeekerManager interface { // Return returns an open seeker for a given shard, block start time, and volume. Return(shard uint32, start time.Time, seeker ConcurrentDataFileSetSeeker) error - // ConcurrentIDBloomFilter returns a concurrent ID bloom filter for a given - // shard, block start time, and volume. - ConcurrentIDBloomFilter(shard uint32, start time.Time) (*ManagedConcurrentBloomFilter, error) + // Test checks if an ID exists in a concurrent ID bloom filter for a + // given shard, block, start time and volume. + Test(id ident.ID, shard uint32, start time.Time) (bool, error) } // DataBlockRetriever provides a block retriever for TSDB file sets