Skip to content

Commit

Permalink
db: use uncompensated scores to smoothe compaction picking scores
Browse files Browse the repository at this point in the history
When adjusting scores by smoothing scores, use a parallel uncompensated
score as the divisor for the smoothing adjustment. This helps avoid
range deletions in lower levels from starving compactions in higher
levels through excessive prioritization of lower-level disk space
reclamation.
  • Loading branch information
jbowens committed Dec 14, 2023
1 parent db91e5c commit 1becf83
Show file tree
Hide file tree
Showing 5 changed files with 660 additions and 24 deletions.
1 change: 1 addition & 0 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ type compaction struct {
// - if startLevel is 0, the output level equals compactionPicker.baseLevel().
// - in multilevel compaction, the output level is the lowest level involved in
// the compaction
// A compaction's outputLevel is nil for delete-only compactions.
outputLevel *compactionLevel

// extraLevels point to additional levels in between the input and output
Expand Down
83 changes: 60 additions & 23 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,16 @@ func newCompactionPicker(
// Information about a candidate compaction level that has been identified by
// the compaction picker.
type candidateLevelInfo struct {
// The score of the level to be compacted.
score float64
// The score of the level to be compacted, with compensated file sizes and
// adjustments.
score float64
// The original score of the level to be compacted, before adjusting
// according to other levels' sizes.
origScore float64
level int
// The raw score of the level to be compacted, calculated using
// uncompensated file sizes and without any adjustments.
rawScore float64
level int
// The level to compact to.
outputLevel int
// The file in level that will be compacted. Additional files may be
Expand Down Expand Up @@ -826,25 +832,50 @@ func (p *compactionPickerByScore) initLevelMaxBytes(inProgressCompactions []comp
}
}

func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]int64 {
// Compute a size adjustment for each level based on the in-progress
// compactions. We subtract the compensated size of start level inputs.
type levelSizeAdjust struct {
incomingActualBytes int64
outgoingActualBytes int64
outgoingCompensatedBytes int64
}

func (a levelSizeAdjust) compensated() int64 {
return a.incomingActualBytes - a.outgoingCompensatedBytes
}

func (a levelSizeAdjust) actual() int64 {
return a.incomingActualBytes - a.outgoingActualBytes
}

func calculateSizeAdjust(inProgressCompactions []compactionInfo) [numLevels]levelSizeAdjust {
// Compute size adjustments for each level based on the in-progress
// compactions. We sum the file sizes of all files leaving and entering each
// level in in-progress compactions. For outgoing files, we also sum a
// separate sum of 'compensated file sizes', which are inflated according
// to deletion estimates.
//
// When we adjust a level's size according to these values during score
// calculation, we subtract the compensated size of start level inputs to
// account for the fact that score calculation uses compensated sizes.
//
// Since compensated file sizes may be compensated because they reclaim
// space from the output level's files, we add the real file size to the
// output level. This is slightly different from RocksDB's behavior, which
// simply elides compacting files from the level size calculation.
var sizeAdjust [numLevels]int64
// space from the output level's files, we only add the real file size to
// the output level.
//
// This is slightly different from RocksDB's behavior, which simply elides
// compacting files from the level size calculation.
var sizeAdjust [numLevels]levelSizeAdjust
for i := range inProgressCompactions {
c := &inProgressCompactions[i]

for _, input := range c.inputs {
real := int64(input.files.SizeSum())
compensated := int64(totalCompensatedSize(input.files.Iter()))
actualSize := int64(input.files.SizeSum())
compensatedSize := int64(totalCompensatedSize(input.files.Iter()))

if input.level != c.outputLevel {
sizeAdjust[input.level] -= compensated
sizeAdjust[input.level].outgoingCompensatedBytes += compensatedSize
sizeAdjust[input.level].outgoingActualBytes += actualSize
if c.outputLevel != -1 {
sizeAdjust[c.outputLevel] += real
sizeAdjust[c.outputLevel].incomingActualBytes += actualSize
}
}
}
Expand All @@ -868,9 +899,15 @@ func (p *compactionPickerByScore) calculateScores(

sizeAdjust := calculateSizeAdjust(inProgressCompactions)
for level := 1; level < numLevels; level++ {
levelSize := int64(levelCompensatedSize(p.vers.Levels[level])) + sizeAdjust[level]
scores[level].score = float64(levelSize) / float64(p.levelMaxBytes[level])
compensatedLevelSize := int64(levelCompensatedSize(p.vers.Levels[level])) + sizeAdjust[level].compensated()
scores[level].score = float64(compensatedLevelSize) / float64(p.levelMaxBytes[level])
scores[level].origScore = scores[level].score

// In addition to the compensated score, we calculate a separate score
// that uses actual file sizes, not compensated sizes. This is used
// during score smoothing down below to prevent excessive
// prioritization of reclaiming disk space.
scores[level].rawScore = float64(p.levelSizes[level]+sizeAdjust[level].actual()) / float64(p.levelMaxBytes[level])
}

// Adjust each level's score by the score of the next level. If the next
Expand Down Expand Up @@ -900,8 +937,8 @@ func (p *compactionPickerByScore) calculateScores(
// Avoid absurdly large scores by placing a floor on the score that we'll
// adjust a level by. The value of 0.01 was chosen somewhat arbitrarily
const minScore = 0.01
if scores[level].score >= minScore {
scores[prevLevel].score /= scores[level].score
if scores[level].rawScore >= minScore {
scores[prevLevel].score /= scores[level].rawScore
} else {
scores[prevLevel].score /= minScore
}
Expand Down Expand Up @@ -1063,7 +1100,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact

scores := p.calculateScores(env.inProgressCompactions)

// TODO(peter): Either remove, or change this into an event sent to the
// TODO(bananabrick): Either remove, or change this into an event sent to the
// EventListener.
logCompaction := func(pc *pickedCompaction) {
var buf bytes.Buffer
Expand All @@ -1084,8 +1121,8 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
if pc.startLevel.level == info.level {
marker = "*"
}
fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %8s %8s",
marker, info.level, info.score, info.origScore,
fmt.Fprintf(&buf, " %sL%d: %5.1f %5.1f %5.1f %8s %8s",
marker, info.level, info.score, info.origScore, info.rawScore,
humanize.Int64(int64(totalCompensatedSize(
p.vers.Levels[info.level].Iter(),
))),
Expand Down Expand Up @@ -1133,7 +1170,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
// concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
pc.score = info.score
// TODO(peter): remove
// TODO(bananabrick): Create an EventListener for logCompaction.
if false {
logCompaction(pc)
}
Expand All @@ -1153,7 +1190,7 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
// Fail-safe to protect against compacting the same sstable concurrently.
if pc != nil && !inputRangeAlreadyCompacting(env, pc) {
pc.score = info.score
// TODO(peter): remove
// TODO(bananabrick): Create an EventListener for logCompaction.
if false {
logCompaction(pc)
}
Expand Down
43 changes: 42 additions & 1 deletion compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ func loadVersion(d *datadriven.TestData) (*version, *Options, [numLevels]int64,

var files [numLevels][]*fileMetadata
if len(d.Input) > 0 {
// Parse each line as
//
// <level>: <size> [compensation]
//
// Creating sstables within the level whose file sizes total to `size`
// and whose compensated file sizes total to `size`+`compensation`. If
// size is sufficiently large, only one single file is created. See
// the TODO below.
for _, data := range strings.Split(d.Input, "\n") {
parts := strings.Split(data, " ")
parts[0] = strings.TrimSuffix(strings.TrimSpace(parts[0]), ":")
Expand All @@ -56,6 +64,15 @@ func loadVersion(d *datadriven.TestData) (*version, *Options, [numLevels]int64,
if err != nil {
return nil, nil, sizes, err.Error()
}
var compensation uint64
if len(parts) == 3 {
compensation, err = strconv.ParseUint(strings.TrimSpace(parts[2]), 10, 64)
if err != nil {
return nil, nil, sizes, err.Error()
}
}

var lastFile *fileMetadata
for i := uint64(1); sizes[level] < int64(size); i++ {
var key InternalKey
if level == 0 {
Expand All @@ -65,10 +82,16 @@ func loadVersion(d *datadriven.TestData) (*version, *Options, [numLevels]int64,
key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", i)), i, InternalKeyKindSet)
}
m := (&fileMetadata{
FileNum: base.FileNum(uint64(level)*100_000 + i),
SmallestSeqNum: key.SeqNum(),
LargestSeqNum: key.SeqNum(),
Size: 1,
Stats: manifest.TableStats{
RangeDeletionsBytesEstimate: 0,
},
}).ExtendPointKeyBounds(opts.Comparer.Compare, key, key)
m.StatsMarkValid()
lastFile = m
if size >= 100 {
// If the requested size of the level is very large only add a single
// file in order to avoid massive blow-up in the number of files in
Expand All @@ -78,10 +101,18 @@ func loadVersion(d *datadriven.TestData) (*version, *Options, [numLevels]int64,
// TestCompactionPickerLevelMaxBytes and
// TestCompactionPickerTargetLevel. Clean this up somehow.
m.Size = size
if level != 0 {
endKey := base.MakeInternalKey([]byte(fmt.Sprintf("%04d", size)), i, InternalKeyKindSet)
m.ExtendPointKeyBounds(opts.Comparer.Compare, key, endKey)
}
}
files[level] = append(files[level], m)
sizes[level] += int64(m.Size)
}
// Let all the compensation be due to the last file.
if lastFile != nil && compensation > 0 {
lastFile.Stats.RangeDeletionsBytesEstimate = compensation
}
}
}

Expand Down Expand Up @@ -160,12 +191,21 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
func(d *datadriven.TestData) string {
switch d.Cmd {
case "init":
// loadVersion expects a single datadriven argument that it
// sets as Options.LBaseMaxBytes. It parses the input as
// newline-separated levels, specifying the level's file size
// and optionally additional compensation to be added during
// compensated file size calculations. Eg:
//
// init <LBaseMaxBytes>
// <level>: <size> [compensation]
// <level>: <size> [compensation]
var errMsg string
vers, opts, sizes, errMsg = loadVersion(d)
if errMsg != "" {
return errMsg
}
return ""
return runVersionFileSizes(vers)
case "init_cp":
resetCompacting()

Expand Down Expand Up @@ -212,6 +252,7 @@ func TestCompactionPickerTargetLevel(t *testing.T) {
for _, cl := range pc.inputs {
cl.files.Each(func(f *fileMetadata) {
f.CompactionState = manifest.CompactionStateCompacting
fmt.Fprintf(&b, " %s marked as compacting\n", f)
})
}
}
Expand Down
20 changes: 20 additions & 0 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangedel"
"github.com/cockroachdb/pebble/internal/rangekey"
Expand Down Expand Up @@ -1007,6 +1008,25 @@ func runTableStatsCmd(td *datadriven.TestData, d *DB) string {
return "(not found)"
}

func runVersionFileSizes(v *version) string {
var buf bytes.Buffer
for l, levelMetadata := range v.Levels {
if levelMetadata.Empty() {
continue
}
fmt.Fprintf(&buf, "L%d:\n", l)
iter := levelMetadata.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
fmt.Fprintf(&buf, " %s: %d bytes (%s)", f, f.Size, humanize.IEC.Uint64(f.Size))
if f.IsCompacting() {
fmt.Fprintf(&buf, " (IsCompacting)")
}
fmt.Fprintln(&buf)
}
}
return buf.String()
}

func runPopulateCmd(t *testing.T, td *datadriven.TestData, b *Batch) {
var timestamps []int
var maxKeyLength int
Expand Down
Loading

0 comments on commit 1becf83

Please sign in to comment.