Skip to content

Commit

Permalink
importinto: cleanup sorted files, writer memory quota, and test (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored and yibin87 committed Oct 31, 2023
1 parent 18fdf74 commit 900b27b
Show file tree
Hide file tree
Showing 18 changed files with 762 additions and 40 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 34,
shard_count = 36,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/storage",
"//kv",
Expand Down
74 changes: 74 additions & 0 deletions br/pkg/lightning/backend/external/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,77 @@ func TestGetMaxOverlapping(t *testing.T) {
}
require.EqualValues(t, 3, GetMaxOverlapping(points))
}

func TestSortedKVMeta(t *testing.T) {
summary := []*WriterSummary{
{
Min: []byte("a"),
Max: []byte("b"),
TotalSize: 123,
MultipleFilesStats: []MultipleFilesStat{
{
Filenames: [][2]string{
{"f1", "stat1"},
{"f2", "stat2"},
},
},
},
},
{
Min: []byte("x"),
Max: []byte("y"),
TotalSize: 177,
MultipleFilesStats: []MultipleFilesStat{
{
Filenames: [][2]string{
{"f3", "stat3"},
{"f4", "stat4"},
},
},
},
},
}
meta0 := NewSortedKVMeta(summary[0])
require.Equal(t, []byte("a"), meta0.MinKey)
require.Equal(t, []byte("b"), meta0.MaxKey)
require.Equal(t, uint64(123), meta0.TotalKVSize)
require.Equal(t, []string{"f1", "f2"}, meta0.DataFiles)
require.Equal(t, []string{"stat1", "stat2"}, meta0.StatFiles)
meta1 := NewSortedKVMeta(summary[1])
require.Equal(t, []byte("x"), meta1.MinKey)
require.Equal(t, []byte("y"), meta1.MaxKey)
require.Equal(t, uint64(177), meta1.TotalKVSize)
require.Equal(t, []string{"f3", "f4"}, meta1.DataFiles)
require.Equal(t, []string{"stat3", "stat4"}, meta1.StatFiles)

meta0.MergeSummary(summary[1])
require.Equal(t, []byte("a"), meta0.MinKey)
require.Equal(t, []byte("y"), meta0.MaxKey)
require.Equal(t, uint64(300), meta0.TotalKVSize)
require.Equal(t, []string{"f1", "f2", "f3", "f4"}, meta0.DataFiles)
require.Equal(t, []string{"stat1", "stat2", "stat3", "stat4"}, meta0.StatFiles)

meta00 := NewSortedKVMeta(summary[0])
meta00.Merge(meta1)
require.Equal(t, meta0, meta00)
}

