Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix keyspace update in tsoSvcDiscovery #6612

Merged
merged 8 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ type serviceModeKeeper struct {
tsoSvcDiscovery ServiceDiscovery
}

func (k *serviceModeKeeper) SetKeyspaceID(keyspaceID uint32) {
k.Lock()
defer k.Unlock()
if k.serviceMode == pdpb.ServiceMode_API_SVC_MODE {
k.tsoSvcDiscovery.SetKeyspaceID(keyspaceID)
}
}

func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
Expand Down Expand Up @@ -471,9 +479,6 @@ func newClientWithKeyspaceName(
ctx context.Context, keyspaceName string, svrAddrs []string,
security SecurityOption, opts ...ClientOption,
) (Client, error) {
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName))

tlsCfg := &tlsutil.TLSConfig{
CAPath: security.CAPath,
CertPath: security.CertPath,
Expand Down Expand Up @@ -511,7 +516,9 @@ func newClientWithKeyspaceName(
return nil, err
}
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)

c.serviceModeKeeper.SetKeyspaceID(c.keyspaceID)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("[pd] create pd client with endpoints and keyspace",
zap.Strings("pd-address", svrAddrs), zap.String("keyspace-name", keyspaceName), zap.Uint32("keyspace-id", c.keyspaceID))
return c, nil
}

Expand Down Expand Up @@ -593,15 +600,15 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
)
switch newMode {
case pdpb.ServiceMode_PD_SVC_MODE:
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
c.pdSvcDiscovery, &pdTSOStreamBuilderFactory{})
case pdpb.ServiceMode_API_SVC_MODE:
newTSOSvcDiscovery = newTSOServiceDiscovery(
c.ctx, MetaStorageClient(c), c.pdSvcDiscovery,
c.GetClusterID(c.ctx), c.keyspaceID, c.tlsCfg, c.option)
// At this point, the keyspace group isn't known yet. Starts from the default keyspace group,
// and will be updated later.
newTSOCli = newTSOClient(c.ctx, c.option, c.keyspaceID,
newTSOCli = newTSOClient(c.ctx, c.option,
newTSOSvcDiscovery, &tsoTSOStreamBuilderFactory{})
if err := newTSOSvcDiscovery.Init(); err != nil {
log.Error("[pd] failed to initialize tso service discovery. keep the current service mode",
Expand Down
4 changes: 1 addition & 3 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type tsoClient struct {
wg sync.WaitGroup
option *option

keyspaceID uint32
svcDiscovery ServiceDiscovery
tsoStreamBuilderFactory
// tsoAllocators defines the mapping {dc-location -> TSO allocator leader URL}
Expand All @@ -94,15 +93,14 @@ type tsoClient struct {

// newTSOClient returns a new TSO client.
func newTSOClient(
ctx context.Context, option *option, keyspaceID uint32,
ctx context.Context, option *option,
svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory,
) *tsoClient {
ctx, cancel := context.WithCancel(ctx)
c := &tsoClient{
ctx: ctx,
cancel: cancel,
option: option,
keyspaceID: keyspaceID,
svcDiscovery: svcDiscovery,
tsoStreamBuilderFactory: factory,
checkTSDeadlineCh: make(chan struct{}),
Expand Down
6 changes: 6 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,12 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
return err
}
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 {
log.Debug("the split source TSO is not greater than the newly split TSO",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical),
)
return nil
}
// If the split source TSO is greater than the newly split TSO, we need to update the split
Expand Down
78 changes: 78 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
handlersutil "github.com/tikv/pd/tests/server/apiv2/handlers"
Expand Down Expand Up @@ -465,3 +467,79 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() {
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}

func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))

// Init api server config but not start.
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
})
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

// Start pd client and wait pd server start.
done := make(chan pd.Client)
go func() {
apiCtx := pd.NewAPIContextV2("keyspace_b") // its keyspace id is 2.
cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, []string{pdAddr}, pd.SecurityOption{})
re.NoError(err)
done <- cli
}()

// Start api server and tso server.
err = tc.RunInitialServers()
re.NoError(err)
defer tc.Destroy()
tsoServer, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc())
defer tsoServerCleanup1()
re.NoError(err)
_, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc())
defer tsoServerCleanup2()
re.NoError(err)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(2 * time.Second)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

// First split keyspace group 0 to 1 with keyspace 2.
kgm := leaderServer.GetServer().GetKeyspaceGroupManager()
re.NotNil(kgm)
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 1, []uint32{2})
return err == nil
})

// Set the TSO of the keyspace group 0 to a large value to make test stable.
cli := <-done
defer cli.Close()
physical, logical, err := cli.GetTS(ctx)
physical += time.Hour.Milliseconds()
tsoServer.GetHandler().ResetTS(tsoutil.GenerateTS(&pdpb.Timestamp{
Physical: physical,
Logical: logical,
}), false, true, 0)
re.NoError(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not do this to make sure the TSO consistency protection mechanism work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes tso of group0 is less than group1, about 10~20 milliseonds, only to make test stable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it


// Trigger checkTSOSplit to ensure the split is finished.
testutil.Eventually(re, func() bool {
_, _, err = cli.GetTS(ctx)
re.NoError(err)
kg, err := kgm.GetKeyspaceGroupByID(mcsutils.DefaultKeyspaceGroupID)
re.NoError(err)
return !kg.IsSplitting()
})

// Then split keyspace group 0 to 2 with keyspace 1.
testutil.Eventually(re, func() bool {
err = kgm.SplitKeyspaceGroupByID(0, 2, []uint32{1})
return err == nil
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
}
Loading