Skip to content

Commit

Permalink
etcdutil: add dial keep alive params to switch connect as soon as pos…
Browse files Browse the repository at this point in the history
…sible (#6059)

ref #6046, close #6053, ref #6059

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot committed Mar 7, 2023
1 parent aed8a88 commit 253c798
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 93 deletions.
27 changes: 2 additions & 25 deletions pkg/storage/kv/kv_test.go
Expand Up @@ -16,22 +16,20 @@ package kv

import (
"context"
"fmt"
"net/url"
"path"
"sort"
"strconv"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

func TestEtcd(t *testing.T) {
re := require.New(t)
cfg := newTestSingleConfig(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
re.NoError(err)
defer etcd.Close()
Expand Down Expand Up @@ -122,27 +120,6 @@ func testRange(re *require.Assertions, kv Base) {
}
}

func newTestSingleConfig(t *testing.T) *embed.Config {
cfg := embed.NewConfig()
cfg.Name = "test_etcd"
cfg.Dir = t.TempDir()
cfg.WalDir = ""
cfg.Logger = "zap"
cfg.LogOutputs = []string{"stdout"}

pu, _ := url.Parse(tempurl.Alloc())
cfg.LPUrls = []url.URL{*pu}
cfg.APUrls = cfg.LPUrls
cu, _ := url.Parse(tempurl.Alloc())
cfg.LCUrls = []url.URL{*cu}
cfg.ACUrls = cfg.LCUrls

cfg.StrictReconfigCheck = false
cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0])
cfg.ClusterState = embed.ClusterStateFlagNew
return cfg
}

func testSaveMultiple(re *require.Assertions, kv Base, count int) {
err := kv.RunInTxn(context.Background(), func(txn Txn) error {
var saveErr error
Expand Down
28 changes: 22 additions & 6 deletions pkg/utils/etcdutil/etcdutil.go
Expand Up @@ -41,6 +41,13 @@ const (
// defaultAutoSyncInterval is the interval to sync etcd cluster.
defaultAutoSyncInterval = 60 * time.Second

// defaultDialKeepAliveTime is the time after which client pings the server to see if transport is alive.
defaultDialKeepAliveTime = 10 * time.Second

// defaultDialKeepAliveTimeout is the time that the client waits for a response for the
// keep-alive probe. If the response is not received in this time, the connection is closed.
defaultDialKeepAliveTimeout = 3 * time.Second

// DefaultDialTimeout is the maximum amount of time a dial will wait for a
// connection to setup. 30s is long enough for most of the network conditions.
DefaultDialTimeout = 30 * time.Second
Expand Down Expand Up @@ -212,17 +219,26 @@ func createEtcdClient(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
autoSyncInterval := defaultAutoSyncInterval
dialKeepAliveTime := defaultDialKeepAliveTime
dialKeepAliveTimeout := defaultDialKeepAliveTimeout
failpoint.Inject("autoSyncInterval", func() {
autoSyncInterval = 10 * time.Millisecond
})
failpoint.Inject("closeKeepAliveCheck", func() {
autoSyncInterval = 0
dialKeepAliveTime = 0
dialKeepAliveTimeout = 0
})
client, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
Endpoints: endpoints,
DialTimeout: defaultEtcdClientTimeout,
AutoSyncInterval: autoSyncInterval,
TLS: tlsConfig,
LogConfig: &lgc,
DialKeepAliveTime: dialKeepAliveTime,
DialKeepAliveTimeout: dialKeepAliveTimeout,
})
if err != nil {
if err == nil {
log.Info("create etcd v3 client", zap.Strings("endpoints", endpoints))
}
return client, err
Expand Down
209 changes: 148 additions & 61 deletions pkg/utils/etcdutil/etcdutil_test.go
Expand Up @@ -18,18 +18,23 @@ import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/pkg/types"
)

func TestMemberHelpers(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
Expand All @@ -54,43 +59,15 @@ func TestMemberHelpers(t *testing.T) {
re.Equal(uint64(etcd1.Server.ID()), listResp1.Members[0].ID)

// Test AddEtcdMember
// Make a new etcd config.
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting

// Add it to the cluster above.
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
re.NoError(err)

etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)

etcd2 := checkAddEtcdMember(t, cfg1, client1)
cfg2 := etcd2.Config()
defer etcd2.Close()
ep2 := cfg2.LCUrls[0].String()
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep2},
})
re.NoError(err)

<-etcd2.Server.ReadyNotify()

listResp2, err := ListEtcdMembers(client2)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
}
}
checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2})

// Test CheckClusterID
urlsMap, err := types.NewURLsMap(cfg2.InitialCluster)
Expand All @@ -109,7 +86,6 @@ func TestMemberHelpers(t *testing.T) {
}

