Skip to content

Commit

Permalink
etcdutil: consider the latency while patrolling the healthy endpoints (
Browse files Browse the repository at this point in the history
…#7737)

ref #7730

Consider the latency while patrolling the healthy endpoints to reduce the effect of slow nodes.
Now, there are the following strategies to select and remove unhealthy endpoints:

- Choose only the healthy endpoint within the lowest acceptable latency range.
- The evicted endpoint can only rejoin if it is selected again for three consecutive times.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 30, 2024
1 parent e8da033 commit 1c54865
Show file tree
Hide file tree
Showing 9 changed files with 812 additions and 90 deletions.
188 changes: 185 additions & 3 deletions metrics/grafana/pd.json
Expand Up @@ -10066,6 +10066,188 @@
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The time consumed of etcd endpoint health check in .99",
"editable": true,
"error": false,
"fill": 1,
"fillGradient": 0,
"grid": {},
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 53
},
"hiddenSeries": false,
"id": 1607,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sort": "current",
"sortDesc": false,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "8.5.27",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum(rate(pd_server_etcd_endpoint_latency_seconds_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", source=\"server-etcd-client\"}[30s])) by (instance, endpoint, le))",
"intervalFactor": 2,
"legendFormat": "{{instance}} -> {{endpoint}}",
"range": true,
"refId": "A",
"step": 4
}
],
"thresholds": [],
"timeRegions": [],
"title": "99% Endpoint health check latency",
"tooltip": {
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "cumulative"
},
"type": "graph",
"xaxis": {
"mode": "time",
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"logBase": 1,
"show": true
},
{
"format": "short",
"logBase": 1,
"show": true
}
],
"yaxis": {
"align": false
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The state of the endpoint health.",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 24,
"x": 0,
"y": 61
},
"hiddenSeries": false,
"id": 1110,
"legend": {
"alignAsTable": true,
"avg": false,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"paceLength": 10,
"percentage": false,
"pluginVersion": "8.5.27",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"editorMode": "code",
"expr": "pd_server_etcd_client{k8s_cluster=\"$k8s_cluster\", tidb_cluster=~\"$tidb_cluster.*\", source=\"server-etcd-client\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}} - {{type}}",
"range": true,
"refId": "A"
}
],
"thresholds": [],
"timeRegions": [],
"title": "Endpoint health state",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"mode": "time",
"show": true,
"values": []
},
"yaxes": [
{
"format": "none",
"logBase": 1,
"show": true
},
{
"format": "short",
"logBase": 1,
"show": true
}
],
"yaxis": {
"align": false
}
},
{
"aliasColors": {},
"bars": false,
Expand All @@ -10078,7 +10260,7 @@
"h": 8,
"w": 8,
"x": 0,
"y": 53
"y": 69
},
"id": 1109,
"legend": {
Expand Down Expand Up @@ -10169,7 +10351,7 @@
"h": 8,
"w": 8,
"x": 8,
"y": 53
"y": 69
},
"id": 1110,
"legend": {
Expand Down Expand Up @@ -10261,7 +10443,7 @@
"h": 8,
"w": 8,
"x": 16,
"y": 53
"y": 69
},
"id": 1111,
"legend": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/utils/util.go
Expand Up @@ -177,7 +177,7 @@ func InitClient(s server) error {
if err != nil {
return err
}
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls)
etcdClient, err := etcdutil.CreateEtcdClient(tlsConfig, backendUrls, "mcs-etcd-client")
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/utils/etcdutil/etcdutil.go
Expand Up @@ -253,7 +253,7 @@ func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, er
}

// CreateEtcdClient creates etcd v3 client with detecting endpoints.
func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client, error) {
func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL, sourceOpt ...string) (*clientv3.Client, error) {
urls := make([]string, 0, len(acURLs))
for _, u := range acURLs {
urls = append(urls, u.String())
Expand All @@ -270,7 +270,11 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL) (*clientv3.Client
failpoint.Inject("closeTick", func() {
failpoint.Return(client, err)
})
initHealthChecker(tickerInterval, tlsConfig, client)
source := "default-etcd-client"
if len(sourceOpt) > 0 {
source = sourceOpt[0]
}
initHealthChecker(tickerInterval, tlsConfig, client, source)

return client, err
}
Expand Down
35 changes: 19 additions & 16 deletions pkg/utils/etcdutil/etcdutil_test.go
Expand Up @@ -177,23 +177,30 @@ func TestEtcdClientSync(t *testing.T) {
etcd2 := MustAddEtcdMember(t, &cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
testutil.Eventually(re, func() bool {
// wait for etcd client sync endpoints
return len(client1.Endpoints()) == 2
})
// wait for etcd client sync endpoints
checkEtcdEndpointNum(re, client1, 2)

// Remove the first member and close the etcd1.
_, err := RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
etcd1.Close()

// Check the client can get the new member with the new endpoints.
checkEtcdEndpointNum(re, client1, 1)

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
}

func checkEtcdEndpointNum(re *require.Assertions, client *clientv3.Client, num int) {
testutil.Eventually(re, func() bool {
// wait for etcd client sync endpoints
return len(client1.Endpoints()) == 1
return len(client.Endpoints()) == num
})
}

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/fastTick"))
func checkEtcdClientHealth(re *require.Assertions, client *clientv3.Client) {
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client)
})
}

func TestEtcdScaleInAndOut(t *testing.T) {
Expand Down Expand Up @@ -228,25 +235,21 @@ func TestRandomKillEtcd(t *testing.T) {
// Start a etcd server.
etcds, client1, clean := NewTestEtcdCluster(t, 3)
defer clean()
testutil.Eventually(re, func() bool {
return len(client1.Endpoints()) == 3
})
checkEtcdEndpointNum(re, client1, 3)

// Randomly kill an etcd server and restart it
cfgs := []embed.Config{etcds[0].Config(), etcds[1].Config(), etcds[2].Config()}
for i := 0; i < 10; i++ {
killIndex := rand.Intn(len(etcds))
etcds[killIndex].Close()
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client1)
})
checkEtcdEndpointNum(re, client1, 2)
checkEtcdClientHealth(re, client1)
etcd, err := embed.StartEtcd(&cfgs[killIndex])
re.NoError(err)
<-etcd.Server.ReadyNotify()
etcds[killIndex] = etcd
testutil.Eventually(re, func() bool {
return IsHealthy(context.Background(), client1)
})
checkEtcdEndpointNum(re, client1, 3)
checkEtcdClientHealth(re, client1)
}
for _, etcd := range etcds {
if etcd != nil {
Expand Down

0 comments on commit 1c54865

Please sign in to comment.