diff --git a/client/client_test.go b/client/client_test.go index e82fe861a0e8..075e0c876723 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -92,11 +92,10 @@ func TestGRPCDialOption(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 500*time.Millisecond) defer cancel() cli := &pdServiceDiscovery{ - checkMembershipCh: make(chan struct{}, 1), - ctx: ctx, - cancel: cancel, - tlsCfg: &tlsutil.TLSConfig{}, - option: newOption(), + ctx: ctx, + cancel: cancel, + tlsCfg: &tlsutil.TLSConfig{}, + option: newOption(), } cli.urls.Store([]string{testClientURL}) cli.option.gRPCDialOptions = []grpc.DialOption{grpc.WithBlock()} diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index 4499c9e17c0d..0b85e59e31f4 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/client/tlsutil" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" ) const ( @@ -39,6 +40,7 @@ const ( memberUpdateInterval = time.Minute serviceModeUpdateInterval = 3 * time.Second updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. + requestTimeout = 2 * time.Second ) type serviceType int @@ -61,7 +63,7 @@ type ServiceDiscovery interface { GetKeyspaceID() uint32 // GetKeyspaceGroupID returns the ID of the keyspace group GetKeyspaceGroupID() uint32 - // DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls. + // DiscoverMicroservice discovers the microservice with the specified type and returns the server urls. DiscoverMicroservice(svcType serviceType) ([]string, error) // GetServiceURLs returns the URLs of the servers providing the service GetServiceURLs() []string @@ -141,8 +143,6 @@ type pdServiceDiscovery struct { // leader is updated. tsoGlobalAllocLeaderUpdatedCb tsoGlobalServAddrUpdatedFunc - checkMembershipCh chan struct{} - wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc @@ -153,6 +153,8 @@ type pdServiceDiscovery struct { tlsCfg *tlsutil.TLSConfig // Client option. option *option + + successReConnect chan struct{} } // newPDServiceDiscovery returns a new PD service discovery-based client. @@ -165,7 +167,6 @@ func newPDServiceDiscovery( urls []string, tlsCfg *tlsutil.TLSConfig, option *option, ) *pdServiceDiscovery { pdsd := &pdServiceDiscovery{ - checkMembershipCh: make(chan struct{}, 1), ctx: ctx, cancel: cancel, wg: wg, @@ -207,7 +208,7 @@ func (c *pdServiceDiscovery) Init() error { } c.wg.Add(2) - go c.updateMemberLoop() + go c.reconnectMemberLoop() go c.updateServiceModeLoop() c.isInitialized = true @@ -231,30 +232,107 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error { return errors.WithStack(err) } -func (c *pdServiceDiscovery) updateMemberLoop() { +func (c *pdServiceDiscovery) reconnectMemberLoop() { defer c.wg.Done() ctx, cancel := context.WithCancel(c.ctx) defer cancel() ticker := time.NewTicker(memberUpdateInterval) defer ticker.Stop() + failpoint.Inject("acceleratedMemberUpdateInterval", func() { + ticker.Stop() + ticker = time.NewTicker(time.Millisecond * 100) + }) for { select { case <-ctx.Done(): return case <-ticker.C: - case <-c.checkMembershipCh: } + failpoint.Inject("skipUpdateMember", func() { failpoint.Continue() }) + if err := c.updateMember(); err != nil { - log.Error("[pd] failed to update member", zap.Strings("urls", c.GetServiceURLs()), errs.ZapError(err)) + log.Error("[pd] failed to update member", errs.ZapError(err)) + } else { + c.SuccessReconnect() } } } +func (c *pdServiceDiscovery) waitForReady() error { + if e1 := c.waitForLeaderReady(); e1 != nil { + log.Error("[pd.waitForReady] failed to wait for leader ready", errs.ZapError(e1)) + return errors.WithStack(e1) + } else if e2 := c.loadMembers(); e2 != nil { + log.Error("[pd.waitForReady] failed to load members", errs.ZapError(e2)) + } else { + return nil + } + + deadline := time.Now().Add(requestTimeout) + for { + select { + case <-c.successReConnect: + return nil + case <-time.After(time.Until(deadline)): + log.Error("[pd.waitForReady] timeout") + return errors.New("wait for ready timeout") + } + } +} + +// waitForLeaderReady waits for the leader to be ready. +func (c *pdServiceDiscovery) waitForLeaderReady() error { + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + for { + old, ok := c.clientConns.Load(c.getLeaderAddr()) + if !ok { + cancel() + return errors.New("no leader") + } + cc := old.(*grpc.ClientConn) + + s := cc.GetState() + if s == connectivity.Ready { + cancel() + return nil + } + if !cc.WaitForStateChange(ctx, s) { + cancel() + // ctx got timeout or canceled. + return ctx.Err() + } + } +} + +func (c *pdServiceDiscovery) loadMembers() error { + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + + members, err := c.getMembers(ctx, c.getLeaderAddr(), updateMemberTimeout) + if err != nil { + log.Error("[pd.loadMembers] failed to load members ", zap.String("url", c.getLeaderAddr()), errs.ZapError(err)) + return errors.WithStack(err) + } else if members.GetHeader() == nil || members.GetLeader() == nil || len(members.GetLeader().GetClientUrls()) == 0 { + err = errs.ErrClientGetLeader.FastGenByArgs("leader address don't exist") + log.Error("[pd.loadMembers] leader address don't exist. ", zap.String("url", c.getLeaderAddr()), errs.ZapError(err)) + return errors.WithStack(err) + } + + return nil +} + +func (c *pdServiceDiscovery) SuccessReconnect() { + select { + case c.successReConnect <- struct{}{}: + default: + } +} + func (c *pdServiceDiscovery) updateServiceModeLoop() { defer c.wg.Done() failpoint.Inject("skipUpdateServiceMode", func() { @@ -319,7 +397,7 @@ func (c *pdServiceDiscovery) GetKeyspaceGroupID() uint32 { return defaultKeySpaceGroupID } -// DiscoverServiceURLs discovers the microservice with the specified type and returns the server urls. +// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls. func (c *pdServiceDiscovery) DiscoverMicroservice(svcType serviceType) (urls []string, err error) { switch svcType { case apiService: @@ -380,13 +458,12 @@ func (c *pdServiceDiscovery) GetBackupAddrs() []string { // ScheduleCheckMemberChanged is used to check if there is any membership // change among the leader and the followers. func (c *pdServiceDiscovery) ScheduleCheckMemberChanged() { - select { - case c.checkMembershipCh <- struct{}{}: - default: + if err := c.waitForReady(); err != nil { + log.Error("[pd] failed to wait for ready", errs.ZapError(err)) } } -// Immediately check if there is any membership change among the leader/followers in a +// CheckMemberChanged Immediately check if there is any membership change among the leader/followers in a // quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster. func (c *pdServiceDiscovery) CheckMemberChanged() error { return c.updateMember() diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index 2aeb49e15230..92f951299511 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -351,7 +351,7 @@ func (c *tsoServiceDiscovery) ScheduleCheckMemberChanged() { } } -// Immediately check if there is any membership change among the primary/secondaries in +// CheckMemberChanged Immediately check if there is any membership change among the primary/secondaries in // a primary/secondary configured cluster. func (c *tsoServiceDiscovery) CheckMemberChanged() error { c.apiSvcDiscovery.CheckMemberChanged() diff --git a/pkg/utils/testutil/leak.go b/pkg/utils/testutil/leak.go index d1329aef0e60..1bc70855a3e3 100644 --- a/pkg/utils/testutil/leak.go +++ b/pkg/utils/testutil/leak.go @@ -28,4 +28,5 @@ var LeakOptions = []goleak.Option{ goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"), // natefinch/lumberjack#56, It's a goroutine leak bug. Another ignore option PR https://github.com/pingcap/tidb/pull/27405/ goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"), + goleak.IgnoreTopFunction("google.golang.org/grpc.(*ClientConn).WaitForStateChange"), } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 41e7e650261d..55feceb3d041 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -96,6 +96,8 @@ func TestClientClusterIDCheck(t *testing.T) { func TestClientLeaderChange(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`)) + defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval") ctx, cancel := context.WithCancel(context.Background()) defer cancel() cluster, err := tests.NewTestCluster(ctx, 3) @@ -312,6 +314,8 @@ func TestTSOFollowerProxy(t *testing.T) { // TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207 func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`)) + defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval") ctx, cancel := context.WithCancel(context.Background()) defer cancel() cluster, err := tests.NewTestCluster(ctx, 3) @@ -375,6 +379,8 @@ func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) { // TODO: migrate the Local/Global TSO tests to TSO integration test folder. func TestGlobalAndLocalTSO(t *testing.T) { re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval", `return(true)`)) + defer failpoint.Disable("github.com/tikv/pd/client/acceleratedMemberUpdateInterval") ctx, cancel := context.WithCancel(context.Background()) defer cancel() dcLocationConfig := map[string]string{