func TestEtcdKVGet(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg := NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
Expand Down Expand Up @@ -160,7 +136,6 @@ func TestEtcdKVGet(t *testing.T) {
}

func TestEtcdKVPutWithTTL(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg := NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
Expand Down Expand Up @@ -201,7 +176,6 @@ func TestEtcdKVPutWithTTL(t *testing.T) {
}

func TestInitClusterID(t *testing.T) {
t.Parallel()
re := require.New(t)
cfg := NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
Expand Down Expand Up @@ -234,7 +208,6 @@ func TestInitClusterID(t *testing.T) {
}

func TestEtcdClientSync(t *testing.T) {
t.Parallel()
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)"))

Expand All @@ -252,45 +225,159 @@ func TestEtcdClientSync(t *testing.T) {
<-etcd1.Server.ReadyNotify()

// Add a new member.
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
}

func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re := require.New(t)
var err error
// Test with enable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.NoError(err)
require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))

// Test with disable check.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.Error(err)
require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck"))
}

func checkEtcdWithHangLeader(t *testing.T) error {
re := require.New(t)
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
re.NoError(err)
ep1 := cfg1.LCUrls[0].String()
<-etcd1.Server.ReadyNotify()

// Create a proxy to etcd1.
proxyAddr := tempurl.Alloc()
var enableDiscard atomic.Bool
go proxyWithDiscard(re, ep1, proxyAddr, &enableDiscard)

// Create a etcd client with etcd1 as endpoint.
urls, err := types.NewURLs([]string{proxyAddr})
re.NoError(err)
client1, err := createEtcdClient(nil, urls)
re.NoError(err)

// Add a new member and set the client endpoints to etcd1 and etcd2.
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer etcd2.Close()
checkMembers(re, client1, []*embed.Etcd{etcd1, etcd2})
time.Sleep(1 * time.Second) // wait for etcd client sync endpoints

// Hang the etcd1 and wait for the client to connect to etcd2.
enableDiscard.Store(true)
time.Sleep(defaultDialKeepAliveTime + defaultDialKeepAliveTimeout*2)
_, err = EtcdKVGet(client1, "test/key1")
return err
}

func checkAddEtcdMember(t *testing.T, cfg1 *embed.Config, client *clientv3.Client) *embed.Etcd {
re := require.New(t)
cfg2 := NewTestSingleConfig(t)
cfg2.Name = "etcd2"
cfg2.Name = genRandName()
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := AddEtcdMember(client1, []string{peerURL})
addResp, err := AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
defer func() {
etcd2.Close()
}()
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()
return etcd2
}

func checkMembers(re *require.Assertions, client *clientv3.Client, etcds []*embed.Etcd) {
// Check the client can get the new member.
listResp2, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp2.Members, 2)
for _, m := range listResp2.Members {
switch m.ID {
case uint64(etcd1.Server.ID()):
case uint64(etcd2.Server.ID()):
default:
t.Fatalf("unknown member: %v", m)
listResp, err := ListEtcdMembers(client)
re.NoError(err)
re.Len(listResp.Members, len(etcds))
inList := func(m *etcdserverpb.Member) bool {
for _, etcd := range etcds {
if m.ID == uint64(etcd.Server.ID()) {
return true
}
}
return false
}
for _, m := range listResp.Members {
re.True(inList(m))
}
}

// Remove the first member and close the etcd1.
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
func proxyWithDiscard(re *require.Assertions, server, proxy string, enableDiscard *atomic.Bool) {
server = strings.TrimPrefix(server, "http://")
proxy = strings.TrimPrefix(proxy, "http://")
l, err := net.Listen("tcp", proxy)
re.NoError(err)
time.Sleep(20 * time.Millisecond) // wait for etcd client sync endpoints and client will be connected to etcd2
etcd1.Close()
for {
connect, err := l.Accept()
re.NoError(err)
go func(connect net.Conn) {
serverConnect, err := net.Dial("tcp", server)
re.NoError(err)
pipe(connect, serverConnect, enableDiscard)
}(connect)
}
}

// Check the client can get the new member with the new endpoints.
listResp3, err := ListEtcdMembers(client1)
re.NoError(err)
re.Len(listResp3.Members, 1)
re.Equal(uint64(etcd2.Server.ID()), listResp3.Members[0].ID)
func pipe(src net.Conn, dst net.Conn, enableDiscard *atomic.Bool) {
errChan := make(chan error, 1)
go func() {
err := ioCopy(src, dst, enableDiscard)
errChan <- err
}()
go func() {
err := ioCopy(dst, src, enableDiscard)
errChan <- err
}()
<-errChan
dst.Close()
src.Close()
}

require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/autoSyncInterval"))
func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error) {
buffer := make([]byte, 32*1024)
for {
if enableDiscard.Load() {
io.Copy(io.Discard, src)
}
readNum, errRead := src.Read(buffer)
if readNum > 0 {
writeNum, errWrite := dst.Write(buffer[:readNum])
if errWrite != nil {
return errWrite
}
if readNum != writeNum {
return io.ErrShortWrite
}
}
if errRead != nil {
err = errRead
break
}
}
return err
}

0 comments on commit 253c798

Please sign in to comment.