Skip to content

Commit

Permalink
This is an automated cherry-pick of #11099
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hicqu authored and ti-chi-bot committed May 17, 2024
1 parent c56e6ba commit ea4a2d9
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 1 deletion.
6 changes: 6 additions & 0 deletions cdc/processor/sourcemanager/engine/pebble/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,14 @@ func buildPebbleOption(cfg *config.DBConfig) (opts *pebble.Options) {
opts.ErrorIfExists = true
opts.DisableWAL = false // Delete range requires WAL.
opts.MaxOpenFiles = cfg.MaxOpenFiles / cfg.Count
<<<<<<< HEAD:cdc/processor/sourcemanager/engine/pebble/db.go
opts.MaxConcurrentCompactions = 6
opts.L0CompactionThreshold = cfg.CompactionL0Trigger
=======
opts.MaxConcurrentCompactions = func() int { return 6 }
opts.L0CompactionThreshold = 4 // Default for PebbleDB.
opts.L0CompactionFileThreshold = cfg.CompactionL0Trigger
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099)):cdc/processor/sourcemanager/sorter/pebble/db.go
opts.L0StopWritesThreshold = cfg.WriteL0PauseTrigger
opts.LBaseMaxBytes = 64 << 20 // 64 MB
opts.MemTableSize = cfg.WriterBufferSize
Expand Down
30 changes: 30 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func TestParseCfg(t *testing.T) {
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
<<<<<<< HEAD
Count: 8,
Concurrency: 128,
MaxOpenFiles: 10000,
Expand All @@ -192,6 +193,15 @@ func TestParseCfg(t *testing.T) {
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
Count: 8,
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16,
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
// We expect the default configuration here.
Messages: &config.MessagesConfig{
Expand Down Expand Up @@ -494,6 +504,7 @@ cert-allowed-cn = ["dd","ee"]
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
<<<<<<< HEAD
Count: 8,
Concurrency: 128,
MaxOpenFiles: 10000,
Expand All @@ -506,6 +517,15 @@ cert-allowed-cn = ["dd","ee"]
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
Count: 8,
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16,
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
// We expect the default configuration here.
Messages: &config.MessagesConfig{
Expand Down Expand Up @@ -568,6 +588,7 @@ unknown3 = 3
EnableNewScheduler: true,
EnablePullBasedSink: true,
DB: &config.DBConfig{
<<<<<<< HEAD
Count: 8,
Concurrency: 128,
MaxOpenFiles: 10000,
Expand All @@ -580,6 +601,15 @@ unknown3 = 3
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
Count: 8,
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16,
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
// We expect the default configuration here.
Messages: &config.MessagesConfig{
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,15 @@ const (
"writer-buffer-size": 8388608,
"compression": "snappy",
"write-l0-pause-trigger": 2147483647,
<<<<<<< HEAD
"compaction-l0-trigger": 160,
"compaction-deletion-threshold": 10485760,
"compaction-period": 1800,
"iterator-max-alive-duration": 10000,
"iterator-slow-read-duration": 256
=======
"compaction-l0-trigger": 16
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))
},
"enable-new-scheduler": true,
"messages": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type DBConfig struct {
// CompactionL0Trigger defines number of db sst file at level-0 that will
// trigger compaction.
//
// The default value is 160.
// The default value is 16, which is based on a performance test on 4K tables.
CompactionL0Trigger int `toml:"compaction-l0-trigger" json:"compaction-l0-trigger"`
// CompactionDeletionThreshold defines the threshold of the number of deletion that
// trigger compaction.
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ var defaultServerConfig = &ServerConfig{
Count: 8,
// Following configs are optimized for write/read throughput.
// Users should not change them.
<<<<<<< HEAD

Check failure on line 144 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected <<, expecting expression

Check failure on line 144 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected <<, expecting expression
Concurrency: 128,
MaxOpenFiles: 10000,
BlockSize: 65536,
Expand All @@ -152,6 +153,14 @@ var defaultServerConfig = &ServerConfig{
CompactionPeriod: 1800,
IteratorMaxAliveDuration: 10000,
IteratorSlowReadDuration: 256,
=======
MaxOpenFiles: 10000,
BlockSize: 65536,
WriterBufferSize: 8388608,
Compression: "snappy",
WriteL0PauseTrigger: math.MaxInt32,
CompactionL0Trigger: 16, // Based on a performance test on 4K tables.
>>>>>>> 08aec53320 (cdc: adjust sorter options to avoid Seek CPU usage exploding (#11099))

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid digit '8' in octal literal

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: unexpected ) in composite literal; possibly missing comma or }

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid digit '8' in octal literal

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 163 in pkg/config/server_config.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: unexpected ) in composite literal; possibly missing comma or }
},
Messages: defaultMessageConfig.Clone(),

Expand Down
87 changes: 87 additions & 0 deletions pkg/spanz/hash_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package spanz

import (
"encoding/binary"

"blainsmith.com/go/seahash"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
)

// HashMap is a specialized hash map that map a Span to a value.
type HashMap[T any] struct {
hashMap map[hashableSpan]T
}

// NewHashMap returns a new HashMap.
func NewHashMap[T any]() *HashMap[T] {
return &HashMap[T]{
hashMap: make(map[hashableSpan]T),
}
}

// Len returns the number of items currently in the map.
func (m *HashMap[T]) Len() int {
return len(m.hashMap)
}

// Has returns true if the given key is in the map.
func (m *HashMap[T]) Has(span tablepb.Span) bool {
_, ok := m.hashMap[toHashableSpan(span)]
return ok
}

// Get looks for the key item in the map, returning it.
// It returns (zeroValue, false) if unable to find that item.
func (m *HashMap[T]) Get(span tablepb.Span) (T, bool) {
item, ok := m.hashMap[toHashableSpan(span)]
return item, ok
}

// GetV looks for the key item in the map, returning it.
// It returns zeroValue if unable to find that item.
func (m *HashMap[T]) GetV(span tablepb.Span) T {
item := m.hashMap[toHashableSpan(span)]
return item
}

// Delete removes an item whose key equals to the span.
func (m *HashMap[T]) Delete(span tablepb.Span) {
delete(m.hashMap, toHashableSpan(span))
}

// ReplaceOrInsert adds the given item to the map.
func (m *HashMap[T]) ReplaceOrInsert(span tablepb.Span, value T) {
m.hashMap[toHashableSpan(span)] = value
}

// Range calls the iterator for every value in the map until iterator returns
// false.
func (m *HashMap[T]) Range(iterator ItemIterator[T]) {
for k, v := range m.hashMap {
ok := iterator(k.toSpan(), v)
if !ok {
break
}
}
}

// HashTableSpan hashes the given span to a slot offset.
func HashTableSpan(span tablepb.Span, slots int) int {
b := make([]byte, 8+len(span.StartKey))
binary.LittleEndian.PutUint64(b[0:8], uint64(span.TableID))
copy(b[8:], span.StartKey)
return int(seahash.Sum64(b) % uint64(slots))
}

0 comments on commit ea4a2d9

Please sign in to comment.