Skip to content

Commit

Permalink
gc (ticdc): Instead of keeping restarting error changefeed in 24 hours,
Browse files Browse the repository at this point in the history
put the error changefeed into failed state and calculate the ticdc global
gc safepoint based on checkpoint ts of all changefeeds and give users 24
hours grace period to handle the failed changefeed.

e.g.,

Have two ChangeFeeds,
cf1(failed) with checkpointTs ts1
cf2(normal) with checkpointTs ts2

the global gc safepoint will be:
min(ts2, max(ts1, currentPDTs - 24 hours))
  • Loading branch information
charleszheng44 committed Mar 7, 2023
1 parent bf37f0b commit c938c5e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 28 deletions.
19 changes: 19 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,20 @@ func (o *ownerImpl) updateGCSafepoint(
return nil
}

// ignoreFailedFeedWhenGC checks if a failed changefeed should be ignored
// when calculating the gc safepoint of the associated upstream.
func (o *ownerImpl) ignoreFailedFeedWhenGC(
state *orchestrator.ChangefeedReactorState,
) bool {
upID := state.Info.UpstreamID
us, exist := o.upstreamManager.Get(upID)
if !exist {
log.Warn("upstream(%d) not found", zap.Uint64("ID", upID))
return false
}
return us.GCManager.IgnoreFailedFeed(state.Status.CheckpointTs)
}

// calculateGCSafepoint calculates GCSafepoint for different upstream.
// Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster
// to prevent upstream TiDB GC from removing data that is still needed by TiCDC.
Expand All @@ -794,8 +808,13 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState)
if changefeedState.Info == nil {
continue
}

switch changefeedState.Info.State {
case model.StateNormal, model.StateStopped, model.StateError:
case model.StateFailed:
if o.ignoreFailedFeedWhenGC(changefeedState) {
continue
}
default:
continue
}
Expand Down
48 changes: 29 additions & 19 deletions pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -39,6 +38,9 @@ type Manager interface {
// Set `forceUpdate` to force Manager update.
TryUpdateGCSafePoint(ctx context.Context, checkpointTs model.Ts, forceUpdate bool) error
CheckStaleCheckpointTs(ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts) error
// IgnoreFailedFeedWhenGC checks if a failed changefeed should be ignored
// when calculating the gc safepoint of the associated upstream.
IgnoreFailedFeed(checkpointTs uint64) bool
}

type gcManager struct {
Expand All @@ -50,7 +52,6 @@ type gcManager struct {
lastUpdatedTime time.Time
lastSucceededTime time.Time
lastSafePointTs uint64
isTiCDCBlockGC bool
}

// NewManager creates a new Manager.
Expand Down Expand Up @@ -97,9 +98,6 @@ func (m *gcManager) TryUpdateGCSafePoint(
log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs",
zap.Uint64("actual", actual), zap.Uint64("checkpointTs", checkpointTs))
}
// if the min checkpoint ts is equal to the current gc safe point,
// it means that the service gc safe point set by TiCDC is the min service gc safe point
m.isTiCDCBlockGC = actual == checkpointTs
m.lastSafePointTs = actual
m.lastSucceededTime = time.Now()
return nil
Expand All @@ -109,20 +107,32 @@ func (m *gcManager) CheckStaleCheckpointTs(
ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts,
) error {
gcSafepointUpperBound := checkpointTs - 1
if m.isTiCDCBlockGC {
pdTime, err := m.pdClock.CurrentTime()
// TODO: should we return err here, or just log it?
if err != nil {
return errors.Trace(err)
}
if pdTime.Sub(oracle.GetTimeFromTS(gcSafepointUpperBound)) > time.Duration(m.gcTTL)*time.Second {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, changefeedID)
}
} else {
// if `isTiCDCBlockGC` is false, it means there is another service gc point less than the min checkpoint ts.
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs)
}
// if there is another service gc point less than the min checkpoint ts.
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.
GenWithStackByArgs(
checkpointTs,
m.lastSafePointTs,
)
}
return nil
}

func (m *gcManager) IgnoreFailedFeed(
checkpointTs uint64,
) bool {
pdTime, err := m.pdClock.CurrentTime()
if err != nil {
log.Warn("failed to get ts",
zap.String("GcManagerID", m.gcServiceID),
zap.Error(err),
)
return false
}
// ignore the change feed if its current checkpoint TS is earlier
// than the (currentPDTso - 24 hours).
gcSafepointUpperBound := checkpointTs - 1
return pdTime.Sub(
oracle.GetTimeFromTS(gcSafepointUpperBound),
) > 24*time.Hour
}
10 changes: 1 addition & 9 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,16 @@ func TestCheckStaleCheckpointTs(t *testing.T) {
pdClock := pdutil.NewClock4Test()
gcManager := NewManager(etcd.GcServiceIDForTest(),
mockPDClient, pdClock).(*gcManager)
gcManager.isTiCDCBlockGC = true
ctx := context.Background()

time.Sleep(1 * time.Second)

cfID := model.DefaultChangeFeedID("cfID")
err := gcManager.CheckStaleCheckpointTs(ctx, cfID, 10)
require.True(t, cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)))
require.True(t, cerror.IsChangefeedFastFailError(err))

err = gcManager.CheckStaleCheckpointTs(ctx, cfID, oracle.GoTimeToTS(time.Now()))
err := gcManager.CheckStaleCheckpointTs(ctx, cfID, oracle.GoTimeToTS(time.Now()))
require.Nil(t, err)

gcManager.isTiCDCBlockGC = false
gcManager.lastSafePointTs = 20

err = gcManager.CheckStaleCheckpointTs(ctx, cfID, 10)

require.True(t, cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)))
require.True(t, cerror.IsChangefeedFastFailError(err))
}

0 comments on commit c938c5e

Please sign in to comment.