Skip to content

Commit

Permalink
test: reduce redundant code about etcd server and logger (tikv#7007)
Browse files Browse the repository at this point in the history
close tikv#6890

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 authored and rleungx committed Dec 1, 2023
1 parent 1e968b6 commit 74e95e0
Show file tree
Hide file tree
Showing 16 changed files with 253 additions and 563 deletions.
18 changes: 3 additions & 15 deletions pkg/audit/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"testing"
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/requestutil"
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestLabelMatcher(t *testing.T) {
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestLocalLogBackendUsingFile(t *testing.T) {
t.Parallel()
re := require.New(t)
backend := NewLocalLogBackend(true)
fname := initLog()
fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody"))
re.False(backend.ProcessHTTPRequest(req))
Expand Down Expand Up @@ -125,7 +125,7 @@ func BenchmarkLocalLogAuditUsingTerminal(b *testing.B) {
func BenchmarkLocalLogAuditUsingFile(b *testing.B) {
b.StopTimer()
backend := NewLocalLogBackend(true)
fname := initLog()
fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody"))
b.StartTimer()
Expand All @@ -135,15 +135,3 @@ func BenchmarkLocalLogAuditUsingFile(b *testing.B) {
backend.ProcessHTTPRequest(req)
}
}

func initLog() string {
cfg := &log.Config{}
f, _ := os.CreateTemp("/tmp", "pd_tests")
fname := f.Name()
f.Close()
cfg.File.Filename = fname
cfg.Level = "info"
lg, p, _ := log.InitLogger(cfg)
log.ReplaceGlobals(lg, p)
return fname
}
132 changes: 25 additions & 107 deletions pkg/election/leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package election

import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
Expand All @@ -35,27 +33,15 @@ const defaultLeaseTimeout = 1

func TestLeadership(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()

// Campaign the same leadership
leadership1 := NewLeadership(client, "/test_leader", "test_leader_1")
leadership2 := NewLeadership(client, "/test_leader", "test_leader_2")

// leadership1 starts first and get the leadership
err = leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
err := leadership1.Campaign(defaultLeaseTimeout, "test_leader_1")
re.NoError(err)
// leadership2 starts then and can not get the leadership
err = leadership2.Campaign(defaultLeaseTimeout, "test_leader_2")
Expand Down Expand Up @@ -168,60 +154,24 @@ func TestExitWatch(t *testing.T) {
// Case6: transfer leader without client reconnection.
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() {
cfg1 := server.Config()
cfg2 := etcdutil.NewTestSingleConfig(t)
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client)
client2, err := etcdutil.CreateEtcdClient(nil, etcd2.Config().LCUrls)
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()
ep := cfg2.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

// close the original leader
server.Server.HardStop()
client1.Delete(context.Background(), leaderKey)
// delete the leader key with the new client
client2.Delete(context.Background(), leaderKey)
return func() {
etcd2.Close()
client2.Close()
}
})
// Case7: loss the quorum when the watch loop is running
checkExitWatch(t, leaderKey, func(server *embed.Etcd, client *clientv3.Client) func() {
tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests")
defer os.RemoveAll(tempStdoutFile.Name())
logCfg := &log.Config{}
logCfg.File.Filename = tempStdoutFile.Name()
logCfg.Level = "info"
lg, p, _ := log.InitLogger(logCfg)
log.ReplaceGlobals(lg, p)

cfg1 := server.Config()
cfg2 := etcdutil.NewTestSingleConfig(t)
cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.LPUrls[0])
cfg2.ClusterState = embed.ClusterStateFlagExisting
peerURL := cfg2.LPUrls[0].String()
addResp, err := etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd2, err := embed.StartEtcd(cfg2)
re.NoError(err)
re.Equal(uint64(etcd2.Server.ID()), addResp.Member.ID)
<-etcd2.Server.ReadyNotify()

cfg3 := etcdutil.NewTestSingleConfig(t)
cfg3.InitialCluster = cfg2.InitialCluster + fmt.Sprintf(",%s=%s", cfg3.Name, &cfg3.LPUrls[0])
cfg3.ClusterState = embed.ClusterStateFlagExisting
peerURL = cfg3.LPUrls[0].String()
addResp, err = etcdutil.AddEtcdMember(client, []string{peerURL})
re.NoError(err)
etcd3, err := embed.StartEtcd(cfg3)
re.NoError(err)
re.Equal(uint64(etcd3.Server.ID()), addResp.Member.ID)
<-etcd3.Server.ReadyNotify()
etcd2 := etcdutil.MustAddEtcdMember(t, &cfg1, client)
cfg2 := etcd2.Config()
etcd3 := etcdutil.MustAddEtcdMember(t, &cfg2, client)

resp2, err := client.MemberList(context.Background())
re.NoError(err)
Expand All @@ -237,24 +187,11 @@ func TestExitWatch(t *testing.T) {

func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embed.Etcd, client *clientv3.Client) func()) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
re.NoError(err)

<-etcd.Server.ReadyNotify()
defer client2.Close()

leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
leadership2 := NewLeadership(client2, leaderKey, "test_leader_2")
Expand All @@ -268,7 +205,7 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe
done <- struct{}{}
}()

cleanFunc := injectFunc(etcd, client2)
cleanFunc := injectFunc(servers[0], client2)
defer cleanFunc()

testutil.Eventually(re, func() bool {
Expand All @@ -283,33 +220,14 @@ func checkExitWatch(t *testing.T, leaderKey string, injectFunc func(server *embe

func TestRequestProgress(t *testing.T) {
checkWatcherRequestProgress := func(injectWatchChanBlock bool) {
tempStdoutFile, _ := os.CreateTemp("/tmp", "pd_tests")
defer os.RemoveAll(tempStdoutFile.Name())
logCfg := &log.Config{}
logCfg.File.Filename = tempStdoutFile.Name()
logCfg.Level = "debug"
lg, p, _ := log.InitLogger(logCfg)
log.ReplaceGlobals(lg, p)

re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
fname := testutil.InitTempFileLogger("debug")
defer os.RemoveAll(fname)
servers, client1, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
client2, err := etcdutil.CreateEtcdClient(nil, servers[0].Config().LCUrls)
re.NoError(err)

ep := cfg.LCUrls[0].String()
client1, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)
client2, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()
defer client2.Close()

leaderKey := "/test_leader"
leadership1 := NewLeadership(client1, leaderKey, "test_leader_1")
Expand All @@ -328,14 +246,14 @@ func TestRequestProgress(t *testing.T) {
if injectWatchChanBlock {
failpoint.Enable("github.com/tikv/pd/pkg/election/watchChanBlock", "return(true)")
testutil.Eventually(re, func() bool {
b, _ := os.ReadFile(tempStdoutFile.Name())
b, _ := os.ReadFile(fname)
l := string(b)
return strings.Contains(l, "watch channel is blocked for a long time")
})
failpoint.Disable("github.com/tikv/pd/pkg/election/watchChanBlock")
} else {
testutil.Eventually(re, func() bool {
b, _ := os.ReadFile(tempStdoutFile.Name())
b, _ := os.ReadFile(fname)
l := string(b)
return strings.Contains(l, "watcher receives progress notify in watch loop")
})
Expand Down
36 changes: 21 additions & 15 deletions pkg/election/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,12 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
)

func TestLease(t *testing.T) {
re := require.New(t)
cfg := etcdutil.NewTestSingleConfig(t)
etcd, err := embed.StartEtcd(cfg)
defer func() {
etcd.Close()
}()
re.NoError(err)

ep := cfg.LCUrls[0].String()
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{ep},
})
re.NoError(err)

<-etcd.Server.ReadyNotify()
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()

// Create the lease.
lease1 := &lease{
Expand Down Expand Up @@ -101,3 +88,22 @@ func TestLease(t *testing.T) {
time.Sleep((defaultLeaseTimeout + 1) * time.Second)
re.True(lease1.IsExpired())
}

func TestLeaseKeepAlive(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()

// Create the lease.
lease := &lease{
Purpose: "test_lease",
client: client,
lease: clientv3.NewLease(client),
}

re.NoError(lease.Grant(defaultLeaseTimeout))
ch := lease.keepAliveWorker(context.Background(), 2*time.Second)
time.Sleep(2 * time.Second)
<-ch
re.NoError(lease.Close())
}

0 comments on commit 74e95e0

Please sign in to comment.