Skip to content

Commit

Permalink
lightning: refactor to reuse in load data, part1 (#42469)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
D3Hunter committed Mar 24, 2023
1 parent cb12d33 commit d636a81
Show file tree
Hide file tree
Showing 38 changed files with 654 additions and 474 deletions.
4 changes: 4 additions & 0 deletions Makefile
Expand Up @@ -332,6 +332,10 @@ br_compatibility_test:
mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

mock_lightning:
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,Rows,Row > br/pkg/mock/encode.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
# cgo enabled on regular builds is performance.
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/BUILD.bazel
Expand Up @@ -6,7 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
Expand All @@ -31,7 +31,7 @@ go_test(
shard_count = 10,
deps = [
":backend",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/encode",
"//br/pkg/mock",
"//parser/mysql",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
30 changes: 11 additions & 19 deletions br/pkg/lightning/backend/backend.go
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
Expand Down Expand Up @@ -145,19 +145,11 @@ type TargetInfoGetter interface {
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}

// EncodingBuilder consists of operations to handle encoding backend row data formats from source.
type EncodingBuilder interface {
// NewEncoder creates an encoder of a TiDB table.
NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)
// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() kv.Rows
}

// AbstractBackend is the abstract interface behind Backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
EncodingBuilder
encode.EncodingBuilder
TargetInfoGetter
// Close the connection to the backend.
Close()
Expand Down Expand Up @@ -206,11 +198,11 @@ type AbstractBackend interface {

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
// may be repeated with other keys in local data source.
CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)
CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)

// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
// the data import by other lightning.
CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)
CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)

// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
// according to the required algorithm.
Expand Down Expand Up @@ -269,12 +261,12 @@ func (be Backend) Close() {
be.abstract.Close()
}

func (be Backend) MakeEmptyRows() kv.Rows {
func (be Backend) MakeEmptyRows() encode.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(ctx, tbl, options)
func (be Backend) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) {
return be.abstract.NewEncoder(ctx, config)
}

func (be Backend) ShouldPostProcess() bool {
Expand Down Expand Up @@ -388,11 +380,11 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
}, nil
}

func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectLocalDuplicateRows(ctx, tbl, tableName, opts)
}

func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectRemoteDuplicateRows(ctx, tbl, tableName, opts)
}

Expand Down Expand Up @@ -429,7 +421,7 @@ func (engine *OpenedEngine) TotalMemoryConsume() int64 {
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows encode.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

Expand Down Expand Up @@ -519,7 +511,7 @@ type EngineWriter interface {
ctx context.Context,
tableName string,
columnNames []string,
rows kv.Rows,
rows encode.Rows,
) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/lightning/backend/backend_test.go
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -325,10 +325,12 @@ func TestNewEncoder(t *testing.T) {
defer s.tearDownTest()

encoder := mock.NewMockEncoder(s.controller)
options := &kv.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890}
s.mockBackend.EXPECT().NewEncoder(nil, nil, options).Return(encoder, nil)
options := &encode.EncodingConfig{
SessionOptions: encode.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890},
}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil)

realEncoder, err := s.mockBackend.NewEncoder(nil, nil, options)
realEncoder, err := s.mockBackend.NewEncoder(nil, options)
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/lightning/backend/encode/BUILD.bazel
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "encode",
srcs = ["encode.go"],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/encode",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
"//parser/mysql",
"//table",
"//types",
],
)
89 changes: 89 additions & 0 deletions br/pkg/lightning/backend/encode/encode.go
@@ -0,0 +1,89 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encode

import (
"context"

"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
)

type EncodingConfig struct {
SessionOptions
Path string // path of data file
Table table.Table
Logger log.Logger
}

// EncodingBuilder consists of operations to handle encoding backend row data formats from source.
type EncodingBuilder interface {
// NewEncoder creates an encoder of a TiDB table.
NewEncoder(ctx context.Context, config *EncodingConfig) (Encoder, error)
// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() Rows
}

// Encoder encodes a row of SQL values into some opaque type which can be
// consumed by OpenEngine.WriteEncoded.
type Encoder interface {
// Close the encoder.
Close()

// Encode encodes a row of SQL values into a backend-friendly format.
Encode(row []types.Datum, rowID int64, columnPermutation []int, offset int64) (Row, error)
}

// SessionOptions is the initial configuration of the session.
type SessionOptions struct {
SQLMode mysql.SQLMode
Timestamp int64
SysVars map[string]string
// a seed used for tableKvEncoder's auto random bits value
AutoRandomSeed int64
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
IndexID int64
}

// Rows represents a collection of encoded rows.
type Rows interface {
// SplitIntoChunks splits the rows into multiple consecutive parts, each
// part having total byte size less than `splitSize`. The meaning of "byte
// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
SplitIntoChunks(splitSize int) []Rows

// Clear returns a new collection with empty content. It may share the
// capacity with the current instance. The typical usage is `x = x.Clear()`.
Clear() Rows
}

// Row represents a single encoded row.
type Row interface {
// ClassifyAndAppend separates the data-like and index-like parts of the
// encoded row, and appends these parts into the existing buffers and
// checksums.
ClassifyAndAppend(
data *Rows,
dataChecksum *verification.KVChecksum,
indices *Rows,
indexChecksum *verification.KVChecksum,
)

// Size represents the total kv size of this Row.
Size() uint64
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/kv",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/manual",
Expand Down Expand Up @@ -54,6 +55,7 @@ go_test(
flaky = True,
race = "on",
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/kv2sql.go
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"fmt"

"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -111,7 +112,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
func NewTableKVDecoder(
tbl table.Table,
tableName string,
options *SessionOptions,
options *encode.SessionOptions,
logger log.Logger,
) (*TableKVDecoder, error) {
se := newSession(options, logger)
Expand Down
17 changes: 3 additions & 14 deletions br/pkg/lightning/backend/kv/session.go
Expand Up @@ -24,13 +24,13 @@ import (
"sync"

"github.com/docker/go-units"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/manual"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/mathutil"
Expand Down Expand Up @@ -256,23 +256,12 @@ type session struct {
values map[fmt.Stringer]interface{}
}

// SessionOptions is the initial configuration of the session.
type SessionOptions struct {
SQLMode mysql.SQLMode
Timestamp int64
SysVars map[string]string
// a seed used for tableKvEncoder's auto random bits value
AutoRandomSeed int64
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
IndexID int64
}

// NewSession creates a new trimmed down Session matching the options.
func NewSession(options *SessionOptions, logger log.Logger) sessionctx.Context {
func NewSession(options *encode.SessionOptions, logger log.Logger) sessionctx.Context {
return newSession(options, logger)
}

func newSession(options *SessionOptions, logger log.Logger) *session {
func newSession(options *encode.SessionOptions, logger log.Logger) *session {
s := &session{
values: make(map[fmt.Stringer]interface{}, 1),
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/session_test.go
Expand Up @@ -17,14 +17,15 @@ package kv_test
import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestSession(t *testing.T) {
session := kv.NewSession(&kv.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
session := kv.NewSession(&encode.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
_, err := session.Txn(true)
require.NoError(t, err)
}

0 comments on commit d636a81

Please sign in to comment.