Skip to content

Commit

Permalink
mcs: fix ticker stop in register (#6876)
Browse files Browse the repository at this point in the history
close #6875

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] committed Aug 3, 2023
1 parent d664827 commit 6377b26
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 48 deletions.
69 changes: 35 additions & 34 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -54,18 +55,12 @@ func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, se

// Register registers the service to etcd.
func (sr *ServiceRegister) Register() error {
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
id, err := sr.putWithTTL()
if err != nil {
sr.cancel()
return fmt.Errorf("grant lease failed: %v", err)
return fmt.Errorf("put the key with lease %s failed: %v", sr.key, err)
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
sr.cancel()
return fmt.Errorf("put the key %s failed: %v", sr.key, err)
}

kresp, err := sr.cli.KeepAlive(sr.ctx, resp.ID)
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
sr.cancel()
return fmt.Errorf("keepalive failed: %v", err)
Expand All @@ -80,31 +75,7 @@ func (sr *ServiceRegister) Register() error {
case _, ok := <-kresp:
if !ok {
log.Error("keep alive failed", zap.String("key", sr.key))
// retry
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
t.Stop()
return
default:
}

<-t.C
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
if err != nil {
log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}
}
kresp = sr.renewKeepalive()
}
}
}
Expand All @@ -113,6 +84,36 @@ func (sr *ServiceRegister) Register() error {
return nil
}

func (sr *ServiceRegister) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
defer t.Stop()
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
return nil
case <-t.C:
id, err := sr.putWithTTL()
if err != nil {
log.Error("put the key with lease failed", zap.String("key", sr.key), zap.Error(err))
continue
}
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
log.Error("client keep alive failed", zap.String("key", sr.key), zap.Error(err))
continue
}
return kresp
}
}
}

func (sr *ServiceRegister) putWithTTL() (clientv3.LeaseID, error) {
ctx, cancel := context.WithTimeout(sr.ctx, etcdutil.DefaultRequestTimeout)
defer cancel()
return etcdutil.EtcdKVPutWithTTL(ctx, sr.cli, sr.key, sr.value, sr.ttl)
}

// Deregister deregisters the service from etcd.
func (sr *ServiceRegister) Deregister() error {
sr.cancel()
Expand Down
44 changes: 33 additions & 11 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)
Expand All @@ -29,17 +30,13 @@ func TestRegister(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.NewFromURL(ep)
re.NoError(err)

<-etcd.Server.ReadyNotify()
// with http prefix

// Test register with http prefix.
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
re.NoError(err)
err = sr.Register()
Expand All @@ -49,20 +46,45 @@ func TestRegister(t *testing.T) {
re.NoError(err)
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))

// Test deregister.
err = sr.Deregister()
re.NoError(err)
resp, err = client.Get(context.Background(), sr.key)
re.NoError(err)
re.Empty(resp.Kvs)

// Test the case that ctx is canceled.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
re.NoError(err)
err = sr.Register()
re.NoError(err)
sr.cancel()
// ensure that the lease is expired
time.Sleep(3 * time.Second)
resp, err = client.Get(context.Background(), sr.key)
re.Empty(getKeyAfterLeaseExpired(re, client, sr.key))

// Test the case that keepalive is failed when the etcd is restarted.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
re.Empty(resp.Kvs)
for i := 0; i < 3; i++ {
re.Equal("127.0.0.1:2", getKeyAfterLeaseExpired(re, client, sr.key))
etcd.Server.HardStop() // close the etcd to make the keepalive failed
time.Sleep(etcdutil.DefaultDialTimeout) // ensure that the request is timeout
etcd.Close()
etcd, err = embed.StartEtcd(cfg)
re.NoError(err)
<-etcd.Server.ReadyNotify()
testutil.Eventually(re, func() bool {
return getKeyAfterLeaseExpired(re, client, sr.key) == "127.0.0.1:2"
})
}
etcd.Close()
}

func getKeyAfterLeaseExpired(re *require.Assertions, client *clientv3.Client, key string) string {
time.Sleep(3 * time.Second) // ensure that the lease is expired
resp, err := client.Get(context.Background(), key)
re.NoError(err)
if len(resp.Kvs) == 0 {
return ""
}
return string(resp.Kvs[0].Value)
}
7 changes: 4 additions & 3 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,14 @@ func GetProtoMsgWithModRev(c *clientv3.Client, key string, msg proto.Message, op
}

// EtcdKVPutWithTTL put (key, value) into etcd with a ttl of ttlSeconds
func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value string, ttlSeconds int64) (*clientv3.PutResponse, error) {
func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value string, ttlSeconds int64) (clientv3.LeaseID, error) {
kv := clientv3.NewKV(c)
grantResp, err := c.Grant(ctx, ttlSeconds)
if err != nil {
return nil, err
return 0, err
}
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
_, err = kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
return grantResp.ID, err
}

const (
Expand Down
6 changes: 6 additions & 0 deletions pkg/utils/etcdutil/etcdutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,12 @@ func (suite *loopWatcherTestSuite) TestWatcherBreak() {
// Case1: restart the etcd server
suite.etcd.Close()
suite.startEtcd()
suite.put("TestWatcherBreak", "0")
checkCache("0")
suite.etcd.Server.Stop()
time.Sleep(DefaultRequestTimeout)
suite.etcd.Close()
suite.startEtcd()
suite.put("TestWatcherBreak", "1")
checkCache("1")

Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,12 @@ func (s *Server) initTSOPrimaryWatcher() {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
var oldPrimary string
v, ok := s.servicePrimaryMap.Load(serviceName)
if ok {
oldPrimary = v.(string)
}
log.Info("delete tso primary", zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
Expand Down

0 comments on commit 6377b26

Please sign in to comment.