diff --git a/server/grpc_service.go b/server/grpc_service.go index 7229b085d0b..7f038b55101 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -63,6 +63,7 @@ var ( ErrNotLeader = status.Errorf(codes.Unavailable, "not leader") ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout") + ErrEtcdNotStarted = status.Errorf(codes.Unavailable, "server is started, but etcd not started") ) // GrpcServer wraps Server to provide grpc service. @@ -1896,6 +1897,9 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan // StoreGlobalConfig store global config into etcd by transaction func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) { + if s.client == nil { + return nil, ErrEtcdNotStarted + } ops := make([]clientv3.Op, len(request.Changes)) for i, item := range request.Changes { name := globalConfigPath + item.GetName() @@ -1915,6 +1919,9 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo // LoadGlobalConfig load global config from etcd func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlobalConfigRequest) (*pdpb.LoadGlobalConfigResponse, error) { + if s.client == nil { + return nil, ErrEtcdNotStarted + } names := request.Names res := make([]*pdpb.GlobalConfigItem, len(names)) for i, name := range names { @@ -1935,6 +1942,9 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo // or stoped by whatever reason // just reconnect to it. func (s *GrpcServer) WatchGlobalConfig(_ *pdpb.WatchGlobalConfigRequest, server pdpb.PD_WatchGlobalConfigServer) error { + if s.client == nil { + return ErrEtcdNotStarted + } ctx, cancel := context.WithCancel(s.Context()) defer cancel() err := s.sendAllGlobalConfig(ctx, server) diff --git a/server/server.go b/server/server.go index 1ef364f79ab..ee0359b9f74 100644 --- a/server/server.go +++ b/server/server.go @@ -1770,3 +1770,9 @@ func (s *Server) SetExternalTS(externalTS uint64) error { s.GetRaftCluster().SetExternalTS(externalTS) return nil } + +// SetClient sets the etcd client. +// Notes: it is only used for test. +func (s *Server) SetClient(client *clientv3.Client) { + s.client = client +} diff --git a/tests/server/global_config/global_config_test.go b/tests/server/global_config/global_config_test.go index f821d664b7a..83f49165311 100644 --- a/tests/server/global_config/global_config_test.go +++ b/tests/server/global_config/global_config_test.go @@ -18,6 +18,7 @@ import ( "context" "strconv" "strings" + "sync" "testing" "time" @@ -57,6 +58,7 @@ type globalConfigTestSuite struct { server *server.GrpcServer client *grpc.ClientConn cleanup server.CleanupFunc + mu sync.Mutex } func TestGlobalConfigTestSuite(t *testing.T) { @@ -237,3 +239,27 @@ func (suite *globalConfigTestSuite) TestClientWatch() { } } } + +func (suite *globalConfigTestSuite) TestEtcdNotStart() { + cli := suite.server.GetClient() + defer func() { + suite.mu.Lock() + suite.server.SetClient(cli) + suite.mu.Unlock() + }() + suite.mu.Lock() + suite.server.SetClient(nil) + suite.mu.Unlock() + err := suite.server.WatchGlobalConfig(&pdpb.WatchGlobalConfigRequest{}, testReceiver{re: suite.Require()}) + suite.Error(err) + + _, err = suite.server.StoreGlobalConfig(suite.server.Context(), &pdpb.StoreGlobalConfigRequest{ + Changes: []*pdpb.GlobalConfigItem{{Name: "0", Value: "0"}}, + }) + suite.Error(err) + + _, err = suite.server.LoadGlobalConfig(suite.server.Context(), &pdpb.LoadGlobalConfigRequest{ + Names: []string{"test_etcd"}, + }) + suite.Error(err) +}