Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support advertise address #6212

Merged
merged 3 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func NewTSOServiceCommand() *cobra.Command {
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
cmd.Flags().StringP("advertise-backend-endpoints", "", "", "advertise urls for backend endpoints (default '${backend-endpoints}')")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
Expand All @@ -98,7 +100,9 @@ func NewResourceManagerServiceCommand() *cobra.Command {
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "", "url for etcd client")
cmd.Flags().StringP("advertise-backend-endpoints", "", "", "advertise urls for backend endpoints (default '${backend-endpoints}')")
cmd.Flags().StringP("listen-addr", "", "", "listen address for resource management service")
cmd.Flags().StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
Expand Down
16 changes: 11 additions & 5 deletions pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ const (

// Config is the configuration for the resource manager.
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring
EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
AdvertiseBackendEndpoints string `toml:"advertise-backend-endpoints" json:"advertise-backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"`
Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"` // TODO: remove this after refactoring
EnableGRPCGateway bool `json:"enable-grpc-gateway"` // TODO: use it

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`

Expand Down Expand Up @@ -169,7 +171,9 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert")
configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key")
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.AdvertiseBackendEndpoints, "advertise-backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")
configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr")

return c.Adjust(meta, false)
}
Expand Down Expand Up @@ -198,7 +202,9 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}

configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints)
configutil.AdjustString(&c.AdvertiseBackendEndpoints, c.BackendEndpoints)
configutil.AdjustString(&c.ListenAddr, defaultListenAddr)
configutil.AdjustString(&c.AdvertiseListenAddr, c.ListenAddr)

if !configMetaData.IsDefined("enable-grpc-gateway") {
c.EnableGRPCGateway = utils.DefaultEnableGRPCGateway
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (s *Server) startServer() (err error) {
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := fmt.Sprintf("/ms/%d/resource_manager", s.clusterID)
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)), "primary", "keyspace group primary election", s.cfg.ListenAddr)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)), "primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr)

s.service = &Service{
ctx: s.ctx,
Expand Down
37 changes: 29 additions & 8 deletions pkg/mcs/resource_manager/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"os"

"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
Expand All @@ -46,13 +46,34 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*S
return s, cleanup, nil
}

// NewTestDefaultConfig creates a new default config for testing.
func NewTestDefaultConfig() (*Config, error) {
cmd := &cobra.Command{
Use: "resource-manager",
Short: "Run the resource manager service",
// GenerateConfig generates a new config with the given options.
func GenerateConfig(c *Config) (*Config, error) {
arguments := []string{
"--listen-addr=" + c.ListenAddr,
"--advertise-listen-addr=" + c.AdvertiseListenAddr,
"--backend-endpoints=" + c.BackendEndpoints,
"--advertise-backend-endpoints=" + c.AdvertiseBackendEndpoints,
}

flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError)
flagSet.BoolP("version", "V", false, "print version information and exit")
flagSet.StringP("config", "", "", "config file")
flagSet.StringP("backend-endpoints", "", "", "url for etcd client")
flagSet.StringP("advertise-backend-endpoints", "", "", "advertise urls for backend endpoints (default '${backend-endpoints}')")
flagSet.StringP("listen-addr", "", "", "listen address for tso service")
flagSet.StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')")
flagSet.StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
flagSet.StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
flagSet.StringP("key", "", "", "path of file that contains X509 key in PEM format")
err := flagSet.Parse(arguments)
if err != nil {
return nil, err
}
cfg := NewConfig()
flagSet := cmd.Flags()
return cfg, cfg.Parse(flagSet)
err = cfg.Parse(flagSet)
if err != nil {
return nil, err
}

return cfg, nil
}
10 changes: 8 additions & 2 deletions pkg/mcs/tso/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ const (

// Config is the configuration for the TSO.
type Config struct {
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
BackendEndpoints string `toml:"backend-endpoints" json:"backend-endpoints"`
AdvertiseBackendEndpoints string `toml:"advertise-backend-endpoints" json:"advertise-backend-endpoints"`
ListenAddr string `toml:"listen-addr" json:"listen-addr"`
AdvertiseListenAddr string `toml:"advertise-listen-addr" json:"advertise-listen-addr"`

Name string `toml:"name" json:"name"`
DataDir string `toml:"data-dir" json:"data-dir"`
Expand Down Expand Up @@ -140,7 +142,9 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
configutil.AdjustCommandlineString(flagSet, &c.Security.CertPath, "cert")
configutil.AdjustCommandlineString(flagSet, &c.Security.KeyPath, "key")
configutil.AdjustCommandlineString(flagSet, &c.BackendEndpoints, "backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.AdvertiseBackendEndpoints, "advertise-backend-endpoints")
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")
configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr")

return c.Adjust(meta, false)
}
Expand All @@ -166,7 +170,9 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}

configutil.AdjustString(&c.BackendEndpoints, defaultBackendEndpoints)
configutil.AdjustString(&c.AdvertiseBackendEndpoints, c.BackendEndpoints)
configutil.AdjustString(&c.ListenAddr, defaultListenAddr)
configutil.AdjustString(&c.AdvertiseListenAddr, c.ListenAddr)

configutil.AdjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap)
configutil.AdjustInt64(&c.LeaderLease, utils.DefaultLeaderLease)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (s *Server) startServer() (err error) {

s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, fmt.Sprintf(tsoSvcDiscoveryPrefixFormat, s.clusterID, mcsutils.DefaultKeyspaceID),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add AdvertiseListenAddr to tsopb.Participant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that tsopb.Participant just need to store AdvertiseListenAddr as ListenAddr for this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. There is no need to change the definition of Participant.

"primary", "keyspace group primary election", s.cfg.ListenAddr)
"primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr)

s.defaultGroupStorage = endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(s.GetClient(), s.defaultGroupRootPath), nil)
s.tsoAllocatorManager = tso.NewAllocatorManager(
Expand Down
46 changes: 33 additions & 13 deletions pkg/mcs/tso/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

"github.com/pingcap/kvproto/pkg/tsopb"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
Expand Down Expand Up @@ -49,21 +49,41 @@ func NewTSOTestServer(ctx context.Context, re *require.Assertions, cfg *Config)
return s, cleanup, nil
}

// NewTSOTestDefaultConfig is only for test to create one pd.
// Because PD client also needs this, so export here.
func NewTSOTestDefaultConfig() (*Config, error) {
cmd := &cobra.Command{
Use: "tso",
Short: "Run the tso service",
}
cfg := NewConfig()
flagSet := cmd.Flags()
return cfg, cfg.Parse(flagSet)
}

// MustNewGrpcClient must create a new TSO grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) (*grpc.ClientConn, tsopb.TSOClient) {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())
re.NoError(err)
return conn, tsopb.NewTSOClient(conn)
}

// GenerateConfig generates a new config with the given options.
func GenerateConfig(c *Config) (*Config, error) {
arguments := []string{
"--listen-addr=" + c.ListenAddr,
"--advertise-listen-addr=" + c.AdvertiseListenAddr,
"--backend-endpoints=" + c.BackendEndpoints,
"--advertise-backend-endpoints=" + c.AdvertiseBackendEndpoints,
}

flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError)
flagSet.BoolP("version", "V", false, "print version information and exit")
flagSet.StringP("config", "", "", "config file")
flagSet.StringP("backend-endpoints", "", "", "url for etcd client")
flagSet.StringP("advertise-backend-endpoints", "", "", "advertise urls for backend endpoints (default '${backend-endpoints}')")
flagSet.StringP("listen-addr", "", "", "listen address for tso service")
flagSet.StringP("advertise-listen-addr", "", "", "advertise urls for listen address (default '${listen-addr}')")
flagSet.StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
flagSet.StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
flagSet.StringP("key", "", "", "path of file that contains X509 key in PEM format")
err := flagSet.Parse(arguments)
if err != nil {
return nil, err
}
cfg := NewConfig()
err = cfg.Parse(flagSet)
if err != nil {
return nil, err
}

return cfg, nil
}
4 changes: 2 additions & 2 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ func NewParticipant(client *clientv3.Client) *Participant {
}

// InitInfo initializes the member info. The leader key is path.Join(rootPath, leaderName)
func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderName string, purpose string, listenURL string) {
func (m *Participant) InitInfo(name string, id uint64, rootPath string, leaderName string, purpose string, advertiseListenAddr string) {
leader := &tsopb.Participant{
Name: name,
Id: id, // id is unique among all participants
ListenUrls: []string{listenURL},
ListenUrls: []string{advertiseListenAddr},
}

data, err := leader.Marshal()
Expand Down
5 changes: 3 additions & 2 deletions tests/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/mcs"
Expand Down Expand Up @@ -147,9 +148,9 @@ func (suite *serverRegisterTestSuite) addServer(serviceName string) (bs.Server,
re := suite.Require()
switch serviceName {
case utils.TSOServiceName:
return mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints)
return mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
case utils.ResourceManagerServiceName:
return mcs.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints)
return mcs.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
default:
return nil, nil
}
Expand Down
3 changes: 2 additions & 1 deletion tests/mcs/resource_manager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/mcs"
)
Expand All @@ -44,7 +45,7 @@ func TestResourceManagerServer(t *testing.T) {
leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)

s, cleanup := mcs.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr())
s, cleanup := mcs.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc())
addr := s.GetAddr()
defer cleanup()

Expand Down
23 changes: 10 additions & 13 deletions tests/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
rm "github.com/tikv/pd/pkg/mcs/resource_manager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"

"github.com/stretchr/testify/require"
Expand All @@ -38,11 +37,12 @@ func SetupTSOClient(ctx context.Context, re *require.Assertions, endpoints []str
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints string) (*rm.Server, func()) {
cfg, err := rm.NewTestDefaultConfig()
re.NoError(err)
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) {
cfg := rm.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = tempurl.Alloc()
cfg.ListenAddr = listenAddrs
cfg, err := rm.GenerateConfig(cfg)
re.NoError(err)

s, cleanup, err := rm.NewTestServer(ctx, re, cfg)
re.NoError(err)
Expand All @@ -54,15 +54,12 @@ func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Asser
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints string, listenAddrs ...string) (*tso.Server, func()) {
cfg, err := tso.NewTSOTestDefaultConfig()
re.NoError(err)
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) {
cfg := tso.NewConfig()
cfg.BackendEndpoints = backendEndpoints
if len(listenAddrs) > 0 {
cfg.ListenAddr = listenAddrs[0]
} else {
cfg.ListenAddr = tempurl.Alloc()
}
cfg.ListenAddr = listenAddrs
cfg, err := tso.GenerateConfig(cfg)
re.NoError(err)

s, cleanup, err := tso.NewTSOTestServer(ctx, re, cfg)
re.NoError(err)
Expand Down
33 changes: 29 additions & 4 deletions tests/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
tsoapi "github.com/tikv/pd/pkg/mcs/tso/server/apis/v1"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/tests"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (suite *tsoServerTestSuite) TestTSOServerStartAndStopNormally() {
}()

re := suite.Require()
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints)
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())

defer cleanup()
testutil.Eventually(re, func() bool {
Expand Down Expand Up @@ -150,7 +151,7 @@ func checkTSOPath(re *require.Assertions, isAPIServiceMode bool) {
re.Equal(1, getEtcdTimestampKeyNum(re, client))
}

_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints)
_, cleanup := mcs.StartSingleTSOTestServer(ctx, re, backendEndpoints, tempurl.Alloc())
defer cleanup()

cli := mcs.SetupTSOClient(ctx, re, []string{backendEndpoints})
Expand Down Expand Up @@ -230,7 +231,7 @@ func (suite *APIServerForwardTestSuite) TestForwardTSORelated() {
// Unable to use the tso-related interface without tso server
suite.checkUnavailableTSO()
// can use the tso-related interface with tso server
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints)
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
serverMap := make(map[string]bs.Server)
serverMap[s.GetAddr()] = s
mcs.WaitForPrimaryServing(suite.Require(), serverMap)
Expand All @@ -241,7 +242,7 @@ func (suite *APIServerForwardTestSuite) TestForwardTSORelated() {
func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() {
serverMap := make(map[string]bs.Server)
for i := 0; i < 3; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints)
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
serverMap[s.GetAddr()] = s
}
Expand Down Expand Up @@ -319,3 +320,27 @@ func (suite *APIServerForwardTestSuite) checkAvailableTSO() {
err = suite.pdClient.SetExternalTimestamp(suite.ctx, ts+1)
suite.NoError(err)
}

func TestAdvertiseAddr(t *testing.T) {
re := require.New(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
defer cluster.Destroy()
re.NoError(err)

err = cluster.RunInitialServers()
re.NoError(err)

leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)

u := tempurl.Alloc()
s, cleanup := mcs.StartSingleTSOTestServer(ctx, re, leader.GetAddr(), u)
defer cleanup()

tsoServerConf := s.GetConfig()
re.Equal(leader.GetAddr(), tsoServerConf.AdvertiseBackendEndpoints)
re.Equal(u, tsoServerConf.AdvertiseListenAddr)
}