Skip to content

Commit

Permalink
db: do not cache compaction block reads
Browse files Browse the repository at this point in the history
During compactions, avoid populating the block cache with input files' blocks.
These files will soon be removed from the LSM, making it less likely any
iterator will need to read these blocks. While Pebble uses a scan-resistant
block cache algorithm (ClockPRO), the act of inserting the blocks into the
cache increases contention on the block cache mutexes (cockroachdb#1997). This contention
has been observed to significantly contribute to tail latencies, both for reads
and for writes during memtable reservation. Additionally, although these blocks
may be soon replaced with more useful blocks due to ClockPRO's scan resistance,
they may be freed by a different thread inducing excessive TLB shootdowns
(cockroachdb#2693).

A compaction only requires a relatively small working set of buffers during its
scan across input sstables. In this commit, we introduce a per-compaction
BufferPool that is used to allocate buffers during cache misses. Buffers are
reused throughout the compaction and only freed to the memory allocator when
they're too small or the compaction is finished. This reduces pressure on the
memory allocator and the block cache.
  • Loading branch information
jbowens committed Jul 13, 2023
1 parent fdd8ac6 commit 6c22525
Show file tree
Hide file tree
Showing 17 changed files with 529 additions and 122 deletions.
16 changes: 14 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ type compaction struct {
// resulting version has been installed (if successful), but the compaction
// goroutine is still cleaning up (eg, deleting obsolete files).
versionEditApplied bool
bufferPool sstable.BufferPool

score float64

Expand Down Expand Up @@ -1342,7 +1343,10 @@ func (c *compaction) newInputIter(
f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64,
) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
&IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated})
&IterOptions{level: l}, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
})
if err == nil {
// TODO(peter): It is mildly wasteful to open the point iterator only to
// immediately close it. One way to solve this would be to add new
Expand Down Expand Up @@ -1427,7 +1431,10 @@ func (c *compaction) newInputIter(
// to configure the levelIter at these levels to hide the obsolete points.
addItersForLevel := func(level *compactionLevel, l manifest.Level) error {
iters = append(iters, newLevelIter(iterOpts, c.cmp, nil /* split */, newIters,
level.files.Iter(), l, &c.bytesIterated))
level.files.Iter(), l, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
}))
// TODO(jackson): Use keyspan.LevelIter to avoid loading all the range
// deletions into memory upfront. (See #2015, which reverted this.)
// There will be no user keys that are split between sstables
Expand Down Expand Up @@ -2745,6 +2752,11 @@ func (d *DB) runCompaction(
d.mu.Unlock()
defer d.mu.Lock()

// Compactions use a pool of buffers to read blocks, avoiding polluting the
// block cache with blocks that will not be read again.
c.bufferPool.Init(d.opts.Cache, 5)
defer c.bufferPool.Release()

iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
if err != nil {
return nil, pendingOutputs, stats, err
Expand Down
2 changes: 1 addition & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
// aren't truly levels in the lsm. Right now, the encoding only supports
// L0 sublevels, and the rest of the levels in the lsm.
return newLevelIter(
opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), nil,
opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), internalIterOpts{},
)
}

Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func ingestTargetLevel(
// Check for overlap over the keys of L0 by iterating over the sublevels.
for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
iter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), nil)
v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})

var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
Expand Down Expand Up @@ -733,7 +733,7 @@ func ingestTargetLevel(
level := baseLevel
for ; level < numLevels; level++ {
levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.Levels[level].Iter(), manifest.Level(level), nil)
v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
// sets it up for the target file.
Expand Down
5 changes: 3 additions & 2 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.T

type internalIterOpts struct {
bytesIterated *uint64
bufferPool *sstable.BufferPool
stats *base.InternalIteratorStats
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
}
Expand Down Expand Up @@ -246,11 +247,11 @@ func newLevelIter(
newIters tableNewIters,
files manifest.LevelIterator,
level manifest.Level,
bytesIterated *uint64,
internalOpts internalIterOpts,
) *levelIter {
l := &levelIter{}
l.init(context.Background(), opts, cmp, split, newIters, files, level,
internalIterOpts{bytesIterated: bytesIterated})
internalOpts)
return l
}

Expand Down
16 changes: 8 additions & 8 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestLevelIter(t *testing.T) {

iter := newLevelIter(opts, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, files.Iter(), manifest.Level(level),
nil)
internalIterOpts{})
defer iter.Close()
// Fake up the range deletion initialization.
iter.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestLevelIter(t *testing.T) {

iter := newLevelIter(opts, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters2, files.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
iter.SeekGE([]byte(key), base.SeekGEFlagsNone)
lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound()
return fmt.Sprintf("[%s,%s]\n", lower, upper)
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestLevelIterBoundaries(t *testing.T) {
slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas)
iter = newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization.
iter.initRangeDel(new(keyspan.FragmentIterator))
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))

