From f1d1a80feb955f0521c011bb133076c012873e85 Mon Sep 17 00:00:00 2001 From: Yongbo Jiang Date: Wed, 19 Apr 2023 23:33:20 +0800 Subject: [PATCH 1/3] resource_manager/client: fine tune trickle param (#6261) ref tikv/pd#6113 Signed-off-by: Cabinfever_B Co-authored-by: Ti Chi Robot --- client/resource_group/controller/controller.go | 10 +++++++--- client/resource_group/controller/metrics.go | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 7a3c575fb2ec..457a946faae5 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -38,6 +38,7 @@ const ( retryInterval = 50 * time.Millisecond maxNotificationChanLen = 200 needTokensAmplification = 1.1 + trickleReserveDuration = 1250 * time.Millisecond ) type selectType int @@ -763,7 +764,10 @@ func (gc *groupCostController) shouldReportConsumption() bool { failpoint.Inject("acceleratedReportingPeriod", func() { timeSinceLastRequest = extendedReportingPeriodFactor * defaultTargetPeriod }) - if timeSinceLastRequest >= defaultTargetPeriod { + // Due to `gc.run.lastRequestTime` update operations late in this logic, + // so `timeSinceLastRequest` is less than defaultGroupStateUpdateInterval a little bit, lead to actual report period is greater than defaultTargetPeriod. + // Add defaultGroupStateUpdateInterval/2 as duration buffer to avoid it. + if timeSinceLastRequest+defaultGroupStateUpdateInterval/2 >= defaultTargetPeriod { if timeSinceLastRequest >= extendedReportingPeriodFactor*defaultTargetPeriod { return true } @@ -882,9 +886,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket deadline := gc.run.now.Add(trickleDuration) cfg.NewRate = float64(bucket.GetSettings().FillRate) + granted/trickleDuration.Seconds() - timerDuration := trickleDuration - time.Second + timerDuration := trickleDuration - trickleReserveDuration if timerDuration <= 0 { - timerDuration = (trickleDuration + time.Second) / 2 + timerDuration = (trickleDuration + trickleReserveDuration) / 2 } counter.notify.mu.Lock() counter.notify.setupNotificationTimer = time.NewTimer(timerDuration) diff --git a/client/resource_group/controller/metrics.go b/client/resource_group/controller/metrics.go index fc34ae0fb0cf..68eb26d03121 100644 --- a/client/resource_group/controller/metrics.go +++ b/client/resource_group/controller/metrics.go @@ -38,7 +38,7 @@ var ( Namespace: namespace, Subsystem: requestSubsystem, Name: "success", - Buckets: prometheus.ExponentialBuckets(0.001, 4, 8), // 0.001 ~ 40.96 + Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30 Help: "Bucketed histogram of wait duration of successful request.", }, []string{resourceGroupNameLabel}) From 5f99e0c04f1cc9614a99c5f1ba3568f8159df48e Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Wed, 19 Apr 2023 23:51:19 +0800 Subject: [PATCH 2/3] server: add accelerate-schedule/batch api (#6327) close tikv/pd#6326 add batch-accelerate-schedule api Signed-off-by: Lloyd-Pottiger Co-authored-by: Ti Chi Robot --- server/api/region.go | 56 +++++++++++++++++++++++++++++++++++++++ server/api/region_test.go | 20 ++++++++++++++ server/api/router.go | 1 + 3 files changed, 77 insertions(+) diff --git a/server/api/region.go b/server/api/region.go index 69618b1a82ce..b28506956913 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -22,6 +22,7 @@ import ( "net/url" "sort" "strconv" + "strings" "github.com/gorilla/mux" "github.com/pingcap/failpoint" @@ -836,6 +837,61 @@ func (h *regionsHandler) AccelerateRegionsScheduleInRange(w http.ResponseWriter, h.rd.Text(w, http.StatusOK, fmt.Sprintf("Accelerate regions scheduling in a given range [%s,%s)", rawStartKey, rawEndKey)) } +// @Tags region +// @Summary Accelerate regions scheduling in given ranges, only receive hex format for keys +// @Accept json +// @Param body body object true "json params" +// @Param limit query integer false "Limit count" default(256) +// @Produce json +// @Success 200 {string} string "Accelerate regions scheduling in given ranges [startKey1, endKey1), [startKey2, endKey2), ..." +// @Failure 400 {string} string "The input is invalid." +// @Router /regions/accelerate-schedule/batch [post] +func (h *regionsHandler) AccelerateRegionsScheduleInRanges(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + var input []map[string]interface{} + if err := apiutil.ReadJSONRespondError(h.rd, w, r.Body, &input); err != nil { + return + } + limit := 256 + if limitStr := r.URL.Query().Get("limit"); limitStr != "" { + var err error + limit, err = strconv.Atoi(limitStr) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + } + if limit > maxRegionLimit { + limit = maxRegionLimit + } + var msgBuilder strings.Builder + msgBuilder.Grow(128) + msgBuilder.WriteString("Accelerate regions scheduling in given ranges: ") + var regions []*core.RegionInfo + for _, rg := range input { + startKey, rawStartKey, err := apiutil.ParseKey("start_key", rg) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + endKey, rawEndKey, err := apiutil.ParseKey("end_key", rg) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + regions = append(regions, rc.ScanRegions(startKey, endKey, limit)...) + msgBuilder.WriteString(fmt.Sprintf("[%s,%s), ", rawStartKey, rawEndKey)) + } + if len(regions) > 0 { + regionsIDList := make([]uint64, 0, len(regions)) + for _, region := range regions { + regionsIDList = append(regionsIDList, region.GetID()) + } + rc.AddSuspectRegions(regionsIDList...) + } + h.rd.Text(w, http.StatusOK, msgBuilder.String()) +} + func (h *regionsHandler) GetTopNRegions(w http.ResponseWriter, r *http.Request, less func(a, b *core.RegionInfo) bool) { rc := getCluster(r) limit := defaultRegionLimit diff --git a/server/api/region_test.go b/server/api/region_test.go index aaca24138fc4..18a5abe78d68 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -332,6 +332,26 @@ func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRange() { suite.Len(idList, 2) } +func (suite *regionTestSuite) TestAccelerateRegionsScheduleInRanges() { + re := suite.Require() + r1 := core.NewTestRegionInfo(557, 13, []byte("a1"), []byte("a2")) + r2 := core.NewTestRegionInfo(558, 14, []byte("a2"), []byte("a3")) + r3 := core.NewTestRegionInfo(559, 15, []byte("a3"), []byte("a4")) + r4 := core.NewTestRegionInfo(560, 16, []byte("a4"), []byte("a5")) + r5 := core.NewTestRegionInfo(561, 17, []byte("a5"), []byte("a6")) + mustRegionHeartbeat(re, suite.svr, r1) + mustRegionHeartbeat(re, suite.svr, r2) + mustRegionHeartbeat(re, suite.svr, r3) + mustRegionHeartbeat(re, suite.svr, r4) + mustRegionHeartbeat(re, suite.svr, r5) + body := fmt.Sprintf(`[{"start_key":"%s", "end_key": "%s"}, {"start_key":"%s", "end_key": "%s"}]`, hex.EncodeToString([]byte("a1")), hex.EncodeToString([]byte("a3")), hex.EncodeToString([]byte("a4")), hex.EncodeToString([]byte("a6"))) + + err := tu.CheckPostJSON(testDialClient, fmt.Sprintf("%s/regions/accelerate-schedule/batch", suite.urlPrefix), []byte(body), tu.StatusOK(re)) + suite.NoError(err) + idList := suite.svr.GetRaftCluster().GetSuspectRegions() + suite.Len(idList, 4) +} + func (suite *regionTestSuite) TestScatterRegions() { re := suite.Require() r1 := core.NewTestRegionInfo(601, 13, []byte("b1"), []byte("b2")) diff --git a/server/api/router.go b/server/api/router.go index a35d4b391f41..5962bb45e0f1 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -267,6 +267,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/regions/check/hist-keys", regionsHandler.GetKeysHistogram, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/sibling/{id}", regionsHandler.GetRegionSiblings, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/accelerate-schedule", regionsHandler.AccelerateRegionsScheduleInRange, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(clusterRouter, "/regions/accelerate-schedule/batch", regionsHandler.AccelerateRegionsScheduleInRanges, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/scatter", regionsHandler.ScatterRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus)) From 3cc745a6715d77d2eacad87ba85316278da7050b Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Wed, 19 Apr 2023 19:55:20 -0700 Subject: [PATCH 3/3] Fixed bugs in tso service registry watching loop. (#6346) ref tikv/pd#6343 Fixed the following two bugs: 1. When re-watch a range, to continue from what left by the last watch, the revision is wresp.Header.Revision + 1 instead of wresp.Header.Revision, where wresp.Header.Revision is the revision indicated in the response of the last watch. Because of this bug, it was processing the same event endless as you can see from the log below. 2. In tso service watch loop in /Users/binshi/code/pingcap/my-pd/pkg/keyspace/tso_keyspace_group.go, If this is delete event, the json.Unmarshal(event.Kv.Value, s) will fail with the error "unexpected end of JSON input", so there is no way to get s.serviceAddr from the result of json.Unmarshal. Signed-off-by: Bin Shi --- pkg/keyspace/tso_keyspace_group.go | 25 ++++++++++++++++++++----- pkg/tso/keyspace_group_manager.go | 3 ++- server/server.go | 2 +- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index f18b80ed3050..bb0f413fb2de 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -76,6 +76,8 @@ type GroupManager struct { // TODO: add user kind with different balancer // when we ensure where the correspondence between tso node and user kind will be found nodesBalancer balancer.Balancer[string] + // serviceRegistryMap stores the mapping from the service registry key to the service address. + serviceRegistryMap map[string]string } // NewKeyspaceGroupManager creates a Manager of keyspace group related data. @@ -131,6 +133,7 @@ func (m *GroupManager) Bootstrap() error { // If the etcd client is not nil, start the watch loop. if m.client != nil { m.nodesBalancer = balancer.GenByPolicy[string](m.policy) + m.serviceRegistryMap = make(map[string]string) m.wg.Add(1) go m.startWatchLoop() } @@ -169,6 +172,7 @@ func (m *GroupManager) startWatchLoop() { continue } m.nodesBalancer.Put(s.ServiceAddr) + m.serviceRegistryMap[string(item.Key)] = s.ServiceAddr } break } @@ -219,17 +223,28 @@ func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (i return revision, wresp.Err() } for _, event := range wresp.Events { - s := &discovery.ServiceRegistryEntry{} - if err := json.Unmarshal(event.Kv.Value, s); err != nil { - log.Warn("failed to unmarshal service registry entry", zap.Error(err)) - } switch event.Type { case clientv3.EventTypePut: + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(event.Kv.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", + zap.String("event-kv-key", string(event.Kv.Key)), zap.Error(err)) + break + } m.nodesBalancer.Put(s.ServiceAddr) + m.serviceRegistryMap[string(event.Kv.Key)] = s.ServiceAddr case clientv3.EventTypeDelete: - m.nodesBalancer.Delete(s.ServiceAddr) + key := string(event.Kv.Key) + if serviceAddr, ok := m.serviceRegistryMap[key]; ok { + delete(m.serviceRegistryMap, key) + m.nodesBalancer.Delete(serviceAddr) + } else { + log.Warn("can't retrieve service addr from service registry map", + zap.String("event-kv-key", key)) + } } } + revision = wresp.Header.Revision + 1 } } } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 8aba66c54128..febbbdabe412 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -485,6 +485,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( log.Warn("failed to unmarshal keyspace group", zap.Uint32("keyspace-group-id", groupID), zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) + break } kgm.updateKeyspaceGroup(group) case clientv3.EventTypeDelete: @@ -499,7 +500,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( } } } - revision = wresp.Header.Revision + revision = wresp.Header.Revision + 1 } select { diff --git a/server/server.go b/server/server.go index 44c6d265e2f7..e83831364c7f 100644 --- a/server/server.go +++ b/server/server.go @@ -1820,7 +1820,7 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string s.servicePrimaryMap.Delete(serviceName) } } - revision = wresp.Header.Revision + revision = wresp.Header.Revision + 1 } } }