diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 3a95191747c..e70c1eb89ef 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -10066,6 +10066,188 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The time consumed of etcd endpoint health check in .99", + "editable": true, + "error": false, + "fill": 1, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 53 + }, + "hiddenSeries": false, + "id": 1607, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": true, + "min": false, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": false, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "8.5.27", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "editorMode": "code", + "expr": "histogram_quantile(0.99, sum(rate(pd_server_etcd_endpoint_latency_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", source=\"server-etcd-client\"}[30s])) by (instance, endpoint, le))", + "intervalFactor": 2, + "legendFormat": "{{instance}} -> {{endpoint}}", + "range": true, + "refId": "A", + "step": 4 + } + ], + "thresholds": [], + "timeRegions": [], + "title": "99% Endpoint health check latency", + "tooltip": { + "msResolution": false, + "shared": true, + "sort": 0, + "value_type": "cumulative" + }, + "type": "graph", + "xaxis": { + "mode": "time", + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "logBase": 1, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": true + } + ], + "yaxis": { + "align": false + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The state of the endpoint health.", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 61 + }, + "hiddenSeries": false, + "id": 1110, + "legend": { + "alignAsTable": true, + "avg": false, + "current": true, + "hideEmpty": true, + "hideZero": true, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "paceLength": 10, + "percentage": false, + "pluginVersion": "8.5.27", + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "editorMode": "code", + "expr": "pd_server_etcd_client{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", source=\"server-etcd-client\"}", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "{{instance}} - {{type}}", + "range": true, + "refId": "A" + } + ], + "thresholds": [], + "timeRegions": [], + "title": "Endpoint health state", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "mode": "time", + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "logBase": 1, + "show": true + }, + { + "format": "short", + "logBase": 1, + "show": true + } + ], + "yaxis": { + "align": false + } + }, { "aliasColors": {}, "bars": false, @@ -10078,7 +10260,7 @@ "h": 8, "w": 8, "x": 0, - "y": 53 + "y": 69 }, "id": 1109, "legend": { @@ -10169,7 +10351,7 @@ "h": 8, "w": 8, "x": 8, - "y": 53 + "y": 69 }, "id": 1110, "legend": { @@ -10261,7 +10443,7 @@ "h": 8, "w": 8, "x": 16, - "y": 53 + "y": 69 }, "id": 1111, "legend": { diff --git a/pkg/mcs/utils/util.go b/pkg/mcs/utils/util.go index 0e587688fce..f8e5f5df548 100644 --- a/pkg/mcs/utils/util.go +++ b/pkg/mcs/utils/util.go @@ -177,7 +177,7 @@ func InitClient(s server) error { if err != nil { return err } - etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls) + etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls, "mcs-etcd-client") if err != nil { return err } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 7db4d1ea56a..d56d0e662e3 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -253,7 +253,7 @@ func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, er } // CreateEtcdClient creates etcd v3 client with detecting endpoints. -func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) { +func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL, sourceOpt ...string) (*clientv3.Client, error) { urls := make([]string, 0, len(acURLs)) for _, u := range acURLs { urls = append(urls, u.String()) @@ -270,7 +270,11 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client failpoint.Inject("closeTick", func() { failpoint.Return(client, err) }) - initHealthChecker(tickerInterval, tlsConfig, client) + source := "default-etcd-client" + if len(sourceOpt) > 0 { + source = sourceOpt[0] + } + initHealthChecker(tickerInterval, tlsConfig, client, source) return client, err } diff --git a/pkg/utils/etcdutil/etcdutil_test.go b/pkg/utils/etcdutil/etcdutil_test.go index 55af4c92a2d..8fb8bc59f88 100644 --- a/pkg/utils/etcdutil/etcdutil_test.go +++ b/pkg/utils/etcdutil/etcdutil_test.go @@ -177,10 +177,8 @@ func TestEtcdClientSync(t *testing.T) { etcd2 := MustAddEtcdMember(t, &cfg1, client1) defer etcd2.Close() checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2}) - testutil.Eventually(re, func() bool { - // wait for etcd client sync endpoints - return len(client1.Endpoints()) == 2 - }) + // wait for etcd client sync endpoints + checkEtcdEndpointNum(re, client1, 2) // Remove the first member and close the etcd1. _, err := RemoveEtcdMember(client1, uint64(etcd1.Server.ID())) @@ -188,12 +186,21 @@ func TestEtcdClientSync(t *testing.T) { etcd1.Close() // Check the client can get the new member with the new endpoints. + checkEtcdEndpointNum(re, client1, 1) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) +} + +func checkEtcdEndpointNum(re *require.Assertions, client *clientv3.Client, num int) { testutil.Eventually(re, func() bool { - // wait for etcd client sync endpoints - return len(client1.Endpoints()) == 1 + return len(client.Endpoints()) == num }) +} - re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick")) +func checkEtcdClientHealth(re *require.Assertions, client *clientv3.Client) { + testutil.Eventually(re, func() bool { + return IsHealthy(context.Background(), client) + }) } func TestEtcdScaleInAndOut(t *testing.T) { @@ -228,25 +235,21 @@ func TestRandomKillEtcd(t *testing.T) { // Start a etcd server. etcds, client1, clean := NewTestEtcdCluster(t, 3) defer clean() - testutil.Eventually(re, func() bool { - return len(client1.Endpoints()) == 3 - }) + checkEtcdEndpointNum(re, client1, 3) // Randomly kill an etcd server and restart it cfgs := []embed.Config{etcds[0].Config(), etcds[1].Config(), etcds[2].Config()} for i := 0; i < 10; i++ { killIndex := rand.Intn(len(etcds)) etcds[killIndex].Close() - testutil.Eventually(re, func() bool { - return IsHealthy(context.Background(), client1) - }) + checkEtcdEndpointNum(re, client1, 2) + checkEtcdClientHealth(re, client1) etcd, err := embed.StartEtcd(&cfgs[killIndex]) re.NoError(err) <-etcd.Server.ReadyNotify() etcds[killIndex] = etcd - testutil.Eventually(re, func() bool { - return IsHealthy(context.Background(), client1) - }) + checkEtcdEndpointNum(re, client1, 3) + checkEtcdClientHealth(re, client1) } for _, etcd := range etcds { if etcd != nil { diff --git a/pkg/utils/etcdutil/health_checker.go b/pkg/utils/etcdutil/health_checker.go index 9ba7efa5903..f18d9c71479 100644 --- a/pkg/utils/etcdutil/health_checker.go +++ b/pkg/utils/etcdutil/health_checker.go @@ -22,6 +22,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -29,34 +30,52 @@ import ( "go.uber.org/zap" ) +const pickedCountThreshold = 3 + // healthyClient will wrap an etcd client and record its last health time. // The etcd client inside will only maintain one connection to the etcd server // to make sure each healthyClient could be used to check the health of a certain // etcd endpoint without involving the load balancer of etcd client. type healthyClient struct { *clientv3.Client - lastHealth time.Time + lastHealth time.Time + healthState prometheus.Gauge + latency prometheus.Observer } // healthChecker is used to check the health of etcd endpoints. Inside the checker, // we will maintain a map from each available etcd endpoint to its healthyClient. type healthChecker struct { + source string tickerInterval time.Duration tlsConfig *tls.Config // Store as endpoint(string) -> *healthyClient healthyClients sync.Map + // evictedEps records the endpoints which are evicted from the last health patrol, + // the value is the count the endpoint being picked continuously after evicted. + // Store as endpoint(string) -> pickedCount(int) + evictedEps sync.Map // client is the etcd client the health checker is guarding, it will be set with // the checked healthy endpoints dynamically and periodically. client *clientv3.Client + + endpointCountState prometheus.Gauge } // initHealthChecker initializes the health checker for etcd client. -func initHealthChecker(tickerInterval time.Duration, tlsConfig *tls.Config, client *clientv3.Client) { +func initHealthChecker( + tickerInterval time.Duration, + tlsConfig *tls.Config, + client *clientv3.Client, + source string, +) { healthChecker := &healthChecker{ - tickerInterval: tickerInterval, - tlsConfig: tlsConfig, - client: client, + source: source, + tickerInterval: tickerInterval, + tlsConfig: tlsConfig, + client: client, + endpointCountState: etcdStateGauge.WithLabelValues(source, endpointLabel), } // A health checker has the same lifetime with the given etcd client. ctx := client.Ctx() @@ -74,7 +93,8 @@ func (checker *healthChecker) syncer(ctx context.Context) { for { select { case <-ctx.Done(): - log.Info("etcd client is closed, exit update endpoint goroutine") + log.Info("etcd client is closed, exit update endpoint goroutine", + zap.String("source", checker.source)) return case <-ticker.C: checker.update() @@ -90,13 +110,13 @@ func (checker *healthChecker) inspector(ctx context.Context) { for { select { case <-ctx.Done(): - log.Info("etcd client is closed, exit health check goroutine") + log.Info("etcd client is closed, exit health check goroutine", + zap.String("source", checker.source)) checker.close() return case <-ticker.C: - lastEps := checker.client.Endpoints() - healthyEps := checker.patrol(ctx) - if len(healthyEps) == 0 { + lastEps, pickedEps, changed := checker.patrol(ctx) + if len(pickedEps) == 0 { // when no endpoint could be used, try to reset endpoints to update connect rather // than delete them to avoid there is no any endpoint in client. // Note: reset endpoints will trigger sub-connection closed, and then trigger reconnection. @@ -104,29 +124,32 @@ func (checker *healthChecker) inspector(ctx context.Context) { // and it cannot recover as soon as possible. if time.Since(lastAvailable) > etcdServerDisconnectedTimeout { log.Info("no available endpoint, try to reset endpoints", - zap.Strings("last-endpoints", lastEps)) - resetClientEndpoints(checker.client, lastEps...) - } - } else { - if !typeutil.AreStringSlicesEquivalent(healthyEps, lastEps) { - oldNum, newNum := len(lastEps), len(healthyEps) - checker.client.SetEndpoints(healthyEps...) - etcdStateGauge.WithLabelValues("endpoints").Set(float64(newNum)) - log.Info("update endpoints", - zap.String("num-change", fmt.Sprintf("%d->%d", oldNum, newNum)), zap.Strings("last-endpoints", lastEps), - zap.Strings("endpoints", checker.client.Endpoints())) + zap.String("source", checker.source)) + resetClientEndpoints(checker.client, lastEps...) } - lastAvailable = time.Now() + continue + } + if changed { + oldNum, newNum := len(lastEps), len(pickedEps) + checker.client.SetEndpoints(pickedEps...) + checker.endpointCountState.Set(float64(newNum)) + log.Info("update endpoints", + zap.String("num-change", fmt.Sprintf("%d->%d", oldNum, newNum)), + zap.Strings("last-endpoints", lastEps), + zap.Strings("endpoints", checker.client.Endpoints()), + zap.String("source", checker.source)) } + lastAvailable = time.Now() } } } func (checker *healthChecker) close() { checker.healthyClients.Range(func(key, value interface{}) bool { - client := value.(*healthyClient) - client.Close() + healthyCli := value.(*healthyClient) + healthyCli.healthState.Set(0) + healthyCli.Client.Close() return true }) } @@ -137,13 +160,17 @@ func resetClientEndpoints(client *clientv3.Client, endpoints ...string) { client.SetEndpoints(endpoints...) } +type healthProbe struct { + ep string + took time.Duration +} + // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/etcdctl/ctlv3/command/ep_command.go#L105-L145 -func (checker *healthChecker) patrol(ctx context.Context) []string { +func (checker *healthChecker) patrol(ctx context.Context) ([]string, []string, bool) { var ( - count = checker.clientCount() - hch = make(chan string, count) - healthyList = make([]string, 0, count) - wg sync.WaitGroup + count = checker.clientCount() + probeCh = make(chan healthProbe, count) + wg sync.WaitGroup ) checker.healthyClients.Range(func(key, value interface{}) bool { wg.Add(1) @@ -151,32 +178,165 @@ func (checker *healthChecker) patrol(ctx context.Context) []string { defer wg.Done() defer logutil.LogPanic() var ( - ep = key.(string) - client = value.(*healthyClient) + ep = key.(string) + healthyCli = value.(*healthyClient) + client = healthyCli.Client + healthState = healthyCli.healthState + latency = healthyCli.latency + start = time.Now() ) - if IsHealthy(ctx, client.Client) { - hch <- ep - checker.storeClient(ep, &healthyClient{ - Client: client.Client, - lastHealth: time.Now(), - }) + // Check the health of the endpoint. + healthy := IsHealthy(ctx, client) + took := time.Since(start) + latency.Observe(took.Seconds()) + if !healthy { + healthState.Set(0) + log.Warn("etcd endpoint is unhealthy", + zap.String("endpoint", ep), + zap.Duration("took", took), + zap.String("source", checker.source)) return } + healthState.Set(1) + // If the endpoint is healthy, update its last health time. + checker.storeClient(ep, client, start) + // Send the healthy probe result to the channel. + probeCh <- healthProbe{ep, took} }(key, value) return true }) wg.Wait() - close(hch) - for h := range hch { - healthyList = append(healthyList, h) + close(probeCh) + var ( + lastEps = checker.client.Endpoints() + pickedEps = checker.pickEps(probeCh) + ) + if len(pickedEps) > 0 { + checker.updateEvictedEps(lastEps, pickedEps) + pickedEps = checker.filterEps(pickedEps) } - return healthyList + return lastEps, pickedEps, !typeutil.AreStringSlicesEquivalent(lastEps, pickedEps) +} + +// Divide the acceptable latency range into several parts, and pick the endpoints which +// are in the first acceptable latency range. Currently, we only take the latency of the +// last health check into consideration, and maybe in the future we could introduce more +// factors to help improving the selection strategy. +func (checker *healthChecker) pickEps(probeCh <-chan healthProbe) []string { + var ( + count = len(probeCh) + pickedEps = make([]string, 0, count) + ) + if count == 0 { + return pickedEps + } + // Consume the `probeCh` to build a reusable slice. + probes := make([]healthProbe, 0, count) + for probe := range probeCh { + probes = append(probes, probe) + } + // Take the default value as an example, if we have 3 endpoints with latency like: + // - A: 175ms + // - B: 50ms + // - C: 2.5s + // the distribution will be like: + // - [0, 1s) -> {A, B} + // - [1s, 2s) + // - [2s, 3s) -> {C} + // - ... + // - [9s, 10s) + // Then the picked endpoints will be {A, B} and if C is in the last used endpoints, it will be evicted later. + factor := int(DefaultRequestTimeout / DefaultSlowRequestTime) + for i := 0; i < factor; i++ { + minLatency, maxLatency := DefaultSlowRequestTime*time.Duration(i), DefaultSlowRequestTime*time.Duration(i+1) + for _, probe := range probes { + if minLatency <= probe.took && probe.took < maxLatency { + log.Debug("pick healthy etcd endpoint within acceptable latency range", + zap.Duration("min-latency", minLatency), + zap.Duration("max-latency", maxLatency), + zap.Duration("took", probe.took), + zap.String("endpoint", probe.ep), + zap.String("source", checker.source)) + pickedEps = append(pickedEps, probe.ep) + } + } + if len(pickedEps) > 0 { + break + } + } + return pickedEps +} + +func (checker *healthChecker) updateEvictedEps(lastEps, pickedEps []string) { + // Create a set of picked endpoints for faster lookup + pickedSet := make(map[string]bool, len(pickedEps)) + for _, ep := range pickedEps { + pickedSet[ep] = true + } + // Reset the count to 0 if it's in evictedEps but not in the pickedEps. + checker.evictedEps.Range(func(key, _ interface{}) bool { + ep := key.(string) + if !pickedSet[ep] { + checker.evictedEps.Store(ep, 0) + log.Info("reset evicted etcd endpoint picked count", + zap.String("endpoint", ep), + zap.String("source", checker.source)) + } + return true + }) + // Find all endpoints which are in the lastEps but not in the pickedEps, + // and add them to the evictedEps. + for _, ep := range lastEps { + if pickedSet[ep] { + continue + } + checker.evictedEps.Store(ep, 0) + log.Info("evicted etcd endpoint found", + zap.String("endpoint", ep), + zap.String("source", checker.source)) + } + // Find all endpoints which are in both pickedEps and evictedEps to + // increase their picked count. + for _, ep := range pickedEps { + if count, ok := checker.evictedEps.Load(ep); ok { + // Increase the count the endpoint being picked continuously. + checker.evictedEps.Store(ep, count.(int)+1) + log.Info("evicted etcd endpoint picked again", + zap.Int("picked-count-threshold", pickedCountThreshold), + zap.Int("picked-count", count.(int)+1), + zap.String("endpoint", ep), + zap.String("source", checker.source)) + } + } +} + +// Filter out the endpoints that are in evictedEps and have not been continuously picked +// for `pickedCountThreshold` times still, this is to ensure the evicted endpoints truly +// become available before adding them back to the client. +func (checker *healthChecker) filterEps(eps []string) []string { + pickedEps := make([]string, 0, len(eps)) + for _, ep := range eps { + if count, ok := checker.evictedEps.Load(ep); ok { + if count.(int) < pickedCountThreshold { + continue + } + checker.evictedEps.Delete(ep) + log.Info("add evicted etcd endpoint back", + zap.Int("picked-count-threshold", pickedCountThreshold), + zap.Int("picked-count", count.(int)), + zap.String("endpoint", ep), + zap.String("source", checker.source)) + } + pickedEps = append(pickedEps, ep) + } + return pickedEps } func (checker *healthChecker) update() { - eps := syncUrls(checker.client) + eps := checker.syncURLs() if len(eps) == 0 { - log.Warn("no available etcd endpoint returned by etcd cluster") + log.Warn("no available etcd endpoint returned by etcd cluster", + zap.String("source", checker.source)) return } epMap := make(map[string]struct{}, len(eps)) @@ -189,7 +349,7 @@ func (checker *healthChecker) update() { for ep := range epMap { client := checker.loadClient(ep) if client == nil { - checker.addClient(ep, time.Now()) + checker.initClient(ep) continue } since := time.Since(client.lastHealth) @@ -197,7 +357,8 @@ func (checker *healthChecker) update() { if since > etcdServerOfflineTimeout { log.Info("etcd server might be offline, try to remove it", zap.Duration("since-last-health", since), - zap.String("endpoint", ep)) + zap.String("endpoint", ep), + zap.String("source", checker.source)) checker.removeClient(ep) continue } @@ -205,7 +366,8 @@ func (checker *healthChecker) update() { if since > etcdServerDisconnectedTimeout { log.Info("etcd server might be disconnected, try to reconnect", zap.Duration("since-last-health", since), - zap.String("endpoint", ep)) + zap.String("endpoint", ep), + zap.String("source", checker.source)) resetClientEndpoints(client.Client, ep) } } @@ -213,7 +375,9 @@ func (checker *healthChecker) update() { checker.healthyClients.Range(func(key, value interface{}) bool { ep := key.(string) if _, ok := epMap[ep]; !ok { - log.Info("remove stale etcd client", zap.String("endpoint", ep)) + log.Info("remove stale etcd client", + zap.String("endpoint", ep), + zap.String("source", checker.source)) checker.removeClient(ep) } return true @@ -236,40 +400,48 @@ func (checker *healthChecker) loadClient(ep string) *healthyClient { return nil } -func (checker *healthChecker) addClient(ep string, lastHealth time.Time) { +func (checker *healthChecker) initClient(ep string) { client, err := newClient(checker.tlsConfig, ep) if err != nil { log.Error("failed to create etcd healthy client", zap.String("endpoint", ep), + zap.String("source", checker.source), zap.Error(err)) return } - checker.healthyClients.Store(ep, &healthyClient{ - Client: client, - lastHealth: lastHealth, - }) + checker.storeClient(ep, client, time.Now()) } -func (checker *healthChecker) storeClient(ep string, client *healthyClient) { - checker.healthyClients.Store(ep, client) +func (checker *healthChecker) storeClient(ep string, client *clientv3.Client, lastHealth time.Time) { + checker.healthyClients.Store(ep, &healthyClient{ + Client: client, + lastHealth: lastHealth, + healthState: etcdStateGauge.WithLabelValues(checker.source, ep), + latency: etcdEndpointLatency.WithLabelValues(checker.source, ep), + }) } func (checker *healthChecker) removeClient(ep string) { if client, ok := checker.healthyClients.LoadAndDelete(ep); ok { - err := client.(*healthyClient).Close() - if err != nil { + healthyCli := client.(*healthyClient) + healthyCli.healthState.Set(0) + if err := healthyCli.Close(); err != nil { log.Error("failed to close etcd healthy client", zap.String("endpoint", ep), + zap.String("source", checker.source), zap.Error(err)) } } + checker.evictedEps.Delete(ep) } // See https://github.com/etcd-io/etcd/blob/85b640cee793e25f3837c47200089d14a8392dc7/clientv3/client.go#L170-L183 -func syncUrls(client *clientv3.Client) (eps []string) { - resp, err := ListEtcdMembers(clientv3.WithRequireLeader(client.Ctx()), client) +func (checker *healthChecker) syncURLs() (eps []string) { + resp, err := ListEtcdMembers(clientv3.WithRequireLeader(checker.client.Ctx()), checker.client) if err != nil { - log.Error("failed to list members", errs.ZapError(err)) + log.Error("failed to list members", + zap.String("source", checker.source), + errs.ZapError(err)) return nil } for _, m := range resp.Members { diff --git a/pkg/utils/etcdutil/health_checker_test.go b/pkg/utils/etcdutil/health_checker_test.go new file mode 100644 index 00000000000..3891fbfce7a --- /dev/null +++ b/pkg/utils/etcdutil/health_checker_test.go @@ -0,0 +1,325 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdutil + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type testCase struct { + healthProbes []healthProbe + expectedEvictedEps map[string]int + expectedPickedEps []string +} + +func check(re *require.Assertions, testCases []*testCase) { + checker := &healthChecker{} + lastEps := []string{} + for idx, tc := range testCases { + // Send the health probes to the channel. + probeCh := make(chan healthProbe, len(tc.healthProbes)) + for _, probe := range tc.healthProbes { + probeCh <- probe + } + close(probeCh) + // Pick and filter the endpoints. + pickedEps := checker.pickEps(probeCh) + checker.updateEvictedEps(lastEps, pickedEps) + pickedEps = checker.filterEps(pickedEps) + // Check the evicted states after finishing picking. + count := 0 + checker.evictedEps.Range(func(key, value interface{}) bool { + count++ + ep := key.(string) + times := value.(int) + re.Equal(tc.expectedEvictedEps[ep], times, "case %d ep %s", idx, ep) + return true + }) + re.Len(tc.expectedEvictedEps, count, "case %d", idx) + re.Equal(tc.expectedPickedEps, pickedEps, "case %d", idx) + lastEps = pickedEps + } +} + +// Test the endpoint picking and evicting logic. +func TestPickEps(t *testing.T) { + re := require.New(t) + testCases := []*testCase{ + // {} -> {A, B} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + }, + map[string]int{}, + []string{"A", "B"}, + }, + // {A, B} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{}, + []string{"A", "B", "C"}, + }, + // {A, B, C} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{}, + []string{"A", "B", "C"}, + }, + // {A, B, C} -> {C} + { + []healthProbe{ + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{"A": 0, "B": 0}, + []string{"C"}, + }, + // {C} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{"A": 1, "B": 1}, + []string{"C"}, + }, + // {C} -> {B, C} + { + []healthProbe{ + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{"A": 0, "B": 2}, + []string{"C"}, + }, + // {C} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{"A": 1}, + []string{"B", "C"}, + }, + // {B, C} -> {D} + { + []healthProbe{ + { + ep: "D", + took: time.Millisecond, + }, + }, + map[string]int{"A": 0, "B": 0, "C": 0}, + []string{"D"}, + }, + // {D} -> {B, C} + { + []healthProbe{ + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{"A": 0, "B": 1, "C": 1, "D": 0}, + []string{}, + }, + } + check(re, testCases) +} + +func TestLatencyPick(t *testing.T) { + re := require.New(t) + testCases := []*testCase{ + // {} -> {A, B} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + }, + map[string]int{}, + []string{"A", "B"}, + }, + // {A, B} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Second, + }, + }, + map[string]int{}, + []string{"A", "B"}, + }, + // {A, B} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Second, + }, + { + ep: "B", + took: time.Second, + }, + { + ep: "C", + took: 2 * time.Second, + }, + }, + map[string]int{}, + []string{"A", "B"}, + }, + // {A, B} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Second, + }, + { + ep: "B", + took: 2 * time.Second, + }, + { + ep: "C", + took: 3 * time.Second, + }, + }, + map[string]int{"B": 0}, + []string{"A"}, + }, + // {A} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Second, + }, + { + ep: "B", + took: time.Second, + }, + { + ep: "C", + took: time.Millisecond, + }, + }, + map[string]int{"A": 0, "B": 0}, + []string{"C"}, + }, + // {C} -> {A, B, C} + { + []healthProbe{ + { + ep: "A", + took: time.Millisecond, + }, + { + ep: "B", + took: time.Millisecond, + }, + { + ep: "C", + took: time.Second, + }, + }, + map[string]int{"A": 1, "B": 1, "C": 0}, + []string{}, + }, + } + check(re, testCases) +} diff --git a/pkg/utils/etcdutil/metrics.go b/pkg/utils/etcdutil/metrics.go index f78e0864ba2..5d0eae1607e 100644 --- a/pkg/utils/etcdutil/metrics.go +++ b/pkg/utils/etcdutil/metrics.go @@ -16,14 +16,32 @@ package etcdutil import "github.com/prometheus/client_golang/prometheus" -var etcdStateGauge = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: "pd", - Subsystem: "server", - Name: "etcd_client", - Help: "Etcd client states.", - }, []string{"type"}) +var ( + sourceLabel = "source" + typeLabel = "type" + endpointLabel = "endpoint" +) + +var ( + etcdStateGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "etcd_client", + Help: "Etcd client states.", + }, []string{sourceLabel, typeLabel}) + + etcdEndpointLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "server", + Name: "etcd_endpoint_latency_seconds", + Help: "Bucketed histogram of latency of health check.", + Buckets: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, []string{sourceLabel, endpointLabel}) +) func init() { prometheus.MustRegister(etcdStateGauge) + prometheus.MustRegister(etcdEndpointLatency) } diff --git a/pkg/utils/typeutil/comparison_test.go b/pkg/utils/typeutil/comparison_test.go index 9f6f832b1e8..b296405b3d5 100644 --- a/pkg/utils/typeutil/comparison_test.go +++ b/pkg/utils/typeutil/comparison_test.go @@ -53,3 +53,21 @@ func TestEqualFloat(t *testing.T) { re.True(Float64Equal(f1, f1*1.000)) re.True(Float64Equal(f1, f1/1.000)) } + +func TestAreStringSlicesEquivalent(t *testing.T) { + t.Parallel() + re := require.New(t) + re.True(AreStringSlicesEquivalent(nil, nil)) + re.True(AreStringSlicesEquivalent([]string{}, nil)) + re.True(AreStringSlicesEquivalent(nil, []string{})) + re.True(AreStringSlicesEquivalent([]string{}, []string{})) + re.True(AreStringSlicesEquivalent([]string{"a", "b"}, []string{"b", "a"})) + re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{"a", "b", "c"})) + re.False(AreStringSlicesEquivalent([]string{"a", "b", "c"}, []string{"a", "b"})) + re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{"a", "c"})) + re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{"c", "d"})) + re.False(AreStringSlicesEquivalent(nil, []string{"a", "b"})) + re.False(AreStringSlicesEquivalent([]string{"a", "b"}, nil)) + re.False(AreStringSlicesEquivalent([]string{}, []string{"a", "b"})) + re.False(AreStringSlicesEquivalent([]string{"a", "b"}, []string{})) +} diff --git a/server/server.go b/server/server.go index c37a0ea5fe3..0e9c3d84074 100644 --- a/server/server.go +++ b/server/server.go @@ -384,12 +384,12 @@ func (s *Server) startClient() error { } /* Starting two different etcd clients here is to avoid the throttling. */ // This etcd client will be used to access the etcd cluster to read and write all kinds of meta data. - s.client, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls) + s.client, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls, "server-etcd-client") if err != nil { return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } // This etcd client will only be used to read and write the election-related data, such as leader key. - s.electionClient, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls) + s.electionClient, err = etcdutil.CreateEtcdClient(tlsConfig, etcdCfg.ACUrls, "election-etcd-client") if err != nil { return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() }