Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix watch primary address revision and update cache when meets not leader #6279

Merged
merged 8 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
defaultLoadKeyspaceGroupsBatchSize = int64(400)
defaultLoadFromEtcdRetryInterval = 500 * time.Millisecond
defaultLoadFromEtcdMaxRetryTimes = int(defaultLoadKeyspaceGroupsTimeout / defaultLoadFromEtcdRetryInterval)
watchKEtcdChangeRetryInterval = 1 * time.Second
watchEtcdChangeRetryInterval = 1 * time.Second
)

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
Expand Down Expand Up @@ -341,7 +341,7 @@ func (kgm *KeyspaceGroupManager) loadKeyspaceGroups(
return revison, kgs, resp.More, nil
}

// startKeyspaceGroupsMetaWatchLoop Repeatedly watches any change in keyspace group membership/distribution
// startKeyspaceGroupsMetaWatchLoop repeatedly watches any change in keyspace group membership/distribution
// and apply the change dynamically.
func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64) {
defer logutil.LogPanic()
Expand All @@ -357,11 +357,11 @@ func (kgm *KeyspaceGroupManager) startKeyspaceGroupsMetaWatchLoop(revision int64

nextRevision, err := kgm.watchKeyspaceGroupsMetaChange(revision)
if err != nil {
log.Error("watcher canceled unexpectedly. Will start a new watcher after a while",
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
time.Sleep(watchKEtcdChangeRetryInterval)
time.Sleep(watchEtcdChangeRetryInterval)
}
}
}
Expand Down
30 changes: 26 additions & 4 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tsoutil

import (
"context"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -57,20 +58,32 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo

// DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host
func (s *TSODispatcher) DispatchRequest(
ctx context.Context, req Request, tsoProtoFactory ProtoFactory, doneCh <-chan struct{}, errCh chan<- error) {
ctx context.Context,
req Request,
tsoProtoFactory ProtoFactory,
doneCh <-chan struct{},
errCh chan<- error,
updateServicePrimaryAddrChs ...chan<- struct{}) {
val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests))
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan deadline, 1)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh)
go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, updateServicePrimaryAddrChs...)
go watchTSDeadline(ctx, tsDeadlineCh)
}
reqCh <- req
}

func (s *TSODispatcher) dispatch(
ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {
ctx context.Context,
tsoProtoFactory ProtoFactory,
forwardedHost string,
clientConn *grpc.ClientConn,
tsoRequestCh <-chan Request,
tsDeadlineCh chan<- deadline,
doneCh <-chan struct{},
errCh chan<- error,
updateServicePrimaryAddrChs ...chan<- struct{}) {
dispatcherCtx, ctxCancel := context.WithCancel(ctx)
defer ctxCancel()
defer s.dispatchChs.Delete(forwardedHost)
Expand All @@ -96,6 +109,7 @@ func (s *TSODispatcher) dispatch(
defer cancel()

requests := make([]Request, maxMergeRequests+1)
needUpdateServicePrimaryAddr := len(updateServicePrimaryAddrChs) > 0 && updateServicePrimaryAddrChs[0] != nil
for {
select {
case first := <-tsoRequestCh:
Expand All @@ -121,6 +135,14 @@ func (s *TSODispatcher) dispatch(
log.Error("proxy forward tso error",
zap.String("forwarded-host", forwardedHost),
errs.ZapError(errs.ErrGRPCSend, err))
if needUpdateServicePrimaryAddr {
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) {
select {
case updateServicePrimaryAddrChs[0] <- struct{}{}:
default:
}
}
}
select {
case <-dispatcherCtx.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
}

tsoRequest := tsoutil.NewPDProtoRequest(forwardedHost, clientConn, request, stream)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh)
s.tsoDispatcher.DispatchRequest(ctx, tsoRequest, tsoProtoFactory, doneCh, errCh, s.updateServicePrimaryAddrCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the channel is always passed to the function, then why use an optional parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DispatchRequest is also used by tso server, tso server is no needed to watch api key

continue
}

Expand Down
125 changes: 103 additions & 22 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ const (
maxRetryTimesGetServicePrimary = 25
// retryIntervalGetServicePrimary is the retry interval for getting primary addr.
retryIntervalGetServicePrimary = 100 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
)

// EtcdStartTimeout the timeout of the startup etcd.
Expand Down Expand Up @@ -215,6 +217,9 @@ type Server struct {
registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
// updateServicePrimaryAddrCh is used to notify the server to update the service primary address.
// Note: it is only used in API service mode.
updateServicePrimaryAddrCh chan struct{}
}

// HandlerBuilder builds a server HTTP handler.
Expand Down Expand Up @@ -558,7 +563,7 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.encryptionKeyManagerLoop()
if s.IsAPIServiceMode() { // disable tso service in api server
s.serverLoopWg.Add(1)
go s.watchServicePrimaryAddrLoop(mcs.TSOServiceName)
go s.startWatchServicePrimaryAddrLoop(mcs.TSOServiceName)
}
}

Expand Down Expand Up @@ -1716,43 +1721,98 @@ func (s *Server) GetServicePrimaryAddr(ctx context.Context, serviceName string)
return "", false
}

func (s *Server) watchServicePrimaryAddrLoop(serviceName string) {
// startWatchServicePrimaryAddrLoop starts a loop to watch the primary address of a given service.
func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()

serviceKey := fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
log.Info("start to watch", zap.String("service-key", serviceKey))

primary := &tsopb.Participant{}
ok, rev, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
if err != nil {
log.Error("get service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
s.updateServicePrimaryAddrCh = make(chan struct{}, 1)
serviceKey := s.servicePrimaryKey(serviceName)
var (
revision int64
err error
)
for i := 0; i < maxRetryTimesGetServicePrimary; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryIntervalGetServicePrimary):
}
Comment on lines +1739 to +1743
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call s.updateServicePrimaryAddr(serviceName) at the beginning of the loop? Otherwise, we have to wait for a retryIntervalGetServicePrimary before the first time update.

revision, err = s.updateServicePrimaryAddr(serviceName)
if revision != 0 && err == nil { // update success
break
}
}
listenUrls := primary.GetListenUrls()
if ok && len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
} else {
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey))
if err != nil {
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey), zap.Error(err))
}

