Skip to content

Commit

Permalink
election, tso: fix data race in lease.go (tikv#6379)
Browse files Browse the repository at this point in the history
close tikv#6378

fix data race in lease.go

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and zeminzhou committed May 8, 2023
1 parent 6784c9e commit c04ac8c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
finalCmps = append(finalCmps, clientv3.Compare(clientv3.CreateRevision(ls.leaderKey), "=", 0))
resp, err := kv.NewSlowLogTxn(ls.client).
If(finalCmps...).
Then(clientv3.OpPut(ls.leaderKey, leaderData, clientv3.WithLease(newLease.ID))).
Then(clientv3.OpPut(ls.leaderKey, leaderData, clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID)))).
Commit()
log.Info("check campaign resp", zap.Any("resp", resp))
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type lease struct {
// etcd client and lease
client *clientv3.Client
lease clientv3.Lease
ID clientv3.LeaseID
ID atomic.Value // store as clientv3.LeaseID
// leaseTimeout and expireTime are used to control the lease's lifetime
leaseTimeout time.Duration
expireTime atomic.Value
Expand All @@ -64,7 +64,7 @@ func (l *lease) Grant(leaseTimeout int64) error {
log.Warn("lease grants too slow", zap.Duration("cost", cost), zap.String("purpose", l.Purpose))
}
log.Info("lease granted", zap.Int64("lease-id", int64(leaseResp.ID)), zap.Int64("lease-timeout", leaseTimeout), zap.String("purpose", l.Purpose))
l.ID = leaseResp.ID
l.ID.Store(leaseResp.ID)
l.leaseTimeout = time.Duration(leaseTimeout) * time.Second
l.expireTime.Store(start.Add(time.Duration(leaseResp.TTL) * time.Second))
return nil
Expand All @@ -80,7 +80,11 @@ func (l *lease) Close() error {
// Try to revoke lease to make subsequent elections faster.
ctx, cancel := context.WithTimeout(l.client.Ctx(), revokeLeaseTimeout)
defer cancel()
l.lease.Revoke(ctx, l.ID)
var leaseID clientv3.LeaseID
if l.ID.Load() != nil {
leaseID = l.ID.Load().(clientv3.LeaseID)
}
l.lease.Revoke(ctx, leaseID)
return l.lease.Close()
}

Expand Down Expand Up @@ -145,7 +149,11 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c
start := time.Now()
ctx1, cancel := context.WithTimeout(ctx, l.leaseTimeout)
defer cancel()
res, err := l.lease.KeepAliveOnce(ctx1, l.ID)
var leaseID clientv3.LeaseID
if l.ID.Load() != nil {
leaseID = l.ID.Load().(clientv3.LeaseID)
}
res, err := l.lease.KeepAliveOnce(ctx1, leaseID)
if err != nil {
log.Warn("lease keep alive failed", zap.String("purpose", l.Purpose), errs.ZapError(err))
return
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
_, _, err := client.GetTS(ctx)
return err == nil
})
// Transfer leader to trigger the TSO resetting.
// Resign leader to trigger the TSO resetting.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"))
oldLeaderName := suite.cluster.WaitLeader()
err := suite.cluster.GetServer(oldLeaderName).ResignLeader()
Expand Down

0 comments on commit c04ac8c

Please sign in to comment.