Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6409
Browse files Browse the repository at this point in the history
ref tikv#6403

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx authored and ti-chi-bot committed May 12, 2023
1 parent e77b5ad commit fab219f
Show file tree
Hide file tree
Showing 3 changed files with 360 additions and 14 deletions.
16 changes: 3 additions & 13 deletions pkg/utils/etcdutil/etcdutil.go
Expand Up @@ -193,19 +193,9 @@ func EtcdKVPutWithTTL(ctx context.Context, c *clientv3.Client, key string, value
return kv.Put(ctx, key, value, clientv3.WithLease(grantResp.ID))
}

// CreateClientsWithMultiEndpoint creates etcd v3 client and http client.
func CreateClientsWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClientWithMultiEndpoint(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
httpClient := createHTTPClient(tlsConfig)
return client, httpClient, nil
}

// CreateClients creates etcd v3 client and http client.
func CreateClients(tlsConfig *tls.Config, acUrls url.URL) (*clientv3.Client, *http.Client, error) {
client, err := createEtcdClient(tlsConfig, acUrls)
client, err := CreateEtcdClient(tlsConfig, acUrls)
if err != nil {
return nil, nil, errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause()
}
Expand Down Expand Up @@ -252,9 +242,9 @@ func createEtcdClientWithMultiEndpoint(tlsConfig *tls.Config, acUrls []url.URL)
return client, err
}

