Skip to content

Commit

Permalink
Merge branch 'mx/recordMoreInformationBeforeOOM' of github.com:mengxi…
Browse files Browse the repository at this point in the history
…n9014/tidb into mx/recordMoreInformationBeforeOOM
  • Loading branch information
mengxin9014 committed Oct 11, 2022
2 parents 3d34222 + fdc6d37 commit 2618c09
Show file tree
Hide file tree
Showing 56 changed files with 1,289 additions and 376 deletions.
9 changes: 0 additions & 9 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,15 +274,6 @@ func (rc *Client) GetSupportPolicy() bool {
return rc.supportPolicy
}

// GetTruncateSafepoint read the truncate checkpoint from the storage bind to the client.
func (rc *Client) GetTruncateSafepoint(ctx context.Context) (uint64, error) {
ts, err := GetTSFromFile(ctx, rc.storage, TruncateSafePointFileName)
if err != nil {
log.Warn("failed to get truncate safepoint, using 0", logutil.ShortError(err))
}
return ts, err
}

func (rc *Client) GetDomain() *domain.Domain {
return rc.dom
}
Expand Down
35 changes: 3 additions & 32 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,40 +171,11 @@ func (ms *StreamMetadataSet) DoWriteBack(ctx context.Context, s storage.External
}

func truncateAndWrite(ctx context.Context, s storage.ExternalStorage, path string, data []byte) error {
switch s.(type) {
// Performance hack: the `Write` implemention for S3 and local would truncate the file if it exists.
case *storage.S3Storage, *storage.LocalStorage:
if err := s.WriteFile(ctx, path, data); err != nil {
return errors.Annotatef(err, "failed to save the file %s to %s", path, s.URI())
}
default:
if err := swapAndOverrideFile(ctx, s, path, data); err != nil {
return errors.Annotatef(err, "failed during rewriting the file at %s in %s", path, s.URI())
}
}
return nil
}

// swapAndOverrideFile is a slow but safe way for overriding a file in the external storage.
// Because there isn't formal definition of `WriteFile` over a existing file, this should be safe in generic external storage.
// It moves the origin file to a swap file and did the file write, finally remove the swap file.
func swapAndOverrideFile(ctx context.Context, s storage.ExternalStorage, path string, data []byte) error {
ok, err := s.FileExists(ctx, path)
if err != nil {
return err
}
if !ok {
return errors.Annotate(berrors.ErrInvalidArgument, "the origin file doesn't exist")
}

backup := path + ".override_swap"
if err := s.Rename(ctx, path, backup); err != nil {
return err
}
// Performance hack: the `Write` implemention would truncate the file if it exists.
if err := s.WriteFile(ctx, path, data); err != nil {
return err
return errors.Annotatef(err, "failed to save the file %s to %s", path, s.URI())
}
return s.DeleteFile(ctx, backup)
return nil
}

const (
Expand Down
41 changes: 41 additions & 0 deletions br/pkg/restore/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"testing"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -246,6 +247,46 @@ func TestTruncateSafepoint(t *testing.T) {
}
}

func TestTruncateSafepointForGCS(t *testing.T) {
ctx := context.Background()
opts := fakestorage.Options{
NoListener: true,
}
server, err := fakestorage.NewServerWithOptions(opts)
require.NoError(t, err)
bucketName := "testbucket"
server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName})

gcs := &backuppb.GCS{
Bucket: bucketName,
Prefix: "a/b/",
StorageClass: "NEARLINE",
PredefinedAcl: "private",
CredentialsBlob: "Fake Credentials",
}

l, err := storage.NewGCSStorageForTest(ctx, gcs, &storage.ExternalStorageOptions{
SendCredentials: false,
CheckPermissions: []storage.Permission{storage.AccessBuckets},
HTTPClient: server.HTTPClient(),
})
require.NoError(t, err)
require.NoError(t, err)

ts, err := restore.GetTSFromFile(ctx, l, restore.TruncateSafePointFileName)
require.NoError(t, err)
require.Equal(t, int(ts), 0)

for i := 0; i < 100; i++ {
n := rand.Uint64()
require.NoError(t, restore.SetTSToFile(ctx, l, n, restore.TruncateSafePointFileName))

ts, err = restore.GetTSFromFile(ctx, l, restore.TruncateSafePointFileName)
require.NoError(t, err)
require.Equal(t, ts, n, "failed at %d round: truncate safepoint mismatch", i)
}
}