b.ResetTimer()
Expand Down Expand Up @@ -578,7 +578,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) {
opts.LowerBound, opts.UpperBound)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -627,7 +627,7 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) {
func(b *testing.B) {
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, metas.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -672,7 +672,7 @@ func BenchmarkLevelIterNext(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -706,7 +706,7 @@ func BenchmarkLevelIterPrev(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
2 changes: 1 addition & 1 deletion merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, levelSlices[i].Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
l.initRangeDel(&mils[level].rangeDelIter)
l.initBoundaryContext(&mils[level].levelIterBoundaryContext)
mils[level].iter = l
Expand Down
12 changes: 6 additions & 6 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ type blockIter struct {
cached []blockEntry
cachedBuf []byte
cacheHandle cache.Handle
// The first user key in the block. This is used by the caller to set bounds
handle bufferHandle
// for block iteration for already loaded blocks.
firstUserKey []byte
lazyValueHandling struct {
Expand Down Expand Up @@ -458,10 +458,10 @@ func (i *blockIter) init(
// ingested.
// - Foreign sstable iteration: globalSeqNum is always set.
func (i *blockIter) initHandle(
cmp Compare, block cache.Handle, globalSeqNum uint64, hideObsoletePoints bool,
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool,
) error {
i.cacheHandle.Release()
i.cacheHandle = block
i.handle.Release()
i.handle = block
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints)
}

Expand Down Expand Up @@ -1515,8 +1515,8 @@ func (i *blockIter) Error() error {
// Close implements internalIterator.Close, as documented in the pebble
// package.
func (i *blockIter) Close() error {
i.cacheHandle.Release()
i.cacheHandle = cache.Handle{}
i.handle.Release()
i.handle = bufferHandle{}
i.val = nil
i.lazyValue = base.LazyValue{}
i.lazyValueHandling.vbr = nil
Expand Down
2 changes: 1 addition & 1 deletion sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
// block that bhp points to, along with its block properties.
if twoLevelIndex {
subiter := &blockIter{}
subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil)
subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil, nil)
if err != nil {
return err.Error()
}
Expand Down
140 changes: 140 additions & 0 deletions sstable/buffer_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package sstable

import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/cache"
)

// A bufferHandle is a handle to manually-managed memory. The handle may point
// to a block in the block cache (h.Get() != nil), or a buffer that exists
// outside the block cache allocated from a BufferPool (b.Valid()).
type bufferHandle struct {
h cache.Handle
b Buf
}

// Get retrieves the underlying buffer referenced by the handle.
func (bh bufferHandle) Get() []byte {
if v := bh.h.Get(); v != nil {
return v
} else if bh.b.p != nil {
return bh.b.p.pool[bh.b.i].b
}
return nil
}

// Release releases the buffer, either back to the block cache or BufferPool.
func (bh bufferHandle) Release() {
bh.h.Release()
bh.b.Release()
}

// A BufferPool holds a pool of buffers for holding sstable blocks. An initial
// size of the pool is provided on Init, but a BufferPool will grow to meet the
// largest working set size. It'll never shrink. When a buffer is released, the
// BufferPool recycles the buffer for future allocations.
//
// A BufferPool should only be used for short-lived allocations with
// well-understood working set sizes to avoid excessive memory consumption.
//
// BufferPool is not thread-safe.
type BufferPool struct {
cache *cache.Cache
// pool contains all the buffers held by the pool, including buffers that
// are in-use.
pool []allocedBuffer
}

type allocedBuffer struct {
v *cache.Value
// b holds the current byte slice. It's backed by v, but may be a subslice
// of v's memory while the buffer is in-use [ len(b) ≤ len(v.Buf()) ].
//
// If the buffer is not currently in-use, b is nil. When being recycled, the
// BufferPool.Alloc will reset b to be a subslice of v.Buf().
b []byte
}

// Init initializes the pool to use the provided cache for allocations, and
// with an initial working set buffer size of `initialSize`.
func (p *BufferPool) Init(cache *cache.Cache, initialSize int) {
*p = BufferPool{
cache: cache,
pool: make([]allocedBuffer, 0, initialSize),
}
}

// Release releases all buffers held by the pool and resets the pool to an
// uninitialized state.
func (p *BufferPool) Release() {
for i := range p.pool {
if p.pool[i].b != nil {
panic(errors.AssertionFailedf("Release called on a BufferPool with in-use buffers"))
}
p.cache.Free(p.pool[i].v)
}
*p = BufferPool{}
}

// Alloc allocates a new buffer of size n. If the pool already holds a buffer at
// least as large as n, the pooled buffer is used instead.
//
// Alloc is O(MAX(N,M)) where N is the largest number of concurrently in-use
// buffers allocated and M is the initialSize passed to Init.
func (p *BufferPool) Alloc(n int) Buf {
unusableBufferIdx := -1
for i := 0; i < len(p.pool); i++ {
if p.pool[i].b == nil {
if len(p.pool[i].v.Buf()) >= n {
p.pool[i].b = p.pool[i].v.Buf()[:n]
return Buf{p: p, i: i}
}
unusableBufferIdx = i
}
}

// If we would need to grow the size of the pool to allocate another buffer,
// but there was a slot available occupied by a buffer that's just too
// small, replace the too-small buffer.
if len(p.pool) == cap(p.pool) && unusableBufferIdx >= 0 {
i := unusableBufferIdx
p.cache.Free(p.pool[i].v)
p.pool[i].v = p.cache.Alloc(n)
p.pool[i].b = p.pool[i].v.Buf()
return Buf{p: p, i: i}
}

// Allocate a new buffer.
v := p.cache.Alloc(n)
p.pool = append(p.pool, allocedBuffer{v: v, b: v.Buf()[:n]})
return Buf{p: p, i: len(p.pool) - 1}
}

// A Buf holds a reference to a manually-managed, pooled byte buffer.
type Buf struct {
p *BufferPool
// i holds the index into p.pool where the buffer may be found. This scheme
// avoids needing to allocate the handle to the buffer on the heap at the
// cost of copying two words instead of one.
i int
}

// Valid returns true if the buf holds a valid buffer.
func (b Buf) Valid() bool {
return b.p != nil
}

// Release releases the buffer back to the pool.
func (b Buf) Release() {
if b.p == nil {
return
}
// Clear the allocedBuffer's byte slice. This signals the allocated buffer
// is no longer in use and a future call to BufferPool.Alloc may reuse this
// buffer.
b.p.pool[b.i].b = nil
}
Loading

0 comments on commit 6c22525

Please sign in to comment.