Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix LRU deadlock + regression test and improve comments / logging #862

Merged
merged 17 commits into from
Aug 29, 2018
3 changes: 2 additions & 1 deletion src/cmd/services/m3dbnode/config/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ type SeriesCacheConfiguration struct {
// LRUSeriesCachePolicyConfiguration contains configuration for the LRU
// series caching policy.
type LRUSeriesCachePolicyConfiguration struct {
MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"`
MaxBlocks uint `yaml:"maxBlocks" validate:"nonzero"`
EventsChannelSize uint `yaml:"eventsChannelSize" validate:"nonzero"`
}
12 changes: 7 additions & 5 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,13 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup

// Set up wired list if required
if storageOpts.SeriesCachePolicy() == series.CacheLRU {
wiredList := block.NewWiredList(
runtimeOptsMgr,
storageOpts.InstrumentOptions(),
storageOpts.ClockOptions(),
)
wiredList := block.NewWiredList(block.WiredListOptions{
RuntimeOptionsManager: runtimeOptsMgr,
InstrumentOptions: storageOpts.InstrumentOptions(),
ClockOptions: storageOpts.ClockOptions(),
// Use a small event channel size to stress-test the implementation
EventsChannelSize: 1,
})
blockOpts := storageOpts.DatabaseBlockOptions().SetWiredList(wiredList)
blockPool := block.NewDatabaseBlockPool(nil)
// Have to manually set the blockpool because the default one uses a constructor
Expand Down
19 changes: 16 additions & 3 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func Run(runOpts RunOptions) {
opts = opts.SetSeriesCachePolicy(seriesCachePolicy)

// Apply pooling options
opts = withEncodingAndPoolingOptions(logger, opts, cfg.PoolingPolicy)
opts = withEncodingAndPoolingOptions(cfg, logger, opts, cfg.PoolingPolicy)

// Setup the block retriever
switch seriesCachePolicy {
Expand Down Expand Up @@ -882,6 +882,7 @@ func kvWatchBootstrappers(
}

func withEncodingAndPoolingOptions(
cfg config.DBConfiguration,
logger xlog.Logger,
opts storage.Options,
policy config.PoolingPolicy,
Expand Down Expand Up @@ -1003,8 +1004,20 @@ func withEncodingAndPoolingOptions(
SetBytesPool(bytesPool)

if opts.SeriesCachePolicy() == series.CacheLRU {
runtimeOpts := opts.RuntimeOptionsManager()
wiredList := block.NewWiredList(runtimeOpts, iopts, opts.ClockOptions())
var (
runtimeOpts = opts.RuntimeOptionsManager()
wiredListOpts = block.WiredListOptions{
RuntimeOptionsManager: runtimeOpts,
InstrumentOptions: iopts,
ClockOptions: opts.ClockOptions(),
}
lruCfg = cfg.Cache.SeriesConfiguration().LRU
)

if lruCfg != nil && lruCfg.EventsChannelSize > 0 {
wiredListOpts.EventsChannelSize = int(lruCfg.EventsChannelSize)
}
wiredList := block.NewWiredList(wiredListOpts)
blockOpts = blockOpts.SetWiredList(wiredList)
}
blockPool := block.NewDatabaseBlockPool(poolOptions(policy.BlockPool,
Expand Down
42 changes: 30 additions & 12 deletions src/dbnode/storage/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ type dbBlock struct {
}

type listState struct {
next DatabaseBlock
prev DatabaseBlock
nextPrevUpdatedAtUnixNano int64
next DatabaseBlock
prev DatabaseBlock
enteredListAtUnixNano int64
}

// NewDatabaseBlock creates a new DatabaseBlock instance.
Expand Down Expand Up @@ -414,19 +414,37 @@ func (b *dbBlock) resetRetrievableWithLock(
}

func (b *dbBlock) Discard() ts.Segment {
return b.closeAndDiscard()
seg, _ := b.closeAndDiscardConditionally(nil)
return seg
}

func (b *dbBlock) Close() {
segment := b.closeAndDiscard()
segment, _ := b.closeAndDiscardConditionally(nil)
segment.Finalize()
}

func (b *dbBlock) closeAndDiscard() ts.Segment {
func (b *dbBlock) CloseIfFromDisk() bool {
segment, ok := b.closeAndDiscardConditionally(func(b *dbBlock) bool {
return b.wasRetrievedFromDisk
})
if !ok {
return false
}
segment.Finalize()
return true
}

func (b *dbBlock) closeAndDiscardConditionally(condition func(b *dbBlock) bool) (ts.Segment, bool) {
b.Lock()

if condition != nil && !condition(b) {
b.Unlock()
return ts.Segment{}, false
}

if b.closed {
b.Unlock()
return ts.Segment{}
return ts.Segment{}, true
}

segment := b.segment
Expand All @@ -439,7 +457,7 @@ func (b *dbBlock) closeAndDiscard() ts.Segment {
pool.Put(b)
}

return segment
return segment, true
}

func (b *dbBlock) resetMergeTargetWithLock() {
Expand Down Expand Up @@ -470,13 +488,13 @@ func (b *dbBlock) setPrev(value DatabaseBlock) {
}

// Should only be used by the WiredList.
func (b *dbBlock) nextPrevUpdatedAtUnixNano() int64 {
return b.listState.nextPrevUpdatedAtUnixNano
func (b *dbBlock) enteredListAtUnixNano() int64 {
return b.listState.enteredListAtUnixNano
}

// Should only be used by the WiredList.
func (b *dbBlock) setNextPrevUpdatedAtUnixNano(value int64) {
b.listState.nextPrevUpdatedAtUnixNano = value
func (b *dbBlock) setEnteredListAtUnixNano(value int64) {
b.listState.enteredListAtUnixNano = value
}

// wiredListEntry is a snapshot of a subset of the block's state that the WiredList
Expand Down
60 changes: 36 additions & 24 deletions src/dbnode/storage/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions src/dbnode/storage/block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,18 @@ func TestDatabaseBlockStreamMergePerformsCopy(t *testing.T) {
require.NoError(t, iter.Err())
}

func TestDatabaseBlockCloseIfFromDisk(t *testing.T) {
var (
blockOpts = NewOptions()
blockNotFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock)
blockFromDisk = NewDatabaseBlock(time.Time{}, time.Hour, ts.Segment{}, blockOpts).(*dbBlock)
)
blockFromDisk.wasRetrievedFromDisk = true

require.False(t, blockNotFromDisk.CloseIfFromDisk())
require.True(t, blockFromDisk.CloseIfFromDisk())
}

func TestDatabaseSeriesBlocksAddBlock(t *testing.T) {
now := time.Now()
blockTimes := []time.Time{now, now.Add(time.Second), now.Add(time.Minute), now.Add(-time.Second), now.Add(-time.Hour)}
Expand Down
9 changes: 7 additions & 2 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ type DatabaseBlock interface {
// Close closes the block.
Close()

// CloseIfFromDisk atomically checks if the disk was retrieved from disk, and
// if so, closes it. It is meant as a layered protection for the WiredList
// which should only close blocks that were retrieved from disk.
CloseIfFromDisk() bool

// SetOnEvictedFromWiredList sets the owner of the block
SetOnEvictedFromWiredList(OnEvictedFromWiredList)

Expand All @@ -213,8 +218,8 @@ type databaseBlock interface {
setNext(block DatabaseBlock)
prev() DatabaseBlock
setPrev(block DatabaseBlock)
nextPrevUpdatedAtUnixNano() int64
setNextPrevUpdatedAtUnixNano(value int64)
enteredListAtUnixNano() int64
setEnteredListAtUnixNano(value int64)
wiredListEntry() wiredListEntry
}

Expand Down
Loading