Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix ticker stop in register #6876

Merged
merged 10 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 35 additions & 34 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -54,18 +55,12 @@ func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, se

// Register registers the service to etcd.
func (sr *ServiceRegister) Register() error {
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
id, err := sr.putWithTTL()
if err != nil {
sr.cancel()
return fmt.Errorf("grant lease failed: %v", err)
return fmt.Errorf("put the key with lease %s failed: %v", sr.key, err)
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
sr.cancel()
return fmt.Errorf("put the key %s failed: %v", sr.key, err)
}

kresp, err := sr.cli.KeepAlive(sr.ctx, resp.ID)
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
sr.cancel()
return fmt.Errorf("keepalive failed: %v", err)
Expand All @@ -80,31 +75,7 @@ func (sr *ServiceRegister) Register() error {
case _, ok := <-kresp:
if !ok {
log.Error("keep alive failed", zap.String("key", sr.key))
// retry
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
t.Stop()
return
default:
}

<-t.C
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
if err != nil {
log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}
}
kresp = sr.renewKeepalive()
}
}
}
Expand All @@ -113,6 +84,36 @@ func (sr *ServiceRegister) Register() error {
return nil
}

func (sr *ServiceRegister) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
defer t.Stop()
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
return nil
case <-t.C:
id, err := sr.putWithTTL()
if err != nil {
log.Error("put the key with lease failed", zap.String("key", sr.key), zap.Error(err))
continue
}
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
log.Error("client keep alive failed", zap.String("key", sr.key), zap.Error(err))
continue
}
return kresp
}
}
}

func (sr *ServiceRegister) putWithTTL() (clientv3.LeaseID, error) {
ctx, cancel := context.WithTimeout(sr.ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
return etcdutil.EtcdKVPutWithTTL(ctx, sr.cli, sr.key, sr.value, sr.ttl)
}

// Deregister deregisters the service from etcd.
func (sr *ServiceRegister) Deregister() error {
sr.cancel()
Expand Down
44 changes: 33 additions & 11 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)
Expand All @@ -29,17 +30,13 @@ func TestRegister(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.NewFromURL(ep)
re.NoError(err)

<-etcd.Server.ReadyNotify()
// with http prefix

// Test register with http prefix.
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
re.NoError(err)
err = sr.Register()
Expand All @@ -49,20 +46,45 @@ func TestRegister(t *testing.T) {
re.NoError(err)
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))

// Test deregister.
err = sr.Deregister()
re.NoError(err)
resp, err = client.Get(context.Background(), sr.key)
re.NoError(err)
re.Empty(resp.Kvs)

// Test the case that ctx is canceled.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
re.NoError(err)
err = sr.Register()
re.NoError(err)
sr.cancel()
// ensure that the lease is expired
time.Sleep(3 * time.Second)
resp, err = client.Get(context.Background(), sr.key)
re.Empty(getKeyAfterLeaseExpired(re, client, sr.key))

// Test the case that keepalive is failed when the etcd is restarted.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
re.Empty(resp.Kvs)
for i := 0; i < 3; i++ {
re.Equal("127.0.0.1:2", getKeyAfterLeaseExpired(re, client, sr.key))
etcd.Server.HardStop() // close the etcd to make the keepalive failed
time.Sleep(etcdutil.DefaultDialTimeout) // ensure that the request is timeout
etcd.Close()
etcd, err = embed.StartEtcd(cfg)
re.NoError(err)
<-etcd.Server.ReadyNotify()
testutil.Eventually(re, func() bool {
return getKeyAfterLeaseExpired(re, client, sr.key) == "127.0.0.1:2"
})
}
etcd.Close()
}

func getKeyAfterLeaseExpired(re *require.Assertions, client *clientv3.Client, key string) string {
time.Sleep(3 * time.Second) // ensure that the lease is expired
resp, err := client.Get(context.Background(), key)
re.NoError(err)
if len(resp.Kvs) == 0 {
return ""
}
return string(resp.Kvs[0].Value)
}
7 changes: 4 additions & 3 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,14 @@ func GetProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, op
}

// EtcdKVPutWithTTL put (key, value) into etcd with a ttl of ttlSeconds
func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value string, ttlSeconds int64) (*clientv3.PutResponse, error) {
func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value string, ttlSeconds int64) (clientv3.LeaseID, error) {
kv := clientv3.NewKV(c)
grantResp, err := c.Grant(ctx, ttlSeconds)
if err != nil {
return nil, err
return 0, err
}
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
_, err = kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
return grantResp.ID, err
}

const (
Expand Down
6 changes: 6 additions & 0 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,12 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {
// Case1: restart the etcd server
suite.etcd.Close()
suite.startEtcd()
suite.put("TestWatcherBreak", "0")
checkCache("0")
suite.etcd.Server.Stop()
time.Sleep(DefaultRequestTimeout)
suite.etcd.Close()
suite.startEtcd()
suite.put("TestWatcherBreak", "1")
checkCache("1")

Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,12 @@ func (s *Server) initTSOPrimaryWatcher() {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
var oldPrimary string
v, ok := s.servicePrimaryMap.Load(serviceName)
if ok {
oldPrimary = v.(string)
}
log.Info("delete tso primary", zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
Expand Down