// createEtcdClient creates etcd v3 client.
// CreateEtcdClient creates etcd v3 client.
// Note: it will be used by legacy pd-server, and only connect to leader only.
func createEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
func CreateEtcdClient(tlsConfig *tls.Config, acURL url.URL) (*clientv3.Client, error) {
lgc := zap.NewProductionConfig()
lgc.Encoding = log.ZapEncodingName
client, err := clientv3.New(clientv3.Config{
Expand Down
331 changes: 331 additions & 0 deletions pkg/utils/etcdutil/etcdutil_test.go
Expand Up @@ -257,7 +257,50 @@ func TestEtcdWithHangLeaderEnableCheck(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck", "return(true)"))
err = checkEtcdWithHangLeader(t)
re.Error(err)
<<<<<<< HEAD
require.NoError(t, failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck"))
=======
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/closeKeepAliveCheck"))
}

func TestEtcdScaleInAndOutWithoutMultiPoint(t *testing.T) {
re := require.New(t)
// Start a etcd server.
cfg1 := NewTestSingleConfig(t)
etcd1, err := embed.StartEtcd(cfg1)
defer func() {
etcd1.Close()
}()
re.NoError(err)
ep1 := cfg1.LCUrls[0].String()
<-etcd1.Server.ReadyNotify()

// Create two etcd clients with etcd1 as endpoint.
urls, err := types.NewURLs([]string{ep1})
re.NoError(err)
client1, err := CreateEtcdClient(nil, urls[0]) // execute member change operation with this client
defer func() {
client1.Close()
}()
re.NoError(err)
client2, err := CreateEtcdClient(nil, urls[0]) // check member change with this client
defer func() {
client2.Close()
}()
re.NoError(err)

// Add a new member and check members
etcd2 := checkAddEtcdMember(t, cfg1, client1)
defer func() {
etcd2.Close()
}()
checkMembers(re, client2, []*embed.Etcd{etcd1, etcd2})

// scale in etcd1
_, err = RemoveEtcdMember(client1, uint64(etcd1.Server.ID()))
re.NoError(err)
checkMembers(re, client2, []*embed.Etcd{etcd2})
>>>>>>> d2e73d106 (*: use another etcd client for election (#6409))
}

func checkEtcdWithHangLeader(t *testing.T) error {
Expand Down Expand Up @@ -381,3 +424,291 @@ func ioCopy(dst io.Writer, src io.Reader, enableDiscard *atomic.Bool) (err error
}
return err
}
<<<<<<< HEAD
=======

type loopWatcherTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
cleans []func()
etcd *embed.Etcd
client *clientv3.Client
config *embed.Config
}

func TestLoopWatcherTestSuite(t *testing.T) {
suite.Run(t, new(loopWatcherTestSuite))
}

func (suite *loopWatcherTestSuite) SetupSuite() {
t := suite.T()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cleans = make([]func(), 0)
// Start a etcd server and create a client with etcd1 as endpoint.
suite.config = NewTestSingleConfig(t)
suite.startEtcd()
ep1 := suite.config.LCUrls[0].String()
urls, err := types.NewURLs([]string{ep1})
suite.NoError(err)
suite.client, err = CreateEtcdClient(nil, urls[0])
suite.NoError(err)
suite.cleans = append(suite.cleans, func() {
suite.client.Close()
})
}

func (suite *loopWatcherTestSuite) TearDownSuite() {
suite.cancel()
suite.wg.Wait()
for _, clean := range suite.cleans {
clean()
}
}

func (suite *loopWatcherTestSuite) TestLoadWithoutKey() {
cache := struct {
sync.RWMutex
data map[string]struct{}
}{
data: make(map[string]struct{}),
}
watcher := NewLoopWatcher(
suite.ctx,
&suite.wg,
suite.client,
"test",
"TestLoadWithoutKey",
func(kv *mvccpb.KeyValue) error {
cache.Lock()
defer cache.Unlock()
cache.data[string(kv.Key)] = struct{}{}
return nil
},
func(kv *mvccpb.KeyValue) error { return nil },
func() error { return nil },
)

suite.wg.Add(1)
go watcher.StartWatchLoop()
err := watcher.WaitLoad()
suite.NoError(err) // although no key, watcher returns no error
cache.RLock()
defer cache.RUnlock()
suite.Len(cache.data, 0)
}

func (suite *loopWatcherTestSuite) TestCallBack() {
cache := struct {
sync.RWMutex
data map[string]struct{}
}{
data: make(map[string]struct{}),
}
result := make([]string, 0)
watcher := NewLoopWatcher(
suite.ctx,
&suite.wg,
suite.client,
"test",
"TestCallBack",
func(kv *mvccpb.KeyValue) error {
result = append(result, string(kv.Key))
return nil
},
func(kv *mvccpb.KeyValue) error {
cache.Lock()
defer cache.Unlock()
delete(cache.data, string(kv.Key))
return nil
},
func() error {
cache.Lock()
defer cache.Unlock()
for _, r := range result {
cache.data[r] = struct{}{}
}
result = result[:0]
return nil
},
clientv3.WithPrefix(),
)

suite.wg.Add(1)
go watcher.StartWatchLoop()
err := watcher.WaitLoad()
suite.NoError(err)

// put 10 keys
for i := 0; i < 10; i++ {
suite.put(fmt.Sprintf("TestCallBack%d", i), "")
}
time.Sleep(time.Second)
cache.RLock()
suite.Len(cache.data, 10)
cache.RUnlock()

// delete 10 keys
for i := 0; i < 10; i++ {
key := fmt.Sprintf("TestCallBack%d", i)
_, err = suite.client.Delete(suite.ctx, key)
suite.NoError(err)
}
time.Sleep(time.Second)
cache.RLock()
suite.Empty(cache.data)
cache.RUnlock()
}

func (suite *loopWatcherTestSuite) TestWatcherLoadLimit() {
for count := 1; count < 10; count++ {
for limit := 0; limit < 10; limit++ {
ctx, cancel := context.WithCancel(suite.ctx)
for i := 0; i < count; i++ {
suite.put(fmt.Sprintf("TestWatcherLoadLimit%d", i), "")
}
cache := struct {
sync.RWMutex
data []string
}{
data: make([]string, 0),
}
watcher := NewLoopWatcher(
ctx,
&suite.wg,
suite.client,
"test",
"TestWatcherLoadLimit",
func(kv *mvccpb.KeyValue) error {
cache.Lock()
defer cache.Unlock()
cache.data = append(cache.data, string(kv.Key))
return nil
},
func(kv *mvccpb.KeyValue) error {
return nil
},
func() error {
return nil
},
clientv3.WithPrefix(),
)
suite.wg.Add(1)
go watcher.StartWatchLoop()
err := watcher.WaitLoad()
suite.NoError(err)
cache.RLock()
suite.Len(cache.data, count)
cache.RUnlock()
cancel()
}
}
}

func (suite *loopWatcherTestSuite) TestWatcherBreak() {
cache := struct {
sync.RWMutex
data string
}{}
checkCache := func(expect string) {
testutil.Eventually(suite.Require(), func() bool {
cache.RLock()
defer cache.RUnlock()
return cache.data == expect
}, testutil.WithWaitFor(time.Second))
}

watcher := NewLoopWatcher(
suite.ctx,
&suite.wg,
suite.client,
"test",
"TestWatcherBreak",
func(kv *mvccpb.KeyValue) error {
if string(kv.Key) == "TestWatcherBreak" {
cache.Lock()
defer cache.Unlock()
cache.data = string(kv.Value)
}
return nil
},
func(kv *mvccpb.KeyValue) error { return nil },
func() error { return nil },
)
watcher.watchChangeRetryInterval = 100 * time.Millisecond

suite.wg.Add(1)
go watcher.StartWatchLoop()
err := watcher.WaitLoad()
suite.NoError(err)
checkCache("")

// we use close client and update client in failpoint to simulate the network error and recover
failpoint.Enable("github.com/tikv/pd/pkg/utils/etcdutil/updateClient", "return(true)")

// Case1: restart the etcd server
suite.etcd.Close()
suite.startEtcd()
suite.put("TestWatcherBreak", "1")
checkCache("1")

// Case2: close the etcd client and put a new value after watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
watcher.updateClientCh <- suite.client
suite.put("TestWatcherBreak", "2")
checkCache("2")

// Case3: close the etcd client and put a new value before watcher restarts
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "3")
watcher.updateClientCh <- suite.client
checkCache("3")

// Case4: close the etcd client and put a new value with compact
suite.client.Close()
suite.client, err = CreateEtcdClient(nil, suite.config.LCUrls[0])
suite.NoError(err)
suite.put("TestWatcherBreak", "4")
resp, err := EtcdKVGet(suite.client, "TestWatcherBreak")
suite.NoError(err)
revision := resp.Header.Revision
resp2, err := suite.etcd.Server.Compact(suite.ctx, &etcdserverpb.CompactionRequest{Revision: revision})
suite.NoError(err)
suite.Equal(revision, resp2.Header.Revision)
watcher.updateClientCh <- suite.client
checkCache("4")

// Case5: there is an error data in cache
cache.Lock()
cache.data = "error"
cache.Unlock()
watcher.ForceLoad()
checkCache("4")

failpoint.Disable("github.com/tikv/pd/pkg/utils/etcdutil/updateClient")
}

func (suite *loopWatcherTestSuite) startEtcd() {
etcd1, err := embed.StartEtcd(suite.config)
suite.NoError(err)
suite.etcd = etcd1
<-etcd1.Server.ReadyNotify()
suite.cleans = append(suite.cleans, func() {
suite.etcd.Close()
})
}

func (suite *loopWatcherTestSuite) put(key, value string) {
kv := clientv3.NewKV(suite.client)
_, err := kv.Put(suite.ctx, key, value)
suite.NoError(err)
resp, err := kv.Get(suite.ctx, key)
suite.NoError(err)
suite.Equal(value, string(resp.Kvs[0].Value))
}
>>>>>>> d2e73d106 (*: use another etcd client for election (#6409))

0 comments on commit fab219f

Please sign in to comment.