From 2c43e7a109fb3a1e9f37db9e7e10b93d555488b1 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Fri, 7 Apr 2023 15:24:36 +0800 Subject: [PATCH 1/6] mcs: fix watch primary address revision Signed-off-by: lhy1024 --- pkg/tso/keyspace_group_manager.go | 4 +- server/server.go | 98 ++++++++++++++++++++++++------- 2 files changed, 80 insertions(+), 22 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2f97299fede..2f5aa289012 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -341,7 +341,7 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups( return revison, kgs, resp.More, nil } -// startKeyspaceGroupsMetaWatchLoop Repeatedly watches any change in keyspace group membership/distribution +// startKeyspaceGroupsMetaWatchLoop repeatedly watches any change in keyspace group membership/distribution // and apply the change dynamically. func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) { defer logutil.LogPanic() @@ -357,7 +357,7 @@ func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64 nextRevision, err := kgm.watchKeyspaceGroupsMetaChange(revision) if err != nil { - log.Error("watcher canceled unexpectedly. Will start a new watcher after a while", + log.Error("watcher canceled unexpectedly and a new watcher will start after a while", zap.Int64("next-revision", nextRevision), zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), zap.Error(err)) diff --git a/server/server.go b/server/server.go index 06ad2198288..6f28a5799a1 100644 --- a/server/server.go +++ b/server/server.go @@ -104,6 +104,8 @@ const ( maxRetryTimesGetServicePrimary = 25 // retryIntervalGetServicePrimary is the retry interval for getting primary addr. retryIntervalGetServicePrimary = 100 * time.Millisecond + // TODO: move it to etcdutil + watchKEtcdChangeRetryInterval = 1 * time.Second ) // EtcdStartTimeout the timeout of the startup etcd. @@ -558,7 +560,7 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.encryptionKeyManagerLoop() if s.IsAPIServiceMode() { // disable tso service in api server s.serverLoopWg.Add(1) - go s.watchServicePrimaryAddrLoop(mcs.TSOServiceName) + go s.startWatchServicePrimaryAddrLoop(mcs.TSOServiceName) } } @@ -1716,43 +1718,88 @@ func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string) return "", false } -func (s *Server) watchServicePrimaryAddrLoop(serviceName string) { +// startWatchServicePrimaryAddrLoop starts a loop to watch the primary address of a given service. +func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { defer logutil.LogPanic() defer s.serverLoopWg.Done() ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() - serviceKey := fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary") - log.Info("start to watch", zap.String("service-key", serviceKey)) - - primary := &tsopb.Participant{} - ok, rev, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) - if err != nil { - log.Error("get service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err)) + serviceKey := s.servicePrimaryKey(serviceName) + var revision int64 + for i := 0; i < maxRetryTimesGetServicePrimary; i++ { + select { + case <-ctx.Done(): + return + case <-time.After(retryIntervalGetServicePrimary): + } + primary := &tsopb.Participant{} + ok, r, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) + if err != nil { + log.Error("get service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err)) + continue + } + listenUrls := primary.GetListenUrls() + if ok && len(listenUrls) > 0 { + // listenUrls[0] is the primary service endpoint of the keyspace group + s.servicePrimaryMap.Store(serviceName, listenUrls[0]) + revision = r + log.Info("get service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0])) + break + } } - listenUrls := primary.GetListenUrls() - if ok && len(listenUrls) > 0 { - // listenUrls[0] is the primary service endpoint of the keyspace group - s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - } else { + if revision == 0 { log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey)) } - - watchChan := s.client.Watch(ctx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(rev)) + log.Info("start to watch", zap.String("service-key", serviceKey)) for { select { case <-ctx.Done(): log.Info("server is closed, exist watch service primary addr loop", zap.String("service", serviceName)) return - case res := <-watchChan: - for _, event := range res.Events { + default: + } + nextRevision, err := s.watchServicePrimaryAddr(ctx, serviceName, revision) + if err != nil { + log.Error("watcher canceled unexpectedly and a new watcher will start after a while", + zap.Int64("next-revision", nextRevision), + zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), + zap.Error(err)) + revision = nextRevision + time.Sleep(watchKEtcdChangeRetryInterval) + } + } +} + +func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) { + serviceKey := s.servicePrimaryKey(serviceName) + watcher := clientv3.NewWatcher(s.client) + defer watcher.Close() + + for { + watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision)) + for wresp := range watchChan { + if wresp.CompactRevision != 0 { + log.Warn("required revision has been compacted, use the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) + revision = wresp.CompactRevision + break + } + if wresp.Err() != nil { + log.Error("watcher is canceled with", + zap.Int64("revision", revision), + errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) + return revision, wresp.Err() + } + for _, event := range wresp.Events { switch event.Type { case clientv3.EventTypePut: - primary.ListenUrls = nil // reset the field + primary := &tsopb.Participant{} if err := proto.Unmarshal(event.Kv.Value, primary); err != nil { log.Error("watch service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err)) } else { - listenUrls = primary.GetListenUrls() + listenUrls := primary.GetListenUrls() if len(listenUrls) > 0 { // listenUrls[0] is the primary service endpoint of the keyspace group s.servicePrimaryMap.Store(serviceName, listenUrls[0]) @@ -1761,13 +1808,24 @@ func (s *Server) watchServicePrimaryAddrLoop(serviceName string) { } } case clientv3.EventTypeDelete: + log.Warn("service primary is deleted", zap.String("service-key", serviceKey)) s.servicePrimaryMap.Delete(serviceName) } } + revision = wresp.Header.Revision + } + select { + case <-ctx.Done(): + return revision, nil + default: } } } +func (s *Server) servicePrimaryKey(serviceName string) string { + return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary") +} + // RecoverAllocID recover alloc id. set current base id to input id func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error { return s.idAllocator.SetBase(id) From e34a074ef3d038fa1bc508305d06ff5864b5fad0 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sat, 8 Apr 2023 00:38:30 +0800 Subject: [PATCH 2/6] add update cache when meets not leader Signed-off-by: lhy1024 --- pkg/utils/tsoutil/tso_dispatcher.go | 12 +++- server/grpc_service.go | 2 +- server/server.go | 67 ++++++++++++------- .../mcs/discovery/register_test.go | 3 +- 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 351fe424b16..0f9c729ef26 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -16,6 +16,7 @@ package tsoutil import ( "context" + "strings" "sync" "time" @@ -57,12 +58,12 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo // DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host func (s *TSODispatcher) DispatchRequest( - ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error) { + ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error, updateServicePrimaryAddrChs ...chan<- struct{}) { val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests)) reqCh := val.(chan Request) if !loaded { tsDeadlineCh := make(chan deadline, 1) - go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh) + go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, updateServicePrimaryAddrChs...) go watchTSDeadline(ctx, tsDeadlineCh) } reqCh <- req @@ -70,7 +71,7 @@ func (s *TSODispatcher) DispatchRequest( func (s *TSODispatcher) dispatch( ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn, - tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) { + tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error, updateServicePrimaryAddrChs ...chan<- struct{}) { dispatcherCtx, ctxCancel := context.WithCancel(ctx) defer ctxCancel() defer s.dispatchChs.Delete(forwardedHost) @@ -121,6 +122,11 @@ func (s *TSODispatcher) dispatch( log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err)) + if len(updateServicePrimaryAddrChs) > 0 { + if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { + updateServicePrimaryAddrChs[0] <- struct{}{} + } + } select { case <-dispatcherCtx.Done(): return diff --git a/server/grpc_service.go b/server/grpc_service.go index 42f85e64336..08aa33406b4 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -224,7 +224,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { } tsoRequest := tsoutil.NewPDProtoRequest(forwardedHost, clientConn, request, stream) - s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh) + s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh, s.updateServicePrimaryAddrCh) continue } diff --git a/server/server.go b/server/server.go index 6f28a5799a1..02d1ec42102 100644 --- a/server/server.go +++ b/server/server.go @@ -214,9 +214,10 @@ type Server struct { auditBackends []audit.Backend - registry *registry.ServiceRegistry - mode string - servicePrimaryMap sync.Map /* Store as map[string]string */ + registry *registry.ServiceRegistry + mode string + servicePrimaryMap sync.Map /* Store as map[string]string */ + updateServicePrimaryAddrCh chan struct{} } // HandlerBuilder builds a server HTTP handler. @@ -249,6 +250,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le }{ clients: make(map[string]tsopb.TSO_TsoClient), }, + updateServicePrimaryAddrCh: make(chan struct{}, 1), } s.handler = newHandler(s) @@ -1726,32 +1728,25 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { defer cancel() serviceKey := s.servicePrimaryKey(serviceName) - var revision int64 + var ( + revision int64 + err error + ) for i := 0; i < maxRetryTimesGetServicePrimary; i++ { select { case <-ctx.Done(): return case <-time.After(retryIntervalGetServicePrimary): } - primary := &tsopb.Participant{} - ok, r, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) - if err != nil { - log.Error("get service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err)) - continue - } - listenUrls := primary.GetListenUrls() - if ok && len(listenUrls) > 0 { - // listenUrls[0] is the primary service endpoint of the keyspace group - s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - revision = r - log.Info("get service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0])) + revision, err = s.updateServicePrimaryAddr(serviceName) + if err == nil { break } } if revision == 0 { log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey)) } - log.Info("start to watch", zap.String("service-key", serviceKey)) + log.Info("start to watch service primary addr", zap.String("service-key", serviceKey)) for { select { case <-ctx.Done(): @@ -1771,20 +1766,49 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { } } +// SetServicePrimaryAddr sets the primary address directly. +// Note: This function is only used for test. +func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { + s.servicePrimaryMap.Store(serviceName, addr) +} + +// updateServicePrimaryAddr updates the primary address from etcd with get operation. +func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) { + serviceKey := s.servicePrimaryKey(serviceName) + primary := &tsopb.Participant{} + ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) + listenUrls := primary.GetListenUrls() + if !ok || err != nil || len(listenUrls) == 0 { + return 0, err + } + // listenUrls[0] is the primary service endpoint of the keyspace group + s.servicePrimaryMap.Store(serviceName, listenUrls[0]) + log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0])) + return revision, nil +} + +// watchServicePrimaryAddr watches the primary address on etcd. func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) { serviceKey := s.servicePrimaryKey(serviceName) watcher := clientv3.NewWatcher(s.client) defer watcher.Close() for { + WatchChan: watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision)) - for wresp := range watchChan { + select { + case <-ctx.Done(): + return revision, nil + case <-s.updateServicePrimaryAddrCh: + revision, err = s.updateServicePrimaryAddr(serviceName) + goto WatchChan + case wresp := <-watchChan: if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision", zap.Int64("required-revision", revision), zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision - break + goto WatchChan } if wresp.Err() != nil { log.Error("watcher is canceled with", @@ -1814,11 +1838,6 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string } revision = wresp.Header.Revision } - select { - case <-ctx.Done(): - return revision, nil - default: - } } } diff --git a/tests/integrations/mcs/discovery/register_test.go b/tests/integrations/mcs/discovery/register_test.go index ebb2a7fc59b..01db31dc6e8 100644 --- a/tests/integrations/mcs/discovery/register_test.go +++ b/tests/integrations/mcs/discovery/register_test.go @@ -96,9 +96,10 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) { re.Equal(addr, returnedEntry.ServiceAddr) // test primary when only one server + expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s}) primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName) re.True(exist) - re.Equal(primary, addr) + re.Equal(primary, expectedPrimary) // test API server discovery after unregister cleanup() From 1b09ef40eb90035c8f6e68f446dd2f6866d69528 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 10 Apr 2023 12:41:07 +0800 Subject: [PATCH 3/6] address comments Signed-off-by: lhy1024 --- pkg/tso/keyspace_group_manager.go | 6 ++-- pkg/utils/tsoutil/tso_dispatcher.go | 21 +++++++++--- server/server.go | 52 ++++++++++++++++------------- 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2f5aa289012..863c2365670 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -50,7 +50,7 @@ const ( defaultLoadKeyspaceGroupsBatchSize = int64(400) defaultLoadFromEtcdRetryInterval = 500 * time.Millisecond defaultLoadFromEtcdMaxRetryTimes = int(defaultLoadKeyspaceGroupsTimeout / defaultLoadFromEtcdRetryInterval) - watchKEtcdChangeRetryInterval = 1 * time.Second + watchEtcdChangeRetryInterval = 1 * time.Second ) // KeyspaceGroupManager manages the members of the keyspace groups assigned to this host. @@ -359,9 +359,9 @@ func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64 if err != nil { log.Error("watcher canceled unexpectedly and a new watcher will start after a while", zap.Int64("next-revision", nextRevision), - zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), + zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)), zap.Error(err)) - time.Sleep(watchKEtcdChangeRetryInterval) + time.Sleep(watchEtcdChangeRetryInterval) } } } diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 0f9c729ef26..9f0ce2df07d 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -58,7 +58,12 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo // DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host func (s *TSODispatcher) DispatchRequest( - ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error, updateServicePrimaryAddrChs ...chan<- struct{}) { + ctx context.Context, + req Request, + tsoProtoFactory ProtoFactory, + doneCh <-chan struct{}, + errCh chan<- error, + updateServicePrimaryAddrChs ...chan<- struct{}) { val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests)) reqCh := val.(chan Request) if !loaded { @@ -70,8 +75,15 @@ func (s *TSODispatcher) DispatchRequest( } func (s *TSODispatcher) dispatch( - ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn, - tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error, updateServicePrimaryAddrChs ...chan<- struct{}) { + ctx context.Context, + tsoProtoFactory ProtoFactory, + forwardedHost string, + clientConn *grpc.ClientConn, + tsoRequestCh <-chan Request, + tsDeadlineCh chan<- deadline, + doneCh <-chan struct{}, + errCh chan<- error, + updateServicePrimaryAddrChs ...chan<- struct{}) { dispatcherCtx, ctxCancel := context.WithCancel(ctx) defer ctxCancel() defer s.dispatchChs.Delete(forwardedHost) @@ -97,6 +109,7 @@ func (s *TSODispatcher) dispatch( defer cancel() requests := make([]Request, maxMergeRequests+1) + needUpdateServicePrimaryAddr := len(updateServicePrimaryAddrChs) > 0 && updateServicePrimaryAddrChs[0] != nil for { select { case first := <-tsoRequestCh: @@ -122,7 +135,7 @@ func (s *TSODispatcher) dispatch( log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err)) - if len(updateServicePrimaryAddrChs) > 0 { + if needUpdateServicePrimaryAddr { if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { updateServicePrimaryAddrChs[0] <- struct{}{} } diff --git a/server/server.go b/server/server.go index 02d1ec42102..c36fbcd9fb6 100644 --- a/server/server.go +++ b/server/server.go @@ -105,7 +105,7 @@ const ( // retryIntervalGetServicePrimary is the retry interval for getting primary addr. retryIntervalGetServicePrimary = 100 * time.Millisecond // TODO: move it to etcdutil - watchKEtcdChangeRetryInterval = 1 * time.Second + watchEtcdChangeRetryInterval = 1 * time.Second ) // EtcdStartTimeout the timeout of the startup etcd. @@ -214,9 +214,11 @@ type Server struct { auditBackends []audit.Backend - registry *registry.ServiceRegistry - mode string - servicePrimaryMap sync.Map /* Store as map[string]string */ + registry *registry.ServiceRegistry + mode string + servicePrimaryMap sync.Map /* Store as map[string]string */ + // updateServicePrimaryAddrCh is used to notify the server to update the service primary address. + // Note: it is only used in API service mode. updateServicePrimaryAddrCh chan struct{} } @@ -250,7 +252,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le }{ clients: make(map[string]tsopb.TSO_TsoClient), }, - updateServicePrimaryAddrCh: make(chan struct{}, 1), } s.handler = newHandler(s) @@ -1726,7 +1727,7 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { defer s.serverLoopWg.Done() ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() - + s.updateServicePrimaryAddrCh = make(chan struct{}, 1) serviceKey := s.servicePrimaryKey(serviceName) var ( revision int64 @@ -1739,7 +1740,7 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { case <-time.After(retryIntervalGetServicePrimary): } revision, err = s.updateServicePrimaryAddr(serviceName) - if err == nil { + if err != nil { break } } @@ -1758,10 +1759,10 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { if err != nil { log.Error("watcher canceled unexpectedly and a new watcher will start after a while", zap.Int64("next-revision", nextRevision), - zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)), + zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)), zap.Error(err)) revision = nextRevision - time.Sleep(watchKEtcdChangeRetryInterval) + time.Sleep(watchEtcdChangeRetryInterval) } } } @@ -1772,21 +1773,6 @@ func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { s.servicePrimaryMap.Store(serviceName, addr) } -// updateServicePrimaryAddr updates the primary address from etcd with get operation. -func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) { - serviceKey := s.servicePrimaryKey(serviceName) - primary := &tsopb.Participant{} - ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) - listenUrls := primary.GetListenUrls() - if !ok || err != nil || len(listenUrls) == 0 { - return 0, err - } - // listenUrls[0] is the primary service endpoint of the keyspace group - s.servicePrimaryMap.Store(serviceName, listenUrls[0]) - log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0])) - return revision, nil -} - // watchServicePrimaryAddr watches the primary address on etcd. func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) { serviceKey := s.servicePrimaryKey(serviceName) @@ -1801,6 +1787,9 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string return revision, nil case <-s.updateServicePrimaryAddrCh: revision, err = s.updateServicePrimaryAddr(serviceName) + if err != nil { + log.Warn("update service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err)) + } goto WatchChan case wresp := <-watchChan: if wresp.CompactRevision != 0 { @@ -1841,6 +1830,21 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string } } +// updateServicePrimaryAddr updates the primary address from etcd with get operation. +func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) { + serviceKey := s.servicePrimaryKey(serviceName) + primary := &tsopb.Participant{} + ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary) + listenUrls := primary.GetListenUrls() + if !ok || err != nil || len(listenUrls) == 0 { + return 0, err + } + // listenUrls[0] is the primary service endpoint of the keyspace group + s.servicePrimaryMap.Store(serviceName, listenUrls[0]) + log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0])) + return revision, nil +} + func (s *Server) servicePrimaryKey(serviceName string) string { return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary") } From 642872e61c53e0c04e2d41b8ec55394656457762 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 10 Apr 2023 12:48:41 +0800 Subject: [PATCH 4/6] fix possible block Signed-off-by: lhy1024 --- pkg/utils/tsoutil/tso_dispatcher.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 9f0ce2df07d..18080307b94 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -137,7 +137,10 @@ func (s *TSODispatcher) dispatch( errs.ZapError(errs.ErrGRPCSend, err)) if needUpdateServicePrimaryAddr { if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) { - updateServicePrimaryAddrChs[0] <- struct{}{} + select { + case updateServicePrimaryAddrChs[0] <- struct{}{}: + default: + } } } select { From 719b6f958398428f78c849c662626bb09f850242 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 11 Apr 2023 17:18:08 +0800 Subject: [PATCH 5/6] address comments Signed-off-by: lhy1024 --- server/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/server.go b/server/server.go index c36fbcd9fb6..a053cf2be2a 100644 --- a/server/server.go +++ b/server/server.go @@ -1740,12 +1740,12 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { case <-time.After(retryIntervalGetServicePrimary): } revision, err = s.updateServicePrimaryAddr(serviceName) - if err != nil { + if revision != 0 && err == nil { // update success break } } - if revision == 0 { - log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey)) + if err != nil { + log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey), zap.Error(err)) } log.Info("start to watch service primary addr", zap.String("service-key", serviceKey)) for { From 4a6cb7fb835407b05abbed2710a1e7e4ddf35fe8 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 17 Apr 2023 15:16:02 +0800 Subject: [PATCH 6/6] address comments Signed-off-by: lhy1024 --- server/server.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/server/server.go b/server/server.go index a053cf2be2a..d0e8525e9e7 100644 --- a/server/server.go +++ b/server/server.go @@ -1734,15 +1734,15 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { err error ) for i := 0; i < maxRetryTimesGetServicePrimary; i++ { + revision, err = s.updateServicePrimaryAddr(serviceName) + if revision != 0 && err == nil { // update success + break + } select { case <-ctx.Done(): return case <-time.After(retryIntervalGetServicePrimary): } - revision, err = s.updateServicePrimaryAddr(serviceName) - if revision != 0 && err == nil { // update success - break - } } if err != nil { log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey), zap.Error(err)) @@ -1767,12 +1767,6 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) { } } -// SetServicePrimaryAddr sets the primary address directly. -// Note: This function is only used for test. -func (s *Server) SetServicePrimaryAddr(serviceName, addr string) { - s.servicePrimaryMap.Store(serviceName, addr) -} - // watchServicePrimaryAddr watches the primary address on etcd. func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) { serviceKey := s.servicePrimaryKey(serviceName) @@ -1781,7 +1775,7 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string for { WatchChan: - watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision)) + watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithRev(revision)) select { case <-ctx.Done(): return revision, nil