Skip to content

Commit

Permalink
client: fix keyspace update in tsoSvcDiscovery (#6612)
Browse files Browse the repository at this point in the history
close #6611

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Jun 16, 2023
1 parent a0beaef commit 2570c70
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 10 deletions.
21 changes: 15 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ type serviceModeKeeper struct {
tsoSvcDiscovery ServiceDiscovery
}

func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) {
k.Lock()
defer k.Unlock()
if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE {
k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID)
}
}

func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
Expand Down Expand Up @@ -471,9 +479,6 @@ func newClientWithKeyspaceName(
ctx context.Context, keyspaceName string, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName))

tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
Expand Down Expand Up @@ -510,8 +515,12 @@ func newClientWithKeyspaceName(
if err := c.initRetry(c.loadKeyspaceMeta, keyspaceName); err != nil {
return nil, err
}
// We call "c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)" after service mode already switching to API mode
// and tso service discovery already initialized, so here we need to set the tso_service_discovery's keyspace id too.
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)

c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID)
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID))
return c, nil
}

Expand Down Expand Up @@ -593,15 +602,15 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
)
switch newMode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
Expand Down
4 changes: 1 addition & 3 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type tsoClient struct {
wg sync.WaitGroup
option *option

keyspaceID uint32
svcDiscovery ServiceDiscovery
tsoStreamBuilderFactory
// tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL}
Expand All @@ -94,15 +93,14 @@ type tsoClient struct {

// newTSOClient returns a new TSO client.
func newTSOClient(
ctx context.Context, option *option, keyspaceID uint32,
ctx context.Context, option *option,
svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory,
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
option: option,
keyspaceID: keyspaceID,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
checkTSDeadlineCh: make(chan struct{}),
Expand Down
3 changes: 2 additions & 1 deletion pkg/balancer/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func (r *RoundRobin[T]) Next() (t T) {
func (r *RoundRobin[T]) GetAll() []T {
r.RLock()
defer r.RUnlock()
return r.nodes
// return a copy to avoid data race
return append(r.nodes[:0:0], r.nodes...)
}

// Put puts one into balancer.
Expand Down
6 changes: 6 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,12 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
return err
}
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 {
log.Debug("the split source TSO is not greater than the newly split TSO",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical),
)
return nil
}
// If the split source TSO is greater than the newly split TSO, we need to update the split
Expand Down
104 changes: 104 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
Expand Down Expand Up @@ -465,3 +466,106 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
})
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start pd client and wait pd server start.
var clients sync.Map
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_b", cli)
}()
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_a") // its keyspace id is 1.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
clients.Store("keyspace_a", cli)
}()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
defer tc.Destroy()
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)

// Wait pd clients are ready.
testutil.Eventually(re, func() bool {
count := 0
clients.Range(func(key, value interface{}) bool {
count++
return true
})
return count == 2
})
clientA, ok := clients.Load("keyspace_a")
re.True(ok)
clientB, ok := clients.Load("keyspace_b")
re.True(ok)

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientB.(pd.Client).GetTS(ctx)
re.NoError(err)
kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
return !kg.IsSplitting()
})
clientB.(pd.Client).Close()

// Then split keyspace group 0 to 2 with keyspace 1.
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1})
return err == nil
})

// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = clientA.(pd.Client).GetTS(ctx)
re.NoError(err)
kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
return !kg.IsSplitting()
})
clientA.(pd.Client).Close()

// Check the keyspace group 0 is split to 1 and 2.
kg0 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 1)
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 2)
re.Equal([]uint32{0}, kg0.Keyspaces)
re.Equal([]uint32{2}, kg1.Keyspaces)
re.Equal([]uint32{1}, kg2.Keyspaces)
re.False(kg0.IsSplitting())
re.False(kg1.IsSplitting())
re.False(kg2.IsSplitting())

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

0 comments on commit 2570c70

Please sign in to comment.