Skip to content

Commit

Permalink
fix after review
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR authored and ti-chi-bot committed Apr 12, 2024
1 parent 0f8af3d commit 9026f42
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return true, err
return false, err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
Expand Down
20 changes: 8 additions & 12 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,19 @@ func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskE
zap.Int("remained", len(taskCh)))
defer log.Info("Finish collecting remaining events in the channel.", zap.String("category", "log backup advancer"))
for {
if taskCh == nil && pauseCh == nil {
return
}

select {
case resp, ok := <-taskCh:
if !ok {
return
}
if !handleResponse(resp) {
return
if !ok || !handleResponse(resp) {
taskCh = nil
}
case resp, ok := <-pauseCh:
if !ok {
return
if !ok || !handleResponse(resp) {
pauseCh = nil
}
if !handleResponse(resp) {
return
}
default:
return
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/streamhelper/advancer_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -48,8 +49,9 @@ func (c PDRegionScanner) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
return c.UpdateServiceGCSafePoint(ctx, logBackupServiceID, int64(logBackupSafePointTTL.Seconds()), at)
}

// TODO: It should be able to synchoronize the current TS with the PD.
func (c PDRegionScanner) FetchCurrentTS(ctx context.Context) (uint64, error) {
return uint64(time.Now().Unix()), nil
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
}

// RegionScan gets a list of regions, starts from the region that contains key.
Expand Down
1 change: 0 additions & 1 deletion br/pkg/streamhelper/regioniter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type TiKVClusterMeta interface {
// For now, all tasks (exactly one task in fact) use the same checkpoint.
BlockGCUntil(ctx context.Context, at uint64) (uint64, error)

//TODO: It should be able to synchoronize the current TS with the PD.
FetchCurrentTS(ctx context.Context) (uint64, error)
}

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/streamhelper/regioniter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/tidb/br/pkg/streamhelper/spans"
"github.com/pingcap/tidb/pkg/kv"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -82,8 +83,9 @@ func (c constantRegions) BlockGCUntil(ctx context.Context, at uint64) (uint64, e
return 0, status.Error(codes.Unimplemented, "Unsupported operation")
}

// TODO: It should be able to synchoronize the current TS with the PD.
func (c constantRegions) FetchCurrentTS(ctx context.Context) (uint64, error) {
return uint64(time.Now().Unix()), nil
return oracle.ComposeTS(time.Now().UnixMilli(), 0), nil
}

func makeSubrangeRegions(keys ...string) constantRegions {
Expand Down

0 comments on commit 9026f42

Please sign in to comment.