Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix ticker stop in register #6876

Merged
merged 10 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 31 additions & 25 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,7 @@
case _, ok := <-kresp:
if !ok {
log.Error("keep alive failed", zap.String("key", sr.key))
// retry
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
t.Stop()
return
default:
}

<-t.C
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
if err != nil {
log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err))
t.Stop()
continue
}
}
kresp = sr.renewKeepalive()
}
}
}
Expand All @@ -113,6 +89,36 @@
return nil
}

func (sr *ServiceRegister) renewKeepalive() <-chan *clientv3.LeaseKeepAliveResponse {
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2)
defer t.Stop()
for {
select {
case <-sr.ctx.Done():
log.Info("exit register process", zap.String("key", sr.key))
return nil

Check warning on line 99 in pkg/mcs/discovery/register.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/discovery/register.go#L97-L99

Added lines #L97 - L99 were not covered by tests
case <-t.C:
resp, err := sr.cli.Grant(sr.ctx, sr.ttl)
if err != nil {
log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err))
continue

Check warning on line 104 in pkg/mcs/discovery/register.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/discovery/register.go#L103-L104

Added lines #L103 - L104 were not covered by tests
}

if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil {
log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err))
continue

Check warning on line 109 in pkg/mcs/discovery/register.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/discovery/register.go#L108-L109

Added lines #L108 - L109 were not covered by tests
}

kresp, err := sr.cli.KeepAlive(sr.ctx, resp.ID)
if err != nil {
log.Error("client keep alive failed", zap.String("key", sr.key), zap.Error(err))
continue

Check warning on line 115 in pkg/mcs/discovery/register.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/discovery/register.go#L114-L115

Added lines #L114 - L115 were not covered by tests
}
return kresp
}
}
}

// Deregister deregisters the service from etcd.
func (sr *ServiceRegister) Deregister() error {
sr.cancel()
Expand Down
38 changes: 27 additions & 11 deletions pkg/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,13 @@ func TestRegister(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.NewFromURL(ep)
re.NoError(err)

<-etcd.Server.ReadyNotify()
// with http prefix

// Test register with http prefix.
sr := NewServiceRegister(context.Background(), client, "12345", "test_service", "http://127.0.0.1:1", "http://127.0.0.1:1", 10)
re.NoError(err)
err = sr.Register()
Expand All @@ -49,20 +45,40 @@ func TestRegister(t *testing.T) {
re.NoError(err)
re.Equal("http://127.0.0.1:1", string(resp.Kvs[0].Value))

// Test deregister.
err = sr.Deregister()
re.NoError(err)
resp, err = client.Get(context.Background(), sr.key)
re.NoError(err)
re.Empty(resp.Kvs)

// Test the case that ctx is canceled.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
re.NoError(err)
err = sr.Register()
re.NoError(err)
sr.cancel()
// ensure that the lease is expired
time.Sleep(3 * time.Second)
resp, err = client.Get(context.Background(), sr.key)
re.Empty(getKeyAfterLeaseExpired(re, client, sr.key))

// Test the case that keepalive is failed when the etcd is restarted.
sr = NewServiceRegister(context.Background(), client, "12345", "test_service", "127.0.0.1:2", "127.0.0.1:2", 1)
err = sr.Register()
re.NoError(err)
re.Empty(resp.Kvs)
re.Equal("127.0.0.1:2", getKeyAfterLeaseExpired(re, client, sr.key))
etcd.Close() // close the etcd to make the keepalive failed
time.Sleep(3 * time.Second) // ensure that the lease is expired
etcd, err = embed.StartEtcd(cfg)
re.NoError(err)
<-etcd.Server.ReadyNotify()
defer etcd.Close()
re.Equal("127.0.0.1:2", getKeyAfterLeaseExpired(re, client, sr.key))
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}

func getKeyAfterLeaseExpired(re *require.Assertions, client *clientv3.Client, key string) string {
time.Sleep(3 * time.Second) // ensure that the lease is expired
resp, err := client.Get(context.Background(), key)
re.NoError(err)
if len(resp.Kvs) == 0 {
return ""
}
return string(resp.Kvs[0].Value)
}
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,12 @@ func (s *Server) initTSOPrimaryWatcher() {
return nil
}
deleteFn := func(kv *mvccpb.KeyValue) error {
var oldPrimary string
v, ok := s.servicePrimaryMap.Load(serviceName)
if ok {
oldPrimary = v.(string)
}
log.Info("delete tso primary", zap.String("old-primary", oldPrimary))
s.servicePrimaryMap.Delete(serviceName)
return nil
}
Expand Down