Skip to content

Commit

Permalink
*: fix grpc client leak bug for AUTO_ID_CACHE=1 tables (pingcap#48870) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Dec 7, 2023
1 parent 159547b commit f249304
Show file tree
Hide file tree
Showing 36 changed files with 389 additions and 149 deletions.
3 changes: 1 addition & 2 deletions br/pkg/lightning/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//br/pkg/lightning/log",
"//br/pkg/utils",
"//errno",
"//kv",
"//meta/autoid",
"//parser/model",
"//sessionctx/variable",
Expand Down Expand Up @@ -102,7 +101,7 @@ go_test(
],
embed = [":common"],
flaky = True,
shard_count = 20,
shard_count = 21,
deps = [
"//br/pkg/errors",
"//br/pkg/lightning/log",
Expand Down
43 changes: 33 additions & 10 deletions br/pkg/lightning/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
)
Expand Down Expand Up @@ -50,9 +49,9 @@ var DefaultImportVariablesTiDB = map[string]string{
}

// AllocGlobalAutoID allocs N consecutive autoIDs from TiDB.
func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64,
func AllocGlobalAutoID(ctx context.Context, n int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
Expand All @@ -70,9 +69,9 @@ func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int6
}

// RebaseGlobalAutoID rebase the autoID base to newBase.
func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64,
func RebaseGlobalAutoID(ctx context.Context, newBase int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) error {
allocators, err := GetGlobalAutoIDAlloc(store, dbID, tblInfo)
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
Expand All @@ -85,10 +84,34 @@ func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, db
return nil
}

// RebaseTableAllocators rebase the allocators of a table.
// This function only rebase a table allocator when its new base is given in
// `bases` param, else it will be skipped.
// base is the max id that have been used by the table, the next usable id will
// be base + 1, see Allocator.Alloc.
func RebaseTableAllocators(ctx context.Context, bases map[autoid.AllocatorType]int64, r autoid.Requirement, dbID int64,
tblInfo *model.TableInfo) error {
allocators, err := GetGlobalAutoIDAlloc(r, dbID, tblInfo)
if err != nil {
return err
}
for _, alloc := range allocators {
base, ok := bases[alloc.GetType()]
if !ok {
continue
}
err = alloc.Rebase(ctx, base, false)
if err != nil {
return err
}
}
return nil
}

// GetGlobalAutoIDAlloc returns the autoID allocators for a table.
// export it for testing.
func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if store == nil {
func GetGlobalAutoIDAlloc(r autoid.Requirement, dbID int64, tblInfo *model.TableInfo) ([]autoid.Allocator, error) {
if r == nil || r.Store() == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
Expand Down Expand Up @@ -120,15 +143,15 @@ func GetGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo
case hasRowID || hasAutoIncID:
allocators := make([]autoid.Allocator, 0, 2)
if tblInfo.SepAutoInc() && hasAutoIncID {
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.AutoIncrementType, noCache, tblVer))
}
// this allocator is NOT used when SepAutoInc=true and auto increment column is clustered.
allocators = append(allocators, autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
allocators = append(allocators, autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(),
autoid.RowIDAllocType, noCache, tblVer))
return allocators, nil
case hasAutoRandID:
return []autoid.Allocator{autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
return []autoid.Allocator{autoid.NewAllocator(r, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(),
autoid.AutoRandomType, noCache, tblVer)}, nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
Expand Down
73 changes: 70 additions & 3 deletions br/pkg/lightning/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ func TestAllocGlobalAutoID(t *testing.T) {
ctx := context.Background()
for _, c := range cases {
ti := newTableInfo(t, 1, c.tableID, c.createTableSQL, kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(kvStore, 1, ti)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
if c.expectErrStr == "" {
require.NoError(t, err, c.tableID)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, kvStore, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, kvStore, 1, ti)
require.NoError(t, common.RebaseGlobalAutoID(ctx, 123, mockRequirement{kvStore}, 1, ti))
base, idMax, err := common.AllocGlobalAutoID(ctx, 100, mockRequirement{kvStore}, 1, ti)
require.NoError(t, err, c.tableID)
require.Equal(t, int64(123), base, c.tableID)
require.Equal(t, int64(223), idMax, c.tableID)
Expand All @@ -159,3 +159,70 @@ func TestAllocGlobalAutoID(t *testing.T) {
require.Equal(t, c.expectAllocatorTypes, allocatorTypes, c.tableID)
}
}

type mockRequirement struct {
kv.Storage
}

func (r mockRequirement) Store() kv.Storage {
return r.Storage
}

func (r mockRequirement) AutoIDClient() *autoid.ClientDiscover {
return nil
}

func TestRebaseTableAllocators(t *testing.T) {
storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, kvStore.Close())
})
ti := newTableInfo(t, 1, 42,
"create table t42 (a int primary key nonclustered auto_increment) AUTO_ID_CACHE 1", kvStore)
allocators, err := common.GetGlobalAutoIDAlloc(mockRequirement{kvStore}, 1, ti)
require.NoError(t, err)
require.Len(t, allocators, 2)
for _, alloc := range allocators {
id, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(1), id)
}
ctx := context.Background()
allocatorTypes := make([]autoid.AllocatorType, 0, len(allocators))
// rebase to 123
for _, alloc := range allocators {
require.NoError(t, alloc.Rebase(ctx, 123, false))
allocatorTypes = append(allocatorTypes, alloc.GetType())
}
require.Equal(t, []autoid.AllocatorType{autoid.AutoIncrementType, autoid.RowIDAllocType}, allocatorTypes)
// this call does nothing
require.NoError(t, common.RebaseTableAllocators(ctx, nil, mockRequirement{kvStore}, 1, ti))
for _, alloc := range allocators {
nextID, err := alloc.NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), nextID)
}
// this call rebase AutoIncrementType allocator to 223
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 223,
}, mockRequirement{kvStore}, 1, ti))
next, err := allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(224), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(124), next)
// this call rebase AutoIncrementType allocator to 323, RowIDAllocType allocator to 423
require.NoError(t, common.RebaseTableAllocators(ctx, map[autoid.AllocatorType]int64{
autoid.AutoIncrementType: 323,
autoid.RowIDAllocType: 423,
}, mockRequirement{kvStore}, 1, ti))
next, err = allocators[0].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(324), next)
next, err = allocators[1].NextGlobalAutoID()
require.NoError(t, err)
require.Equal(t, int64(424), next)
}
1 change: 1 addition & 0 deletions br/pkg/lightning/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ go_library(
"//util/collate",
"//util/dbterror",
"//util/engine",
"//util/etcd",
"//util/mathutil",
"//util/mock",
"//util/regexpr-router",
Expand Down
21 changes: 20 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/br/pkg/version/build"
tidbconfig "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/keyspace"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
Expand All @@ -62,12 +63,14 @@ import (
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/mathutil"
regexprrouter "github.com/pingcap/tidb/util/regexpr-router"
"github.com/pingcap/tidb/util/set"
"github.com/prometheus/client_golang/prometheus"
tikvconfig "github.com/tikv/client-go/v2/config"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -1517,6 +1520,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
cleanup := false
postProgress := func() error { return nil }
var kvStore tidbkv.Storage
var etcdCli *clientv3.Client

if isLocalBackend(rc.cfg) {
var (
Expand Down Expand Up @@ -1570,6 +1574,16 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
etcdCli, err := clientv3.New(clientv3.Config{
Endpoints: []string{rc.cfg.TiDB.PdAddr},
AutoSyncInterval: 30 * time.Second,
TLS: rc.tls.TLSConfig(),
})
if err != nil {
return errors.Trace(err)
}
etcd.SetEtcdCliByNamespace(etcdCli, keyspace.MakeKeyspaceEtcdNamespace(kvStore.GetCodec()))

manager, err := NewChecksumManager(ctx, rc, kvStore)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1637,6 +1651,11 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
logTask.Warn("failed to close kv store", zap.Error(err))
}
}
if etcdCli != nil {
if err := etcdCli.Close(); err != nil {
logTask.Warn("failed to close etcd client", zap.Error(err))
}
}
}()

taskCh := make(chan task, rc.cfg.App.IndexConcurrency)
Expand Down Expand Up @@ -1686,7 +1705,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
if err != nil {
return errors.Trace(err)
}
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, log.FromContext(ctx))
tr, err := NewTableImporter(tableName, tableMeta, dbInfo, tableInfo, cp, igCols.ColumnsMap(), kvStore, etcdCli, log.FromContext(ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestNewTableRestore(t *testing.T) {
for _, tc := range testCases {
tableInfo := dbInfo.Tables[tc.name]
tableName := common.UniqueTable("mockdb", tableInfo.Name)
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
tr, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.NotNil(t, tr)
require.NoError(t, err)
}
Expand All @@ -89,7 +89,7 @@ func TestNewTableRestoreFailure(t *testing.T) {
}}
tableName := common.UniqueTable("mockdb", "failure")

_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, log.L())
_, err := NewTableImporter(tableName, nil, dbInfo, tableInfo, &checkpoints.TableCheckpoint{}, nil, nil, nil, log.L())
require.Regexp(t, `failed to tables\.TableFromMeta.*`, err.Error())
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if curStatus == metaStatusInitial {
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
if err := common.RebaseGlobalAutoID(ctx, maxRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
newRowIDBase, newRowIDMax, err = common.AllocGlobalAutoID(ctx, rawRowIDMax, m.tr, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *metaMgrSuite) prepareMockInner(rowsVal [][]driver.Value, nextRowID *int
WillReturnRows(rows)

if nextRowID != nil {
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}
Expand Down
22 changes: 21 additions & 1 deletion br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mathutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand All @@ -69,6 +70,8 @@ type TableImporter struct {
alloc autoid.Allocators
logger log.Logger
kvStore tidbkv.Storage
etcdCli *clientv3.Client
autoidCli *autoid.ClientDiscover

ignoreColumns map[string]struct{}
}
Expand All @@ -82,13 +85,15 @@ func NewTableImporter(
cp *checkpoints.TableCheckpoint,
ignoreColumns map[string]struct{},
kvStore tidbkv.Storage,
etcdCli *clientv3.Client,
logger log.Logger,
) (*TableImporter, error) {
idAlloc := kv.NewPanickingAllocators(cp.AllocBase)
tbl, err := tables.TableFromMeta(idAlloc, tableInfo.Core)
if err != nil {
return nil, errors.Annotatef(err, "failed to tables.TableFromMeta %s", tableName)
}
autoidCli := autoid.NewClientDiscover(etcdCli)

return &TableImporter{
tableName: tableName,
Expand All @@ -98,6 +103,8 @@ func NewTableImporter(
encTable: tbl,
alloc: idAlloc,
kvStore: kvStore,
etcdCli: etcdCli,
autoidCli: autoidCli,
logger: logger.With(zap.String("table", tableName)),
ignoreColumns: ignoreColumns,
}, nil
Expand Down Expand Up @@ -268,6 +275,19 @@ func (tr *TableImporter) populateChunks(ctx context.Context, rc *Controller, cp
return err
}

// AutoIDRequirement implements autoid.Requirement.
var _ autoid.Requirement = &TableImporter{}

// Store implements the autoid.Requirement interface.
func (tr *TableImporter) Store() tidbkv.Storage {
return tr.kvStore
}

// AutoIDClient implements the autoid.Requirement interface.
func (tr *TableImporter) AutoIDClient() *autoid.ClientDiscover {
return tr.autoidCli
}

// RebaseChunkRowIDs rebase the row id of the chunks.
func (*TableImporter) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
if rowIDBase == 0 {
Expand Down Expand Up @@ -898,7 +918,7 @@ func (tr *TableImporter) postProcess(
// And in this case, ALTER TABLE xxx AUTO_INCREMENT = xxx only works on the allocator of auto_increment column,
// not for allocator of _tidb_rowid.
// So we need to rebase IDs for those 2 allocators explicitly.
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr.kvStore, tr.dbInfo.ID, tr.tableInfo.Core)
err = common.RebaseGlobalAutoID(ctx, adjustIDBase(newBase), tr, tr.dbInfo.ID, tr.tableInfo.Core)
}
}
rc.alterTableLock.Unlock()
Expand Down

0 comments on commit f249304

Please sign in to comment.