Skip to content

Commit

Permalink
fix the split failed due to no enough replicas
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jun 5, 2023
1 parent 7b1893d commit 02d1507
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 8 deletions.
10 changes: 9 additions & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,11 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
ticker = time.NewTicker(time.Millisecond * 100)
})
defer ticker.Stop()
log.Info("start to alloc nodes to all keyspace groups")
for {
select {
case <-m.ctx.Done():
log.Info("stop to alloc nodes to all keyspace groups")
return
case <-ticker.C:
}
Expand All @@ -170,6 +172,10 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
log.Error("failed to load all keyspace groups", zap.Error(err))
continue
}
// if the default keyspace is not initialized, we should wait for the default keyspace to be initialized.
if len(groups) == 0 {
continue
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
Expand All @@ -180,18 +186,20 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
continue
}
group.Members = nodes

}
}
if !withError {
// all keyspace groups have equal or more than default replica count
log.Info("all keyspace groups have equal or more than default replica count, stop to alloc node")
return
}
}
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// Discover is used to get all the service instances of the specified service name.
func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) {
key := discoveryPath(clusterID, serviceName) + "/"
endKey := clientv3.GetPrefixRangeEnd(key) + "/"
endKey := clientv3.GetPrefixRangeEnd(key)

withRange := clientv3.WithRange(endKey)
resp, err := etcdutil.EtcdKVGet(cli, key, withRange)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/discovery/registry_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

// ServiceRegistryEntry is the registry entry of a service
type ServiceRegistryEntry struct {
ServiceAddr string `json:"serviceAddr"`
ServiceAddr string `json:"service-addr"`
}

// Serialize this service registry entry
Expand Down
7 changes: 4 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,14 @@ func (s *Server) Run() error {
if err := s.startEtcd(s.ctx); err != nil {
return err
}
failpoint.Inject("delayStartServer", func() {
time.Sleep(2 * time.Second)
})

if err := s.startServer(s.ctx); err != nil {
return err
}

failpoint.Inject("delayStartServerLoop", func() {
time.Sleep(2 * time.Second)
})
s.startServerLoop(s.ctx)

return nil
Expand Down
33 changes: 33 additions & 0 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
Expand Down Expand Up @@ -83,6 +84,38 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err
return createTestServer(ctx, cfg, []string{utils.APIServiceName})
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tsoserver.Server, func(), error) {
cfg := tsoserver.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tsoserver.GenerateConfig(cfg)
re.NoError(err)
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return nil, nil, err
}
zapLogOnce.Do(func() {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
})
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tsoserver.Config) (*tsoserver.Server, testutil.CleanupFunc, error) {
s := tsoserver.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) {
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func mustGetKeyspaceGroupMembers(re *require.Assertions, server *tso.Server) map

func TestTSOServerStartFirst(t *testing.T) {
re := require.New(t)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServer", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -171,5 +171,5 @@ func TestTSOServerStartFirst(t *testing.T) {
re.Len(group.Keyspaces, 2)
re.Len(group.Members, 2)

re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServer"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}
45 changes: 45 additions & 0 deletions tests/pdctl/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/pdctl"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
Expand Down Expand Up @@ -82,3 +88,42 @@ func TestKeyspaceGroup(t *testing.T) {
re.Equal(uint32(2), keyspaceGroup.ID)
re.Equal(keyspaceGroup.Keyspaces, []uint32{222, 333})
}

func TestSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = []string{"keyspace_a", "keyspace_b"}
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

_, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc())
defer tsoServerCleanup1()
re.NoError(err)
_, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc())
defer tsoServerCleanup2()
re.NoError(err)
cmd := pdctlCmd.GetRootCmd()

time.Sleep(2 * time.Second)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

// split keyspace group.
testutil.Eventually(re, func() bool {
args := []string{"-u", pdAddr, "keyspace-group", "split", "0", "1", "2"}
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
return strings.Contains(string(output), "Success")
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}

0 comments on commit 02d1507

Please sign in to comment.