diff --git a/client/http/api.go b/client/http/api.go index a1ca96b38f17..3376a48770d5 100644 --- a/client/http/api.go +++ b/client/http/api.go @@ -41,6 +41,7 @@ const ( membersPrefix = "/pd/api/v1/members" leaderPrefix = "/pd/api/v1/leader" transferLeader = "/pd/api/v1/leader/transfer" + health = "/pd/api/v1/health" // Config Config = "/pd/api/v1/config" ClusterVersion = "/pd/api/v1/config/cluster-version" diff --git a/client/http/interface.go b/client/http/interface.go index 7b15291d9e7e..11c24beaefd8 100644 --- a/client/http/interface.go +++ b/client/http/interface.go @@ -50,6 +50,7 @@ type Client interface { GetStores(context.Context) (*StoresInfo, error) GetStore(context.Context, uint64) (*StoreInfo, error) SetStoreLabels(context.Context, int64, map[string]string) error + GetHealthStatus(context.Context) ([]Health, error) /* Config-related interfaces */ GetConfig(context.Context) (map[string]any, error) SetConfig(context.Context, map[string]any, ...float64) error @@ -337,6 +338,20 @@ func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels WithBody(jsonInput)) } +// GetHealthStatus gets the health status of the cluster. +func (c *client) GetHealthStatus(ctx context.Context) ([]Health, error) { + var healths []Health + err := c.request(ctx, newRequestInfo(). + WithName(getHealthStatusName). + WithURI(health). + WithMethod(http.MethodGet). + WithResp(&healths)) + if err != nil { + return nil, err + } + return healths, nil +} + // GetConfig gets the configurations. func (c *client) GetConfig(ctx context.Context) (map[string]any, error) { var config map[string]any diff --git a/client/http/request_info.go b/client/http/request_info.go index 0ce7072d1ba0..202eab1150fe 100644 --- a/client/http/request_info.go +++ b/client/http/request_info.go @@ -39,6 +39,7 @@ const ( getStoresName = "GetStores" getStoreName = "GetStore" setStoreLabelsName = "SetStoreLabels" + getHealthStatusName = "GetHealthStatus" getConfigName = "GetConfig" setConfigName = "SetConfig" getScheduleConfigName = "GetScheduleConfig" diff --git a/client/http/types.go b/client/http/types.go index 31b2bfdaea7e..f7273068b8cd 100644 --- a/client/http/types.go +++ b/client/http/types.go @@ -661,3 +661,12 @@ func stringToKeyspaceState(str string) (keyspacepb.KeyspaceState, error) { return keyspacepb.KeyspaceState(0), fmt.Errorf("invalid KeyspaceState string: %s", str) } } + +// Health reflects the cluster's health. +// NOTE: This type is moved from `server/api/health.go`, maybe move them to the same place later. +type Health struct { + Name string `json:"name"` + MemberID uint64 `json:"member_id"` + ClientUrls []string `json:"client_urls"` + Health bool `json:"health"` +} diff --git a/go.mod b/go.mod index ff0cb20069a0..90c5639c9367 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/prometheus/common v0.51.1 github.com/sasha-s/go-deadlock v0.2.0 github.com/shirou/gopsutil/v3 v3.23.3 - github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072 + github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 8c77a4b84da9..6ec1baa72c47 100644 --- a/go.sum +++ b/go.sum @@ -432,8 +432,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072 h1:Txo4SXVJq/OgEjwgkWoxkMoTjGlcrgsQE/XSghjmu0w= -github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072/go.mod h1:+4nWMF0+CqEcU74SnX2NxaGqZ8zX4pcQ8Jcs77DbX5A= +github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99 h1:fmanhZtn5RKRljCjX46H+Q9/PECsHbflXm0RdrnK9e4= +github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99/go.mod h1:+4nWMF0+CqEcU74SnX2NxaGqZ8zX4pcQ8Jcs77DbX5A= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index e6d314c2e00d..69afb93f531c 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -11170,10 +11170,15 @@ "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The region heartbeat handle duration in .99", + "description": "The region heartbeat handle duration by levels", "editable": true, "error": false, + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, "fill": 0, + "fillGradient": 0, "grid": {}, "gridPos": { "h": 8, @@ -11181,7 +11186,8 @@ "x": 12, "y": 23 }, - "id": 1302, + "hiddenSeries": false, + "id": 1610, "legend": { "alignAsTable": true, "avg": false, @@ -11199,8 +11205,12 @@ "linewidth": 1, "links": [], "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, "paceLength": 10, "percentage": false, + "pluginVersion": "7.5.17", "pointradius": 5, "points": false, "renderer": "flot", @@ -11210,20 +11220,46 @@ "steppedLine": false, "targets": [ { - "expr": "histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\"}[1m])) by (address, store, le))", + "exemplar": true, + "expr": "histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\"}[1m])) by (le))", "format": "time_series", "hide": false, + "interval": "", "intervalFactor": 2, - "legendFormat": "{{address}}-store-{{store}}", + "legendFormat": "0.99", "refId": "A", "step": 4 + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.9, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\"}[1m])) by (le))", + "hide": false, + "interval": "", + "legendFormat": "0.9", + "refId": "B" + }, + { + "exemplar": true, + "expr": "histogram_quantile(0.8, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\"}[1m])) by (le))", + "hide": false, + "interval": "", + "legendFormat": "0.8", + "refId": "C" + }, + { + "exemplar": true, + "expr": "sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\"}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\"}[1m]))", + "hide": false, + "interval": "", + "legendFormat": "avg", + "refId": "D" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "99% Region heartbeat handle latency", + "title": "Region heartbeat handle latency overview", "tooltip": { "msResolution": false, "shared": true, @@ -11381,15 +11417,14 @@ }, { "aliasColors": {}, - "bars": true, + "bars": false, "dashLength": 10, "dashes": false, "datasource": "${DS_TEST-CLUSTER}", - "description": "The breakdown metric about heartbeat", + "description": "The region heartbeat handle duration in .99 by store", "editable": true, "error": false, "fill": 0, - "fillGradient": 0, "grid": {}, "gridPos": { "h": 8, @@ -11397,77 +11432,49 @@ "x": 12, "y": 31 }, - "hiddenSeries": false, - "id": 1335, + "id": 1302, "legend": { "alignAsTable": true, - "avg": true, + "avg": false, "current": true, "hideEmpty": true, - "hideZero": true, + "hideZero": false, "max": true, "min": false, "rightSide": true, "show": true, - "sort": "current", - "sortDesc": true, "total": false, "values": true }, - "lines": false, + "lines": true, "linewidth": 1, "links": [], "nullPointMode": "null as zero", - "options": { - "alertThreshold": true - }, "paceLength": 10, "percentage": false, - "pluginVersion": "8.5.27", "pointradius": 5, "points": false, "renderer": "flot", - "seriesOverrides": [ - { - "alias": "WaitRegionsLock", - "bars": false, - "lines": true, - "linewidth": 2, - "stack": false - }, - { - "alias": "WaitSubRegionsLock", - "bars": false, - "lines": true, - "linewidth": 2, - "stack": false - } - ], + "seriesOverrides": [], "spaceLength": 10, - "stack": true, + "stack": false, "steppedLine": false, "targets": [ { - "expr": "sum(rate(pd_core_region_heartbeat_breakdown_handle_duration_seconds_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=~\"$tidb_cluster.*\"}[1m])) by (name)", + "expr": "histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store\"}[1m])) by (address, store, le))", "format": "time_series", "hide": false, "intervalFactor": 2, - "legendFormat": "{{name}}", - "range": true, + "legendFormat": "{{address}}-store-{{store}}", "refId": "A", "step": 4 - }, - { - "expr": "sum(rate(pd_core_acquire_regions_lock_wait_duration_seconds_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=~\"$tidb_cluster.*\"}[1m])) by (type)", - "hide": false, - "legendFormat": "{{type}}", - "range": true, - "refId": "B" } ], "thresholds": [], + "timeFrom": null, "timeRegions": [], - "title": "Heartbeat Performance Duration BreakDown (Accumulation)", + "timeShift": null, + "title": "99% Region heartbeat handle latency by store", "tooltip": { "msResolution": false, "shared": true, @@ -11476,25 +11483,33 @@ }, "type": "graph", "xaxis": { + "buckets": null, "mode": "time", + "name": null, "show": true, "values": [] }, "yaxes": [ { "format": "s", + "label": null, "logBase": 1, + "max": null, "min": "0", "show": true }, { "format": "s", + "label": null, "logBase": 1, + "max": null, + "min": null, "show": true } ], "yaxis": { - "align": false + "align": false, + "alignLevel": null } }, { @@ -11594,6 +11609,124 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": true, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The breakdown metric about heartbeat", + "editable": true, + "error": false, + "fill": 0, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 39 + }, + "hiddenSeries": false, + "id": 1335, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": false, + "linewidth": 1, + "links": [], + "nullPointMode": "null as zero", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "8.5.27", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "WaitRegionsLock", + "bars": false, + "lines": true, + "linewidth": 2, + "stack": false + }, + { + "alias": "WaitSubRegionsLock", + "bars": false, + "lines": true, + "linewidth": 2, + "stack": false + } + ], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(pd_core_region_heartbeat_breakdown_handle_duration_seconds_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=~\"$tidb_cluster.*\"}[1m])) by (name)", + "format": "time_series", + "hide": false, + "intervalFactor": 2, + "legendFormat": "{{name}}", + "range": true, + "refId": "A", + "step": 4 + }, + { + "expr": "sum(rate(pd_core_acquire_regions_lock_wait_duration_seconds_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=~\"$tidb_cluster.*\"}[1m])) by (type)", + "hide": false, + "legendFormat": "{{type}}", + "range": true, + "refId": "B" + } + ], + "thresholds": [], + "timeRegions": [], + "title": "Heartbeat Performance Duration BreakDown (Accumulation)", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "mode": "time", + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "logBase": 1, + "min": "0", + "show": true + }, + { + "format": "s", + "logBase": 1, + "show": true + } + ], + "yaxis": { + "align": false + } + }, { "aliasColors": {}, "bars": false, @@ -11613,11 +11746,11 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 39 + "x": 0, + "y": 47 }, "hiddenSeries": false, - "id": 1608, + "id": 1609, "legend": { "alignAsTable": true, "avg": true, @@ -11644,28 +11777,44 @@ "pointradius": 5, "points": false, "renderer": "flot", - "seriesOverrides": [], + "seriesOverrides": [ + { + "alias": "/max-wait-duration.*/", + "bars": true, + "lines": false, + "transform": "negative-Y", + "yaxis": 2 + } + ], "spaceLength": 10, "stack": false, "steppedLine": false, "targets": [ { "exemplar": true, - "expr": "pd_ratelimit_runner_task_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "expr": "pd_ratelimit_runner_pending_tasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "{{task_type}}_({{runner_name}})", + "legendFormat": "{{task_type}}_{{runner_name}}", "refId": "A", "step": 4 + }, + { + "exemplar": true, + "expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", + "hide": false, + "interval": "", + "legendFormat": "max-wait-duration-{{runner_name}}", + "refId": "B" } ], "thresholds": [], "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Heartbeat Runner Pending Task", + "title": "Concurrent Runner Pending Task", "tooltip": { "msResolution": false, "shared": true, @@ -11682,8 +11831,9 @@ }, "yaxes": [ { - "format": "opm", - "label": null, + "decimals": null, + "format": "none", + "label": "", "logBase": 1, "max": null, "min": "0", @@ -11722,11 +11872,11 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 47 }, "hiddenSeries": false, - "id": 1609, + "id": 1608, "legend": { "alignAsTable": true, "avg": true, @@ -11753,37 +11903,21 @@ "pointradius": 5, "points": false, "renderer": "flot", - "seriesOverrides": [ - { - "alias": "/max-wait-duration.*/", - "bars": true, - "lines": false, - "transform": "negative-Y", - "yaxis": 2 - } - ], + "seriesOverrides": [], "spaceLength": 10, "stack": false, "steppedLine": false, "targets": [ { "exemplar": true, - "expr": "rate(pd_ratelimit_runner_task_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", + "expr": "rate(pd_ratelimit_runner_failed_tasks_total{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[1m])*60", "format": "time_series", "hide": false, "interval": "", "intervalFactor": 2, - "legendFormat": "failed-tasks-({{runner_name}})", + "legendFormat": "failed-tasks-{{runner_name}}", "refId": "A", "step": 4 - }, - { - "exemplar": true, - "expr": "pd_ratelimit_runner_task_max_waiting_duration_seconds{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}", - "hide": false, - "interval": "", - "legendFormat": "max-wait-duration-({{runner_name}})", - "refId": "B" } ], "thresholds": [], @@ -11807,9 +11941,8 @@ }, "yaxes": [ { - "decimals": null, "format": "opm", - "label": "", + "label": null, "logBase": 1, "max": null, "min": "0", @@ -11843,8 +11976,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 47 + "x": 0, + "y": 55 }, "id": 1305, "legend": { @@ -11937,7 +12070,7 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 55 }, "id": 1306, @@ -12027,8 +12160,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 55 + "x": 0, + "y": 63 }, "id": 1307, "legend": { @@ -12120,7 +12253,7 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 63 }, "id": 1308, @@ -12217,8 +12350,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 63 + "x": 0, + "y": 71 }, "id": 1309, "legend": { @@ -12314,7 +12447,7 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 71 }, "id": 1310, @@ -12411,8 +12544,8 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 71 + "x": 0, + "y": 79 }, "id": 1311, "legend": { @@ -12508,7 +12641,7 @@ "gridPos": { "h": 8, "w": 12, - "x": 0, + "x": 12, "y": 79 }, "id": 1312, diff --git a/pkg/core/context.go b/pkg/core/context.go index a0f51e556806..7410f8394c2c 100644 --- a/pkg/core/context.go +++ b/pkg/core/context.go @@ -25,6 +25,7 @@ type MetaProcessContext struct { context.Context Tracer RegionHeartbeatProcessTracer TaskRunner ratelimit.Runner + MiscRunner ratelimit.Runner LogRunner ratelimit.Runner } @@ -35,6 +36,7 @@ func ContextTODO() *MetaProcessContext { Context: context.TODO(), Tracer: NewNoopHeartbeatProcessTracer(), TaskRunner: ratelimit.NewSyncRunner(), + MiscRunner: ratelimit.NewSyncRunner(), LogRunner: ratelimit.NewSyncRunner(), // Limit default is nil } diff --git a/pkg/core/region.go b/pkg/core/region.go index c9a8455d4de0..19c1d0d4794d 100644 --- a/pkg/core/region.go +++ b/pkg/core/region.go @@ -727,9 +727,14 @@ func (r *RegionInfo) isRegionRecreated() bool { return r.GetRegionEpoch().GetVersion() == 1 && r.GetRegionEpoch().GetConfVer() == 1 && (len(r.GetStartKey()) != 0 || len(r.GetEndKey()) != 0) } +func (r *RegionInfo) Contains(key []byte) bool { + start, end := r.GetStartKey(), r.GetEndKey() + return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) +} + // RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin // and new region information. -type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) +type RegionGuideFunc func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) // GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function. // nil means do not print the log. @@ -742,7 +747,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { } // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. - return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync bool) { + return func(ctx *MetaProcessContext, region, origin *RegionInfo) (saveKV, saveCache, needSync, retained bool) { logRunner := ctx.LogRunner // print log asynchronously debug, info := d, i @@ -772,7 +777,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("region-id", region.GetID()), logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta()))) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } else { r := region.GetRegionEpoch() o := origin.GetRegionEpoch() @@ -785,7 +790,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-version", r.GetVersion()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } if r.GetConfVer() > o.GetConfVer() { if log.GetLevel() <= zap.InfoLevel { @@ -796,7 +801,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc { zap.Uint64("new-confver", r.GetConfVer()), ) } - saveKV, saveCache = true, true + saveKV, saveCache, retained = true, true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel { @@ -1673,13 +1678,6 @@ func (r *RegionsInfo) GetStoreWitnessCount(storeID uint64) int { return r.witnesses[storeID].length() } -// RandPendingRegion randomly gets a store's region with a pending peer. -func (r *RegionsInfo) RandPendingRegion(storeID uint64, ranges []KeyRange) *RegionInfo { - r.st.RLock() - defer r.st.RUnlock() - return r.pendingPeers[storeID].RandomRegion(ranges) -} - // RandPendingRegions randomly gets a store's n regions with a pending peer. func (r *RegionsInfo) RandPendingRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { r.st.RLock() @@ -1687,11 +1685,11 @@ func (r *RegionsInfo) RandPendingRegions(storeID uint64, ranges []KeyRange) []*R return r.pendingPeers[storeID].RandomRegions(randomRegionMaxRetry, ranges) } -// RandLeaderRegion randomly gets a store's leader region. -func (r *RegionsInfo) RandLeaderRegion(storeID uint64, ranges []KeyRange) *RegionInfo { +// This function is used for test only. +func (r *RegionsInfo) randLeaderRegion(storeID uint64, ranges []KeyRange) { r.st.RLock() defer r.st.RUnlock() - return r.leaders[storeID].RandomRegion(ranges) + _ = r.leaders[storeID].randomRegion(ranges) } // RandLeaderRegions randomly gets a store's n leader regions. @@ -1701,13 +1699,6 @@ func (r *RegionsInfo) RandLeaderRegions(storeID uint64, ranges []KeyRange) []*Re return r.leaders[storeID].RandomRegions(randomRegionMaxRetry, ranges) } -// RandFollowerRegion randomly gets a store's follower region. -func (r *RegionsInfo) RandFollowerRegion(storeID uint64, ranges []KeyRange) *RegionInfo { - r.st.RLock() - defer r.st.RUnlock() - return r.followers[storeID].RandomRegion(ranges) -} - // RandFollowerRegions randomly gets a store's n follower regions. func (r *RegionsInfo) RandFollowerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { r.st.RLock() @@ -1715,13 +1706,6 @@ func (r *RegionsInfo) RandFollowerRegions(storeID uint64, ranges []KeyRange) []* return r.followers[storeID].RandomRegions(randomRegionMaxRetry, ranges) } -// RandLearnerRegion randomly gets a store's learner region. -func (r *RegionsInfo) RandLearnerRegion(storeID uint64, ranges []KeyRange) *RegionInfo { - r.st.RLock() - defer r.st.RUnlock() - return r.learners[storeID].RandomRegion(ranges) -} - // RandLearnerRegions randomly gets a store's n learner regions. func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { r.st.RLock() @@ -1729,13 +1713,6 @@ func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange) []*R return r.learners[storeID].RandomRegions(randomRegionMaxRetry, ranges) } -// RandWitnessRegion randomly gets a store's witness region. -func (r *RegionsInfo) RandWitnessRegion(storeID uint64, ranges []KeyRange) *RegionInfo { - r.st.RLock() - defer r.st.RUnlock() - return r.witnesses[storeID].RandomRegion(ranges) -} - // RandWitnessRegions randomly gets a store's n witness regions. func (r *RegionsInfo) RandWitnessRegions(storeID uint64, ranges []KeyRange) []*RegionInfo { r.st.RLock() diff --git a/pkg/core/region_test.go b/pkg/core/region_test.go index 1b8f20cf9b28..aaf440eeeea2 100644 --- a/pkg/core/region_test.go +++ b/pkg/core/region_test.go @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) { for _, testCase := range testCases { regionA := region.Clone(testCase.optionsA...) regionB := region.Clone(testCase.optionsB...) - _, _, needSync := RegionGuide(ContextTODO(), regionA, regionB) + _, _, needSync, _ := RegionGuide(ContextTODO(), regionA, regionB) re.Equal(testCase.needSync, needSync) } } @@ -642,21 +642,64 @@ func BenchmarkUpdateBuckets(b *testing.B) { } func BenchmarkRandomRegion(b *testing.B) { - regions := NewRegionsInfo() - for i := 0; i < 5000000; i++ { - peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} - region := NewRegionInfo(&metapb.Region{ - Id: uint64(i + 1), - Peers: []*metapb.Peer{peer}, - StartKey: []byte(fmt.Sprintf("%20d", i)), - EndKey: []byte(fmt.Sprintf("%20d", i+1)), - }, peer) - origin, overlaps, rangeChanged := regions.SetRegion(region) - regions.UpdateSubTree(region, origin, overlaps, rangeChanged) - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - regions.RandLeaderRegion(1, nil) + for _, size := range []int{10, 100, 1000, 10000, 100000, 1000000, 10000000} { + regions := NewRegionsInfo() + for i := 0; i < size; i++ { + peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)} + region := NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + Peers: []*metapb.Peer{peer}, + StartKey: []byte(fmt.Sprintf("%20d", i)), + EndKey: []byte(fmt.Sprintf("%20d", i+1)), + }, peer) + origin, overlaps, rangeChanged := regions.SetRegion(region) + regions.UpdateSubTree(region, origin, overlaps, rangeChanged) + } + b.Run(fmt.Sprintf("random region whole range with size %d", size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + regions.randLeaderRegion(1, nil) + } + }) + b.Run(fmt.Sprintf("random regions whole range with size %d", size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + regions.RandLeaderRegions(1, nil) + } + }) + ranges := []KeyRange{ + NewKeyRange(fmt.Sprintf("%20d", size/4), fmt.Sprintf("%20d", size*3/4)), + } + b.Run(fmt.Sprintf("random region single range with size %d", size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + regions.randLeaderRegion(1, ranges) + } + }) + b.Run(fmt.Sprintf("random regions single range with size %d", size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + regions.RandLeaderRegions(1, ranges) + } + }) + ranges = []KeyRange{ + NewKeyRange(fmt.Sprintf("%20d", 0), fmt.Sprintf("%20d", size/4)), + NewKeyRange(fmt.Sprintf("%20d", size/4), fmt.Sprintf("%20d", size/2)), + NewKeyRange(fmt.Sprintf("%20d", size/2), fmt.Sprintf("%20d", size*3/4)), + NewKeyRange(fmt.Sprintf("%20d", size*3/4), fmt.Sprintf("%20d", size)), + } + b.Run(fmt.Sprintf("random region multiple ranges with size %d", size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + regions.randLeaderRegion(1, ranges) + } + }) + b.Run(fmt.Sprintf("random regions multiple ranges with size %d", size), func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + regions.RandLeaderRegions(1, ranges) + } + }) } } @@ -1031,7 +1074,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { regionsOld.AtomicCheckAndPutRegion(ctx, regionPendingItemA) re.Equal(int32(2), regionPendingItemA.GetRef()) // check new item - saveKV, saveCache, needSync := regionGuide(ctx, regionItemA, regionPendingItemA) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemA, regionPendingItemA) re.True(needSync) re.True(saveCache) re.False(saveKV) @@ -1060,7 +1103,7 @@ func TestUpdateRegionEventualConsistency(t *testing.T) { re.Equal(int32(1), regionPendingItemB.GetRef()) // heartbeat again, no need updates root tree - saveKV, saveCache, needSync := regionGuide(ctx, regionItemB, regionItemB) + saveKV, saveCache, needSync, _ := regionGuide(ctx, regionItemB, regionItemB) re.False(needSync) re.False(saveCache) re.False(saveKV) diff --git a/pkg/core/region_tree.go b/pkg/core/region_tree.go index 6c3c71c51588..d4ef4a880fc8 100644 --- a/pkg/core/region_tree.go +++ b/pkg/core/region_tree.go @@ -52,11 +52,6 @@ func (r *regionItem) Less(other *regionItem) bool { return bytes.Compare(left, right) < 0 } -func (r *regionItem) Contains(key []byte) bool { - start, end := r.GetStartKey(), r.GetEndKey() - return bytes.Compare(key, start) >= 0 && (len(end) == 0 || bytes.Compare(key, end) < 0) -} - const ( defaultBTreeDegree = 64 ) @@ -328,62 +323,115 @@ func (t *regionTree) getAdjacentItem(item *regionItem) (prev *regionItem, next * return prev, next } -// RandomRegion is used to get a random region within ranges. -func (t *regionTree) RandomRegion(ranges []KeyRange) *RegionInfo { - if t.length() == 0 { +func (t *regionTree) randomRegion(ranges []KeyRange) *RegionInfo { + regions := t.RandomRegions(1, ranges) + if len(regions) == 0 { return nil } + return regions[0] +} - if len(ranges) == 0 { - ranges = []KeyRange{NewKeyRange("", "")} +// RandomRegions get n random regions within the given ranges. +func (t *regionTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo { + treeLen := t.length() + if treeLen == 0 || n < 1 { + return nil } - - for _, i := range rand.Perm(len(ranges)) { - var endIndex int - startKey, endKey := ranges[i].StartKey, ranges[i].EndKey - startRegion, startIndex := t.tree.GetWithIndex(®ionItem{RegionInfo: &RegionInfo{meta: &metapb.Region{StartKey: startKey}}}) - - if len(endKey) != 0 { - _, endIndex = t.tree.GetWithIndex(®ionItem{RegionInfo: &RegionInfo{meta: &metapb.Region{StartKey: endKey}}}) - } else { - endIndex = t.tree.Len() - } - - // Consider that the item in the tree may not be continuous, - // we need to check if the previous item contains the key. - if startIndex != 0 && startRegion == nil && t.tree.GetAt(startIndex-1).Contains(startKey) { - startIndex-- + // Pre-allocate the variables to reduce the temporary memory allocations. + var ( + startKey, endKey []byte + // By default, we set the `startIndex` and `endIndex` to the whole tree range. + startIndex, endIndex = 0, treeLen + randIndex int + startItem *regionItem + pivotItem = ®ionItem{&RegionInfo{meta: &metapb.Region{}}} + region *RegionInfo + regions = make([]*RegionInfo, 0, n) + rangeLen, curLen = len(ranges), len(regions) + // setStartEndIndices is a helper function to set `startIndex` and `endIndex` + // according to the `startKey` and `endKey` and check if the range is invalid + // to skip the iteration. + // TODO: maybe we could cache the `startIndex` and `endIndex` for each range. + setAndCheckStartEndIndices = func() (skip bool) { + startKeyLen, endKeyLen := len(startKey), len(endKey) + if startKeyLen == 0 && endKeyLen == 0 { + startIndex, endIndex = 0, treeLen + return false + } + pivotItem.meta.StartKey = startKey + startItem, startIndex = t.tree.GetWithIndex(pivotItem) + if endKeyLen > 0 { + pivotItem.meta.StartKey = endKey + _, endIndex = t.tree.GetWithIndex(pivotItem) + } else { + endIndex = treeLen + } + // Consider that the item in the tree may not be continuous, + // we need to check if the previous item contains the key. + if startIndex != 0 && startItem == nil { + region = t.tree.GetAt(startIndex - 1).RegionInfo + if region.Contains(startKey) { + startIndex-- + } + } + // Check whether the `startIndex` and `endIndex` are valid. + if endIndex <= startIndex { + if endKeyLen > 0 && bytes.Compare(startKey, endKey) > 0 { + log.Error("wrong range keys", + logutil.ZapRedactString("start-key", string(HexRegionKey(startKey))), + logutil.ZapRedactString("end-key", string(HexRegionKey(endKey))), + errs.ZapError(errs.ErrWrongRangeKeys)) + } + return true + } + return false } - - if endIndex <= startIndex { - if len(endKey) > 0 && bytes.Compare(startKey, endKey) > 0 { - log.Error("wrong range keys", - logutil.ZapRedactString("start-key", string(HexRegionKey(startKey))), - logutil.ZapRedactString("end-key", string(HexRegionKey(endKey))), - errs.ZapError(errs.ErrWrongRangeKeys)) + ) + // This is a fast path to reduce the unnecessary iterations when we only have one range. + if rangeLen <= 1 { + if rangeLen == 1 { + startKey, endKey = ranges[0].StartKey, ranges[0].EndKey + if setAndCheckStartEndIndices() { + return regions } - continue } - index := rand.Intn(endIndex-startIndex) + startIndex - region := t.tree.GetAt(index).RegionInfo - if region.isInvolved(startKey, endKey) { - return region + for curLen < n { + randIndex = rand.Intn(endIndex-startIndex) + startIndex + region = t.tree.GetAt(randIndex).RegionInfo + if region.isInvolved(startKey, endKey) { + regions = append(regions, region) + curLen++ + } + // No region found, directly break to avoid infinite loop. + if curLen == 0 { + break + } } - } - - return nil -} - -func (t *regionTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo { - if t.length() == 0 { - return nil - } - - regions := make([]*RegionInfo, 0, n) + return regions + } + // When there are multiple ranges provided, + // keep retrying until we get enough regions. + for curLen < n { + // Shuffle the ranges to increase the randomness. + for _, i := range rand.Perm(rangeLen) { + startKey, endKey = ranges[i].StartKey, ranges[i].EndKey + if setAndCheckStartEndIndices() { + continue + } - for i := 0; i < n; i++ { - if region := t.RandomRegion(ranges); region != nil { - regions = append(regions, region) + randIndex = rand.Intn(endIndex-startIndex) + startIndex + region = t.tree.GetAt(randIndex).RegionInfo + if region.isInvolved(startKey, endKey) { + regions = append(regions, region) + curLen++ + if curLen == n { + return regions + } + } + } + // No region found, directly break to avoid infinite loop. + if curLen == 0 { + break } } return regions diff --git a/pkg/core/region_tree_test.go b/pkg/core/region_tree_test.go index 3f2ca0c1fb8f..5886103191c7 100644 --- a/pkg/core/region_tree_test.go +++ b/pkg/core/region_tree_test.go @@ -274,13 +274,19 @@ func TestRegionTreeSplitAndMerge(t *testing.T) { func TestRandomRegion(t *testing.T) { re := require.New(t) tree := newRegionTree() - r := tree.RandomRegion(nil) + r := tree.randomRegion(nil) re.Nil(r) regionA := NewTestRegionInfo(1, 1, []byte(""), []byte("g")) updateNewItem(tree, regionA) - ra := tree.RandomRegion([]KeyRange{NewKeyRange("", "")}) + ra := tree.randomRegion([]KeyRange{NewKeyRange("", "")}) re.Equal(regionA, ra) + ra = tree.randomRegion(nil) + re.Equal(regionA, ra) + ra2 := tree.RandomRegions(2, []KeyRange{NewKeyRange("", "")}) + re.Equal([]*RegionInfo{regionA, regionA}, ra2) + ra2 = tree.RandomRegions(2, nil) + re.Equal([]*RegionInfo{regionA, regionA}, ra2) regionB := NewTestRegionInfo(2, 2, []byte("g"), []byte("n")) regionC := NewTestRegionInfo(3, 3, []byte("n"), []byte("t")) @@ -289,22 +295,23 @@ func TestRandomRegion(t *testing.T) { updateNewItem(tree, regionC) updateNewItem(tree, regionD) - rb := tree.RandomRegion([]KeyRange{NewKeyRange("g", "n")}) + rb := tree.randomRegion([]KeyRange{NewKeyRange("g", "n")}) re.Equal(regionB, rb) - rc := tree.RandomRegion([]KeyRange{NewKeyRange("n", "t")}) + rc := tree.randomRegion([]KeyRange{NewKeyRange("n", "t")}) re.Equal(regionC, rc) - rd := tree.RandomRegion([]KeyRange{NewKeyRange("t", "")}) + rd := tree.randomRegion([]KeyRange{NewKeyRange("t", "")}) re.Equal(regionD, rd) - rf := tree.RandomRegion([]KeyRange{NewKeyRange("", "a")}) + rf := tree.randomRegion([]KeyRange{NewKeyRange("", "a")}) re.Nil(rf) - rf = tree.RandomRegion([]KeyRange{NewKeyRange("o", "s")}) + rf = tree.randomRegion([]KeyRange{NewKeyRange("o", "s")}) re.Nil(rf) - rf = tree.RandomRegion([]KeyRange{NewKeyRange("", "a")}) + rf = tree.randomRegion([]KeyRange{NewKeyRange("", "a")}) re.Nil(rf) - rf = tree.RandomRegion([]KeyRange{NewKeyRange("z", "")}) + rf = tree.randomRegion([]KeyRange{NewKeyRange("z", "")}) re.Nil(rf) + checkRandomRegion(re, tree, []*RegionInfo{regionA, regionB, regionC, regionD}, nil) checkRandomRegion(re, tree, []*RegionInfo{regionA, regionB, regionC, regionD}, []KeyRange{NewKeyRange("", "")}) checkRandomRegion(re, tree, []*RegionInfo{regionA, regionB}, []KeyRange{NewKeyRange("", "n")}) checkRandomRegion(re, tree, []*RegionInfo{regionC, regionD}, []KeyRange{NewKeyRange("n", "")}) @@ -315,45 +322,46 @@ func TestRandomRegion(t *testing.T) { func TestRandomRegionDiscontinuous(t *testing.T) { re := require.New(t) tree := newRegionTree() - r := tree.RandomRegion([]KeyRange{NewKeyRange("c", "f")}) + r := tree.randomRegion([]KeyRange{NewKeyRange("c", "f")}) re.Nil(r) // test for single region regionA := NewTestRegionInfo(1, 1, []byte("c"), []byte("f")) updateNewItem(tree, regionA) - ra := tree.RandomRegion([]KeyRange{NewKeyRange("c", "e")}) + ra := tree.randomRegion([]KeyRange{NewKeyRange("c", "e")}) re.Nil(ra) - ra = tree.RandomRegion([]KeyRange{NewKeyRange("c", "f")}) + ra = tree.randomRegion([]KeyRange{NewKeyRange("c", "f")}) re.Equal(regionA, ra) - ra = tree.RandomRegion([]KeyRange{NewKeyRange("c", "g")}) + ra = tree.randomRegion([]KeyRange{NewKeyRange("c", "g")}) re.Equal(regionA, ra) - ra = tree.RandomRegion([]KeyRange{NewKeyRange("a", "e")}) + ra = tree.randomRegion([]KeyRange{NewKeyRange("a", "e")}) re.Nil(ra) - ra = tree.RandomRegion([]KeyRange{NewKeyRange("a", "f")}) + ra = tree.randomRegion([]KeyRange{NewKeyRange("a", "f")}) re.Equal(regionA, ra) - ra = tree.RandomRegion([]KeyRange{NewKeyRange("a", "g")}) + ra = tree.randomRegion([]KeyRange{NewKeyRange("a", "g")}) re.Equal(regionA, ra) regionB := NewTestRegionInfo(2, 2, []byte("n"), []byte("x")) updateNewItem(tree, regionB) - rb := tree.RandomRegion([]KeyRange{NewKeyRange("g", "x")}) + rb := tree.randomRegion([]KeyRange{NewKeyRange("g", "x")}) re.Equal(regionB, rb) - rb = tree.RandomRegion([]KeyRange{NewKeyRange("g", "y")}) + rb = tree.randomRegion([]KeyRange{NewKeyRange("g", "y")}) re.Equal(regionB, rb) - rb = tree.RandomRegion([]KeyRange{NewKeyRange("n", "y")}) + rb = tree.randomRegion([]KeyRange{NewKeyRange("n", "y")}) re.Equal(regionB, rb) - rb = tree.RandomRegion([]KeyRange{NewKeyRange("o", "y")}) + rb = tree.randomRegion([]KeyRange{NewKeyRange("o", "y")}) re.Nil(rb) regionC := NewTestRegionInfo(3, 3, []byte("z"), []byte("")) updateNewItem(tree, regionC) - rc := tree.RandomRegion([]KeyRange{NewKeyRange("y", "")}) + rc := tree.randomRegion([]KeyRange{NewKeyRange("y", "")}) re.Equal(regionC, rc) regionD := NewTestRegionInfo(4, 4, []byte(""), []byte("a")) updateNewItem(tree, regionD) - rd := tree.RandomRegion([]KeyRange{NewKeyRange("", "b")}) + rd := tree.randomRegion([]KeyRange{NewKeyRange("", "b")}) re.Equal(regionD, rd) + checkRandomRegion(re, tree, []*RegionInfo{regionA, regionB, regionC, regionD}, nil) checkRandomRegion(re, tree, []*RegionInfo{regionA, regionB, regionC, regionD}, []KeyRange{NewKeyRange("", "")}) } @@ -365,7 +373,7 @@ func updateNewItem(tree *regionTree, region *RegionInfo) { func checkRandomRegion(re *require.Assertions, tree *regionTree, regions []*RegionInfo, ranges []KeyRange) { keys := make(map[string]struct{}) for i := 0; i < 10000 && len(keys) < len(regions); i++ { - re := tree.RandomRegion(ranges) + re := tree.randomRegion(ranges) if re == nil { continue } diff --git a/pkg/dashboard/uiserver/embedded_assets_rewriter.go b/pkg/dashboard/uiserver/embedded_assets_rewriter.go index 2a5b4a5b3b6c..d19db01936f8 100644 --- a/pkg/dashboard/uiserver/embedded_assets_rewriter.go +++ b/pkg/dashboard/uiserver/embedded_assets_rewriter.go @@ -28,6 +28,7 @@ import ( var once sync.Once // Assets returns the Assets FileSystem of the dashboard UI +// NOTE: if you see "undefined: assets" error, please run `make dashboard-ui` in the root directory of the repository. func Assets(cfg *config.Config) http.FileSystem { once.Do(func() { resPath := distroutil.MustGetResPath() diff --git a/pkg/mcs/scheduling/server/cluster.go b/pkg/mcs/scheduling/server/cluster.go index d3691516868e..c6c365b03ad8 100644 --- a/pkg/mcs/scheduling/server/cluster.go +++ b/pkg/mcs/scheduling/server/cluster.go @@ -54,8 +54,12 @@ type Cluster struct { clusterID uint64 running atomic.Bool - heartbeatRunnner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } const ( @@ -64,8 +68,9 @@ const ( collectWaitTime = time.Minute // heartbeat relative const - heartbeatTaskRunner = "heartbeat-task-runner" - logTaskRunner = "log-task-runner" + heartbeatTaskRunner = "heartbeat-task-runner" + statisticsTaskRunner = "statistics-task-runner" + logTaskRunner = "log-task-runner" ) var syncRunner = ratelimit.NewSyncRunner() @@ -93,8 +98,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig, clusterID: clusterID, checkMembershipCh: checkMembershipCh, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams) err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel()) @@ -531,7 +537,8 @@ func (c *Cluster) StartBackgroundJobs() { go c.runUpdateStoreStats() go c.runCoordinator() go c.runMetricsCollectionJob() - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.miscRunner.Start() c.logRunner.Start() c.running.Store(true) } @@ -543,7 +550,8 @@ func (c *Cluster) StopBackgroundJobs() { } c.running.Store(false) c.coordinator.Stop() - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.cancel() c.wg.Wait() @@ -560,16 +568,18 @@ func (c *Cluster) HandleRegionHeartbeat(region *core.RegionInfo) error { if c.persistConfig.GetScheduleConfig().EnableHeartbeatBreakdownMetrics { tracer = core.NewHeartbeatProcessTracer() } - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.persistConfig.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner logRunner = c.logRunner } ctx := &core.MetaProcessContext{ Context: c.ctx, Tracer: tracer, TaskRunner: taskRunner, + MiscRunner: miscRunner, LogRunner: logRunner, } tracer.Begin() @@ -591,19 +601,12 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c return err } region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) - - ctx.TaskRunner.RunTask( - ctx, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - _, saveCache, _ := core.GenerateRegionGuideFunc(true)(ctx, region, origin) + _, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin) if !saveCache { // Due to some config changes need to update the region stats as well, @@ -627,6 +630,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(true), ) } return nil @@ -650,6 +654,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() ctx.TaskRunner.RunTask( diff --git a/pkg/ratelimit/metrics.go b/pkg/ratelimit/metrics.go index 5d4443a1cc4a..c5510e66b263 100644 --- a/pkg/ratelimit/metrics.go +++ b/pkg/ratelimit/metrics.go @@ -31,25 +31,41 @@ var ( Name: "runner_task_max_waiting_duration_seconds", Help: "The duration of tasks waiting in the runner.", }, []string{nameStr}) - - RunnerTaskPendingTasks = prometheus.NewGaugeVec( + RunnerPendingTasks = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_pending_tasks", + Name: "runner_pending_tasks", Help: "The number of pending tasks in the runner.", }, []string{nameStr, taskStr}) - RunnerTaskFailedTasks = prometheus.NewCounterVec( + RunnerFailedTasks = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "pd", Subsystem: "ratelimit", - Name: "runner_task_failed_tasks_total", + Name: "runner_failed_tasks_total", Help: "The number of failed tasks in the runner.", - }, []string{nameStr}) + }, []string{nameStr, taskStr}) + RunnerSucceededTasks = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_success_tasks_total", + Help: "The number of tasks in the runner.", + }, []string{nameStr, taskStr}) + RunnerTaskExecutionDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "ratelimit", + Name: "runner_task_execution_duration_seconds", + Help: "Bucketed histogram of processing time (s) of finished tasks.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{nameStr, taskStr}) ) func init() { prometheus.MustRegister(RunnerTaskMaxWaitingDuration) - prometheus.MustRegister(RunnerTaskPendingTasks) - prometheus.MustRegister(RunnerTaskFailedTasks) + prometheus.MustRegister(RunnerPendingTasks) + prometheus.MustRegister(RunnerFailedTasks) + prometheus.MustRegister(RunnerTaskExecutionDuration) + prometheus.MustRegister(RunnerSucceededTasks) } diff --git a/pkg/ratelimit/runner.go b/pkg/ratelimit/runner.go index 07233af238b9..17a45067f3dd 100644 --- a/pkg/ratelimit/runner.go +++ b/pkg/ratelimit/runner.go @@ -35,7 +35,10 @@ const ( SaveRegionToKV = "SaveRegionToKV" ) -const initialCapacity = 100 +const ( + initialCapacity = 10000 + maxPendingTaskNum = 20000000 +) // Runner is the interface for running tasks. type Runner interface { @@ -48,9 +51,10 @@ type Runner interface { type Task struct { ctx context.Context submittedAt time.Time - opts *TaskOpts f func(context.Context) name string + // retained indicates whether the task should be dropped if the task queue exceeds maxPendingDuration. + retained bool } // ErrMaxWaitingTasksExceeded is returned when the number of waiting tasks exceeds the maximum. @@ -67,7 +71,6 @@ type ConcurrentRunner struct { stopChan chan struct{} wg sync.WaitGroup pendingTaskCount map[string]int64 - failedTaskCount prometheus.Counter maxWaitingDuration prometheus.Gauge } @@ -79,18 +82,19 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur maxPendingDuration: maxPendingDuration, taskChan: make(chan *Task), pendingTasks: make([]*Task, 0, initialCapacity), - failedTaskCount: RunnerTaskFailedTasks.WithLabelValues(name), pendingTaskCount: make(map[string]int64), maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name), } return s } -// TaskOpts is the options for RunTask. -type TaskOpts struct{} - // TaskOption configures TaskOp -type TaskOption func(opts *TaskOpts) +type TaskOption func(opts *Task) + +// WithRetained sets whether the task should be retained. +func WithRetained(retained bool) TaskOption { + return func(opts *Task) { opts.retained = retained } +} // Start starts the runner. func (cr *ConcurrentRunner) Start() { @@ -123,8 +127,8 @@ func (cr *ConcurrentRunner) Start() { if len(cr.pendingTasks) > 0 { maxDuration = time.Since(cr.pendingTasks[0].submittedAt) } - for name, cnt := range cr.pendingTaskCount { - RunnerTaskPendingTasks.WithLabelValues(cr.name, name).Set(float64(cnt)) + for taskName, cnt := range cr.pendingTaskCount { + RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt)) } cr.pendingMu.Unlock() cr.maxWaitingDuration.Set(maxDuration.Seconds()) @@ -134,26 +138,28 @@ func (cr *ConcurrentRunner) Start() { } func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) { + start := time.Now() task.f(task.ctx) if token != nil { cr.limiter.ReleaseToken(token) cr.processPendingTasks() } + RunnerTaskExecutionDuration.WithLabelValues(cr.name, task.name).Observe(time.Since(start).Seconds()) + RunnerSucceededTasks.WithLabelValues(cr.name, task.name).Inc() } func (cr *ConcurrentRunner) processPendingTasks() { cr.pendingMu.Lock() defer cr.pendingMu.Unlock() - for len(cr.pendingTasks) > 0 { + if len(cr.pendingTasks) > 0 { task := cr.pendingTasks[0] select { case cr.taskChan <- task: cr.pendingTasks = cr.pendingTasks[1:] cr.pendingTaskCount[task.name]-- - return default: - return } + return } } @@ -165,34 +171,40 @@ func (cr *ConcurrentRunner) Stop() { // RunTask runs the task asynchronously. func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error { - taskOpts := &TaskOpts{} - for _, opt := range opts { - opt(taskOpts) - } task := &Task{ ctx: ctx, name: name, f: f, - opts: taskOpts, } - + for _, opt := range opts { + opt(task) + } cr.processPendingTasks() - select { - case cr.taskChan <- task: - default: - cr.pendingMu.Lock() - defer cr.pendingMu.Unlock() - if len(cr.pendingTasks) > 0 { + cr.pendingMu.Lock() + defer func() { + cr.pendingMu.Unlock() + cr.processPendingTasks() + }() + + pendingTaskNum := len(cr.pendingTasks) + if pendingTaskNum > 0 { + if !task.retained { maxWait := time.Since(cr.pendingTasks[0].submittedAt) if maxWait > cr.maxPendingDuration { - cr.failedTaskCount.Inc() + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() return ErrMaxWaitingTasksExceeded } } - task.submittedAt = time.Now() - cr.pendingTasks = append(cr.pendingTasks, task) - cr.pendingTaskCount[task.name]++ + // We use the max task number to limit the memory usage. + // It occupies around 1.5GB memory when there is 20000000 pending task. + if len(cr.pendingTasks) > maxPendingTaskNum { + RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc() + return ErrMaxWaitingTasksExceeded + } } + task.submittedAt = time.Now() + cr.pendingTasks = append(cr.pendingTasks, task) + cr.pendingTaskCount[task.name]++ return nil } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index e4c159cf22d9..30197dd43ea4 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -222,61 +222,93 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store // Better to make sure once any of these conditions changes, it will trigger the heartbeat `save_cache`. // Otherwise, the state may be out-of-date for a long time, which needs another way to apply the change ASAP. // For example, see `RegionStatsNeedUpdate` above to know how `OversizedRegion` and `UndersizedRegion` are updated. - conditions := map[RegionStatisticType]bool{ - MissPeer: len(peers) < desiredReplicas, - ExtraPeer: len(peers) > desiredReplicas, - DownPeer: len(downPeers) > 0, - PendingPeer: len(pendingPeers) > 0, - OfflinePeer: func() bool { - for _, store := range stores { - if store.IsRemoving() { - peer := region.GetStorePeer(store.GetID()) - if peer != nil { - return true - } - } + var conditions RegionStatisticType + if len(peers) < desiredReplicas { + conditions |= MissPeer + } + if len(peers) > desiredReplicas { + conditions |= ExtraPeer + } + if len(downPeers) > 0 { + conditions |= DownPeer + } + if len(pendingPeers) > 0 { + conditions |= PendingPeer + } + for _, store := range stores { + if store.IsRemoving() { + peer := region.GetStorePeer(store.GetID()) + if peer != nil { + conditions |= OfflinePeer + break } - return false - }(), - LearnerPeer: len(learners) > 0, - EmptyRegion: regionSize <= core.EmptyRegionApproximateSize, - OversizedRegion: region.IsOversized(regionMaxSize, regionMaxKeys), - UndersizedRegion: region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys), - WitnessLeader: leaderIsWitness, + } + } + if len(learners) > 0 { + conditions |= LearnerPeer + } + if regionSize <= core.EmptyRegionApproximateSize { + conditions |= EmptyRegion + } + if region.IsOversized(regionMaxSize, regionMaxKeys) { + conditions |= OversizedRegion + } + if region.NeedMerge(maxMergeRegionSize, maxMergeRegionKeys) { + conditions |= UndersizedRegion + } + if leaderIsWitness { + conditions |= WitnessLeader } // Check if the region meets any of the conditions and update the corresponding info. regionID := region.GetID() - for typ, c := range conditions { - if c { - info := r.stats[typ][regionID] - if typ == DownPeer { - if info == nil { - info = &RegionInfoWithTS{} - } - if info.(*RegionInfoWithTS).startDownPeerTS != 0 { - regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS)) + for i := 0; i < len(regionStatisticTypes); i++ { + condition := RegionStatisticType(1 << i) + if conditions&condition == 0 { + continue + } + info := r.stats[condition][regionID] + // The condition is met + switch condition { + case MissPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if len(voters) < desiredVoters { + if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 { + regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS)) } else { - info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix() - logDownPeerWithNoDisconnectedStore(region, stores) - } - } else if typ == MissPeer { - if info == nil { - info = &RegionInfoWithTS{} - } - if len(voters) < desiredVoters { - if info.(*RegionInfoWithTS).startMissVoterPeerTS != 0 { - regionMissVoterPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startMissVoterPeerTS)) - } else { - info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix() - } + info.(*RegionInfoWithTS).startMissVoterPeerTS = time.Now().Unix() } + } + case DownPeer: + if info == nil { + info = &RegionInfoWithTS{} + } + if info.(*RegionInfoWithTS).startDownPeerTS != 0 { + regionDownPeerDuration.Observe(float64(time.Now().Unix() - info.(*RegionInfoWithTS).startDownPeerTS)) } else { - info = struct{}{} + info.(*RegionInfoWithTS).startDownPeerTS = time.Now().Unix() + logDownPeerWithNoDisconnectedStore(region, stores) } - - r.stats[typ][regionID] = info - peerTypeIndex |= typ + case ExtraPeer: + fallthrough + case PendingPeer: + fallthrough + case OfflinePeer: + fallthrough + case LearnerPeer: + fallthrough + case EmptyRegion: + fallthrough + case OversizedRegion: + fallthrough + case UndersizedRegion: + fallthrough + case WitnessLeader: + info = struct{}{} } + r.stats[condition][regionID] = info + peerTypeIndex |= condition } // Remove the info if any of the conditions are not met any more. if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic { diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 8a2e757d5cde..00fa8dc389b2 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -212,7 +212,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { Tracer: core.NewNoopHeartbeatProcessTracer(), // no limit for followers. } - saveKV, _, _ := regionGuide(ctx, region, origin) + saveKV, _, _, _ := regionGuide(ctx, region, origin) overlaps := bc.PutRegion(region) if hasBuckets { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2930357e2b4f..b2af48f08da1 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -1439,7 +1439,7 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() { defer kgm.wg.Done() patrolInterval := groupPatrolInterval failpoint.Inject("fastGroupSplitPatroller", func() { - patrolInterval = time.Second + patrolInterval = 3 * time.Second }) ticker := time.NewTicker(patrolInterval) defer ticker.Stop() diff --git a/pkg/utils/tempurl/tempurl.go b/pkg/utils/tempurl/tempurl.go index 421513ff0016..cd5cd498f952 100644 --- a/pkg/utils/tempurl/tempurl.go +++ b/pkg/utils/tempurl/tempurl.go @@ -16,7 +16,10 @@ package tempurl import ( "fmt" + "io" "net" + "net/http" + "os" "time" "github.com/pingcap/log" @@ -29,6 +32,9 @@ var ( testAddrMap = make(map[string]struct{}) ) +// reference: /pd/tools/pd-ut/alloc/server.go +const AllocURLFromUT = "allocURLFromUT" + // Alloc allocates a local URL for testing. func Alloc() string { for i := 0; i < 10; i++ { @@ -42,6 +48,9 @@ func Alloc() string { } func tryAllocTestURL() string { + if url := getFromUT(); url != "" { + return url + } l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { log.Fatal("listen failed", errs.ZapError(err)) @@ -63,3 +72,26 @@ func tryAllocTestURL() string { testAddrMap[addr] = struct{}{} return addr } + +func getFromUT() string { + addr := os.Getenv(AllocURLFromUT) + if addr == "" { + return "" + } + + req, err := http.NewRequest(http.MethodGet, addr, nil) + if err != nil { + return "" + } + resp, err := http.DefaultClient.Do(req) + if err != nil || resp.StatusCode != http.StatusOK { + return "" + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return "" + } + url := string(body) + return url +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index a8558051dfa6..148b43541a23 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -107,8 +107,9 @@ const ( minSnapshotDurationSec = 5 // heartbeat relative const - heartbeatTaskRunner = "heartbeat-async" - logTaskRunner = "log-async" + heartbeatTaskRunner = "heartbeat-async" + statisticsTaskRunner = "statistics-async" + logTaskRunner = "log-async" ) // Server is the interface for cluster. @@ -173,8 +174,12 @@ type RaftCluster struct { independentServices sync.Map hbstreams *hbstream.HeartbeatStreams - heartbeatRunnner ratelimit.Runner - logRunner ratelimit.Runner + // heartbeatRunner is used to process the subtree update task asynchronously. + heartbeatRunner ratelimit.Runner + // miscRunner is used to process the statistics and persistent tasks asynchronously. + miscRunner ratelimit.Runner + // logRunner is used to process the log asynchronously. + logRunner ratelimit.Runner } // Status saves some state information. @@ -191,15 +196,16 @@ type Status struct { func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.BasicCluster, storage storage.Storage, regionSyncer *syncer.RegionSyncer, etcdClient *clientv3.Client, httpClient *http.Client) *RaftCluster { return &RaftCluster{ - serverCtx: ctx, - clusterID: clusterID, - regionSyncer: regionSyncer, - httpClient: httpClient, - etcdClient: etcdClient, - core: basicCluster, - storage: storage, - heartbeatRunnner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), - logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + serverCtx: ctx, + clusterID: clusterID, + regionSyncer: regionSyncer, + httpClient: httpClient, + etcdClient: etcdClient, + core: basicCluster, + storage: storage, + heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + miscRunner: ratelimit.NewConcurrentRunner(statisticsTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), + logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute), } } @@ -357,7 +363,8 @@ func (c *RaftCluster) Start(s Server) error { go c.startGCTuner() c.running = true - c.heartbeatRunnner.Start() + c.heartbeatRunner.Start() + c.miscRunner.Start() c.logRunner.Start() return nil } @@ -752,7 +759,8 @@ func (c *RaftCluster) Stop() { if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { c.stopSchedulingJobs() } - c.heartbeatRunnner.Stop() + c.heartbeatRunner.Stop() + c.miscRunner.Stop() c.logRunner.Stop() c.Unlock() @@ -1024,19 +1032,13 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio region.Inherit(origin, c.GetStoreConfig().IsEnableRegionBucket()) if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( - ctx.Context, - ratelimit.HandleStatsAsync, - func(_ context.Context) { - cluster.HandleStatsAsync(c, region) - }, - ) + cluster.HandleStatsAsync(c, region) } tracer.OnAsyncHotStatsFinished() hasRegionStats := c.regionStats != nil // Save to storage if meta is updated, except for flashback. // Save to cache if meta or leader is updated, or contains any down/pending peer. - saveKV, saveCache, needSync := regionGuide(ctx, region, origin) + saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin) tracer.OnRegionGuideFinished() if !saveKV && !saveCache { // Due to some config changes need to update the region stats as well, @@ -1045,7 +1047,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio // region stats needs to be collected in API mode. // We need to think of a better way to reduce this part of the cost in the future. if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.ObserveRegionStatsAsync, func(_ context.Context) { @@ -1063,6 +1065,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(true), ) } return nil @@ -1090,11 +1093,12 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio func(_ context.Context) { c.CheckAndPutSubTree(region) }, + ratelimit.WithRetained(retained), ) tracer.OnUpdateSubTreeFinished() if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.HandleOverlaps, func(_ context.Context) { @@ -1107,7 +1111,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnSaveCacheFinished() // handle region stats - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.CollectRegionStatsAsync, func(_ context.Context) { @@ -1121,7 +1125,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio tracer.OnCollectRegionStatsFinished() if c.storage != nil { if saveKV { - ctx.TaskRunner.RunTask( + ctx.MiscRunner.RunTask( ctx.Context, ratelimit.SaveRegionToKV, func(_ context.Context) { diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index 43602dbb68d0..39720e7d7650 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -40,10 +40,11 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { tracer = core.NewHeartbeatProcessTracer() } defer tracer.Release() - var taskRunner, logRunner ratelimit.Runner - taskRunner, logRunner = syncRunner, syncRunner + var taskRunner, miscRunner, logRunner ratelimit.Runner + taskRunner, miscRunner, logRunner = syncRunner, syncRunner, syncRunner if c.GetScheduleConfig().EnableHeartbeatConcurrentRunner { - taskRunner = c.heartbeatRunnner + taskRunner = c.heartbeatRunner + miscRunner = c.miscRunner logRunner = c.logRunner } @@ -51,6 +52,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { Context: c.ctx, Tracer: tracer, TaskRunner: taskRunner, + MiscRunner: miscRunner, LogRunner: logRunner, } tracer.Begin() diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index d35b7f005848..9e712b808f38 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -811,3 +811,22 @@ func (suite *httpClientTestSuite) checkUpdateKeyspaceGCManagementType(mode mode, re.True(ok) re.Equal(expectGCManagementType, val) } + +func (suite *httpClientTestSuite) TestGetHealthStatus() { + suite.RunTestInTwoModes(suite.checkGetHealthStatus) +} + +func (suite *httpClientTestSuite) checkGetHealthStatus(mode mode, client pd.Client) { + re := suite.Require() + env := suite.env[mode] + + healths, err := client.GetHealthStatus(env.ctx) + re.NoError(err) + re.Len(healths, 2) + sort.Slice(healths, func(i, j int) bool { + return healths[i].Name < healths[j].Name + }) + re.Equal("pd1", healths[0].Name) + re.Equal("pd2", healths[1].Name) + re.True(healths[0].Health && healths[1].Health) +} diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index bb231f747b75..7d07b668c800 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -139,7 +139,7 @@ require ( github.com/shoenig/go-m1cpu v0.1.5 // indirect github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072 // indirect + github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99 // indirect github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index eeb2d73ba7fa..0701b42aea71 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -427,8 +427,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072 h1:Txo4SXVJq/OgEjwgkWoxkMoTjGlcrgsQE/XSghjmu0w= -github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072/go.mod h1:+4nWMF0+CqEcU74SnX2NxaGqZ8zX4pcQ8Jcs77DbX5A= +github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99 h1:fmanhZtn5RKRljCjX46H+Q9/PECsHbflXm0RdrnK9e4= +github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99/go.mod h1:+4nWMF0+CqEcU74SnX2NxaGqZ8zX4pcQ8Jcs77DbX5A= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 909972f03154..f7b892ce77d7 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/mock/mockid" "github.com/tikv/pd/pkg/storage/endpoint" tsopkg "github.com/tikv/pd/pkg/tso" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -56,6 +57,13 @@ type tsoKeyspaceGroupManagerTestSuite struct { pdLeaderServer *tests.TestServer // tsoCluster is the TSO service cluster. tsoCluster *tests.TestTSOCluster + + allocator *mockid.IDAllocator +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) allocID() uint32 { + id, _ := suite.allocator.Alloc() + return uint32(id) } func TestTSOKeyspaceGroupManager(t *testing.T) { @@ -77,6 +85,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { re.NoError(suite.pdLeaderServer.BootstrapCluster()) suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr()) re.NoError(err) + suite.allocator = mockid.NewIDAllocator() + suite.allocator.SetBase(uint64(time.Now().Second())) } func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() { @@ -166,9 +176,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe keyspaceGroupID uint32 keyspaceIDs []uint32 }{ - {0, []uint32{0, 10}}, - {1, []uint32{1, 11}}, - {2, []uint32{2, 12}}, + {suite.allocID(), []uint32{0, 10}}, + {suite.allocID(), []uint32{1, 11}}, + {suite.allocID(), []uint32{2, 12}}, } for _, param := range params { @@ -242,51 +252,53 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re := suite.Require() - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Create the keyspace group `oldID` with keyspaces [111, 222, 333]. + oldID := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: oldID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID) + re.Equal(oldID, kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Get a TSO from the keyspace group 1. + // Get a TSO from the keyspace group `oldID`. var ( ts pdpb.Timestamp err error ) testutil.Eventually(re, func() bool { - ts, err = suite.requestTSO(re, 222, 1) + ts, err = suite.requestTSO(re, 222, oldID) return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 }) ts.Physical += time.Hour.Milliseconds() - // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + // Set the TSO of the keyspace group `oldID` to a large value. + err = suite.tsoCluster.GetPrimaryServer(222, oldID).ResetTS(tsoutil.GenerateTS(&ts), false, true, oldID) re.NoError(err) - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, + // Split the keyspace group `oldID` to `newID`. + newID := suite.allocID() + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: newID, Keyspaces: []uint32{222, 333}, }) // Wait for the split to complete automatically even there is no TSO request from the outside. testutil.Eventually(re, func() bool { - kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID) if code != http.StatusOK { return false } - re.Equal(uint32(2), kg2.ID) + re.Equal(newID, kg2.ID) re.Equal([]uint32{222, 333}, kg2.Keyspaces) return !kg2.IsSplitting() }) - // Check the split TSO from keyspace group 2 now. - splitTS, err := suite.requestTSO(re, 222, 2) + // Check the split TSO from keyspace group `newID` now. + splitTS, err := suite.requestTSO(re, 222, newID) re.NoError(err) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) } @@ -304,60 +316,62 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() { re := suite.Require() - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Create the keyspace group `oldID` with keyspaces [111, 222, 333]. + oldID := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: oldID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID) + re.Equal(oldID, kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, + // Split the keyspace group `oldID` to `newID`. + newID := suite.allocID() + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: newID, Keyspaces: []uint32{222, 333}, }) - kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) - re.Equal(uint32(2), kg2.ID) + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID) + re.Equal(newID, kg2.ID) re.Equal([]uint32{222, 333}, kg2.Keyspaces) re.True(kg2.IsSplitTarget()) // Check the leadership. - member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, 1).GetMember(111, 1) + member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, oldID).GetMember(111, oldID) re.NoError(err) re.NotNil(member1) - member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2) + member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, newID).GetMember(222, newID) re.NoError(err) re.NotNil(member2) - // Wait for the leader of the keyspace group 1 and 2 to be elected. + // Wait for the leader of the keyspace group `oldID` and `newID` to be elected. testutil.Eventually(re, func() bool { return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0 }) - // Check if the leader of the keyspace group 1 and 2 are the same. + // Check if the leader of the keyspace group `oldID` and `newID` are the same. re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) - // Resign and block the leader of the keyspace group 1 from being elected. + // Resign and block the leader of the keyspace group `oldID` from being elected. member1.(*member.Participant).SetCampaignChecker(func(*election.Leadership) bool { return false }) member1.ResetLeader() - // The leader of the keyspace group 2 should be resigned also. + // The leader of the keyspace group `newID` should be resigned also. testutil.Eventually(re, func() bool { return member2.IsLeader() == false }) - // Check if the leader of the keyspace group 1 and 2 are the same again. + // Check if the leader of the keyspace group `oldID` and `newID` are the same again. member1.(*member.Participant).SetCampaignChecker(nil) testutil.Eventually(re, func() bool { return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0 }) re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) // Wait for the keyspace groups to finish the split. - waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333}) + waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{111}, []uint32{222, 333}) } func waitFinishSplit( @@ -390,30 +404,32 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() re := suite.Require() // Enable the failpoint to slow down the system time to test whether the TSO is monotonic. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`)) - // Create the keyspace group 1 with keyspaces [444, 555, 666]. + // Create the keyspace group `oldID` with keyspaces [444, 555, 666]. + oldID := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: oldID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{444, 555, 666}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID) + re.Equal(oldID, kg1.ID) re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces) re.False(kg1.IsSplitting()) // Request the TSO for keyspace 555 concurrently via client. - cancel := suite.dispatchClient(re, 555, 1) - // Split the keyspace group 1 to 2. - handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ - NewID: 2, + cancel := suite.dispatchClient(re, 555, oldID) + // Split the keyspace group `oldID` to `newID`. + newID := suite.allocID() + handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{ + NewID: newID, Keyspaces: []uint32{555, 666}, }) // Wait for the keyspace groups to finish the split. - waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) + waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{444}, []uint32{555, 666}) // Stop the client. cancel() re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) @@ -569,48 +585,49 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { re := suite.Require() - // Create the keyspace group 1 and 2 with keyspaces [111, 222] and [333]. + // Create the keyspace group `firstID` and `secondID` with keyspaces [111, 222] and [333]. + firstID, secondID := suite.allocID(), suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: firstID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222}, }, { - ID: 2, + ID: secondID, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{333}, }, }, }) - // Get a TSO from the keyspace group 1. + // Get a TSO from the keyspace group `firstID`. var ( ts pdpb.Timestamp err error ) testutil.Eventually(re, func() bool { - ts, err = suite.requestTSO(re, 222, 1) + ts, err = suite.requestTSO(re, 222, firstID) return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 }) ts.Physical += time.Hour.Milliseconds() - // Set the TSO of the keyspace group 1 to a large value. - err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + // Set the TSO of the keyspace group `firstID` to a large value. + err = suite.tsoCluster.GetPrimaryServer(222, firstID).ResetTS(tsoutil.GenerateTS(&ts), false, true, firstID) re.NoError(err) - // Merge the keyspace group 1 and 2 to the default keyspace group. + // Merge the keyspace group `firstID` and `secondID` to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ - MergeList: []uint32{1, 2}, + MergeList: []uint32{firstID, secondID}, }) - // Check the keyspace group 1 and 2 are merged to the default keyspace group. + // Check the keyspace group `firstID` and `secondID` are merged to the default keyspace group. kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) for _, keyspaceID := range []uint32{111, 222, 333} { re.Contains(kg.Keyspaces, keyspaceID) } re.True(kg.IsMergeTarget()) - // Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group 1. + // Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group`firstID`. var mergedTS pdpb.Timestamp testutil.Eventually(re, func() bool { mergedTS, err = suite.requestTSO(re, 333, mcsutils.DefaultKeyspaceGroupID) @@ -624,26 +641,27 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() { re := suite.Require() - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Create the keyspace group `id` with keyspaces [111, 222, 333]. + id := suite.allocID() handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: id, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) - re.Equal(uint32(1), kg1.ID) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, id) + re.Equal(id, kg1.ID) re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) re.False(kg1.IsMerging()) // Request the TSO for keyspace 222 concurrently via client. - cancel := suite.dispatchClient(re, 222, 1) + cancel := suite.dispatchClient(re, 222, id) // Merge the keyspace group 1 to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ - MergeList: []uint32{1}, + MergeList: []uint32{id}, }) // Wait for the default keyspace group to finish the merge. waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) @@ -671,24 +689,25 @@ func waitFinishMerge( func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() { re := suite.Require() - // Make sure the TSO of keyspace group 1 won't be initialized before it's merged. + // Make sure the TSO of keyspace group `id` won't be initialized before it's merged. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`)) // Request the TSO for the default keyspace concurrently via client. + id := suite.allocID() cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID) // Create the keyspace group 1 with keyspaces [111, 222, 333]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { - ID: 1, + ID: id, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), Keyspaces: []uint32{111, 222, 333}, }, }, }) - // Merge the keyspace group 1 to the default keyspace group. + // Merge the keyspace group `id` to the default keyspace group. handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ - MergeList: []uint32{1}, + MergeList: []uint32{id}, }) // Wait for the default keyspace group to finish the merge. waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333}) @@ -775,12 +794,13 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault keyspaces = make([]uint32, 0, keyspaceGroupNum) ) for i := 1; i <= keyspaceGroupNum; i++ { + id := suite.allocID() keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{ - ID: uint32(i), + ID: id, UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(), - Keyspaces: []uint32{uint32(i)}, + Keyspaces: []uint32{id}, }) - keyspaces = append(keyspaces, uint32(i)) + keyspaces = append(keyspaces, id) if i != keyspaceGroupNum { continue } @@ -797,7 +817,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault re.NotNil(svr) for i := 1; i < keyspaceGroupNum; i++ { // Check if the keyspace group is served. - svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(i), uint32(i)) + svr = suite.tsoCluster.WaitForPrimaryServing(re, keyspaceGroups[i].ID, keyspaceGroups[i].ID) re.NotNil(svr) } // Merge all the keyspace groups into the default keyspace group. diff --git a/tools/go.mod b/tools/go.mod index 2febbe1ad688..eb2c279e7fa9 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -9,6 +9,7 @@ replace ( require ( github.com/BurntSushi/toml v0.3.1 + github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e github.com/coreos/go-semver v0.3.1 github.com/docker/go-units v0.4.0 @@ -64,7 +65,6 @@ require ( github.com/bitly/go-simplejson v0.5.0 // indirect github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch // indirect github.com/bytedance/sonic v1.9.1 // indirect - github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 // indirect github.com/cenkalti/backoff/v4 v4.0.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect @@ -141,7 +141,7 @@ require ( github.com/shoenig/go-m1cpu v0.1.5 // indirect github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072 // indirect + github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99 // indirect github.com/soheilhy/cmux v0.1.5 // indirect github.com/stretchr/objx v0.5.0 // indirect github.com/swaggo/files v0.0.0-20210815190702-a29dd2bc99b2 // indirect diff --git a/tools/go.sum b/tools/go.sum index a3c41c16420a..535ea668b970 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -426,8 +426,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072 h1:Txo4SXVJq/OgEjwgkWoxkMoTjGlcrgsQE/XSghjmu0w= -github.com/smallnest/chanx v0.0.0-20221229104322-eb4c998d2072/go.mod h1:+4nWMF0+CqEcU74SnX2NxaGqZ8zX4pcQ8Jcs77DbX5A= +github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99 h1:fmanhZtn5RKRljCjX46H+Q9/PECsHbflXm0RdrnK9e4= +github.com/smallnest/chanx v1.2.1-0.20240521153536-01121e21ff99/go.mod h1:+4nWMF0+CqEcU74SnX2NxaGqZ8zX4pcQ8Jcs77DbX5A= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index fa77df6a1013..f7c04c3ca5cc 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -33,7 +33,7 @@ import ( ) const ( - pdControlCallerID = "pd-ctl" + PDControlCallerID = "pd-ctl" clusterPrefix = "pd/api/v1/cluster" ) @@ -107,7 +107,7 @@ func initNewPDClient(cmd *cobra.Command, opts ...pd.ClientOption) error { if PDCli != nil { PDCli.Close() } - PDCli = pd.NewClient(pdControlCallerID, getEndpoints(cmd), opts...) + PDCli = pd.NewClient(PDControlCallerID, getEndpoints(cmd), opts...).WithCallerID(PDControlCallerID) return nil } @@ -122,7 +122,7 @@ func initNewPDClientWithTLS(cmd *cobra.Command, caPath, certPath, keyPath string // TODO: replace dialClient with the PD HTTP client completely. var dialClient = &http.Client{ - Transport: apiutil.NewCallerIDRoundTripper(http.DefaultTransport, pdControlCallerID), + Transport: apiutil.NewCallerIDRoundTripper(http.DefaultTransport, PDControlCallerID), } // RequireHTTPSClient creates a HTTPS client if the related flags are set @@ -153,7 +153,7 @@ func initHTTPSClient(caPath, certPath, keyPath string) error { } dialClient = &http.Client{ Transport: apiutil.NewCallerIDRoundTripper( - &http.Transport{TLSClientConfig: tlsConfig}, pdControlCallerID), + &http.Transport{TLSClientConfig: tlsConfig}, PDControlCallerID), } return nil } diff --git a/tools/pd-ctl/pdctl/command/health_command.go b/tools/pd-ctl/pdctl/command/health_command.go index 50ac7763d28d..a10ee1183970 100644 --- a/tools/pd-ctl/pdctl/command/health_command.go +++ b/tools/pd-ctl/pdctl/command/health_command.go @@ -15,30 +15,25 @@ package command import ( - "net/http" - "github.com/spf13/cobra" ) -var ( - healthPrefix = "pd/api/v1/health" -) - // NewHealthCommand return a health subcommand of rootCmd func NewHealthCommand() *cobra.Command { m := &cobra.Command{ - Use: "health", - Short: "show all node's health information of the pd cluster", - Run: showHealthCommandFunc, + Use: "health", + Short: "show all node's health information of the PD cluster", + PersistentPreRunE: requirePDClient, + Run: showHealthCommandFunc, } return m } func showHealthCommandFunc(cmd *cobra.Command, _ []string) { - r, err := doRequest(cmd, healthPrefix, http.MethodGet, http.Header{}) + health, err := PDCli.GetHealthStatus(cmd.Context()) if err != nil { cmd.Println(err) return } - cmd.Println(r) + jsonPrint(cmd, health) } diff --git a/tools/pd-ctl/tests/global_test.go b/tools/pd-ctl/tests/global_test.go index f4f55e2af894..766e357088e8 100644 --- a/tools/pd-ctl/tests/global_test.go +++ b/tools/pd-ctl/tests/global_test.go @@ -16,33 +16,39 @@ package tests import ( "context" + "encoding/json" "fmt" "net/http" "testing" - "github.com/pingcap/log" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/assertutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server" cmd "github.com/tikv/pd/tools/pd-ctl/pdctl" - "go.uber.org/zap" + "github.com/tikv/pd/tools/pd-ctl/pdctl/command" ) -const pdControlCallerID = "pd-ctl" - func TestSendAndGetComponent(t *testing.T) { re := require.New(t) handler := func(context.Context, *server.Server) (http.Handler, apiutil.APIServiceGroup, error) { mux := http.NewServeMux() - mux.HandleFunc("/pd/api/v1/health", func(w http.ResponseWriter, r *http.Request) { + // check pd http sdk api + mux.HandleFunc("/pd/api/v1/cluster", func(w http.ResponseWriter, r *http.Request) { + callerID := apiutil.GetCallerIDOnHTTP(r) + re.Equal(command.PDControlCallerID, callerID) + cluster := &metapb.Cluster{Id: 1} + clusterBytes, err := json.Marshal(cluster) + re.NoError(err) + w.Write(clusterBytes) + }) + // check http client api + // TODO: remove this comment after replacing dialClient with the PD HTTP client completely. + mux.HandleFunc("/pd/api/v1/stores", func(w http.ResponseWriter, r *http.Request) { callerID := apiutil.GetCallerIDOnHTTP(r) - for k := range r.Header { - log.Info("header", zap.String("key", k)) - } - log.Info("caller id", zap.String("caller-id", callerID)) - re.Equal(pdControlCallerID, callerID) + re.Equal(command.PDControlCallerID, callerID) fmt.Fprint(w, callerID) }) info := apiutil.APIServiceGroup{ @@ -64,8 +70,15 @@ func TestSendAndGetComponent(t *testing.T) { }() cmd := cmd.GetRootCmd() - args := []string{"-u", pdAddr, "health"} + args := []string{"-u", pdAddr, "cluster"} output, err := ExecuteCommand(cmd, args...) re.NoError(err) - re.Equal(fmt.Sprintf("%s\n", pdControlCallerID), string(output)) + re.Equal(fmt.Sprintf("%s\n", `{ + "id": 1 +}`), string(output)) + + args = []string{"-u", pdAddr, "store"} + output, err = ExecuteCommand(cmd, args...) + re.NoError(err) + re.Equal(fmt.Sprintf("%s\n", command.PDControlCallerID), string(output)) } diff --git a/tools/pd-ut/alloc/check_env_dummy.go b/tools/pd-ut/alloc/check_env_dummy.go new file mode 100644 index 000000000000..b9b8eb4827aa --- /dev/null +++ b/tools/pd-ut/alloc/check_env_dummy.go @@ -0,0 +1,21 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//go:build !linux +// +build !linux + +package alloc + +func environmentCheck(_ string) bool { + return true +} diff --git a/tools/pd-ut/alloc/check_env_linux.go b/tools/pd-ut/alloc/check_env_linux.go new file mode 100644 index 000000000000..1a51f8075cfe --- /dev/null +++ b/tools/pd-ut/alloc/check_env_linux.go @@ -0,0 +1,42 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//go:build linux +// +build linux + +package alloc + +import ( + "github.com/cakturk/go-netstat/netstat" + "github.com/pingcap/log" + "go.uber.org/zap" +) + +func environmentCheck(addr string) bool { + valid, err := checkAddr(addr[len("http://"):]) + if err != nil { + log.Error("check port status failed", zap.Error(err)) + return false + } + return valid +} + +func checkAddr(addr string) (bool, error) { + tabs, err := netstat.TCPSocks(func(s *netstat.SockTabEntry) bool { + return s.RemoteAddr.String() == addr || s.LocalAddr.String() == addr + }) + if err != nil { + return false, err + } + return len(tabs) < 1, nil +} diff --git a/tools/pd-ut/alloc/server.go b/tools/pd-ut/alloc/server.go new file mode 100644 index 000000000000..aced73467fb7 --- /dev/null +++ b/tools/pd-ut/alloc/server.go @@ -0,0 +1,56 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alloc + +import ( + "errors" + "flag" + "fmt" + "net/http" + "os" + "time" + + "github.com/gin-gonic/gin" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/utils/tempurl" + "go.uber.org/zap" +) + +var statusAddress = flag.String("status-addr", "0.0.0.0:20180", "status address") + +func RunHTTPServer() *http.Server { + err := os.Setenv(tempurl.AllocURLFromUT, fmt.Sprintf("http://%s/alloc", *statusAddress)) + if err != nil { + fmt.Println(err) + } + + gin.SetMode(gin.ReleaseMode) + engine := gin.New() + engine.Use(gin.Recovery()) + + engine.GET("alloc", func(c *gin.Context) { + addr := Alloc() + c.String(http.StatusOK, addr) + }) + + srv := &http.Server{Addr: *statusAddress, Handler: engine.Handler(), ReadHeaderTimeout: 3 * time.Second} + go func() { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatal("server listen error", zap.Error(err)) + } + }() + + return srv +} diff --git a/tools/pd-ut/alloc/tempurl.go b/tools/pd-ut/alloc/tempurl.go new file mode 100644 index 000000000000..6be69dfe056e --- /dev/null +++ b/tools/pd-ut/alloc/tempurl.go @@ -0,0 +1,65 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alloc + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" +) + +var ( + testAddrMutex sync.Mutex + testAddrMap = make(map[string]struct{}) +) + +// Alloc allocates a local URL for testing. +func Alloc() string { + for i := 0; i < 50; i++ { + if u := tryAllocTestURL(); u != "" { + return u + } + time.Sleep(200 * time.Millisecond) + } + log.Fatal("failed to alloc test URL") + return "" +} + +func tryAllocTestURL() string { + l, err := net.Listen("tcp", "127.0.0.1:") + if err != nil { + return "" + } + addr := fmt.Sprintf("http://%s", l.Addr()) + err = l.Close() + if err != nil { + log.Fatal("close failed", errs.ZapError(err)) + } + + testAddrMutex.Lock() + defer testAddrMutex.Unlock() + if _, ok := testAddrMap[addr]; ok { + return "" + } + if !environmentCheck(addr) { + return "" + } + testAddrMap[addr] = struct{}{} + return addr +} diff --git a/tools/pd-ut/ut.go b/tools/pd-ut/ut.go index 9419363c1524..fbf2a6406515 100644 --- a/tools/pd-ut/ut.go +++ b/tools/pd-ut/ut.go @@ -16,6 +16,7 @@ package main import ( "bytes" + "context" "encoding/xml" "errors" "fmt" @@ -32,6 +33,9 @@ import ( "sync" "time" + "github.com/tikv/pd/tools/pd-ut/alloc" + "go.uber.org/zap" + // Set the correct value when it runs inside docker. _ "go.uber.org/automaxprocs" ) @@ -91,8 +95,7 @@ var ( var ( // runtime - p int - buildParallel int + parallel int workDir string coverFileTempDir string // arguments @@ -104,6 +107,7 @@ var ( func main() { race = handleFlag("--race") + parallelStr := stripFlag("--parallel") junitFile = stripFlag("--junitfile") coverProfile = stripFlag("--coverprofile") ignoreDir = stripFlag("--ignore") @@ -118,16 +122,33 @@ func main() { defer os.RemoveAll(coverFileTempDir) } - // Get the correct count of CPU if it's in docker. - p = runtime.GOMAXPROCS(0) - // We use 2 * p for `go build` to make it faster. - buildParallel = p * 2 var err error + procs := runtime.GOMAXPROCS(0) + if parallelStr == "" { + // Get the correct count of CPU if it's in docker. + parallel = procs + } else { + parallel, err = strconv.Atoi(parallelStr) + if err != nil { + fmt.Println("parse parallel error", err) + return + } + if parallel > procs { + fmt.Printf("Recommend to set parallel be same as the GOMAXPROCS=%d\n", procs) + } + } workDir, err = os.Getwd() if err != nil { fmt.Println("os.Getwd() error", err) } + srv := alloc.RunHTTPServer() + defer func() { + if err := srv.Shutdown(context.Background()); err != nil { + log.Fatal("server shutdown error", zap.Error(err)) + } + }() + var isSucceed bool // run all tests if len(os.Args) == 1 { @@ -342,12 +363,12 @@ func cmdRun(args ...string) bool { } } - fmt.Printf("building task finish, parallelism=%d, count=%d, takes=%v\n", buildParallel, len(tasks), time.Since(start)) + fmt.Printf("building task finish, parallelism=%d, count=%d, takes=%v\n", parallel*2, len(tasks), time.Since(start)) taskCh := make(chan task, 100) - works := make([]numa, p) + works := make([]numa, parallel) var wg sync.WaitGroup - for i := 0; i < p; i++ { + for i := 0; i < parallel; i++ { wg.Add(1) go works[i].worker(&wg, taskCh) } @@ -389,7 +410,7 @@ func cmdRun(args ...string) bool { // stripFlag strip the '--flag xxx' from the command line os.Args // Example of the os.Args changes -// Before: ut run pkg TestXXX --coverprofile xxx --junitfile yyy +// Before: ut run pkg TestXXX --coverprofile xxx --junitfile yyy --parallel 16 // After: ut run pkg TestXXX // The value of the flag is returned. func stripFlag(flag string) string { @@ -625,7 +646,7 @@ func (*numa) testCommand(pkg string, fn string) *exec.Cmd { args = append(args, "-test.coverprofile", tmpFile) } if strings.Contains(fn, "Suite") { - args = append(args, "-test.cpu", fmt.Sprint(p/2)) + args = append(args, "-test.cpu", fmt.Sprint(parallel/2)) } else { args = append(args, "-test.cpu", "1") } @@ -684,7 +705,7 @@ func buildTestBinaryMulti(pkgs []string) error { } // go test --exec=xprog --tags=tso_function_test,deadlock -vet=off --count=0 $(pkgs) - // workPath just like `/data/nvme0n1/husharp/proj/pd/tests/integrations` + // workPath just like `/pd/tests/integrations` xprogPath := path.Join(workDir, "bin/xprog") if strings.Contains(workDir, integrationsTestPath) { xprogPath = path.Join(workDir[:strings.LastIndex(workDir, integrationsTestPath)], "bin/xprog") @@ -694,7 +715,8 @@ func buildTestBinaryMulti(pkgs []string) error { packages = append(packages, path.Join(modulePath, pkg)) } - p := strconv.Itoa(buildParallel) + // We use 2 * parallel for `go build` to make it faster. + p := strconv.Itoa(parallel * 2) cmd := exec.Command("go", "test", "-p", p, "--exec", xprogPath, "-vet", "off", "--tags=tso_function_test,deadlock") if coverProfile != "" { coverpkg := "./..."