diff --git a/client/client.go b/client/client.go index 3467ce05c20..41de5e8a684 100644 --- a/client/client.go +++ b/client/client.go @@ -150,23 +150,25 @@ var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. type serviceModeKeeper struct { sync.RWMutex - serviceMode pdpb.ServiceMode - tsoClient *tso.Cli - tsoSvcDiscovery sd.ServiceDiscovery - routerClient *router.Cli + serviceMode pdpb.ServiceMode + tsoClient *tso.Cli + tsoSvcDiscovery sd.ServiceDiscovery + routerClient *router.Cli + resourceManagerDiscovery *sd.ResourceManagerDiscovery } func (k *serviceModeKeeper) close() { k.Lock() defer k.Unlock() - switch k.serviceMode { - case pdpb.ServiceMode_API_SVC_MODE: + if k.tsoSvcDiscovery != nil { k.tsoSvcDiscovery.Close() - fallthrough - case pdpb.ServiceMode_PD_SVC_MODE: - k.tsoClient.Close() - case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + k.tsoSvcDiscovery = nil } + if k.resourceManagerDiscovery != nil { + k.resourceManagerDiscovery.Close() + k.resourceManagerDiscovery = nil + } + k.tsoClient.Close() } type client struct { @@ -430,6 +432,19 @@ func (c *client) GetServiceDiscovery() sd.ServiceDiscovery { return c.inner.serviceDiscovery } +// GetResourceManagerServiceURL returns the discovered standalone resource manager +// service URL. It is an optional extension method (not part of the Client +// interface) mainly intended for integration tests. +// +// When it returns an empty string, the client will fall back to PD-provided +// resource manager service. +func (c *client) GetResourceManagerServiceURL() string { + if ds := c.inner.getResourceManagerDiscovery(); ds != nil { + return ds.GetServiceURL() + } + return "" +} + // GetTSOServiceDiscovery returns the TSO service discovery object. Only used for testing. func (c *client) GetTSOServiceDiscovery() sd.ServiceDiscovery { return c.inner.tsoSvcDiscovery @@ -451,7 +466,7 @@ func (c *client) UpdateOption(option opt.DynamicOption, value any) error { if !ok { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") } - if c.inner.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE && enable { + if c.inner.getTSOProvider() != tsoProviderPD && enable { return errors.New("[pd] tso follower proxy is only supported when PD provides TSO") } c.inner.option.SetEnableTSOFollowerProxy(enable) @@ -599,19 +614,17 @@ func (c *client) GetLocalTS(ctx context.Context, _ string) (physical int64, logi // GetMinTS implements the TSOClient interface. func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) { - // Handle compatibility issue in case of PD doesn't support GetMinTS API. - serviceMode := c.inner.getServiceMode() - switch serviceMode { - case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + if c.inner.serviceMode == pdpb.ServiceMode_UNKNOWN_SVC_MODE { return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode") - case pdpb.ServiceMode_PD_SVC_MODE: + } + + // Handle compatibility issue in case of PD doesn't support GetMinTS API. + if c.inner.getTSOProvider() == tsoProviderPD { // If the service mode is switched to API during GetTS() call, which happens during migration, // returning the default timeline should be fine. return c.GetTS(ctx) - case pdpb.ServiceMode_API_SVC_MODE: - default: - return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") } + ctx, cancel := context.WithTimeout(ctx, c.inner.option.Timeout) defer cancel() // Call GetMinTS API to get the minimal TS from the API leader. diff --git a/client/inner_client.go b/client/inner_client.go index 458a81c0dd9..a4032a641e7 100644 --- a/client/inner_client.go +++ b/client/inner_client.go @@ -22,6 +22,8 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/keyspacepb" @@ -79,7 +81,6 @@ func (c *innerClient) init(updateKeyspaceIDCb sd.UpdateKeyspaceIDFunc) error { } c.wg.Add(1) go c.routerClientInitializer() - return nil } @@ -141,40 +142,22 @@ func (c *innerClient) disableRouterClient() { func (c *innerClient) setServiceMode(newMode pdpb.ServiceMode) { c.Lock() defer c.Unlock() - - if c.option.UseTSOServerProxy { - // If we are using TSO server proxy, we always use PD_SVC_MODE. - newMode = pdpb.ServiceMode_PD_SVC_MODE - } if newMode == c.serviceMode { return } - log.Info("[pd] changing TSO provider", - zap.String("old", convertToString(c.serviceMode)), - zap.String("new", convertToString(newMode))) c.resetTSOClientLocked(newMode) - oldMode := c.serviceMode + c.resetResourceManagerDiscoveryLocked(newMode) c.serviceMode = newMode - log.Info("[pd] TSO provider changed", - zap.String("old", convertToString(oldMode)), - zap.String("new", convertToString(newMode))) -} - -func convertToString(mode pdpb.ServiceMode) string { - switch mode { - case pdpb.ServiceMode_PD_SVC_MODE: - return "pd" - case pdpb.ServiceMode_API_SVC_MODE: - return "tso server" - case pdpb.ServiceMode_UNKNOWN_SVC_MODE: - return "unknown" - default: - return "invalid" - } + log.Info("[pd] service mode changed", zap.String("new-mode", newMode.String())) } // Reset a new TSO client. func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { + // `UseTSOServerProxy` is intended to force using PD as the TSO provider, + // but should not block other components (e.g. RM) from switching service mode. + if c.option.UseTSOServerProxy { + mode = pdpb.ServiceMode_PD_SVC_MODE + } // Re-create a new TSO client. var ( newTSOCli *tso.Cli @@ -184,6 +167,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { case pdpb.ServiceMode_PD_SVC_MODE: newTSOCli = tso.NewClient(c.ctx, c.option, c.serviceDiscovery, &tso.PDStreamBuilderFactory{}) + log.Info("[pd] tso provider changed to pd") case pdpb.ServiceMode_API_SVC_MODE: newTSOSvcDiscovery = sd.NewTSOServiceDiscovery( c.ctx, c, c.serviceDiscovery, @@ -198,6 +182,7 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { zap.Error(err)) return } + log.Info("[pd] tso provider changed to tso server") case pdpb.ServiceMode_UNKNOWN_SVC_MODE: log.Warn("[pd] intend to switch to unknown service mode, just return") return @@ -219,6 +204,29 @@ func (c *innerClient) resetTSOClientLocked(mode pdpb.ServiceMode) { } } +func (c *innerClient) resetResourceManagerDiscoveryLocked(mode pdpb.ServiceMode) { + switch mode { + case pdpb.ServiceMode_PD_SVC_MODE: + if c.resourceManagerDiscovery != nil { + c.resourceManagerDiscovery.Close() + c.resourceManagerDiscovery = nil + } + case pdpb.ServiceMode_API_SVC_MODE: + c.resourceManagerDiscovery = sd.NewResourceManagerDiscovery( + c.ctx, c.serviceDiscovery.GetClusterID(), c, c.tlsCfg, c.option, c.scheduleUpdateTokenConnection) + c.resourceManagerDiscovery.Init() + case pdpb.ServiceMode_UNKNOWN_SVC_MODE: + log.Warn("[pd] intend to switch to unknown service mode, just return") + return + } +} + +func (c *innerClient) getResourceManagerDiscovery() *sd.ResourceManagerDiscovery { + c.RLock() + defer c.RUnlock() + return c.resourceManagerDiscovery +} + func (c *innerClient) scheduleUpdateTokenConnection(string) error { select { case c.updateTokenConnectionCh <- struct{}{}: @@ -227,10 +235,20 @@ func (c *innerClient) scheduleUpdateTokenConnection(string) error { return nil } -func (c *innerClient) getServiceMode() pdpb.ServiceMode { +type tsoProvider int + +const ( + tsoProviderPD tsoProvider = iota + tsoProviderTSOServer +) + +func (c *innerClient) getTSOProvider() tsoProvider { c.RLock() defer c.RUnlock() - return c.serviceMode + if c.tsoSvcDiscovery != nil { + return tsoProviderTSOServer + } + return tsoProviderPD } func (c *innerClient) getTSOClient() *tso.Cli { @@ -297,6 +315,36 @@ func (c *innerClient) gRPCErrorHandler(err error) { } } +func shouldUpdateRMURL(err error) bool { + if err == nil { + return false + } + if errs.IsLeaderChange(err) { + return true + } + if s, ok := status.FromError(err); ok { + // If the rm instance stops, we can reset the connection and + // use pd instance url instead of it. + if errs.IsNetworkError(s.Code()) || s.Code() == codes.Canceled { + return true + } + } + return false +} + +func (c *innerClient) resourceManagerErrorHandler(err error) { + if !shouldUpdateRMURL(err) { + return + } + + c.RLock() + defer c.RUnlock() + log.Warn("[resource-manager] resource manager error", zap.Error(err)) + if c.resourceManagerDiscovery != nil { + c.resourceManagerDiscovery.ScheduleUpdateServiceURL() + } +} + func (c *innerClient) getOrCreateGRPCConn() (*grpc.ClientConn, error) { cc, err := c.serviceDiscovery.GetOrCreateGRPCConn(c.serviceDiscovery.GetServingURL()) if err != nil { diff --git a/client/pkg/retry/interval_retry.go b/client/pkg/retry/interval_retry.go new file mode 100644 index 00000000000..69b8980dbe8 --- /dev/null +++ b/client/pkg/retry/interval_retry.go @@ -0,0 +1,56 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "time" + + "github.com/pingcap/errors" +) + +var ( + // QueryRetryMaxTimes is the max retry times for querying microservice. + queryRetryMaxTimes = 10 + // queryRetryInterval is the retry interval for querying microservice. + queryRetryInterval = 500 * time.Millisecond +) + +// WithConfig retries the given function with a fixed interval. +func WithConfig( + ctx context.Context, f func() error, +) error { + return Retry(ctx, queryRetryMaxTimes, queryRetryInterval, f) +} + +// Retry retries the given function with a fixed interval. +func Retry( + ctx context.Context, maxTimes int, interval time.Duration, f func() error, +) error { + var err error + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range maxTimes { + if err = f(); err == nil { + return nil + } + select { + case <-ctx.Done(): + return err + case <-ticker.C: + } + } + return errors.WithStack(err) +} diff --git a/client/resource_group/controller/global_controller.go b/client/resource_group/controller/global_controller.go index b32d3d268fc..a5e88a090a4 100644 --- a/client/resource_group/controller/global_controller.go +++ b/client/resource_group/controller/global_controller.go @@ -163,7 +163,7 @@ type ResourceGroupsController struct { run struct { responseDeadline *time.Timer - inDegradedMode bool + inDegradedMode atomic.Bool // currentRequests is used to record the request and resource group. // Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`. currentRequests []*rmpb.TokenBucketRequest @@ -175,6 +175,8 @@ type ResourceGroupsController struct { safeRuConfig atomic.Pointer[RUConfig] degradedRUSettings *rmpb.GroupRequestUnitSettings + + wg sync.WaitGroup } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -248,7 +250,9 @@ const ( // Start starts ResourceGroupController service. func (c *ResourceGroupsController) Start(ctx context.Context) { c.loopCtx, c.loopCancel = context.WithCancel(ctx) + c.wg.Add(1) go func() { + defer c.wg.Done() if c.ruConfig.DegradedModeWaitDuration > 0 { c.run.responseDeadline = time.NewTimer(c.ruConfig.DegradedModeWaitDuration) c.run.responseDeadline.Stop() @@ -335,7 +339,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { metrics.ResourceGroupStatusGauge.Reset() return case <-c.responseDeadlineCh: - c.run.inDegradedMode = true + c.run.inDegradedMode.Store(true) c.executeOnAllGroups((*groupCostController).applyDegradedMode) log.Warn("[resource group controller] enter degraded mode") case resp := <-c.tokenResponseChan: @@ -348,7 +352,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) - if c.run.inDegradedMode { + if c.IsDegraded() { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } case resp, ok := <-watchMetaChannel: @@ -449,6 +453,7 @@ func (c *ResourceGroupsController) Stop() error { return errors.Errorf("resource groups controller does not start") } c.loopCancel() + c.wg.Wait() return nil } @@ -607,7 +612,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB } c.responseDeadlineCh = nil } - c.run.inDegradedMode = false + c.run.inDegradedMode.Store(false) for _, res := range resp { name := res.GetResourceGroupName() gc, ok := c.loadGroupController(name) @@ -760,3 +765,8 @@ func (c *ResourceGroupsController) ReportConsumption(resourceGroupName string, c gc.addRUConsumption(consumption) } + +// IsDegraded returns whether the controller is in degraded mode. +func (c *ResourceGroupsController) IsDegraded() bool { + return c.run.inDegradedMode.Load() +} diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 11013677499..651c7cd54b9 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -80,6 +80,12 @@ func WithRUStats(op *GetResourceGroupOp) { // resourceManagerClient gets the ResourceManager client of current PD leader. func (c *innerClient) resourceManagerClient() (rmpb.ResourceManagerClient, error) { + if ds := c.getResourceManagerDiscovery(); ds != nil { + // If the discovery has not established the connection, using PD server connection. + if cc := ds.GetConn(); cc != nil { + return rmpb.NewResourceManagerClient(cc), nil + } + } cc, err := c.getOrCreateGRPCConn() if err != nil { return nil, err @@ -106,6 +112,7 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup resp, err := cc.ListResourceGroups(ctx, req) if err != nil { c.inner.gRPCErrorHandler(err) + c.inner.resourceManagerErrorHandler(err) return nil, errs.ErrClientListResourceGroup.FastGenByArgs(err.Error()) } resErr := resp.GetError() @@ -135,6 +142,7 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string, resp, err := cc.GetResourceGroup(ctx, req) if err != nil { c.inner.gRPCErrorHandler(err) + c.inner.resourceManagerErrorHandler(err) return nil, &errs.ErrClientGetResourceGroup{ResourceGroupName: resourceGroupName, Cause: err.Error()} } resErr := resp.GetError() @@ -180,6 +188,7 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG } if err != nil { c.inner.gRPCErrorHandler(err) + c.inner.resourceManagerErrorHandler(err) return "", err } resErr := resp.GetError() @@ -204,6 +213,7 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri resp, err := cc.DeleteResourceGroup(ctx, req) if err != nil { c.inner.gRPCErrorHandler(err) + c.inner.resourceManagerErrorHandler(err) return "", err } resErr := resp.GetError() @@ -386,6 +396,7 @@ func (c *innerClient) processTokenRequests(stream rmpb.ResourceManager_AcquireTo resp, err := stream.Recv() if err != nil { c.gRPCErrorHandler(err) + c.resourceManagerErrorHandler(err) err = errors.WithStack(err) t.done <- err return err @@ -418,6 +429,9 @@ func (c *innerClient) tryResourceManagerConnect(ctx context.Context, connection connection.stream = stream return nil } + if err != nil { + c.resourceManagerErrorHandler(err) + } cancel() select { case <-ctx.Done(): diff --git a/client/servicediscovery/resource_manager_service_discovery.go b/client/servicediscovery/resource_manager_service_discovery.go new file mode 100644 index 00000000000..08a5a9a7400 --- /dev/null +++ b/client/servicediscovery/resource_manager_service_discovery.go @@ -0,0 +1,270 @@ +// Copyright 2025 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicediscovery + +import ( + "context" + "crypto/tls" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/gogo/protobuf/proto" + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" + + "github.com/tikv/pd/client/clients/metastorage" + "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/retry" + "github.com/tikv/pd/client/pkg/utils/grpcutil" +) + +const ( + resourceManagerServiceName = "resource_manager" + // resourceManagerSvcDiscoveryFormat defines the key prefix for keyspace group primary election. + // The entire key is in the format of "/ms//resource-manager/primary". + resourceManagerSvcDiscoveryFormat = "/ms/%d/" + resourceManagerServiceName + "/primary" + resourceManagerInitRetryTime = 3 +) + +// ResourceManagerDiscovery is used to discover the resource manager service. +type ResourceManagerDiscovery struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + started atomic.Bool + clusterID uint64 + metaCli metastorage.Client + discoveryKey string + tlsCfg *tls.Config + // Client option. + option *opt.Option + onLeaderChanged func(string) error + updateServiceURLCh chan struct{} + + mu sync.RWMutex + serviceURL string + conn *grpc.ClientConn +} + +// NewResourceManagerDiscovery creates a new ResourceManagerDiscovery. +func NewResourceManagerDiscovery(ctx context.Context, clusterID uint64, metaCli metastorage.Client, tlsCfg *tls.Config, opt *opt.Option, leaderChangedCb func(string) error) *ResourceManagerDiscovery { + ctx, cancel := context.WithCancel(ctx) + d := &ResourceManagerDiscovery{ + ctx: ctx, + cancel: cancel, + clusterID: clusterID, + metaCli: metaCli, + discoveryKey: fmt.Sprintf(resourceManagerSvcDiscoveryFormat, clusterID), + tlsCfg: tlsCfg, + option: opt, + onLeaderChanged: leaderChangedCb, + updateServiceURLCh: make(chan struct{}, 1), + } + + log.Info("[resource-manager] created resource manager discovery", + zap.Uint64("cluster-id", clusterID), + zap.String("discovery-key", d.discoveryKey)) + return d +} + +// Init implements ServiceDiscovery. +func (r *ResourceManagerDiscovery) Init() { + if !r.started.CompareAndSwap(false, true) { + return + } + r.wg.Add(1) + go r.initAndUpdateLoop() +} + +func (r *ResourceManagerDiscovery) initAndUpdateLoop() { + defer r.wg.Done() + log.Info("[resource-manager] initializing service discovery", + zap.Int("max-retry-times", resourceManagerInitRetryTime), + zap.Duration("retry-interval", initRetryInterval)) + + var ( + url string + revision int64 + err error + ) + if err := retry.Retry(r.ctx, resourceManagerInitRetryTime, initRetryInterval, func() error { + url, revision, err = r.discoverServiceURL() + return err + }); err != nil { + log.Error("[resource-manager] failed to discover service. initialization failed.", zap.Error(err)) + } + r.resetConn(url) + r.updateServiceURLLoop(revision) +} + +func (r *ResourceManagerDiscovery) resetConn(url string) { + r.mu.Lock() + defer r.mu.Unlock() + if len(url) == 0 { + if r.conn != nil { + r.conn.Close() + r.conn = nil + } + r.serviceURL = "" + _ = r.onLeaderChanged("") + return + } + if r.serviceURL == url { + return + } + newConn, err := grpcutil.GetClientConn(r.ctx, url, r.tlsCfg, r.option.GRPCDialOptions...) + if err != nil { + // Dial without `WithBlock`, normally it should not fail. + log.Error("[resource-manager] failed to create gRPC connection", + zap.String("url", url), + zap.Error(err)) + return + } + if r.conn != nil { + r.conn.Close() + } + r.serviceURL, r.conn = url, newConn + log.Info("[resource-manager] updated service URL", + zap.String("new-url", url)) + _ = r.onLeaderChanged("") +} + +// GetServiceURL returns the currently discovered resource manager service URL. +// It returns an empty string when there is no standalone service endpoint and +// the client should fall back to PD-provided resource manager. +func (r *ResourceManagerDiscovery) GetServiceURL() string { + r.mu.RLock() + defer r.mu.RUnlock() + return r.serviceURL +} + +// GetConn returns the gRPC connection to the resource manager service. +func (r *ResourceManagerDiscovery) GetConn() *grpc.ClientConn { + r.mu.RLock() + defer r.mu.RUnlock() + if r.conn == nil { + log.Warn("[resource-manager] gRPC connection is not established yet", + zap.String("discovery-key", r.discoveryKey)) + return nil + } + return r.conn +} + +func (r *ResourceManagerDiscovery) discoverServiceURL() (string, int64, error) { + resp, err := r.metaCli.Get(r.ctx, []byte(r.discoveryKey)) + if err != nil { + log.Error("[resource-manager] failed to get resource-manager serving endpoint", + zap.String("discovery-key", r.discoveryKey), + zap.Error(err)) + return "", 0, err + } + if resp == nil || len(resp.Kvs) == 0 { + log.Warn("[resource-manager] no resource-manager serving endpoint found", + zap.String("discovery-key", r.discoveryKey)) + return "", 0, errs.ErrClientGetServingEndpoint + } else if resp.Count > 1 { + return "", 0, errs.ErrClientGetMultiResponse.FastGenByArgs(resp.Kvs) + } + + url, err := r.parseURLFromStorageValue(resp.Kvs[0].Value) + if err != nil { + return "", 0, err + } + + return url, resp.Header.Revision, nil +} + +func (r *ResourceManagerDiscovery) parseURLFromStorageValue(value []byte) (string, error) { + primary := &resource_manager.Participant{} + if err := proto.Unmarshal(value, primary); err != nil { + return "", errs.ErrClientProtoUnmarshal.Wrap(err).GenWithStackByCause() + } + listenUrls := primary.GetListenUrls() + if len(listenUrls) == 0 { + log.Warn("[resource-manager] the keyspace serving endpoint list is empty", + zap.String("discovery-key", r.discoveryKey)) + return "", errs.ErrClientGetServingEndpoint + } + // TODO: only support 1 listen url for now. + return listenUrls[0], nil +} + +// ScheduleUpdateServiceURL schedules an update of the service URL. +func (r *ResourceManagerDiscovery) ScheduleUpdateServiceURL() { + select { + case r.updateServiceURLCh <- struct{}{}: + default: + } +} + +func (r *ResourceManagerDiscovery) updateServiceURLLoop(revision int64) { + // If no service URL exists (e.g. running in PD-provided mode), we still need to + // periodically check whether a standalone resource-manager appears later. + // This enables runtime switching between deployment modes. + ticker := time.NewTicker(initRetryInterval) + defer ticker.Stop() + + discoverAndUpdate := func() { + url, newRevision, err := r.discoverServiceURL() + if err != nil { + log.Warn("[resource-manager] failed to discover service URL", + zap.String("discovery-key", r.discoveryKey), + zap.Error(err)) + // Endpoint is absent; keep connection cleared so the client can fall back to PD. + if errors.ErrorEqual(err, errs.ErrClientGetServingEndpoint) { + r.resetConn("") + } + return + } + if newRevision > revision { + r.resetConn(url) + revision = newRevision + } + } + for { + select { + case <-r.ctx.Done(): + log.Info("[resource-manager] exit update service URL loop due to context canceled") + return + case <-ticker.C: + if len(r.GetServiceURL()) != 0 { + continue + } + discoverAndUpdate() + case <-r.updateServiceURLCh: + log.Info("[resource-manager] updating service URL", zap.String("old-url", r.serviceURL)) + discoverAndUpdate() + } + } +} + +// Close closes the resource manager discovery and its gRPC connection. +func (r *ResourceManagerDiscovery) Close() { + r.cancel() + r.wg.Wait() + r.mu.Lock() + defer r.mu.Unlock() + if r.conn != nil { + r.conn.Close() + r.conn = nil + } +} diff --git a/client/servicediscovery/tso_service_discovery.go b/client/servicediscovery/tso_service_discovery.go index c398a5aef90..2c8222aeb58 100644 --- a/client/servicediscovery/tso_service_discovery.go +++ b/client/servicediscovery/tso_service_discovery.go @@ -40,6 +40,7 @@ import ( "github.com/tikv/pd/client/constants" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/retry" "github.com/tikv/pd/client/pkg/utils/grpcutil" ) @@ -50,10 +51,6 @@ const ( tsoSvcDiscoveryFormat = "/ms/%d/tso/%05d/primary" // initRetryInterval is the rpc retry interval during the initialization phase. initRetryInterval = time.Second - // tsoQueryRetryMaxTimes is the max retry times for querying TSO. - tsoQueryRetryMaxTimes = 10 - // tsoQueryRetryInterval is the retry interval for querying TSO. - tsoQueryRetryInterval = 500 * time.Millisecond ) var _ ServiceDiscovery = (*tsoServiceDiscovery)(nil) @@ -200,7 +197,7 @@ func (c *tsoServiceDiscovery) Init() error { log.Info("initializing tso service discovery", zap.Int("max-retry-times", c.option.MaxRetryTimes), zap.Duration("retry-interval", initRetryInterval)) - if err := c.retry(c.option.MaxRetryTimes, initRetryInterval, c.updateMember); err != nil { + if err := retry.Retry(c.ctx, c.option.MaxRetryTimes, initRetryInterval, c.updateMember); err != nil { log.Error("failed to update member. initialization failed.", zap.Error(err)) c.cancel() return err @@ -210,25 +207,6 @@ func (c *tsoServiceDiscovery) Init() error { return nil } -func (c *tsoServiceDiscovery) retry( - maxRetryTimes int, retryInterval time.Duration, f func() error, -) error { - var err error - ticker := time.NewTicker(retryInterval) - defer ticker.Stop() - for range maxRetryTimes { - if err = f(); err == nil { - return nil - } - select { - case <-c.ctx.Done(): - return err - case <-ticker.C: - } - } - return errors.WithStack(err) -} - // Close releases all resources func (c *tsoServiceDiscovery) Close() { if c == nil { @@ -266,10 +244,10 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() { log.Info("[tso] exit check member loop") return } - // Make sure tsoQueryRetryMaxTimes * tsoQueryRetryInterval is far less than memberUpdateInterval, + // Make sure queryRetryMaxTimes * queryRetryInterval is far less than memberUpdateInterval, // so that we can speed up the process of tso service discovery when failover happens on the // tso service side and also ensures it won't call updateMember too frequently during normal time. - if err := c.retry(tsoQueryRetryMaxTimes, tsoQueryRetryInterval, c.updateMember); err != nil { + if err := retry.Retry(c.ctx, c.option.MaxRetryTimes, initRetryInterval, c.updateMember); err != nil { log.Error("[tso] failed to update member", errs.ZapError(err)) } } @@ -355,7 +333,7 @@ func (c *tsoServiceDiscovery) CheckMemberChanged() error { if err := c.serviceDiscovery.CheckMemberChanged(); err != nil { log.Warn("[tso] failed to check member changed", errs.ZapError(err)) } - if err := c.retry(tsoQueryRetryMaxTimes, tsoQueryRetryInterval, c.updateMember); err != nil { + if err := retry.WithConfig(c.ctx, c.updateMember); err != nil { log.Error("[tso] failed to update member", errs.ZapError(err)) return err } diff --git a/tests/integrations/mcs/discovery/register_test.go b/tests/integrations/mcs/discovery/register_test.go index 56150b01643..16d3d2d2a72 100644 --- a/tests/integrations/mcs/discovery/register_test.go +++ b/tests/integrations/mcs/discovery/register_test.go @@ -75,6 +75,7 @@ func (suite *serverRegisterTestSuite) TearDownSuite() { func (suite *serverRegisterTestSuite) TestServerRegister() { for range 3 { suite.checkServerRegister(constant.TSOServiceName) + suite.checkServerRegister(constant.ResourceManagerServiceName) } } @@ -110,6 +111,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) { func (suite *serverRegisterTestSuite) TestServerPrimaryChange() { suite.checkServerPrimaryChange(constant.TSOServiceName, 3) + suite.checkServerPrimaryChange(constant.ResourceManagerServiceName, 3) } func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName string, serverNum int) { diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index 2f4cadc5232..6c2bcc35d9b 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "net/http" + "strings" "testing" "time" @@ -55,9 +56,10 @@ type memberTestSuite struct { // We only test `DefaultKeyspaceGroupID` here. // tsoAvailMembers is used to check the tso members which in the DefaultKeyspaceGroupID. - tsoAvailMembers map[string]bool - tsoNodes map[string]bs.Server - schedulingNodes map[string]bs.Server + tsoAvailMembers map[string]bool + tsoNodes map[string]bs.Server + schedulingNodes map[string]bs.Server + resourceManagerNodes map[string]bs.Server } func TestMemberTestSuite(t *testing.T) { @@ -112,6 +114,18 @@ func (suite *memberTestSuite) SetupTest() { tests.WaitForPrimaryServing(re, nodes) suite.schedulingNodes = nodes + // resource manager + nodes = make(map[string]bs.Server) + for range 3 { + s, cleanup := tests.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc()) + nodes[s.GetAddr()] = s + suite.cleanupFunc = append(suite.cleanupFunc, func() { + cleanup() + }) + } + tests.WaitForPrimaryServing(re, nodes) + suite.resourceManagerNodes = nodes + suite.cleanupFunc = append(suite.cleanupFunc, func() { cancel() }) @@ -138,6 +152,10 @@ func (suite *memberTestSuite) TestMembers() { members, err = suite.pdClient.GetMicroserviceMembers(suite.ctx, "scheduling") re.NoError(err) re.Len(members, 3) + + members, err = suite.pdClient.GetMicroserviceMembers(suite.ctx, "resource_manager") + re.NoError(err) + re.Len(members, 3) } func (suite *memberTestSuite) TestPrimary() { @@ -149,6 +167,10 @@ func (suite *memberTestSuite) TestPrimary() { primary, err = suite.pdClient.GetMicroservicePrimary(suite.ctx, "scheduling") re.NoError(err) re.NotEmpty(primary) + + primary, err = suite.pdClient.GetMicroservicePrimary(suite.ctx, "resource_manager") + re.NoError(err) + re.NotEmpty(primary) } func (suite *memberTestSuite) TestPrimaryWorkWhileOtherServerClose() { @@ -157,7 +179,7 @@ func (suite *memberTestSuite) TestPrimaryWorkWhileOtherServerClose() { re.NoError(err) re.NotEmpty(primary) - supportedServices := []string{"tso", "scheduling"} + supportedServices := []string{"tso", "scheduling", "resource_manager"} for _, service := range supportedServices { var nodes map[string]bs.Server switch service { @@ -165,6 +187,8 @@ func (suite *memberTestSuite) TestPrimaryWorkWhileOtherServerClose() { nodes = suite.tsoNodes case "scheduling": nodes = suite.schedulingNodes + case "resource_manager": + nodes = suite.resourceManagerNodes } primary, err := suite.pdClient.GetMicroservicePrimary(suite.ctx, service) @@ -205,7 +229,7 @@ func (suite *memberTestSuite) TestTransferPrimary() { newPrimaryData["new_primary"] = "" data, err := json.Marshal(newPrimaryData) re.NoError(err) - resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, strings.ReplaceAll(service, "_", "-")), "application/json", bytes.NewBuffer(data)) re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode) @@ -237,7 +261,7 @@ func (suite *memberTestSuite) TestTransferPrimary() { newPrimaryData["new_primary"] = newPrimary data, err = json.Marshal(newPrimaryData) re.NoError(err) - resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, strings.ReplaceAll(service, "_", "-")), "application/json", bytes.NewBuffer(data)) re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode) @@ -256,7 +280,7 @@ func (suite *memberTestSuite) TestTransferPrimary() { newPrimaryData["new_primary"] = newPrimary data, err = json.Marshal(newPrimaryData) re.NoError(err) - resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + resp, err = tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, strings.ReplaceAll(service, "_", "-")), "application/json", bytes.NewBuffer(data)) re.NoError(err) re.Equal(http.StatusInternalServerError, resp.StatusCode) @@ -294,14 +318,14 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() { newPrimaryData["new_primary"] = newPrimary data, err := json.Marshal(newPrimaryData) re.NoError(err) - resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, strings.ReplaceAll(service, "_", "-")), "application/json", bytes.NewBuffer(data)) re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode) resp.Body.Close() tests.WaitForPrimaryServing(re, nodes) - newPrimary, err = suite.pdClient.GetMicroservicePrimary(suite.ctx, service) + newPrimary, err = suite.pdClient.GetMicroservicePrimary(suite.ctx, strings.ReplaceAll(service, "_", "-")) re.NoError(err) re.NotEqual(primary, newPrimary) @@ -309,7 +333,7 @@ func (suite *memberTestSuite) TestCampaignPrimaryAfterTransfer() { nodes[newPrimary].Close() tests.WaitForPrimaryServing(re, nodes) // Primary should be different with before - anotherPrimary, err := suite.pdClient.GetMicroservicePrimary(suite.ctx, service) + anotherPrimary, err := suite.pdClient.GetMicroservicePrimary(suite.ctx, strings.ReplaceAll(service, "_", "-")) re.NoError(err) re.NotEqual(newPrimary, anotherPrimary) } @@ -351,7 +375,7 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() { newPrimaryData["new_primary"] = newPrimary data, err := json.Marshal(newPrimaryData) re.NoError(err) - resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, strings.ReplaceAll(service, "_", "-")), "application/json", bytes.NewBuffer(data)) re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode) @@ -407,7 +431,7 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown( newPrimaryData["new_primary"] = "" data, err := json.Marshal(newPrimaryData) re.NoError(err) - resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, service), + resp, err := tests.TestDialClient.Post(fmt.Sprintf("%s/%s/api/v1/primary/transfer", primary, strings.ReplaceAll(service, "_", "-")), "application/json", bytes.NewBuffer(data)) re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index fd840aa6013..f2f52316129 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -23,6 +23,7 @@ import ( "net/http" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -42,10 +43,14 @@ import ( "github.com/tikv/pd/client/pkg/caller" "github.com/tikv/pd/client/resource_group/controller" sd "github.com/tikv/pd/client/servicediscovery" + bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/mcs/resourcemanager/server" + mcsconstant "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" + srvconfig "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs/utils" ) @@ -61,10 +66,87 @@ type resourceManagerClientTestSuite struct { cluster *tests.TestCluster client pd.Client initGroups []*rmpb.ResourceGroup + + mode resourceManagerDeployMode + + fastUpdateServiceModeEnabled bool + + rmCleanup func() + tsoCleanup func() } +func (suite *resourceManagerClientTestSuite) setupPDClient(re *require.Assertions) pd.Client { + cli, err := pd.NewClientWithContext(suite.ctx, + caller.TestComponent, + suite.cluster.GetConfig().GetClientURLs(), + pd.SecurityOption{}, + ) + re.NoError(err) + return cli +} + +func (suite *resourceManagerClientTestSuite) setupKeyspaceClient(re *require.Assertions, keyspaceID uint32) pd.Client { + cli := utils.SetupClientWithKeyspaceID( + suite.ctx, + re, + keyspaceID, + suite.cluster.GetConfig().GetClientURLs(), + ) + // In standalone RM mode, ensure the newly created client has already discovered + // and connected to the RM microservice; otherwise RM RPCs may temporarily fall + // back to PD and split state across backends. + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + waitResourceManagerServiceURL(re, cli, true) + } + return cli +} + +type resourceManagerDeployMode int + +const ( + resourceManagerProvidedByPD resourceManagerDeployMode = iota + resourceManagerStandaloneWithClientDiscovery +) + func TestResourceManagerClientTestSuite(t *testing.T) { - suite.Run(t, new(resourceManagerClientTestSuite)) + for _, tc := range []struct { + name string + mode resourceManagerDeployMode + }{ + {name: "pd-resource-manager", mode: resourceManagerProvidedByPD}, + {name: "standalone-resource-manager-with-client-discovery", mode: resourceManagerStandaloneWithClientDiscovery}, + } { + t.Run(tc.name, func(t *testing.T) { + suite.Run(t, &resourceManagerClientTestSuite{mode: tc.mode}) + }) + } +} + +func (suite *resourceManagerClientTestSuite) startMicroservicesForDiscovery(re *require.Assertions) { + leader := suite.cluster.GetServer(suite.cluster.WaitLeader()) + re.NotNil(leader) + backendEndpoints := leader.GetAddr() + + // Start Resource Manager microservice and wait it is serving. + rmServer, cleanup := tests.StartSingleResourceManagerTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) + _ = tests.WaitForPrimaryServing(re, map[string]bs.Server{rmServer.GetAddr(): rmServer}) + suite.rmCleanup = cleanup + + // Start TSO microservice and wait it is serving. + tsoServer, tsoCleanup := tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc()) + _ = tests.WaitForPrimaryServing(re, map[string]bs.Server{tsoServer.GetAddr(): tsoServer}) + suite.tsoCleanup = tsoCleanup +} + +func (suite *resourceManagerClientTestSuite) stopMicroservicesForDiscovery() { + if suite.rmCleanup != nil { + suite.rmCleanup() + suite.rmCleanup = nil + } + if suite.tsoCleanup != nil { + suite.tsoCleanup() + suite.tsoCleanup = nil + } } func (suite *resourceManagerClientTestSuite) SetupSuite() { @@ -73,22 +155,44 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/enableDegradedModeAndTraceLog", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)")) + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`)) + suite.fastUpdateServiceModeEnabled = true + } suite.ctx, suite.clean = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestCluster(suite.ctx, 2) + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 2, func(conf *srvconfig.Config, _ string) { + conf.Microservice.EnableResourceManagerFallback = false + }) + } else { + suite.cluster, err = tests.NewTestCluster(suite.ctx, 2) + } re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) - suite.client, err = pd.NewClientWithContext(suite.ctx, - caller.TestComponent, - suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) - re.NoError(err) leader := suite.cluster.GetServer(suite.cluster.WaitLeader()) re.NotNil(leader) + + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + // Bootstrap the cluster so that the microservices (now or later) can work normally. + re.NoError(leader.BootstrapCluster()) + } + + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + suite.startMicroservicesForDiscovery(re) + suite.client = suite.setupPDClient(re) + } else { + suite.client = suite.setupPDClient(re) + } waitLeaderServingClient(re, suite.client, leader.GetAddr()) + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + // Ensure RM service discovery has picked up the standalone endpoint before running tests. + waitResourceManagerServiceURL(re, suite.client, true) + } suite.initGroups = []*rmpb.ResourceGroup{ { @@ -155,17 +259,267 @@ func waitLeaderServingClient(re *require.Assertions, cli pd.Client, leaderAddr s }) } +func waitResourceManagerServiceURL(re *require.Assertions, cli pd.Client, wantNonEmpty bool) { + innerCli, ok := cli.(interface{ GetResourceManagerServiceURL() string }) + re.True(ok) + testutil.Eventually(re, func() bool { + url := innerCli.GetResourceManagerServiceURL() + if wantNonEmpty { + return url != "" + } + return url == "" + }) +} + +func TestSwitchModeDuringWorkload(t *testing.T) { + for _, tc := range []struct { + name string + startMode resourceManagerDeployMode + }{ + {name: "pd-to-standalone", startMode: resourceManagerProvidedByPD}, + {name: "standalone-to-pd", startMode: resourceManagerStandaloneWithClientDiscovery}, + } { + t.Run(tc.name, func(t *testing.T) { + re := require.New(t) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/enableDegradedModeAndTraceLog", `return(true)`)) + t.Cleanup(func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/enableDegradedModeAndTraceLog")) + }) + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck", "return(true)")) + t.Cleanup(func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck")) + }) + // This test switches in/out of standalone mode, so always enable faster SD updates. + re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`)) + t.Cleanup(func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode")) + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 2) + re.NoError(err) + defer cluster.Destroy() + + re.NoError(cluster.RunInitialServers()) + leader := cluster.GetServer(cluster.WaitLeader()) + re.NotNil(leader) + re.NoError(leader.BootstrapCluster()) + + restartPDWithFallback := func(enableFallback bool) { + // Restart all PD servers so that handler builders respect the updated config. + servers := cluster.GetServers() + for _, s := range servers { + if s.State() == tests.Running { + re.NoError(s.Stop()) + } + } + newServers := make([]*tests.TestServer, 0, len(servers)) + for name, s := range servers { + cfg := s.GetConfig() + cfg.Microservice.EnableResourceManagerFallback = enableFallback + newServer, err := tests.NewTestServer(ctx, cfg, []string{mcsconstant.PDServiceName}) + re.NoError(err) + servers[name] = newServer + newServers = append(newServers, newServer) + } + re.NoError(tests.RunServersWithRetry(newServers, 3)) + leader = cluster.GetServer(cluster.WaitLeader()) + re.NotNil(leader) + } + + startMicroservices := func() (rmCleanup func(), tsoCleanup func()) { + backendEndpoints := leader.GetAddr() + rmServer, cleanup := tests.StartSingleResourceManagerTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) + _ = tests.WaitForPrimaryServing(re, map[string]bs.Server{rmServer.GetAddr(): rmServer}) + rmCleanup = cleanup + + tsoServer, cleanupTSO := tests.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc()) + _ = tests.WaitForPrimaryServing(re, map[string]bs.Server{tsoServer.GetAddr(): tsoServer}) + tsoCleanup = cleanupTSO + return rmCleanup, tsoCleanup + } + + var rmCleanup func() + var tsoCleanup func() + stopMicroservices := func() { + if rmCleanup != nil { + rmCleanup() + rmCleanup = nil + } + if tsoCleanup != nil { + tsoCleanup() + tsoCleanup = nil + } + } + defer stopMicroservices() + + if tc.startMode == resourceManagerStandaloneWithClientDiscovery { + // In standalone mode, PD should redirect RM requests to the standalone service. + restartPDWithFallback(false) + rmCleanup, tsoCleanup = startMicroservices() + } + + cli, err := pd.NewClientWithContext(ctx, + caller.TestComponent, + cluster.GetConfig().GetClientURLs(), + pd.SecurityOption{}, + ) + re.NoError(err) + defer cli.Close() + + waitLeaderServingClient(re, cli, leader.GetAddr()) + if tc.startMode == resourceManagerStandaloneWithClientDiscovery { + waitResourceManagerServiceURL(re, cli, true) + } else { + waitResourceManagerServiceURL(re, cli, false) + } + + group := &rmpb.ResourceGroup{ + Name: "switch_workload", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{FillRate: 10000}, + Tokens: 100000, + }, + }, + } + resp, err := cli.AddResourceGroup(ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + + cfg := &controller.RequestUnitConfig{ + ReadBaseCost: 1, + ReadCostPerByte: 1, + WriteBaseCost: 1, + WriteCostPerByte: 1, + CPUMsCost: 1, + } + rgController, err := controller.NewResourceGroupController( + ctx, + 1, + cli, + cfg, + constants.NullKeyspaceID, + controller.EnableSingleGroupByKeyspace(), + ) + re.NoError(err) + rgController.Start(ctx) + defer func() { + re.NoError(rgController.Stop()) + }() + + workCtx, workCancel := context.WithCancel(ctx) + defer workCancel() + + var ( + switched atomic.Bool + okBefore, okAfter int64 + errBefore, errAfter int64 + ) + + go func() { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-workCtx.Done(): + return + case <-ticker.C: + } + req := controller.NewTestRequestInfo(false, 0, 0, controller.AccessUnknown) + res := controller.NewTestResponseInfo(0, 0, true) + _, _, _, _, err := rgController.OnRequestWait(workCtx, group.Name, req) + if err != nil { + if switched.Load() { + atomic.AddInt64(&errAfter, 1) + } else { + atomic.AddInt64(&errBefore, 1) + } + continue + } + if switched.Load() { + if !rgController.IsDegraded() { + atomic.AddInt64(&okAfter, 1) + } + } else { + atomic.AddInt64(&okBefore, 1) + } + _, _ = rgController.OnResponse(group.Name, req, res) + } + }() + + testutil.Eventually(re, func() bool { + return atomic.LoadInt64(&okBefore) >= 10 + }, testutil.WithTickInterval(20*time.Millisecond)) + + log.Info("switching mode during workload", + zap.Int("fromMode", int(tc.startMode)), + zap.Int64("okBefore", atomic.LoadInt64(&okBefore)), + zap.Int64("errBefore", atomic.LoadInt64(&errBefore)), + ) + + if tc.startMode == resourceManagerProvidedByPD { + // Switch to standalone RM: disable fallback on PD and restart it, + // then start microservices and wait for client discovery. + restartPDWithFallback(false) + rmCleanup, tsoCleanup = startMicroservices() + waitResourceManagerServiceURL(re, cli, true) + testutil.Eventually(re, func() bool { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + _, err := cli.ListResourceGroups(ctx) + return err == nil + }) + } else { + // Switch back to PD-provided RM: stop microservices, enable fallback on PD and restart it, + // then ensure client falls back to PD. + stopMicroservices() + restartPDWithFallback(true) + testutil.Eventually(re, func() bool { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + _, err := cli.ListResourceGroups(ctx) + return err == nil + }) + waitResourceManagerServiceURL(re, cli, false) + } + waitLeaderServingClient(re, cli, leader.GetAddr()) + + switched.Store(true) + testutil.Eventually(re, func() bool { + return atomic.LoadInt64(&okAfter) >= 10 + }, testutil.WithTickInterval(20*time.Millisecond)) + + log.Info("switch during workload finished", + zap.Int("startMode", int(tc.startMode)), + zap.Int64("okBefore", atomic.LoadInt64(&okBefore)), + zap.Int64("errBefore", atomic.LoadInt64(&errBefore)), + zap.Int64("okAfter", atomic.LoadInt64(&okAfter)), + zap.Int64("errAfter", atomic.LoadInt64(&errAfter)), + ) + }) + } +} + func (suite *resourceManagerClientTestSuite) TearDownSuite() { re := suite.Require() + suite.stopMicroservicesForDiscovery() suite.client.Close() suite.cluster.Destroy() suite.clean() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/enableDegradedModeAndTraceLog")) re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck")) + if suite.fastUpdateServiceModeEnabled { + re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode")) + } } func (suite *resourceManagerClientTestSuite) TearDownTest() { - suite.cleanupResourceGroups(suite.Require()) + re := suite.Require() + suite.cleanupResourceGroups(re) } func (suite *resourceManagerClientTestSuite) cleanupResourceGroups(re *require.Assertions) { @@ -1109,6 +1463,9 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { groups, err := cli.ListResourceGroups(suite.ctx) re.NoError(err) servers := suite.cluster.GetServers() + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + suite.stopMicroservicesForDiscovery() + } re.NoError(suite.cluster.StopAll()) cli.Close() serverList := make([]*tests.TestServer, 0, len(servers)) @@ -1117,11 +1474,11 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { } re.NoError(tests.RunServers(serverList)) re.NotEmpty(suite.cluster.WaitLeader()) + if suite.mode == resourceManagerStandaloneWithClientDiscovery { + suite.startMicroservicesForDiscovery(re) + } // re-connect client as well - suite.client, err = pd.NewClientWithContext(suite.ctx, - caller.TestComponent, - suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) - re.NoError(err) + suite.client = suite.setupPDClient(re) cli = suite.client var newGroups []*rmpb.ResourceGroup testutil.Eventually(re, func() bool { @@ -1194,10 +1551,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupRUConsumption() { suite.cluster.WaitLeader() // re-connect client as cli.Close() - suite.client, err = pd.NewClientWithContext(suite.ctx, - caller.TestComponent, - suite.cluster.GetConfig().GetClientURLs(), pd.SecurityOption{}) - re.NoError(err) + suite.client = suite.setupPDClient(re) cli = suite.client // check ru stats not loss after restart g, err = cli.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) @@ -1512,6 +1866,10 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, constants.NullKeyspaceID, controller.WithMaxWaitDuration(time.Hour)) re.NoError(err) c2.Start(suite.ctx) + defer func() { + err := c2.Stop() + re.NoError(err) + }() // helper function for sending HTTP requests and checking responses sendRequest := func(method, url string, body io.Reader) []byte { req, err := http.NewRequest(method, url, body) @@ -1620,12 +1978,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace() re := suite.Require() cli := suite.client keyspaceID := uint32(1) - clientKeyspace := utils.SetupClientWithKeyspaceID( - suite.ctx, - re, - keyspaceID, - suite.cluster.GetConfig().GetClientURLs(), - ) + clientKeyspace := suite.setupKeyspaceClient(re, keyspaceID) defer clientKeyspace.Close() // Add keyspace meta. @@ -1765,9 +2118,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucketsWithMultiKey keyspaceName := fmt.Sprintf("keyspace%d_test", keyspaceID) groupName := fmt.Sprintf("rg_multi_%d", keyspaceID) // Create a specific client for this keyspace - client := utils.SetupClientWithKeyspaceID( - ctx, re, keyspaceID, suite.cluster.GetConfig().GetClientURLs(), - ) + client := suite.setupKeyspaceClient(re, keyspaceID) clients[i] = client // Create and save keyspace metadata keyspaceMeta := &keyspacepb.KeyspaceMeta{Id: keyspaceID, Name: keyspaceName} @@ -1811,12 +2162,20 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucketsWithMultiKey re.NoError(err) re.NotNil(resp) re.Len(resp, numKeyspaces) - for i, tbResp := range resp { - expectedGroup := groups[i] - re.Equal(expectedGroup.Name, tbResp.ResourceGroupName) + expectedByKeyspace := make(map[uint32]*rmpb.ResourceGroup, numKeyspaces) + for _, g := range groups { + re.NotNil(g.KeyspaceId) + expectedByKeyspace[g.KeyspaceId.GetValue()] = g + } + for _, tbResp := range resp { re.NotNil(tbResp.KeyspaceId) - re.Equal(expectedGroup.KeyspaceId.GetValue(), tbResp.KeyspaceId.GetValue()) + keyspaceID := tbResp.KeyspaceId.GetValue() + expectedGroup, ok := expectedByKeyspace[keyspaceID] + re.True(ok, "unexpected keyspace id in response: %d", keyspaceID) + re.Equal(expectedGroup.Name, tbResp.ResourceGroupName) + delete(expectedByKeyspace, keyspaceID) } + re.Empty(expectedByKeyspace, "some keyspaces did not appear in response") // Verify state change using the keyspace-specific clients for i := range numKeyspaces { @@ -1865,12 +2224,7 @@ func (suite *resourceManagerClientTestSuite) TestLoadAndWatchWithDifferentKeyspa clients[keyspace] = suite.client continue } - cli := utils.SetupClientWithKeyspaceID( - suite.ctx, - re, - keyspace, - suite.cluster.GetConfig().GetClientURLs(), - ) + cli := suite.setupKeyspaceClient(re, keyspace) clients[keyspace] = cli } @@ -1981,7 +2335,7 @@ func (suite *resourceManagerClientTestSuite) TestCannotModifyKeyspaceOfResourceG re.NoError(err) // Create clients for keyspaceA - clientA := utils.SetupClientWithKeyspaceID(ctx, re, keyspaceA, suite.cluster.GetConfig().GetClientURLs()) + clientA := suite.setupKeyspaceClient(re, keyspaceA) defer clientA.Close() // Add a resource group in Keyspace A and check