Skip to content

Commit

Permalink
[dbnode] Add claims for index segments volume index (#2846)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Nov 11, 2020
1 parent 5b5c050 commit 251dc3d
Show file tree
Hide file tree
Showing 29 changed files with 523 additions and 49 deletions.
33 changes: 32 additions & 1 deletion src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,34 @@ type BootstrapConfiguration struct {
// IndexSegmentConcurrency determines the concurrency for building index
// segments.
IndexSegmentConcurrency *int `yaml:"indexSegmentConcurrency"`

// Verify specifies verification checks.
Verify *BootstrapVerifyConfiguration `yaml:"verify"`
}

// VerifyOrDefault returns verify configuration or default.
func (bsc BootstrapConfiguration) VerifyOrDefault() BootstrapVerifyConfiguration {
if bsc.Verify == nil {
return BootstrapVerifyConfiguration{}
}

return *bsc.Verify
}

// BootstrapVerifyConfiguration outlines verification checks to enable
// during a bootstrap.
type BootstrapVerifyConfiguration struct {
VerifyIndexSegments *bool `yaml:"verifyIndexSegments"`
}

// VerifyIndexSegmentsOrDefault returns whether to verify index segments
// or use default value.
func (c BootstrapVerifyConfiguration) VerifyIndexSegmentsOrDefault() bool {
if c.VerifyIndexSegments == nil {
return false
}

return *c.VerifyIndexSegments
}

// BootstrapFilesystemConfiguration specifies config for the fs bootstrapper.
Expand Down Expand Up @@ -288,11 +316,13 @@ func (bsc BootstrapConfiguration) New(
SetFilesystemOptions(fsOpts).
SetIndexOptions(opts.IndexOptions()).
SetPersistManager(opts.PersistManager()).
SetIndexClaimsManager(opts.IndexClaimsManager()).
SetCompactor(compactor).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetIdentifierPool(opts.IdentifierPool()).
SetMigrationOptions(fsCfg.migration().NewOptions()).
SetStorageOptions(opts)
SetStorageOptions(opts).
SetIndexSegmentsVerify(bsc.VerifyOrDefault().VerifyIndexSegmentsOrDefault())
if v := bsc.IndexSegmentConcurrency; v != nil {
fsbOpts = fsbOpts.SetIndexSegmentConcurrency(*v)
}
Expand Down Expand Up @@ -329,6 +359,7 @@ func (bsc BootstrapConfiguration) New(
SetIndexOptions(opts.IndexOptions()).
SetAdminClient(adminClient).
SetPersistManager(opts.PersistManager()).
SetIndexClaimsManager(opts.IndexClaimsManager()).
SetCompactor(compactor).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetContextPool(opts.ContextPool()).
Expand Down
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func TestConfiguration(t *testing.T) {
peers: null
cacheSeriesMetadata: null
indexSegmentConcurrency: null
verify: null
blockRetrieve: null
cache:
series: null
Expand Down
8 changes: 5 additions & 3 deletions src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,10 @@ func newDefaultBootstrappableTestSetups(
SetAdminClient(adminClient).
SetIndexOptions(storageIdxOpts).
SetFilesystemOptions(fsOpts).
// DatabaseBlockRetrieverManager and PersistManager need to be set or we will never execute
// PersistManager need to be set or we will never execute
// the persist bootstrapping path
SetPersistManager(setup.StorageOpts().PersistManager()).
SetIndexClaimsManager(setup.StorageOpts().IndexClaimsManager()).
SetCompactor(newCompactor(t, storageIdxOpts)).
SetRuntimeOptionsManager(runtimeOptsMgr).
SetContextPool(setup.StorageOpts().ContextPool())
Expand All @@ -285,13 +286,14 @@ func newDefaultBootstrappableTestSetups(

persistMgr, err := persistfs.NewPersistManager(fsOpts)
require.NoError(t, err)

icm := persistfs.NewIndexClaimsManager(fsOpts)
bfsOpts := bfs.NewOptions().
SetResultOptions(bsOpts).
SetFilesystemOptions(fsOpts).
SetIndexOptions(storageIdxOpts).
SetCompactor(newCompactor(t, storageIdxOpts)).
SetPersistManager(persistMgr)
SetPersistManager(persistMgr).
SetIndexClaimsManager(icm)

fsBootstrapper, err := bfs.NewFileSystemBootstrapperProvider(bfsOpts, finalBootstrapper)
require.NoError(t, err)
Expand Down
9 changes: 8 additions & 1 deletion src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ func NewTestSetup(

if fsOpts == nil {
fsOpts = fs.NewOptions().
SetFilePathPrefix(filePathPrefix)
SetFilePathPrefix(filePathPrefix).
SetClockOptions(storageOpts.ClockOptions())
}

storageOpts = storageOpts.SetCommitLogOptions(
Expand All @@ -394,6 +395,10 @@ func NewTestSetup(
}
storageOpts = storageOpts.SetPersistManager(pm)

// Set up index claims manager
icm := fs.NewIndexClaimsManager(fsOpts)
storageOpts = storageOpts.SetIndexClaimsManager(icm)

// Set up repair options
storageOpts = storageOpts.
SetRepairOptions(storageOpts.RepairOptions().
Expand Down Expand Up @@ -931,6 +936,7 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions
if err != nil {
return err
}
icm := fs.NewIndexClaimsManager(fsOpts)
storageIdxOpts := storageOpts.IndexOptions()
compactor, err := newCompactorWithErr(storageIdxOpts)
if err != nil {
Expand All @@ -941,6 +947,7 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions
SetFilesystemOptions(fsOpts).
SetIndexOptions(storageIdxOpts).
SetPersistManager(persistMgr).
SetIndexClaimsManager(icm).
SetCompactor(compactor)
bs, err = bfs.NewFileSystemBootstrapperProvider(bfsOpts, bs)
if err != nil {
Expand Down
128 changes: 128 additions & 0 deletions src/dbnode/persist/fs/index_claims_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package fs

import (
"errors"
"sync"
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)

var errOutOfRetentionClaim = errors.New("out of retention index volume claim")

type indexClaimsManager struct {
sync.Mutex

filePathPrefix string
nowFn clock.NowFn
nextIndexFileSetVolumeIndexFn nextIndexFileSetVolumeIndexFn

// Map of ns ID string -> blockStart -> volumeIndexClaim.
volumeIndexClaims map[string]map[xtime.UnixNano]volumeIndexClaim
}

type volumeIndexClaim struct {
volumeIndex int
}

// NewIndexClaimsManager returns an instance of the index claim manager. This manages
// concurrent claims for volume indices per ns and block start.
// NB(bodu): There should be only a single shared index claim manager among all threads
// writing index data filesets.
func NewIndexClaimsManager(opts Options) IndexClaimsManager {
return &indexClaimsManager{
filePathPrefix: opts.FilePathPrefix(),
nowFn: opts.ClockOptions().NowFn(),
volumeIndexClaims: make(map[string]map[xtime.UnixNano]volumeIndexClaim),
nextIndexFileSetVolumeIndexFn: NextIndexFileSetVolumeIndex,
}
}

func (i *indexClaimsManager) ClaimNextIndexFileSetVolumeIndex(
md namespace.Metadata,
blockStart time.Time,
) (int, error) {
i.Lock()
earliestBlockStart := retention.FlushTimeStartForRetentionPeriod(
md.Options().RetentionOptions().RetentionPeriod(),
md.Options().IndexOptions().BlockSize(),
i.nowFn(),
)
defer func() {
i.deleteOutOfRetentionEntriesWithLock(md.ID(), earliestBlockStart)
i.Unlock()
}()

// Reject out of retention claims.
if blockStart.Before(earliestBlockStart) {
return 0, errOutOfRetentionClaim
}

volumeIndexClaimsByBlockStart, ok := i.volumeIndexClaims[md.ID().String()]
if !ok {
volumeIndexClaimsByBlockStart = make(map[xtime.UnixNano]volumeIndexClaim)
i.volumeIndexClaims[md.ID().String()] = volumeIndexClaimsByBlockStart
}

blockStartUnixNanos := xtime.ToUnixNano(blockStart)
if curr, ok := volumeIndexClaimsByBlockStart[blockStartUnixNanos]; ok {
// Already had a previous claim, return the next claim.
next := curr
next.volumeIndex++
volumeIndexClaimsByBlockStart[blockStartUnixNanos] = next
return next.volumeIndex, nil
}

volumeIndex, err := i.nextIndexFileSetVolumeIndexFn(i.filePathPrefix, md.ID(),
blockStart)
if err != nil {
return 0, err
}
volumeIndexClaimsByBlockStart[blockStartUnixNanos] = volumeIndexClaim{
volumeIndex: volumeIndex,
}
return volumeIndex, nil
}

func (i *indexClaimsManager) deleteOutOfRetentionEntriesWithLock(
nsID ident.ID,
earliestBlockStart time.Time,
) {
earliestBlockStartUnixNanos := xtime.ToUnixNano(earliestBlockStart)
// ns ID already exists at this point since the delete call is deferred.
for blockStart := range i.volumeIndexClaims[nsID.String()] {
if blockStart.Before(earliestBlockStartUnixNanos) {
delete(i.volumeIndexClaims[nsID.String()], blockStart)
}
}
}

type nextIndexFileSetVolumeIndexFn func(
filePathPrefix string,
namespace ident.ID,
blockStart time.Time,
) (int, error)
115 changes: 115 additions & 0 deletions src/dbnode/persist/fs/index_claims_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package fs

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)

func TestIndexClaimsManagerConcurrentClaims(t *testing.T) {
mgr, ok := NewIndexClaimsManager(NewOptions()).(*indexClaimsManager)
require.True(t, ok)

// Always return 0 for starting volume index for testing purposes.
mgr.nextIndexFileSetVolumeIndexFn = func(
filePathPrefix string,
namespace ident.ID,
blockStart time.Time,
) (int, error) {
return 0, nil
}

md, err := namespace.NewMetadata(ident.StringID("foo"), namespace.NewOptions())
require.NoError(t, err)

var (
m sync.Map
wg sync.WaitGroup
blockSize = md.Options().IndexOptions().BlockSize()
blockStart = time.Now().Truncate(blockSize)
)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
volumeIndex, err := mgr.ClaimNextIndexFileSetVolumeIndex(
md,
blockStart,
)
require.NoError(t, err)
_, loaded := m.LoadOrStore(volumeIndex, true)
// volume index should not have been previously stored or
// there are conflicting volume indices.
require.False(t, loaded)
}
}()
}
wg.Wait()
}

// TestIndexClaimsManagerOutOfRetention ensure that we both reject and delete out of
// retention index claims.
func TestIndexClaimsManagerOutOfRetention(t *testing.T) {
mgr, ok := NewIndexClaimsManager(NewOptions()).(*indexClaimsManager)
require.True(t, ok)

// Always return 0 for starting volume index for testing purposes.
mgr.nextIndexFileSetVolumeIndexFn = func(
filePathPrefix string,
namespace ident.ID,
blockStart time.Time,
) (int, error) {
return 0, nil
}

md, err := namespace.NewMetadata(ident.StringID("foo"), namespace.NewOptions())
blockSize := md.Options().IndexOptions().BlockSize()
blockStart := time.Now().Truncate(blockSize)
require.NoError(t, err)

_, err = mgr.ClaimNextIndexFileSetVolumeIndex(
md,
blockStart,
)
require.NoError(t, err)

now := mgr.nowFn().Add(md.Options().RetentionOptions().RetentionPeriod()).
Add(blockSize)
mgr.nowFn = func() time.Time { return now }
_, err = mgr.ClaimNextIndexFileSetVolumeIndex(
md,
blockStart,
)
require.Equal(t, errOutOfRetentionClaim, err)

// Verify that the out of retention entry has been deleted as well.
_, ok = mgr.volumeIndexClaims[md.ID().String()][xtime.ToUnixNano(blockStart)]
require.False(t, ok)
}

0 comments on commit 251dc3d

Please sign in to comment.