Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099) #11132

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cdc/processor/sourcemanager/engine/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,14 @@ func buildPebbleOption(cfg *config.DBConfig) (opts *pebble.Options) {
opts.ErrorIfExists = true
opts.DisableWAL = false // Delete range requires WAL.
opts.MaxOpenFiles = cfg.MaxOpenFiles / cfg.Count
<<<<<<< HEAD:cdc/processor/sourcemanager/engine/pebble/db.go
opts.MaxConcurrentCompactions = 6
opts.L0CompactionThreshold = cfg.CompactionL0Trigger
=======
opts.MaxConcurrentCompactions = func() int { return 6 }
opts.L0CompactionThreshold = 4 // Default for PebbleDB.
opts.L0CompactionFileThreshold = cfg.CompactionL0Trigger
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099)):cdc/processor/sourcemanager/sorter/pebble/db.go
opts.L0StopWritesThreshold = cfg.WriteL0PauseTrigger
opts.LBaseMaxBytes = 64 << 20 // 64 MB
opts.MemTableSize = cfg.WriterBufferSize
Expand Down
30 changes: 30 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func TestParseCfg(t *testing.T) {
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
<<<<<<< HEAD
Count: 8,
Concurrency: 128,
MaxOpenFiles: 10000,
Expand All @@ -192,6 +193,15 @@ func TestParseCfg(t *testing.T) {
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
Count: 8,
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16,
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
// We expect the default configuration here.
Messages: &config.MessagesConfig{
Expand Down Expand Up @@ -494,6 +504,7 @@ cert-allowed-cn = ["dd","ee"]
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
<<<<<<< HEAD
Count: 8,
Concurrency: 128,
MaxOpenFiles: 10000,
Expand All @@ -506,6 +517,15 @@ cert-allowed-cn = ["dd","ee"]
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
Count: 8,
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16,
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
// We expect the default configuration here.
Messages: &config.MessagesConfig{
Expand Down Expand Up @@ -568,6 +588,7 @@ unknown3 = 3
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
<<<<<<< HEAD
Count: 8,
Concurrency: 128,
MaxOpenFiles: 10000,
Expand All @@ -580,6 +601,15 @@ unknown3 = 3
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
Count: 8,
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16,
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
// We expect the default configuration here.
Messages: &config.MessagesConfig{
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,15 @@ const (
"writer-buffer-size": 8388608,
"compression": "snappy",
"write-l0-pause-trigger": 2147483647,
<<<<<<< HEAD
"compaction-l0-trigger": 160,
"compaction-deletion-threshold": 10485760,
"compaction-period": 1800,
"iterator-max-alive-duration": 10000,
"iterator-slow-read-duration": 256
=======
"compaction-l0-trigger": 16
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
"enable-new-scheduler": true,
"messages": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type DBConfig struct {
// CompactionL0Trigger defines number of db sst file at level-0 that will
// trigger compaction.
//
// The default value is 160.
// The default value is 16, which is based on a performance test on 4K tables.
CompactionL0Trigger int `toml:"compaction-l0-trigger" json:"compaction-l0-trigger"`
// CompactionDeletionThreshold defines the threshold of the number of deletion that
// trigger compaction.
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
Count: 8,
// Following configs are optimized for write/read throughput.
// Users should not change them.
<<<<<<< HEAD

Check failure on line 144 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expecting expression

Check failure on line 144 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expecting expression
Concurrency: 128,
MaxOpenFiles: 10000,
BlockSize: 65536,
Expand All @@ -152,6 +153,14 @@
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16, // Based on a performance test on 4K tables.
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid digit '8' in octal literal

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ) in composite literal; possibly missing comma or }

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid digit '8' in octal literal

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ) in composite literal; possibly missing comma or }
},
Messages: defaultMessageConfig.Clone(),

Expand Down
87 changes: 87 additions & 0 deletions pkg/spanz/hash_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package spanz

import (
"encoding/binary"

"blainsmith.com/go/seahash"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
)

// HashMap is a specialized hash map that map a Span to a value.
type HashMap[T any] struct {
hashMap map[hashableSpan]T
}

// NewHashMap returns a new HashMap.
func NewHashMap[T any]() *HashMap[T] {
return &HashMap[T]{
hashMap: make(map[hashableSpan]T),
}
}

// Len returns the number of items currently in the map.
func (m *HashMap[T]) Len() int {
return len(m.hashMap)
}

// Has returns true if the given key is in the map.
func (m *HashMap[T]) Has(span tablepb.Span) bool {
_, ok := m.hashMap[toHashableSpan(span)]
return ok
}

// Get looks for the key item in the map, returning it.
// It returns (zeroValue, false) if unable to find that item.
func (m *HashMap[T]) Get(span tablepb.Span) (T, bool) {
item, ok := m.hashMap[toHashableSpan(span)]
return item, ok
}

// GetV looks for the key item in the map, returning it.
// It returns zeroValue if unable to find that item.
func (m *HashMap[T]) GetV(span tablepb.Span) T {
item := m.hashMap[toHashableSpan(span)]
return item
}

// Delete removes an item whose key equals to the span.
func (m *HashMap[T]) Delete(span tablepb.Span) {
delete(m.hashMap, toHashableSpan(span))
}

// ReplaceOrInsert adds the given item to the map.
func (m *HashMap[T]) ReplaceOrInsert(span tablepb.Span, value T) {
m.hashMap[toHashableSpan(span)] = value
}

// Range calls the iterator for every value in the map until iterator returns
// false.
func (m *HashMap[T]) Range(iterator ItemIterator[T]) {
for k, v := range m.hashMap {
ok := iterator(k.toSpan(), v)
if !ok {
break
}
}
}

// HashTableSpan hashes the given span to a slot offset.
func HashTableSpan(span tablepb.Span, slots int) int {
b := make([]byte, 8+len(span.StartKey))
binary.LittleEndian.PutUint64(b[0:8], uint64(span.TableID))
copy(b[8:], span.StartKey)
return int(seahash.Sum64(b) % uint64(slots))
}
Loading