Skip to content

Commit

Permalink
mcs: pick some tso forward fix (tikv#6341)(tikv#6279) (tikv#77)
Browse files Browse the repository at this point in the history
* mcs: update client when meet transport is closing (tikv#6341)

* mcs: update client when meet transport is closing

Signed-off-by: lhy1024 <admin@liudos.us>

* address comments

Signed-off-by: lhy1024 <admin@liudos.us>

* add retry

Signed-off-by: lhy1024 <admin@liudos.us>

---------

Signed-off-by: lhy1024 <admin@liudos.us>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Signed-off-by: lhy1024 <admin@liudos.us>

* mcs: fix watch primary address revision and update cache when meets not leader (tikv#6279)

ref tikv#5895

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Signed-off-by: lhy1024 <admin@liudos.us>

---------

Signed-off-by: lhy1024 <admin@liudos.us>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people committed Apr 24, 2023
1 parent 5a11b83 commit e050e0e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 51 deletions.
49 changes: 35 additions & 14 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ const (
heartbeatSendTimeout = 5 * time.Second

// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second
maxRetryTimesGetGlobalTSOFromTSOServer = 3
)

// gRPC errors
Expand Down Expand Up @@ -339,7 +340,11 @@ errHandling:
if err != nil {
log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err))
if strings.Contains(err.Error(), errs.NotLeaderErr) || strings.Contains(err.Error(), errs.MismatchLeaderErr) {
s.updateServicePrimaryAddrCh <- struct{}{}
select {
case s.updateServicePrimaryAddrCh <- struct{}{}:
log.Info("update service primary address")
default:
}
}
select {
case <-dispatcherCtx.Done():
Expand Down Expand Up @@ -1993,24 +1998,40 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest
if !ok || forwardedHost == "" {
return pdpb.Timestamp{}, ErrNotFoundTSOAddr
}
forwardStream, err := s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(&tsopb.TsoRequest{
request := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: s.clusterID,
KeyspaceId: utils.DefaultKeyspaceID,
KeyspaceGroupId: utils.DefaultKeySpaceGroupID,
},
Count: 1,
})
ts, err := forwardStream.Recv()
if err != nil {
log.Error("get global tso from tso server failed", zap.Error(err))
return pdpb.Timestamp{}, err
}
return *ts.GetTimestamp(), nil
var (
forwardStream tsopb.TSO_TsoClient
ts *tsopb.TsoResponse
err error
)
for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ {
forwardStream, err = s.getTSOForwardStream(forwardedHost)
if err != nil {
return pdpb.Timestamp{}, err
}
forwardStream.Send(request)
ts, err = forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), codes.Unavailable.String()) {
s.tsoClientPool.Lock()
delete(s.tsoClientPool.clients, forwardedHost)
s.tsoClientPool.Unlock()
continue
}
log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err
}
return *ts.GetTimestamp(), nil
}
log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost))
return pdpb.Timestamp{}, err
}

func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) {
Expand Down
72 changes: 35 additions & 37 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ const (
// retryIntervalGetServicePrimary is the retry interval for getting primary addr.
retryIntervalGetServicePrimary = 100 * time.Millisecond
// TODO: move it to etcdutil
watchKEtcdChangeRetryInterval = 1 * time.Second
watchEtcdChangeRetryInterval = 1 * time.Second
)

// EtcdStartTimeout the timeout of the startup etcd.
Expand Down Expand Up @@ -207,9 +207,11 @@ type Server struct {

auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
registry *registry.ServiceRegistry
mode string
servicePrimaryMap sync.Map /* Store as map[string]string */
// updateServicePrimaryAddrCh is used to notify the server to update the service primary address.
// Note: it is only used in API service mode.
updateServicePrimaryAddrCh chan struct{}
}

Expand Down Expand Up @@ -244,7 +246,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
}{
clients: make(map[string]tsopb.TSO_TsoClient),
},
updateServicePrimaryAddrCh: make(chan struct{}, 1),
}
s.handler = newHandler(s)

