Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

election, tso: fix data race in lease.go #6379

Merged
merged 3 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
finalCmps = append(finalCmps, clientv3.Compare(clientv3.CreateRevision(ls.leaderKey), "=", 0))
resp, err := kv.NewSlowLogTxn(ls.client).
If(finalCmps...).
Then(clientv3.OpPut(ls.leaderKey, leaderData, clientv3.WithLease(newLease.ID))).
Then(clientv3.OpPut(ls.leaderKey, leaderData, clientv3.WithLease(newLease.ID.Load().(clientv3.LeaseID)))).
Commit()
log.Info("check campaign resp", zap.Any("resp", resp))
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type lease struct {
// etcd client and lease
client *clientv3.Client
lease clientv3.Lease
ID clientv3.LeaseID
ID atomic.Value // store as clientv3.LeaseID
// leaseTimeout and expireTime are used to control the lease's lifetime
leaseTimeout time.Duration
expireTime atomic.Value
Expand All @@ -64,7 +64,7 @@ func (l *lease) Grant(leaseTimeout int64) error {
log.Warn("lease grants too slow", zap.Duration("cost", cost), zap.String("purpose", l.Purpose))
}
log.Info("lease granted", zap.Int64("lease-id", int64(leaseResp.ID)), zap.Int64("lease-timeout", leaseTimeout), zap.String("purpose", l.Purpose))
l.ID = leaseResp.ID
l.ID.Store(leaseResp.ID)
l.leaseTimeout = time.Duration(leaseTimeout) * time.Second
l.expireTime.Store(start.Add(time.Duration(leaseResp.TTL) * time.Second))
return nil
Expand All @@ -80,7 +80,11 @@ func (l *lease) Close() error {
// Try to revoke lease to make subsequent elections faster.
ctx, cancel := context.WithTimeout(l.client.Ctx(), revokeLeaseTimeout)
defer cancel()
l.lease.Revoke(ctx, l.ID)
var leaseID clientv3.LeaseID
if l.ID.Load() != nil {
leaseID = l.ID.Load().(clientv3.LeaseID)
}
Comment on lines +83 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if l.ID is nil, what will happen? Do we need to add some tests about it?

Copy link
Contributor Author

@binshi-bing binshi-bing May 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if l.ID.Load() is nil, leaseID is initialized with the default value. Since clientv3.LeaseID is int64, so the initialized value is 0, which keeps exact the same as the existing logic.
Because the above reason, the logic is pretty strait forward and it's covered by the existing tests, so no need to add new test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I see

l.lease.Revoke(ctx, leaseID)
return l.lease.Close()
}

Expand Down Expand Up @@ -145,7 +149,11 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c
start := time.Now()
ctx1, cancel := context.WithTimeout(ctx, l.leaseTimeout)
defer cancel()
res, err := l.lease.KeepAliveOnce(ctx1, l.ID)
var leaseID clientv3.LeaseID
if l.ID.Load() != nil {
leaseID = l.ID.Load().(clientv3.LeaseID)
}
res, err := l.lease.KeepAliveOnce(ctx1, leaseID)
if err != nil {
log.Warn("lease keep alive failed", zap.String("purpose", l.Purpose), errs.ZapError(err))
return
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (suite *tsoClientTestSuite) TestUpdateAfterResetTSO() {
_, _, err := client.GetTS(ctx)
return err == nil
})
// Transfer leader to trigger the TSO resetting.
// Resign leader to trigger the TSO resetting.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/updateAfterResetTSO", "return(true)"))
oldLeaderName := suite.cluster.WaitLeader()
err := suite.cluster.GetServer(oldLeaderName).ResignLeader()
Expand Down