func TestKeyMinMax(t *testing.T) {
require.Equal(t, []byte(nil), NotNilMin(nil, nil))
require.Equal(t, []byte{}, NotNilMin(nil, []byte{}))
require.Equal(t, []byte(nil), NotNilMin([]byte{}, nil))
require.Equal(t, []byte("a"), NotNilMin([]byte("a"), nil))
require.Equal(t, []byte("a"), NotNilMin([]byte("a"), []byte{}))
require.Equal(t, []byte("a"), NotNilMin(nil, []byte("a")))
require.Equal(t, []byte("a"), NotNilMin([]byte("a"), []byte("b")))
require.Equal(t, []byte("a"), NotNilMin([]byte("b"), []byte("a")))

require.Equal(t, []byte(nil), NotNilMax(nil, nil))
require.Equal(t, []byte{}, NotNilMax(nil, []byte{}))
require.Equal(t, []byte(nil), NotNilMax([]byte{}, nil))
require.Equal(t, []byte("a"), NotNilMax([]byte("a"), nil))
require.Equal(t, []byte("a"), NotNilMax([]byte("a"), []byte{}))
require.Equal(t, []byte("a"), NotNilMax(nil, []byte("a")))
require.Equal(t, []byte("b"), NotNilMax([]byte("a"), []byte("b")))
require.Equal(t, []byte("b"), NotNilMax([]byte("b"), []byte("a")))
}
14 changes: 11 additions & 3 deletions br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ import (

var multiFileStatNum = 500

const (
// DefaultMemSizeLimit is the default memory size limit for writer.
DefaultMemSizeLimit = 256 * size.MB
)

// rangePropertiesCollector collects range properties for each range. The zero
// value of rangePropertiesCollector is not ready to use, should call reset()
// first.
Expand All @@ -61,8 +66,11 @@ func (rc *rangePropertiesCollector) encode() []byte {

// WriterSummary is the summary of a writer.
type WriterSummary struct {
WriterID string
Seq int
WriterID string
Seq int
// Min and Max are the min and max key written by this writer, both are
// inclusive, i.e. [Min, Max].
// will be empty if no key is written.
Min tidbkv.Key
Max tidbkv.Key
TotalSize uint64
Expand Down Expand Up @@ -90,7 +98,7 @@ type WriterBuilder struct {
// NewWriterBuilder creates a WriterBuilder.
func NewWriterBuilder() *WriterBuilder {
return &WriterBuilder{
memSizeLimit: 256 * size.MB,
memSizeLimit: DefaultMemSizeLimit,
writeBatchCount: 8 * 1024,
propSizeDist: 1 * size.MB,
propKeysDist: 8 * 1024,
Expand Down
12 changes: 6 additions & 6 deletions br/pkg/lightning/backend/external/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/cockroachdb/pebble"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/storage"
dbkv "github.com/pingcap/tidb/kv"
Expand All @@ -41,11 +42,13 @@ func TestWriter(t *testing.T) {
ctx := context.Background()
memStore := storage.NewMemStorage()

writer := NewWriterBuilder().
w := NewWriterBuilder().
SetPropSizeDistance(100).
SetPropKeysDistance(2).
Build(memStore, "/test", "0")

writer := NewEngineWriter(w)

kvCnt := rand.Intn(10) + 10
kvs := make([]common.KvPair, kvCnt)
for i := 0; i < kvCnt; i++ {
Expand All @@ -58,12 +61,9 @@ func TestWriter(t *testing.T) {
_, err = rand.Read(kvs[i].Val)
require.NoError(t, err)
}
for _, pair := range kvs {
err := writer.WriteRow(ctx, pair.Key, pair.Val, nil)
require.NoError(t, err)
}

err := writer.Close(ctx)
require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs)))
_, err := writer.Close(ctx)
require.NoError(t, err)

slices.SortFunc(kvs, func(i, j common.KvPair) int {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//util/compress",
"//util/engine",
"//util/hack",
"//util/intest",
"//util/mathutil",
"//util/ranger",
"@com_github_cockroachdb_pebble//:pebble",
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/engine"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/mathutil"
"github.com/tikv/client-go/v2/oracle"
tikvclient "github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -939,7 +940,11 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig
if err != nil {
return err
}
store, err := storage.New(ctx, storeBackend, nil)
opt := &storage.ExternalStorageOptions{}
if intest.InTest {
opt.NoCredentials = true
}
store, err := storage.New(ctx, storeBackend, opt)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,3 +2255,13 @@ func TestExternalEngine(t *testing.T) {
}
require.Equal(t, 100, kvIdx)
}

func TestGetExternalEngineKVStatistics(t *testing.T) {
b := Backend{
externalEngine: map[uuid.UUID]common.Engine{},
}
// non existent uuid
size, count := b.GetExternalEngineKVStatistics(uuid.New())
require.Zero(t, size)
require.Zero(t, count)
}
11 changes: 10 additions & 1 deletion disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//meta/autoid",
"//metrics",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//resourcemanager/pool/workerpool",
"//resourcemanager/util",
Expand All @@ -59,7 +60,9 @@ go_library(
"//util/logutil",
"//util/mathutil",
"//util/promutil",
"//util/size",
"//util/sqlexec",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
Expand All @@ -78,6 +81,7 @@ go_test(
"dispatcher_test.go",
"dispatcher_testkit_test.go",
"encode_and_sort_operator_test.go",
"job_testkit_test.go",
"metrics_test.go",
"planner_test.go",
"subtask_executor_test.go",
Expand All @@ -86,12 +90,14 @@ go_test(
embed = [":importinto"],
flaky = True,
race = "on",
shard_count = 8,
shard_count = 11,
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/external",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//ddl",
"//disttask/framework/dispatcher",
"//disttask/framework/planner",
"//disttask/framework/proto",
Expand All @@ -101,9 +107,12 @@ go_test(
"//domain/infosync",
"//executor/importer",
"//meta/autoid",
"//parser",
"//parser/ast",
"//parser/model",
"//testkit",
"//util/logutil",
"//util/mock",
"//util/sqlexec",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
Loading

0 comments on commit 900b27b

Please sign in to comment.