Expand Down Expand Up @@ -1757,25 +1758,25 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) {
defer s.serverLoopWg.Done()
ctx, cancel := context.WithCancel(s.serverLoopCtx)
defer cancel()

s.updateServicePrimaryAddrCh = make(chan struct{}, 1)
serviceKey := s.servicePrimaryKey(serviceName)
var (
revision int64
err error
)
for i := 0; i < maxRetryTimesGetServicePrimary; i++ {
revision, err = s.updateServicePrimaryAddr(serviceName)
if revision != 0 && err == nil { // update success
break
}
select {
case <-ctx.Done():
return
case <-time.After(retryIntervalGetServicePrimary):
}
revision, err = s.updateServicePrimaryAddr(serviceName)
if err == nil {
break
}
}
if revision == 0 {
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey))
if err != nil {
log.Warn("service primary addr doesn't exist", zap.String("service-key", serviceKey), zap.Error(err))
}
log.Info("start to watch service primary addr", zap.String("service-key", serviceKey))
for {
Expand All @@ -1789,35 +1790,14 @@ func (s *Server) startWatchServicePrimaryAddrLoop(serviceName string) {
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchKEtcdChangeRetryInterval)),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
revision = nextRevision
time.Sleep(watchKEtcdChangeRetryInterval)
time.Sleep(watchEtcdChangeRetryInterval)
}
}
}

// SetServicePrimaryAddr sets the primary address directly.
// Note: This function is only used for test.
func (s *Server) SetServicePrimaryAddr(serviceName, addr string) {
s.servicePrimaryMap.Store(serviceName, addr)
}

// updateServicePrimaryAddr updates the primary address from etcd with get operation.
func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
primary := &tsopb.Participant{}
ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
listenUrls := primary.GetListenUrls()
if !ok || err != nil || len(listenUrls) == 0 {
return 0, err
}
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0]))
return revision, nil
}

// watchServicePrimaryAddr watches the primary address on etcd.
func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string, revision int64) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
Expand All @@ -1826,12 +1806,15 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string

for {
WatchChan:
watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithPrefix(), clientv3.WithRev(revision))
watchChan := watcher.Watch(s.serverLoopCtx, serviceKey, clientv3.WithRev(revision))
select {
case <-ctx.Done():
return revision, nil
case <-s.updateServicePrimaryAddrCh:
revision, err = s.updateServicePrimaryAddr(serviceName)
if err != nil {
log.Warn("update service primary addr failed", zap.String("service-key", serviceKey), zap.Error(err))
}
goto WatchChan
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
Expand Down Expand Up @@ -1867,11 +1850,26 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string
s.servicePrimaryMap.Delete(serviceName)
}
}
revision = wresp.Header.Revision
revision = wresp.Header.Revision + 1
}
}
}

// updateServicePrimaryAddr updates the primary address from etcd with get operation.
func (s *Server) updateServicePrimaryAddr(serviceName string) (nextRevision int64, err error) {
serviceKey := s.servicePrimaryKey(serviceName)
primary := &tsopb.Participant{}
ok, revision, err := etcdutil.GetProtoMsgWithModRev(s.client, serviceKey, primary)
listenUrls := primary.GetListenUrls()
if !ok || err != nil || len(listenUrls) == 0 {
return 0, err
}
// listenUrls[0] is the primary service endpoint of the keyspace group
s.servicePrimaryMap.Store(serviceName, listenUrls[0])
log.Info("update service primary addr", zap.String("service-key", serviceKey), zap.String("primary-addr", listenUrls[0]))
return revision, nil
}

func (s *Server) servicePrimaryKey(serviceName string) string {
return fmt.Sprintf("/ms/%d/%s/%s/%s", s.clusterID, serviceName, fmt.Sprintf("%05d", 0), "primary")
}
Expand Down

0 comments on commit e050e0e

Please sign in to comment.