Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick 8949,8989,8983,9010,9074,9091 to release 6.5 #9100

Merged
Merged
10 changes: 10 additions & 0 deletions Makefile
Expand Up @@ -255,6 +255,16 @@ fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci generate_mock go-generat
@echo "check log style"
scripts/check-log-style.sh

fast_fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci
@echo "run gci (format imports)"
tools/bin/gci write $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
@echo "run gofumports"
tools/bin/gofumports -l -w $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
@echo "run shfmt"
tools/bin/shfmt -d -w .
@echo "check log style"
scripts/check-log-style.sh

errdoc: tools/bin/errdoc-gen
@echo "generator errors.toml"
# check-errdoc will skip DM directory.
Expand Down
12 changes: 12 additions & 0 deletions cdc/api/v2/changefeed.go
Expand Up @@ -718,11 +718,23 @@ func (h *OpenAPIV2) status(c *gin.Context) {
Message: info.Error.Message,
}
}
var lastWarning *RunningError
if info.Warning != nil &&
oracle.GetTimeFromTS(status.CheckpointTs).Before(info.Warning.Time) {
lastWarning = &RunningError{
Time: &info.Warning.Time,
Addr: info.Warning.Addr,
Code: info.Warning.Code,
Message: info.Warning.Message,
}
}

c.JSON(http.StatusOK, &ChangefeedStatus{
State: string(info.State),
CheckpointTs: status.CheckpointTs,
ResolvedTs: status.ResolvedTs,
LastError: lastError,
LastWarning: lastWarning,
})
}

Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/model.go
Expand Up @@ -689,4 +689,5 @@ type ChangefeedStatus struct {
ResolvedTs uint64 `json:"resolved_ts"`
CheckpointTs uint64 `json:"checkpoint_ts"`
LastError *RunningError `json:"last_error,omitempty"`
LastWarning *RunningError `json:"last_warning,omitempty"`
}
7 changes: 4 additions & 3 deletions cdc/model/changefeed.go
Expand Up @@ -141,9 +141,10 @@ type ChangeFeedInfo struct {
// but can be fetched for backward compatibility
SortDir string `json:"sort-dir"`

Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Error *RunningError `json:"error"`
Config *config.ReplicaConfig `json:"config"`
State FeedState `json:"state"`
Error *RunningError `json:"error"`
Warning *RunningError `json:"warning"`

CreatorVersion string `json:"creator-version"`
// Epoch is the epoch of a changefeed, changes on every restart.
Expand Down
12 changes: 11 additions & 1 deletion cdc/model/owner.go
Expand Up @@ -91,8 +91,10 @@ type TaskPosition struct {
// Deprecated: only used in API. TODO: remove API usage.
Count uint64 `json:"count"`

// Error when error happens
// Error when changefeed error happens
Error *RunningError `json:"error"`
// Warning when module error happens
Warning *RunningError `json:"warning"`
}

// Marshal returns the json marshal format of a TaskStatus
Expand Down Expand Up @@ -129,6 +131,14 @@ func (tp *TaskPosition) Clone() *TaskPosition {
Message: tp.Error.Message,
}
}
if tp.Warning != nil {
ret.Warning = &RunningError{
Time: tp.Warning.Time,
Addr: tp.Warning.Addr,
Code: tp.Warning.Code,
Message: tp.Warning.Message,
}
}
return ret
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/model/owner_test.go
Expand Up @@ -54,7 +54,7 @@ func TestTaskPositionMarshal(t *testing.T) {
ResolvedTs: 420875942036766723,
CheckPointTs: 420875940070686721,
}
expected := `{"checkpoint-ts":420875940070686721,"resolved-ts":420875942036766723,"count":0,"error":null}`
expected := `{"checkpoint-ts":420875940070686721,"resolved-ts":420875942036766723,"count":0,"error":null,"warning":null}`

data, err := pos.Marshal()
require.Nil(t, err)
Expand Down
3 changes: 2 additions & 1 deletion cdc/model/sink.go
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"strconv"
"sync"
"sync/atomic"
"unsafe"

