Skip to content

Commit

Permalink
gc(ticdc): do not block gc if there is no changefeed (#9634) (#9636)
Browse files Browse the repository at this point in the history
close #9633
  • Loading branch information
ti-chi-bot committed Aug 23, 2023
1 parent 8196f8a commit dc709e7
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
15 changes: 15 additions & 0 deletions cdc/owner/owner.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -725,6 +726,20 @@ func (o *ownerImpl) calculateGCSafepoint(state *orchestrator.GlobalReactorState)
forceUpdateMap[upstreamID] = nil
}
}
// check if the upstream has a changefeed, if not we should update the gc safepoint
err := o.upstreamManager.Visit(func(up *upstream.Upstream) error {
if _, exist := minCheckpointTsMap[up.ID]; !exist {
ts, err := up.PDClock.CurrentTime()
if err != nil {
return errors.Annotatef(err, "upstream %d get pd time failed", up.ID)
}
minCheckpointTsMap[up.ID] = oracle.GoTimeToTS(ts)
}
return nil
})
if err != nil {
log.Warn("get pd time failed failed", zap.Error(err))
}
return minCheckpointTsMap, forceUpdateMap
}

Expand Down
20 changes: 16 additions & 4 deletions cdc/owner/owner_test.go
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"context"
"fmt"
"math"
"math/rand"
"testing"
"time"
Expand All @@ -34,6 +33,7 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink/observer"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
Expand Down Expand Up @@ -452,9 +452,6 @@ func TestUpdateGCSafePoint(t *testing.T) {
mockPDClient.UpdateServiceGCSafePointFunc = func(
ctx context.Context, serviceID string, ttl int64, safePoint uint64,
) (uint64, error) {
// Owner will do a snapshot read at (checkpointTs - 1) from TiKV,
// set GC safepoint to (checkpointTs - 1)
require.Equal(t, safePoint, uint64(math.MaxUint64-1))
return 0, nil
}
err := o.updateGCSafepoint(ctx, state)
Expand Down Expand Up @@ -697,6 +694,7 @@ func TestCalculateGCSafepointTs(t *testing.T) {
expectMinTsMap := make(map[uint64]uint64)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
o.upstreamManager = upstream.NewManager4Test(nil)

stateMap := []model.FeedState{
model.StateNormal, model.StateStopped, model.StateError,
Expand Down Expand Up @@ -761,6 +759,20 @@ func TestCalculateGCSafepointTs(t *testing.T) {
require.Equal(t, expectForceUpdateMap, forceUpdateMap)
}

func TestCalculateGCSafepointTsNoChangefeed(t *testing.T) {
state := orchestrator.NewGlobalState(etcd.DefaultCDCClusterID)
expectForceUpdateMap := make(map[uint64]interface{})
o := ownerImpl{changefeeds: make(map[model.ChangeFeedID]*changefeed)}
o.upstreamManager = upstream.NewManager4Test(nil)
up, err := o.upstreamManager.GetDefaultUpstream()
require.Nil(t, err)
up.PDClock = pdutil.NewClock4Test()

minCheckpoinTsMap, forceUpdateMap := o.calculateGCSafepoint(state)
require.Equal(t, 1, len(minCheckpoinTsMap))
require.Equal(t, expectForceUpdateMap, forceUpdateMap)
}

// AsyncStop should cleanup jobs and reject.
func TestAsyncStop(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit dc709e7

Please sign in to comment.