From 107b2a02b3adbae846ffbff7c9b30e5a7aa73447 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 5 Jun 2023 18:07:40 +0800 Subject: [PATCH] *: fix the split problem caused by no enough replicas (#6555) close tikv/pd#6550 Signed-off-by: Ryan Leung --- pkg/keyspace/tso_keyspace_group.go | 9 ++++- pkg/mcs/discovery/discover.go | 2 +- pkg/mcs/discovery/registry_entry.go | 2 +- server/server.go | 7 ++-- tests/cluster.go | 33 +++++++++++++++ tests/integrations/mcs/tso/api_test.go | 4 +- tests/pdctl/keyspace/keyspace_group_test.go | 45 +++++++++++++++++++++ 7 files changed, 94 insertions(+), 8 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 4012f13a909..2a477c3dea1 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -155,9 +155,11 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { ticker = time.NewTicker(time.Millisecond * 100) }) defer ticker.Stop() + log.Info("start to alloc nodes to all keyspace groups") for { select { case <-m.ctx.Done(): + log.Info("stop to alloc nodes to all keyspace groups") return case <-ticker.C: } @@ -170,6 +172,10 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { log.Error("failed to load all keyspace groups", zap.Error(err)) continue } + // if the default keyspace is not initialized, we should wait for the default keyspace to be initialized. + if len(groups) == 0 { + continue + } withError := false for _, group := range groups { if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount { @@ -184,6 +190,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { } if !withError { // all keyspace groups have equal or more than default replica count + log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node") return } } @@ -191,7 +198,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() { func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) { tsoServiceKey := discovery.TSOPath(clusterID) - tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/" + tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) putFn := func(kv *mvccpb.KeyValue) error { s := &discovery.ServiceRegistryEntry{} diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index 362652dbe5d..d3c06ad2cc8 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -22,7 +22,7 @@ import ( // Discover is used to get all the service instances of the specified service name. func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) { key := discoveryPath(clusterID, serviceName) + "/" - endKey := clientv3.GetPrefixRangeEnd(key) + "/" + endKey := clientv3.GetPrefixRangeEnd(key) withRange := clientv3.WithRange(endKey) resp, err := etcdutil.EtcdKVGet(cli, key, withRange) diff --git a/pkg/mcs/discovery/registry_entry.go b/pkg/mcs/discovery/registry_entry.go index 00a87502e4a..52751b430c4 100644 --- a/pkg/mcs/discovery/registry_entry.go +++ b/pkg/mcs/discovery/registry_entry.go @@ -23,7 +23,7 @@ import ( // ServiceRegistryEntry is the registry entry of a service type ServiceRegistryEntry struct { - ServiceAddr string `json:"serviceAddr"` + ServiceAddr string `json:"service-addr"` } // Serialize this service registry entry diff --git a/server/server.go b/server/server.go index 94f8915b961..b6b9dd2b238 100644 --- a/server/server.go +++ b/server/server.go @@ -559,13 +559,14 @@ func (s *Server) Run() error { if err := s.startEtcd(s.ctx); err != nil { return err } - failpoint.Inject("delayStartServer", func() { - time.Sleep(2 * time.Second) - }) + if err := s.startServer(s.ctx); err != nil { return err } + failpoint.Inject("delayStartServerLoop", func() { + time.Sleep(2 * time.Second) + }) s.startServerLoop(s.ctx) return nil diff --git a/tests/cluster.go b/tests/cluster.go index be7ec156628..7fa029487fe 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" + tsoserver "github.com/tikv/pd/pkg/mcs/tso/server" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/swaggerserver" @@ -83,6 +84,38 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err return createTestServer(ctx, cfg, []string{utils.APIServiceName}) } +// StartSingleTSOTestServer creates and starts a tso server with default config for testing. +func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tsoserver.Server, func(), error) { + cfg := tsoserver.NewConfig() + cfg.BackendEndpoints = backendEndpoints + cfg.ListenAddr = listenAddrs + cfg, err := tsoserver.GenerateConfig(cfg) + re.NoError(err) + // Setup the logger. + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err != nil { + return nil, nil, err + } + zapLogOnce.Do(func() { + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + }) + re.NoError(err) + return NewTSOTestServer(ctx, cfg) +} + +// NewTSOTestServer creates a tso server with given config for testing. +func NewTSOTestServer(ctx context.Context, cfg *tsoserver.Config) (*tsoserver.Server, testutil.CleanupFunc, error) { + s := tsoserver.CreateServer(ctx, cfg) + if err := s.Run(); err != nil { + return nil, nil, err + } + cleanup := func() { + s.Close() + os.RemoveAll(cfg.DataDir) + } + return s, cleanup, nil +} + func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) { err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) if err != nil { diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index c4064445a4a..67ef3bfadce 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -112,7 +112,7 @@ func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map func TestTSOServerStartFirst(t *testing.T) { re := require.New(t) - re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServer", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -171,5 +171,5 @@ func TestTSOServerStartFirst(t *testing.T) { re.Len(group.Keyspaces, 2) re.Len(group.Members, 2) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServer")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 7c0862a5d96..3429eb56072 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -18,12 +18,18 @@ import ( "context" "encoding/json" "fmt" + "strings" "testing" + "time" + "github.com/pingcap/failpoint" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/tempurl" + "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/apiv2/handlers" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers" @@ -82,3 +88,42 @@ func TestKeyspaceGroup(t *testing.T) { re.Equal(uint32(2), keyspaceGroup.ID) re.Equal(keyspaceGroup.Keyspaces, []uint32{222, 333}) } + +func TestSplitKeyspaceGroup(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = []string{"keyspace_a", "keyspace_b"} + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + _, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup1() + re.NoError(err) + _, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup2() + re.NoError(err) + cmd := pdctlCmd.GetRootCmd() + + time.Sleep(2 * time.Second) + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + // split keyspace group. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "split", "0", "1", "2"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) +}