Skip to content

Commit

Permalink
gc (ticdc): Instead of keeping restarting error changefeed in 24 hours,
Browse files Browse the repository at this point in the history
put the error changefeed into failed state and calculate the ticdc global
gc safepoint based on checkpoint ts of all changefeeds and give users 24
hours grace period to handle the failed changefeed.

e.g.,

Have two ChangeFeeds,
cf1(failed) with checkpointTs ts1
cf2(normal) with checkpointTs ts2

the global gc safepoint will be:
min(ts2, max(ts1, currentPDTs - 24 hours))
  • Loading branch information
charleszheng44 committed Mar 7, 2023
1 parent bf37f0b commit 7c84ae3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 29 deletions.
38 changes: 38 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -780,6 +781,38 @@ func (o *ownerImpl) updateGCSafepoint(
return nil
}

// getUpstreamTs gets a tso from the desired upstream.
func (o *ownerImpl) getUpstreamTs(usid uint64) (time.Time, error) {
us, exist := o.upstreamManager.Get(usid)
if !exist {
return time.Time{}, errors.Errorf("upstream(%d) not found", usid)
}
return us.PDClock.CurrentTime()
}

// ignoreFailedFeedWhenGC checks if a failed changefeed should be ignored
// when calculating the gc safepoint of the associated upstream.
func (o *ownerImpl) ignoreFailedFeedWhenGC(
state *orchestrator.ChangefeedReactorState,
) bool {
upID := state.Info.UpstreamID
pdTime, err := o.getUpstreamTs(upID)
if err != nil {
log.Warn(
"failed to get upstream ts",
zap.Uint64("ID", upID),
zap.Error(err),
)
return false
}
// ignore the change feed if its current checkpoint TS is earlier
// than the (currentPDTso - 24 hours).
gcSafepointUpperBound := state.Status.CheckpointTs - 1
return pdTime.Sub(
oracle.GetTimeFromTS(gcSafepointUpperBound),
) > 24*time.Hour
}

// calculateGCSafepoint calculates GCSafepoint for different upstream.
// Note: we need to maintain a TiCDC service GC safepoint for each upstream TiDB cluster
// to prevent upstream TiDB GC from removing data that is still needed by TiCDC.
Expand All @@ -794,8 +827,13 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState)
if changefeedState.Info == nil {
continue
}

switch changefeedState.Info.State {
case model.StateNormal, model.StateStopped, model.StateError:
case model.StateFailed:
if o.ignoreFailedFeedWhenGC(changefeedState) {
continue
}
default:
continue
}
Expand Down
27 changes: 7 additions & 20 deletions pkg/txnutil/gc/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)
Expand All @@ -50,7 +48,6 @@ type gcManager struct {
lastUpdatedTime time.Time
lastSucceededTime time.Time
lastSafePointTs uint64
isTiCDCBlockGC bool
}

// NewManager creates a new Manager.
Expand Down Expand Up @@ -97,9 +94,6 @@ func (m *gcManager) TryUpdateGCSafePoint(
log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs",
zap.Uint64("actual", actual), zap.Uint64("checkpointTs", checkpointTs))
}
// if the min checkpoint ts is equal to the current gc safe point,
// it means that the service gc safe point set by TiCDC is the min service gc safe point
m.isTiCDCBlockGC = actual == checkpointTs
m.lastSafePointTs = actual
m.lastSucceededTime = time.Now()
return nil
Expand All @@ -109,20 +103,13 @@ func (m *gcManager) CheckStaleCheckpointTs(
ctx context.Context, changefeedID model.ChangeFeedID, checkpointTs model.Ts,
) error {
gcSafepointUpperBound := checkpointTs - 1
if m.isTiCDCBlockGC {
pdTime, err := m.pdClock.CurrentTime()
// TODO: should we return err here, or just log it?
if err != nil {
return errors.Trace(err)
}
if pdTime.Sub(oracle.GetTimeFromTS(gcSafepointUpperBound)) > time.Duration(m.gcTTL)*time.Second {
return cerror.ErrGCTTLExceeded.GenWithStackByArgs(checkpointTs, changefeedID)
}
} else {
// if `isTiCDCBlockGC` is false, it means there is another service gc point less than the min checkpoint ts.
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs)
}
// if there is another service gc point less than the min checkpoint ts.
if gcSafepointUpperBound < m.lastSafePointTs {
return cerror.ErrSnapshotLostByGC.
GenWithStackByArgs(
checkpointTs,
m.lastSafePointTs,
)
}
return nil
}
10 changes: 1 addition & 9 deletions pkg/txnutil/gc/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,16 @@ func TestCheckStaleCheckpointTs(t *testing.T) {
pdClock := pdutil.NewClock4Test()
gcManager := NewManager(etcd.GcServiceIDForTest(),
mockPDClient, pdClock).(*gcManager)
gcManager.isTiCDCBlockGC = true
ctx := context.Background()

time.Sleep(1 * time.Second)

cfID := model.DefaultChangeFeedID("cfID")
err := gcManager.CheckStaleCheckpointTs(ctx, cfID, 10)
require.True(t, cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)))
require.True(t, cerror.IsChangefeedFastFailError(err))

err = gcManager.CheckStaleCheckpointTs(ctx, cfID, oracle.GoTimeToTS(time.Now()))
err := gcManager.CheckStaleCheckpointTs(ctx, cfID, oracle.GoTimeToTS(time.Now()))
require.Nil(t, err)

gcManager.isTiCDCBlockGC = false
gcManager.lastSafePointTs = 20

err = gcManager.CheckStaleCheckpointTs(ctx, cfID, 10)

require.True(t, cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)))
require.True(t, cerror.IsChangefeedFastFailError(err))
}

0 comments on commit 7c84ae3

Please sign in to comment.