func fakeMetaDatas(t *testing.T, helper *stream.MetadataHelper, cf string) []*backuppb.Metadata {
ms := []*backuppb.Metadata{
{
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ func newGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage
return &gcsStorage{gcs: gcs, bucket: bucket}, nil
}

// only for unit test
func NewGCSStorageForTest(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorageOptions) (*gcsStorage, error) {
return newGCSStorage(ctx, gcs, opts)
}

func hasSSTFiles(ctx context.Context, bucket *storage.BucketHandle, prefix string) bool {
query := storage.Query{Prefix: prefix}
_ = query.SetAttrSelection([]string{"Name"})
Expand Down
3 changes: 3 additions & 0 deletions build/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
"util/tracing/": "util/tracing/ code",
"util/trxevents/": "util/trxevents/ code",
"util/watcher/": "util/watcher/ code",
"util/gctuner": "util/gctuner",
"store/mockstore/unistore/util": "store/mockstore/unistore/util code",
"ddl/util/": "ddl/util code"
}
Expand All @@ -197,6 +198,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"util/gctuner": "util/gctuner",
"br/pkg/lightning/mydump/": "br/pkg/lightning/mydump/",
"br/pkg/lightning/restore/opts": "br/pkg/lightning/restore/opts",
"executor/aggregate.go": "executor/aggregate.go",
Expand Down Expand Up @@ -743,6 +745,7 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"util/gctuner": "util/gctuner",
"br/pkg/lightning/mydump/": "br/pkg/lightning/mydump/",
"br/pkg/lightning/restore/opts": "br/pkg/lightning/restore/opts",
"executor/aggregate.go": "executor/aggregate.go",
Expand Down
2 changes: 1 addition & 1 deletion ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ go_library(

go_test(
name = "ddl_test",
timeout = "long",
timeout = "moderate",
srcs = [
"attributes_sql_test.go",
"callback_test.go",
Expand Down
7 changes: 4 additions & 3 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl

import (
"context"
"sync/atomic"
"testing"

"github.com/pingcap/tidb/infoschema"
Expand Down Expand Up @@ -50,7 +51,7 @@ type TestDDLCallback struct {
onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
Expand Down Expand Up @@ -98,8 +99,8 @@ func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if tc.OnJobUpdatedExported != nil {
tc.OnJobUpdatedExported(job)
if onJobUpdatedExportedFunc := tc.OnJobUpdatedExported.Load(); onJobUpdatedExportedFunc != nil {
(*onJobUpdatedExportedFunc)(job)
return
}
if tc.onJobUpdated != nil {
Expand Down
4 changes: 2 additions & 2 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,14 @@ func TestCancel(t *testing.T) {

restHook := func(h *ddl.TestDDLCallback) {
h.OnJobRunBeforeExported = nil
h.OnJobUpdatedExported = nil
h.OnJobUpdatedExported.Store(nil)
dom.DDL().SetHook(h.Clone())
}
registHook := func(h *ddl.TestDDLCallback, onJobRunBefore bool) {
if onJobRunBefore {
h.OnJobRunBeforeExported = hookFunc
} else {
h.OnJobUpdatedExported = hookFunc
h.OnJobUpdatedExported.Store(&hookFunc)
}
dom.DDL().SetHook(h.Clone())
}
Expand Down
9 changes: 6 additions & 3 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestColumnAdd(t *testing.T) {
)
first := true
var jobID int64
tc.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
jobID = job.ID
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
require.True(t, exist)
Expand All @@ -80,6 +80,7 @@ func TestColumnAdd(t *testing.T) {
require.NoError(t, checkAddPublic(ct, writeOnlyTable, publicTable))
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc.Clone())
tk.MustExec("alter table t add column c3 int default 3")
tb := publicTable
Expand All @@ -93,7 +94,7 @@ func TestColumnAdd(t *testing.T) {
dropCol = tbl.VisibleCols()[2]
}
}
tc.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc2 := func(job *model.Job) {
if job.NotStarted() {
return
}
Expand All @@ -105,6 +106,7 @@ func TestColumnAdd(t *testing.T) {
}
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc2)
d.SetHook(tc.Clone())
tk.MustExec("alter table t drop column c3")
v = getSchemaVer(t, tk.Session())
Expand All @@ -113,7 +115,7 @@ func TestColumnAdd(t *testing.T) {

// Add column not default.
first = true
tc.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc3 := func(job *model.Job) {
jobID = job.ID
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
require.True(t, exist)
Expand All @@ -133,6 +135,7 @@ func TestColumnAdd(t *testing.T) {
require.NoError(t, err)
}
}
tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3)
d.SetHook(tc)
tk.MustExec("alter table t add column c3 int")
testCheckJobDone(t, store, jobID, true)
Expand Down
12 changes: 8 additions & 4 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ func TestAddGeneratedColumnAndInsert(t *testing.T) {
ctx.Store = store
times := 0
var checkErr error
hook.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
if checkErr != nil {
return
}
Expand All @@ -903,6 +903,7 @@ func TestAddGeneratedColumnAndInsert(t *testing.T) {
}
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(hook)

tk.MustExec("alter table t1 add column gc int as ((a+1))")
Expand All @@ -920,7 +921,7 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
var checkErr error
assertChangingColName := "_col$_c2_0"
assertChangingIdxName := "_idx$_idx_0"
hook.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionModifyColumn {
var (
newCol *model.ColumnInfo
Expand All @@ -943,6 +944,7 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
}
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d := dom.DDL()
d.SetHook(hook)

Expand Down Expand Up @@ -975,7 +977,7 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
assertChangingColName2 := "_col$__col$__col$_c1_0_1"
query1 := "alter table t modify column _col$_c1 tinyint"
query2 := "alter table t modify column _col$__col$_c1_0 tinyint"
hook.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc2 := func(job *model.Job) {
if (job.Query == query1 || job.Query == query2) && job.SchemaState == model.StateDeleteOnly && job.Type == model.ActionModifyColumn {
var (
newCol *model.ColumnInfo
Expand All @@ -999,6 +1001,7 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {
}
}
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc2)
d.SetHook(hook)

tk.MustExec("drop table if exists t")
Expand Down Expand Up @@ -1062,7 +1065,7 @@ func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) {
tk1.MustExec("begin pessimistic;")
tk1.MustExec("insert into t1 values(101, 102, 103)")
}
hook.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
if job.Type != model.ActionModifyColumn || checkErr != nil || job.SchemaState != commitColState {
return
}
Expand All @@ -1071,6 +1074,7 @@ func TestWriteReorgForColumnTypeChangeOnAmendTxn(t *testing.T) {
}
times++
}
hook.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(hook)

tk.MustExec(sql)
Expand Down
16 changes: 8 additions & 8 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func TestAddColumn(t *testing.T) {
checkOK := false

tc := &ddl.TestDDLCallback{Do: dom}
tc.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
if checkOK {
return
}
Expand All @@ -690,7 +690,7 @@ func TestAddColumn(t *testing.T) {
checkOK = true
}
}

tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)

jobID := testCreateColumn(tk, t, testkit.NewTestKit(t, store).Session(), tableID, newColName, "", defaultColValue, dom)
Expand Down Expand Up @@ -741,7 +741,7 @@ func TestAddColumns(t *testing.T) {
require.NoError(t, err)

tc := &ddl.TestDDLCallback{Do: dom}
tc.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
Expand All @@ -762,7 +762,7 @@ func TestAddColumns(t *testing.T) {
}
}
}

tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)

jobID := testCreateColumns(tk, t, testkit.NewTestKit(t, store).Session(), tableID, newColNames, positions, defaultColValue, dom)
Expand Down Expand Up @@ -811,7 +811,7 @@ func TestDropColumnInColumnTest(t *testing.T) {

d := dom.DDL()
tc := &ddl.TestDDLCallback{Do: dom}
tc.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
Expand All @@ -824,7 +824,7 @@ func TestDropColumnInColumnTest(t *testing.T) {
return
}
}

tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)

jobID := testDropColumnInternal(tk, t, testkit.NewTestKit(t, store).Session(), tableID, colName, false, dom)
Expand Down Expand Up @@ -873,7 +873,7 @@ func TestDropColumns(t *testing.T) {

d := dom.DDL()
tc := &ddl.TestDDLCallback{Do: dom}
tc.OnJobUpdatedExported = func(job *model.Job) {
onJobUpdatedExportedFunc := func(job *model.Job) {
mu.Lock()
defer mu.Unlock()
if checkOK {
Expand All @@ -888,7 +888,7 @@ func TestDropColumns(t *testing.T) {
}
}
}

tc.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
d.SetHook(tc)

jobID := testDropColumns(tk, t, testkit.NewTestKit(t, store).Session(), tableID, colNames, false, dom)
Expand Down

0 comments on commit 2618c09

Please sign in to comment.