Skip to content

Commit

Permalink
sink(cdc): fix internal retry algothrim (#9530) (#9572)
Browse files Browse the repository at this point in the history
ref #9272, close #9518
  • Loading branch information
ti-chi-bot committed Aug 16, 2023
1 parent 28b0657 commit 425b10a
Show file tree
Hide file tree
Showing 25 changed files with 647 additions and 243 deletions.
7 changes: 6 additions & 1 deletion cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,18 @@ func (h *OpenAPI) ListChangefeed(c *gin.Context) {
}

resp.FeedState = cfInfo.State
resp.RunningError = cfInfo.Error
if cfInfo.Error != nil {
resp.RunningError = cfInfo.Error
} else {
resp.RunningError = cfInfo.Warning
}

if cfStatus != nil {
resp.CheckpointTSO = cfStatus.CheckpointTs
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
resp.CheckpointTime = model.JSONTime(tm)
}
log.Info("List changefeed successfully!", zap.Any("info", resp))

resps = append(resps, resp)
}
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func (h *OpenAPIV2) listChangeFeeds(c *gin.Context) {
} else {
commonInfo.RunningError = cfInfo.Warning
}
log.Info("List changefeed successfully!", zap.Any("runningError", commonInfo.RunningError))

// if the state is normal, we shall not return the error info
// because changefeed will is retrying. errors will confuse the users
Expand Down
23 changes: 19 additions & 4 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,16 +521,24 @@ func TestListChangeFeeds(t *testing.T) {
State: model.StateNormal,
},
model.DefaultChangeFeedID("cf2"): {
State: model.StateError,
State: model.StateWarning,
},
model.DefaultChangeFeedID("cf3"): {
State: model.StateStopped,
},
model.DefaultChangeFeedID("cf4"): {
State: model.StatePending,
},
model.DefaultChangeFeedID("cf5"): {
State: model.StateFinished,
},
},
changefeedStatuses: map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI{
model.DefaultChangeFeedID("cf1"): {},
model.DefaultChangeFeedID("cf2"): {},
model.DefaultChangeFeedID("cf3"): {},
model.DefaultChangeFeedID("cf4"): {},
model.DefaultChangeFeedID("cf5"): {},
},
}
cp.EXPECT().StatusProvider().Return(provider1).AnyTimes()
Expand All @@ -549,10 +557,17 @@ func TestListChangeFeeds(t *testing.T) {
resp := ListResponse[model.ChangefeedCommonInfo]{}
err := json.NewDecoder(w.Body).Decode(&resp)
require.Nil(t, err)
require.Equal(t, 3, resp.Total)
require.Equal(t, 5, resp.Total)
// changefeed info must be sorted by ID
require.Equal(t, true, sorted(resp.Items))

warningChangefeedCount := 0
for _, cf := range resp.Items {
if cf.FeedState == model.StateWarning {
warningChangefeedCount++
}
require.NotEqual(t, model.StatePending, cf.FeedState)
}
require.Equal(t, 2, warningChangefeedCount)
// case 2: only list changefeed with state 'normal', 'stopped' and 'failed'
metaInfo2 := testCase{
url: "/api/v2/changefeeds",
Expand All @@ -568,7 +583,7 @@ func TestListChangeFeeds(t *testing.T) {
resp2 := ListResponse[model.ChangefeedCommonInfo]{}
err = json.NewDecoder(w.Body).Decode(&resp2)
require.Nil(t, err)
require.Equal(t, 2, resp2.Total)
require.Equal(t, 4, resp2.Total)
// changefeed info must be sorted by ID
require.Equal(t, true, sorted(resp2.Items))
}
Expand Down
21 changes: 21 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,27 @@ type ChangefeedCommonInfo struct {
RunningError *model.RunningError `json:"error"`
}

// MarshalJSON marshal changefeed common info to json
// we need to set feed state to normal if it is uninitialized and pending to warning
// to hide the detail of uninitialized and pending state from user
func (c ChangefeedCommonInfo) MarshalJSON() ([]byte, error) {
// alias the original type to prevent recursive call of MarshalJSON
type Alias ChangefeedCommonInfo

if c.FeedState == model.StateUnInitialized {
c.FeedState = model.StateNormal
}
if c.FeedState == model.StatePending {
c.FeedState = model.StateWarning
}

return json.Marshal(struct {
Alias
}{
Alias: Alias(c),
})
}

// ChangefeedConfig use by create changefeed api
type ChangefeedConfig struct {
Namespace string `json:"namespace"`
Expand Down
24 changes: 24 additions & 0 deletions cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -208,3 +209,26 @@ func TestEventFilterRuleConvert(t *testing.T) {
require.Equal(t, c.inRule, c.apiRule.ToInternalEventFilterRule())
}
}

func TestMarshalChangefeedCommonInfo(t *testing.T) {
t.Parallel()

cfInfo := &ChangefeedCommonInfo{
ID: "test-id",
FeedState: model.StatePending,
}

cfInfoJSON, err := json.Marshal(cfInfo)
require.Nil(t, err)
require.False(t, strings.Contains(string(cfInfoJSON), "pending"))
require.True(t, strings.Contains(string(cfInfoJSON), "warning"))

cfInfo = &ChangefeedCommonInfo{
ID: "test-id",
FeedState: model.StateUnInitialized,
}

cfInfoJSON, err = json.Marshal(cfInfo)
require.Nil(t, err)
require.True(t, strings.Contains(string(cfInfoJSON), "normal"))
}
2 changes: 1 addition & 1 deletion cdc/api/v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (h *OpenAPIV2) getProcessor(c *gin.Context) {
_ = c.Error(err)
return
}
if info.State != model.StateNormal {
if !info.State.IsRunning() {
_ = c.Error(
cerror.WrapError(
cerror.ErrAPIInvalidParam,
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestGetProcessor(t *testing.T) {
{
provider := &mockStatusProvider{
changefeedInfo: &model.ChangeFeedInfo{
State: model.StateError,
State: model.StatePending,
},
}
cp := mock_capture.NewMockCapture(gomock.NewController(t))
Expand Down
25 changes: 21 additions & 4 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,23 @@ type FeedState string
// All FeedStates
const (
StateNormal FeedState = "normal"
StateError FeedState = "error"
StatePending FeedState = "pending"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
StateWarning FeedState = "warning"
// StateUnInitialized is used for the changefeed that has not been initialized
// it only exists in memory for a short time and will not be persisted to storage
StateUnInitialized FeedState = ""
)

// ToInt return an int for each `FeedState`, only use this for metrics.
func (s FeedState) ToInt() int {
switch s {
case StateNormal:
return 0
case StateError:
case StatePending:
return 1
case StateFailed:
return 2
Expand All @@ -100,6 +104,10 @@ func (s FeedState) ToInt() int {
return 4
case StateRemoved:
return 5
case StateWarning:
return 6
case StateUnInitialized:
return 7
}
// -1 for unknown feed state
return -1
Expand All @@ -118,11 +126,20 @@ func (s FeedState) IsNeeded(need string) bool {
return true
case StateFailed:
return true
case StateWarning:
return true
case StatePending:
return true
}
}
return need == string(s)
}

// IsRunning return true if the feedState represents a running state.
func (s FeedState) IsRunning() bool {
return s == StateNormal || s == StateWarning
}

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
UpstreamID uint64 `json:"upstream-id"`
Expand Down Expand Up @@ -183,7 +200,7 @@ func ValidateNamespace(namespace string) error {
// Note: if the changefeed is failed by GC, it should not block the GC safepoint.
func (info *ChangeFeedInfo) NeedBlockGC() bool {
switch info.State {
case StateNormal, StateStopped, StateError:
case StateNormal, StateStopped, StatePending, StateWarning:
return true
case StateFailed:
return !info.isFailedByGC()
Expand Down Expand Up @@ -356,7 +373,7 @@ func (info *ChangeFeedInfo) fixState() {
if cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
state = StateWarning
}
}
case AdminStop:
Expand Down
4 changes: 2 additions & 2 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func TestFixState(t *testing.T) {
Code: string(errors.ErrClusterIDMismatch.RFCCode()),
},
},
expectedState: StateError,
expectedState: StateWarning,
},
{
info: &ChangeFeedInfo{
Expand All @@ -475,7 +475,7 @@ func TestFixState(t *testing.T) {
Code: string(errors.ErrClusterIDMismatch.RFCCode()),
},
},
expectedState: StateError,
expectedState: StateWarning,
},
{
info: &ChangeFeedInfo{
Expand Down
6 changes: 6 additions & 0 deletions cdc/model/http_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ func (c ChangefeedCommonInfo) MarshalJSON() ([]byte, error) {
if c.FeedState == StateNormal {
c.RunningError = nil
}
if c.FeedState == StateUnInitialized {
c.FeedState = StateNormal
}
if c.FeedState == StatePending {
c.FeedState = StateWarning
}
return json.Marshal(struct {
Alias
}{
Expand Down
4 changes: 2 additions & 2 deletions cdc/model/http_model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestChangefeedCommonInfoMarshalJSON(t *testing.T) {
require.NotContains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode()))

// when state is not normal, the error code is exist
cfInfo.FeedState = StateError
cfInfo.FeedState = StateWarning
cfInfoJSON, err = json.Marshal(cfInfo)
require.Nil(t, err)
require.Contains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode()))
Expand All @@ -68,7 +68,7 @@ func TestChangefeedDetailMarshalJSON(t *testing.T) {
require.NotContains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode()))

// when state is not normal, the error code is exist
cfDetail.FeedState = StateError
cfDetail.FeedState = StateWarning
cfInfoJSON, err = json.Marshal(cfDetail)
require.Nil(t, err)
require.Contains(t, string(cfInfoJSON), string(errors.ErrProcessorUnknown.RFCCode()))
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
}

func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*model.CaptureInfo) error {
adminJobPending := c.feedStateManager.Tick(c.state)
adminJobPending := c.feedStateManager.Tick(c.state, c.resolvedTs)
preCheckpointTs := c.state.Info.GetCheckpointTs(c.state.Status)
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
Expand Down

0 comments on commit 425b10a

Please sign in to comment.