"github.com/pingcap/log"
Expand Down Expand Up @@ -634,7 +635,7 @@ type DDLEvent struct {
TableInfo *TableInfo `msg:"-"`
PreTableInfo *TableInfo `msg:"-"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
Done atomic.Bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
}
Expand Down
78 changes: 65 additions & 13 deletions cdc/owner/changefeed.go
Expand Up @@ -101,6 +101,7 @@ type changefeed struct {
// in every tick. Such as the changefeed that is stopped or encountered an error.
isReleased bool
errCh chan error
warningCh chan error
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

Expand All @@ -125,7 +126,11 @@ type changefeed struct {
filter filter.Filter,
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink

newScheduler func(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error)
Expand All @@ -147,8 +152,9 @@ func newChangefeed(
feedStateManager: newFeedStateManager(up),
upstream: up,

errCh: make(chan error, defaultErrChSize),
cancel: func() {},
errCh: make(chan error, defaultErrChSize),
warningCh: make(chan error, defaultErrChSize),
cancel: func() {},

newDDLPuller: puller.NewDDLPuller,
newSink: newDDLSink,
Expand All @@ -167,7 +173,10 @@ func newChangefeed4Test(
schemaStorage entry.SchemaStorage,
filter filter.Filter,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newSink func(
changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo,
reportError func(err error), reportWarning func(err error),
) DDLSink,
newScheduler func(
ctx cdcContext.Context, pdClock pdutil.Clock, epoch uint64,
) (scheduler.Scheduler, error),
Expand All @@ -182,6 +191,17 @@ func newChangefeed4Test(
func (c *changefeed) Tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) {
startTime := time.Now()

// Handle all internal warnings.
noMoreWarnings := false
for !noMoreWarnings {
select {
case err := <-c.warningCh:
c.handleWarning(ctx, err)
default:
noMoreWarnings = true
}
}

if skip, err := c.checkUpstream(); skip {
if err != nil {
c.handleErr(ctx, err)
Expand All @@ -190,7 +210,10 @@ func (c *changefeed) Tick(ctx cdcContext.Context, captures map[model.CaptureID]*
}

ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
c.errCh <- errors.Trace(err)
select {
case <-ctx.Done():
case c.errCh <- errors.Trace(err):
}
return nil
})
c.state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
Expand Down Expand Up @@ -233,6 +256,25 @@ func (c *changefeed) handleErr(ctx cdcContext.Context, err error) {
c.releaseResources(ctx)
}

func (c *changefeed) handleWarning(ctx cdcContext.Context, err error) {
log.Warn("an warning occurred in Owner",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID), zap.Error(err))
var code string
if rfcCode, ok := cerror.RFCCode(err); ok {
code = string(rfcCode)
} else {
code = string(cerror.ErrOwnerUnknown.RFCCode())
}

c.feedStateManager.handleWarning(&model.RunningError{
Time: time.Now(),
Addr: contextutil.CaptureAddrFromCtx(ctx),
Code: code,
Message: err.Error(),
})
}

func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error {
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
Expand Down Expand Up @@ -278,11 +320,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return errors.Trace(err)
default:
}
// we need to wait ddl ddlSink to be ready before we do the other things
// otherwise, we may cause a nil pointer panic when we try to write to the ddl ddlSink.
if !c.ddlSink.isInitialized() {
return nil
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -455,15 +493,24 @@ func (c *changefeed) initialize(ctx cdcContext.Context) (err error) {
return nil
}
c.isReleased = false

// clean the errCh
// When the changefeed is resumed after being stopped, the changefeed instance will be reused,
// So we should make sure that the errCh is empty when the changefeed is restarting
LOOP:
LOOP1:
for {
select {
case <-c.errCh:
default:
break LOOP
break LOOP1
}
}
LOOP2:
for {
select {
case <-c.warningCh:
default:
break LOOP2
}
}

Expand Down Expand Up @@ -566,7 +613,12 @@ LOOP:
zap.String("changefeed", c.id.ID),
)

c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw)
c.ddlSink = c.newSink(c.id, c.state.Info, ctx.Throw, func(err error) {
select {
case <-ctx.Done():
case c.warningCh <- err:
}
})
c.ddlSink.run(cancelCtx)

c.ddlPuller, err = c.newDDLPuller(cancelCtx,
Expand Down
6 changes: 1 addition & 5 deletions cdc/owner/changefeed_test.go
Expand Up @@ -150,10 +150,6 @@ func (m *mockDDLSink) close(ctx context.Context) error {
return nil
}

func (m *mockDDLSink) isInitialized() bool {
return true
}

func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}
Expand Down Expand Up @@ -219,7 +215,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
return &mockDDLPuller{resolvedTs: startTs - 1, schemaStorage: schemaStorage}, nil
},
// new ddl ddlSink
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(err error)) DDLSink {
func(_ model.ChangeFeedID, _ *model.ChangeFeedInfo, _ func(error), _ func(error)) DDLSink {
return &mockDDLSink{
resetDDLDone: true,
recordDDLHistory: false,
Expand Down