Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: refactor to reuse in load data, part1 #42469

Merged
merged 7 commits into from Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Expand Up @@ -332,6 +332,10 @@ br_compatibility_test:
mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

mock_lightning:
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,Rows,Row > br/pkg/mock/encode.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
# cgo enabled on regular builds is performance.
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/BUILD.bazel
Expand Up @@ -6,7 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
Expand All @@ -30,7 +30,7 @@ go_test(
flaky = True,
deps = [
":backend",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/backend/encode",
"//br/pkg/mock",
"//parser/mysql",
"@com_github_go_sql_driver_mysql//:mysql",
Expand Down
30 changes: 11 additions & 19 deletions br/pkg/lightning/backend/backend.go
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
Expand Down Expand Up @@ -145,19 +145,11 @@ type TargetInfoGetter interface {
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}

// EncodingBuilder consists of operations to handle encoding backend row data formats from source.
type EncodingBuilder interface {
// NewEncoder creates an encoder of a TiDB table.
NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)
// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() kv.Rows
}

// AbstractBackend is the abstract interface behind Backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
EncodingBuilder
encode.EncodingBuilder
TargetInfoGetter
// Close the connection to the backend.
Close()
Expand Down Expand Up @@ -206,11 +198,11 @@ type AbstractBackend interface {

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
// may be repeated with other keys in local data source.
CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)
CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)

// CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with
// the data import by other lightning.
CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (hasDupe bool, err error)
CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)

// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
// according to the required algorithm.
Expand Down Expand Up @@ -269,12 +261,12 @@ func (be Backend) Close() {
be.abstract.Close()
}

func (be Backend) MakeEmptyRows() kv.Rows {
func (be Backend) MakeEmptyRows() encode.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(ctx, tbl, options)
func (be Backend) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) {
return be.abstract.NewEncoder(ctx, config)
}

func (be Backend) ShouldPostProcess() bool {
Expand Down Expand Up @@ -388,11 +380,11 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
}, nil
}

func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectLocalDuplicateRows(ctx, tbl, tableName, opts)
}

func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectRemoteDuplicateRows(ctx, tbl, tableName, opts)
}

Expand Down Expand Up @@ -429,7 +421,7 @@ func (engine *OpenedEngine) TotalMemoryConsume() int64 {
}

// WriteRows writes a collection of encoded rows into the engine.
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows encode.Rows) error {
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

Expand Down Expand Up @@ -519,7 +511,7 @@ type EngineWriter interface {
ctx context.Context,
tableName string,
columnNames []string,
rows kv.Rows,
rows encode.Rows,
) error
IsSynced() bool
Close(ctx context.Context) (ChunkFlushStatus, error)
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/lightning/backend/backend_test.go
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -325,10 +325,12 @@ func TestNewEncoder(t *testing.T) {
defer s.tearDownTest()

encoder := mock.NewMockEncoder(s.controller)
options := &kv.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890}
s.mockBackend.EXPECT().NewEncoder(nil, nil, options).Return(encoder, nil)
options := &encode.EncodingConfig{
SessionOptions: encode.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890},
}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil)

realEncoder, err := s.mockBackend.NewEncoder(nil, nil, options)
realEncoder, err := s.mockBackend.NewEncoder(nil, options)
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/lightning/backend/encode/BUILD.bazel
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "encode",
srcs = ["encode.go"],
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/encode",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
"//parser/mysql",
"//table",
"//types",
],
)
89 changes: 89 additions & 0 deletions br/pkg/lightning/backend/encode/encode.go
@@ -0,0 +1,89 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package encode

import (
"context"

"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
)

type EncodingConfig struct {
SessionOptions
Path string // path of data file
Table table.Table
Logger log.Logger
}

// EncodingBuilder consists of operations to handle encoding backend row data formats from source.
type EncodingBuilder interface {
// NewEncoder creates an encoder of a TiDB table.
NewEncoder(ctx context.Context, config *EncodingConfig) (Encoder, error)
// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() Rows
}

// Encoder encodes a row of SQL values into some opaque type which can be
// consumed by OpenEngine.WriteEncoded.
type Encoder interface {
// Close the encoder.
Close()

// Encode encodes a row of SQL values into a backend-friendly format.
Encode(row []types.Datum, rowID int64, columnPermutation []int, offset int64) (Row, error)
}

// SessionOptions is the initial configuration of the session.
type SessionOptions struct {
SQLMode mysql.SQLMode
Timestamp int64
SysVars map[string]string
// a seed used for tableKvEncoder's auto random bits value
AutoRandomSeed int64
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
IndexID int64
}

// Rows represents a collection of encoded rows.
type Rows interface {
// SplitIntoChunks splits the rows into multiple consecutive parts, each
// part having total byte size less than `splitSize`. The meaning of "byte
// size" should be consistent with the value used in `Row.ClassifyAndAppend`.
SplitIntoChunks(splitSize int) []Rows

// Clear returns a new collection with empty content. It may share the
// capacity with the current instance. The typical usage is `x = x.Clear()`.
Clear() Rows
}

// Row represents a single encoded row.
type Row interface {
// ClassifyAndAppend separates the data-like and index-like parts of the
// encoded row, and appends these parts into the existing buffers and
// checksums.
ClassifyAndAppend(
data *Rows,
dataChecksum *verification.KVChecksum,
indices *Rows,
indexChecksum *verification.KVChecksum,
)

// Size represents the total kv size of this Row.
Size() uint64
}
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/kv/BUILD.bazel
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/kv",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/manual",
Expand Down Expand Up @@ -54,6 +55,7 @@ go_test(
flaky = True,
race = "on",
deps = [
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/kv2sql.go
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"fmt"

"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -111,7 +112,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
func NewTableKVDecoder(
tbl table.Table,
tableName string,
options *SessionOptions,
options *encode.SessionOptions,
logger log.Logger,
) (*TableKVDecoder, error) {
se := newSession(options, logger)
Expand Down
17 changes: 3 additions & 14 deletions br/pkg/lightning/backend/kv/session.go
Expand Up @@ -24,13 +24,13 @@ import (
"sync"

"github.com/docker/go-units"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/manual"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/mathutil"
Expand Down Expand Up @@ -256,23 +256,12 @@ type session struct {
values map[fmt.Stringer]interface{}
}

// SessionOptions is the initial configuration of the session.
type SessionOptions struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to avoid import cycle

SQLMode mysql.SQLMode
Timestamp int64
SysVars map[string]string
// a seed used for tableKvEncoder's auto random bits value
AutoRandomSeed int64
// IndexID is used by the DuplicateManager. Only the key range with the specified index ID is scanned.
IndexID int64
}

// NewSession creates a new trimmed down Session matching the options.
func NewSession(options *SessionOptions, logger log.Logger) sessionctx.Context {
func NewSession(options *encode.SessionOptions, logger log.Logger) sessionctx.Context {
return newSession(options, logger)
}

func newSession(options *SessionOptions, logger log.Logger) *session {
func newSession(options *encode.SessionOptions, logger log.Logger) *session {
s := &session{
values: make(map[fmt.Stringer]interface{}, 1),
}
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/session_test.go
Expand Up @@ -17,14 +17,15 @@ package kv_test
import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestSession(t *testing.T) {
session := kv.NewSession(&kv.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
session := kv.NewSession(&encode.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
_, err := session.Txn(true)
require.NoError(t, err)
}