Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdutil: support multi backends client #6046

Merged
merged 4 commits into from Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/base_client.go
Expand Up @@ -143,7 +143,7 @@ func (c *baseClient) memberLoop() {
failpoint.Continue()
})
if err := c.updateMember(); err != nil {
log.Error("[pd] failed updateMember", errs.ZapError(err))
log.Error("[pd] failed to update member", errs.ZapError(err))
}
}
}
Expand Down Expand Up @@ -303,7 +303,7 @@ func (c *baseClient) updateMember() error {
errTSO = c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders())
}

// Failed to get PD leader
// Failed to get members
if err != nil {
log.Info("[pd] cannot update member from this address",
zap.String("address", u),
Expand All @@ -327,7 +327,7 @@ func (c *baseClient) updateMember() error {
// the error of `switchTSOAllocatorLeader` will be returned.
return errTSO
}
return errs.ErrClientGetLeader.FastGenByArgs(c.GetURLs())
return errs.ErrClientGetMember.FastGenByArgs(c.GetURLs())
}

func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Duration) (*pdpb.GetMembersResponse, error) {
Expand Down
1 change: 0 additions & 1 deletion pkg/mcs/resource_manager/server/server.go
Expand Up @@ -151,7 +151,6 @@ func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
}

func (s *Server) initClient() error {
// TODO: We need to keep all backend endpoints and keep updating them to the latest. Once one of them failed, need to try another one.
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand Down
69 changes: 32 additions & 37 deletions pkg/utils/etcdutil/etcdutil.go
Expand Up @@ -17,21 +17,18 @@ package etcdutil
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"net/http"
"net/url"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/typeutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/pkg/types"
"go.uber.org/zap"
Expand All @@ -41,6 +38,9 @@ const (
// defaultEtcdClientTimeout is the default timeout for etcd client.
defaultEtcdClientTimeout = 3 * time.Second

// defaultAutoSyncInterval is the interval to sync etcd cluster.
defaultAutoSyncInterval = 60 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, Is there some releationship between the timeouts, like defaultEtcdClientTimeout ,DefaultDialTimeout,DefaultRequestTimeout?


// DefaultDialTimeout is the maximum amount of time a dial will wait for a
// connection to setup. 30s is long enough for most of the network conditions.
DefaultDialTimeout = 30 * time.Second
Expand Down Expand Up @@ -186,53 +186,48 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
}

// NewTestSingleConfig is used to create a etcd config for the unit test purpose.
func NewTestSingleConfig(t *testing.T) *embed.Config {
cfg := embed.NewConfig()
cfg.Name = "test_etcd"
cfg.Dir = t.TempDir()
cfg.WalDir = ""
cfg.Logger = "zap"
cfg.LogOutputs = []string{"stdout"}

pu, _ := url.Parse(tempurl.Alloc())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cu, _ := url.Parse(tempurl.Alloc())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls

cfg.StrictReconfigCheck = false
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0])
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
endpoints := []string{acUrls[0].String()}
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
TLS: tlsConfig,
LogConfig: &lgc,
})
client, err := createEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}

httpClient := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: tlsConfig,
},
}
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints))
return client, httpClient, nil
}

func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, error) {
if len(acUrls) == 0 {
return nil, errs.ErrNewEtcdClient.FastGenByArgs("no available etcd address")
}
endpoints := make([]string, 0, len(acUrls))
for _, u := range acUrls {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
endpoints = append(endpoints, u.String())
}
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
autoSyncInterval := defaultAutoSyncInterval
failpoint.Inject("autoSyncInterval", func() {
autoSyncInterval = 10 * time.Millisecond
})
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
})
if err != nil {
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints))
}
return client, err
}

// InitClusterID creates a cluster ID for the given key if it hasn't existed.
// This function assumes the cluster ID has already existed and always use a
// cheaper read to retrieve it; if it doesn't exist, invoke the more expensive
Expand Down
64 changes: 63 additions & 1 deletion pkg/utils/etcdutil/etcdutil_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
Expand Down Expand Up @@ -78,7 +79,6 @@ func TestMemberHelpers(t *testing.T) {
re.NoError(err)

<-etcd2.Server.ReadyNotify()
re.NoError(err)

listResp2, err := ListEtcdMembers(client2)
re.NoError(err)
Expand Down Expand Up @@ -232,3 +232,65 @@ func TestInitClusterID(t *testing.T) {
re.NoError(err)
re.Equal(clusterID, clusterID1)
}

func TestEtcdClientSync(t *testing.T) {
t.Parallel()
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)"))

// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
re.NoError(err)

// Create a etcd client with etcd1 as endpoint.
ep1 := cfg1.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := createEtcdClient(nil, urls)
re.NoError(err)
<-etcd1.Server.ReadyNotify()

// Add a new member.
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()

// Check the client can get the new member.
listResp2, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
}
}

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
}
46 changes: 46 additions & 0 deletions pkg/utils/etcdutil/testutil.go
@@ -0,0 +1,46 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdutil

import (
"fmt"
"net/url"
"testing"

"github.com/tikv/pd/pkg/utils/tempurl"
"go.etcd.io/etcd/embed"
)

// NewTestSingleConfig is used to create a etcd config for the unit test purpose.
func NewTestSingleConfig(t *testing.T) *embed.Config {
cfg := embed.NewConfig()
cfg.Name = "test_etcd"
cfg.Dir = t.TempDir()
cfg.WalDir = ""
cfg.Logger = "zap"
cfg.LogOutputs = []string{"stdout"}

pu, _ := url.Parse(tempurl.Alloc())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cu, _ := url.Parse(tempurl.Alloc())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls

cfg.StrictReconfigCheck = false
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0])
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}
2 changes: 1 addition & 1 deletion tests/client/client_test.go
Expand Up @@ -91,7 +91,7 @@ func TestClientClusterIDCheck(t *testing.T) {
pd.SecurityOption{}, pd.WithMaxErrorRetry(1),
)
re.Error(err)
re.Contains(err.Error(), "ErrClientGetLeader")
re.Contains(err.Error(), "ErrClientGetMember")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipFirstUpdateMember"))
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipClusterIDCheck"))
}
Expand Down