watchChan := s.client.Watch(ctx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(rev))
log.Info("start to watch service primary addr", zap.String("service-key", serviceKey))
for {
select {
case <-ctx.Done():
log.Info("server is closed, exist watch service primary addr loop", zap.String("service", serviceName))
return
case res := <-watchChan:
for _, event := range res.Events {
default:
}
nextRevision, err := s.watchServicePrimaryAddr(ctx, serviceName, revision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
revision = nextRevision
time.Sleep(watchEtcdChangeRetryInterval)
}
}
}

// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we use it?

s.servicePrimaryMap.Store(serviceName, addr)
}

// watchServicePrimaryAddr watches the primary address on etcd.
func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
watcher := clientv3.NewWatcher(s.client)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many codes with the same logic, the only difference is the key. How about abstracting a function for them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will try it later.

defer watcher.Close()

for {
WatchChan:
watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the prefix?

select {
case <-ctx.Done():
return revision, nil
case <-s.updateServicePrimaryAddrCh:
revision, err = s.updateServicePrimaryAddr(serviceName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely even after we update the service primary address, we still have one more problem -- s.clientConns stores the forwarded hosts' grpc.ClientConn. We never update the broken connections. if a forwarded host's connection broke,e.g., the forwarded host restarted and broken the existing connection, we'll retrieve the broken connection for this forwarded host continuously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will try to create a new connection automatically when the existed connection is closed.

if err != nil {
log.Warn("update service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
}
goto WatchChan
rleungx marked this conversation as resolved.
Show resolved Hide resolved
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, use the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
goto WatchChan
}
if wresp.Err() != nil {
log.Error("watcher is canceled with",
zap.Int64("revision", revision),
errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err()))
return revision, wresp.Err()
}
for _, event := range wresp.Events {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
switch event.Type {
case clientv3.EventTypePut:
primary.ListenUrls = nil // reset the field
primary := &tsopb.Participant{}
if err := proto.Unmarshal(event.Kv.Value, primary); err != nil {
log.Error("watch service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
} else {
listenUrls = primary.GetListenUrls()
listenUrls := primary.GetListenUrls()
if len(listenUrls) > 0 {
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
Expand All @@ -1761,13 +1821,34 @@ func (s *Server) watchServicePrimaryAddrLoop(serviceName string) {
}
}
case clientv3.EventTypeDelete:
log.Warn("service primary is deleted", zap.String("service-key", serviceKey))
s.servicePrimaryMap.Delete(serviceName)
}
}
revision = wresp.Header.Revision
}
}
}

// updateServicePrimaryAddr updates the primary address from etcd with get operation.
func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
primary := &tsopb.Participant{}
ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
listenUrls := primary.GetListenUrls()
if !ok || err != nil || len(listenUrls) == 0 {
return 0, err
}
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0]))
return revision, nil
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}

// RecoverAllocID recover alloc id. set current base id to input id
func (s *Server) RecoverAllocID(ctx context.Context, id uint64) error {
return s.idAllocator.SetBase(id)
Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
re.Equal(addr, returnedEntry.ServiceAddr)

// test primary when only one server
expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s})
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(primary, addr)
re.Equal(primary, expectedPrimary)

// test API server discovery after unregister
cleanup()
Expand Down