Skip to content

Commit

Permalink
Merge branch 'master' into diag
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed Jul 10, 2023
2 parents bca5165 + b2a9768 commit eb48895
Show file tree
Hide file tree
Showing 17 changed files with 244 additions and 207 deletions.
4 changes: 4 additions & 0 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func NewTSOServiceCommand() *cobra.Command {
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}

Expand All @@ -104,6 +106,8 @@ func NewResourceManagerServiceCommand() *cobra.Command {
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
cmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
cmd.Flags().StringP("log-file", "", "", "log file path")
return cmd
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/mcs/discovery/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ package discovery
import (
"strconv"
"strings"

"github.com/tikv/pd/pkg/mcs/utils"
)

const (
registryPrefix = "/ms"
registryKey = "registry"
registryKey = "registry"
)

// RegistryPath returns the full path to store microservice addresses.
func RegistryPath(clusterID, serviceName, serviceAddr string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey, serviceAddr}, "/")
}

// ServicePath returns the path to store microservice addresses.
func ServicePath(clusterID, serviceName string) string {
return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/")
return strings.Join([]string{utils.MicroserviceRootPath, clusterID, serviceName, registryKey}, "/")
}

// TSOPath returns the path to store TSO addresses.
Expand Down
7 changes: 4 additions & 3 deletions pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type Config struct {

Security configutil.SecurityConfig `toml:"security" json:"security"`

// WarningMsgs contains all warnings during parsing.
WarningMsgs []string

// LeaderLease defines the time within which a Resource Manager primary/leader must
// update its TTL in etcd, otherwise etcd will expire the leader key and other servers
// can campaign the primary/leader again. Etcd only supports seconds TTL, so here is
Expand Down Expand Up @@ -196,11 +199,9 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
// Adjust is used to adjust the resource manager configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := configutil.NewConfigMetadata(meta)
warningMsgs := make([]string, 0)
if err := configMetaData.CheckUndecoded(); err != nil {
warningMsgs = append(warningMsgs, err.Error())
c.WarningMsgs = append(c.WarningMsgs, err.Error())
}
configutil.PrintConfigCheckMsg(os.Stdout, warningMsgs)

if c.Name == "" {
hostname, err := os.Hostname()
Expand Down
31 changes: 19 additions & 12 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import (
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/kvproto/pkg/diagnosticspb"
"github.com/pingcap/log"
"github.com/pingcap/sysutil"
"github.com/soheilhy/cmux"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
Expand All @@ -52,8 +55,11 @@ import (

// Server is the resource manager server, and it implements bs.Server.
type Server struct {
diagnosticspb.DiagnosticsServer
// Server state. 0 is not running, 1 is running.
isRunning int64
// Server start timestamp
startTimestamp int64

ctx context.Context
serverLoopCtx context.Context
Expand Down Expand Up @@ -366,10 +372,10 @@ func (s *Server) startServer() (err error) {
uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
resourceManagerPrimaryPrefix := fmt.Sprintf("/ms/%d/resource_manager", s.clusterID)
resourceManagerPrimaryPrefix := endpoint.ResourceManagerSvcRootPath(s.clusterID)
s.participant = member.NewParticipant(s.etcdClient)
s.participant.InitInfo(uniqueName, uniqueID, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)),
"primary", "keyspace group primary election", s.cfg.AdvertiseListenAddr)
utils.KeyspaceGroupsPrimaryKey, "keyspace group primary election", s.cfg.AdvertiseListenAddr)

s.service = &Service{
ctx: s.ctx,
Expand Down Expand Up @@ -418,13 +424,15 @@ func (s *Server) startServer() (err error) {
return nil
}

// NewServer creates a new resource manager server.
func NewServer(ctx context.Context, cfg *Config) *Server {
return &Server{
name: cfg.Name,
ctx: ctx,
cfg: cfg,
// CreateServer creates the Server
func CreateServer(ctx context.Context, cfg *Config) *Server {
svr := &Server{
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
startTimestamp: time.Now().Unix(),
cfg: cfg,
ctx: ctx,
}
return svr
}

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
Expand Down Expand Up @@ -458,15 +466,14 @@ func CreateServerWrapper(cmd *cobra.Command, args []string) {
// Flushing any buffered log entries
defer log.Sync()

versioninfo.Log("resource manager")
log.Info("resource manager config", zap.Reflect("config", cfg))
versioninfo.Log("Resource Manager")
log.Info("Resource Manager config", zap.Reflect("config", cfg))

grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

ctx, cancel := context.WithCancel(context.Background())
svr := NewServer(ctx, cfg)
svr := CreateServer(ctx, cfg)

sc := make(chan os.Signal, 1)
signal.Notify(sc,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*S
// Flushing any buffered log entries
defer log.Sync()

s := NewServer(ctx, cfg)
s := CreateServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}
Expand Down
12 changes: 3 additions & 9 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net/url"
"os"
"os/signal"
"path"
"strconv"
"strings"
"sync"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -63,12 +63,6 @@ import (
)

const (
// pdRootPath is the old path for storing the tso related root path.
pdRootPath = "/pd"
msServiceRootPath = "/ms"
// tsoSvcRootPathFormat defines the root path for all etcd paths used for different purposes.
// format: "/ms/{cluster_id}/tso".
tsoSvcRootPathFormat = msServiceRootPath + "/%d/" + mcsutils.TSOServiceName
// maxRetryTimesWaitAPIService is the max retry times for initializing the cluster ID.
maxRetryTimesWaitAPIService = 360
// retryIntervalWaitAPIService is the interval to retry.
Expand Down Expand Up @@ -535,8 +529,8 @@ func (s *Server) startServer() (err error) {

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
legacySvcRootPath := endpoint.LegacyRootPath(s.clusterID)
tsoSvcRootPath := endpoint.TSOSvcRootPath(s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr,
Expand Down
6 changes: 4 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ const (
// We also reserved 0 for the keyspace group for the same purpose.
DefaultKeyspaceGroupID = uint32(0)

// MicroserviceKey is the key of microservice.
MicroserviceKey = "ms"
// MicroserviceRootPath is the root path of microservice in etcd.
MicroserviceRootPath = "/ms"
// APIServiceName is the name of api server.
APIServiceName = "api"
// TSOServiceName is the name of tso server.
Expand All @@ -59,6 +59,8 @@ const (
ResourceManagerServiceName = "resource_manager"
// KeyspaceGroupsKey is the path component of keyspace groups.
KeyspaceGroupsKey = "keyspace_groups"
// KeyspaceGroupsPrimaryKey is the path component of primary for keyspace groups.
KeyspaceGroupsPrimaryKey = "primary"

// MaxKeyspaceGroupCount is the max count of keyspace groups. keyspace group in tso
// is the sharding unit, i.e., by the definition here, the max count of the shards
Expand Down
80 changes: 72 additions & 8 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

const (
pdRootPath = "/pd"
clusterPath = "raft"
configPath = "config"
serviceMiddlewarePath = "service_middleware"
Expand Down Expand Up @@ -58,8 +59,9 @@ const (
// TimestampKey is the key of timestamp oracle used for the suffix.
TimestampKey = "timestamp"

tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"
tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupsMembershipKey = "membership"
keyspaceGroupsElectionKey = "election"

// we use uint64 to represent ID, the max length of uint64 is 20.
keyLen = 20
Expand Down Expand Up @@ -230,13 +232,13 @@ func EncodeKeyspaceID(spaceID uint32) string {
// KeyspaceGroupIDPrefix returns the prefix of keyspace group id.
// Path: tso/keyspace_groups/membership
func KeyspaceGroupIDPrefix() string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey)
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey)
}

// KeyspaceGroupIDPath returns the path to keyspace id from the given name.
// Path: tso/keyspace_groups/membership/{id}
func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey, encodeKeyspaceGroupID(id))
}

// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id.
Expand All @@ -245,6 +247,54 @@ func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp {
return regexp.MustCompile(pattern)
}

// ResourceManagerSvcRootPath returns the root path of resource manager service.
// Path: /ms/{cluster_id}/resource_manager
func ResourceManagerSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.ResourceManagerServiceName)
}

// TSOSvcRootPath returns the root path of tso service.
// Path: /ms/{cluster_id}/tso
func TSOSvcRootPath(clusterID uint64) string {
return svcRootPath(clusterID, utils.TSOServiceName)
}

func svcRootPath(clusterID uint64, svcName string) string {
c := strconv.FormatUint(clusterID, 10)
return path.Join(utils.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// Path: /pd/{cluster_id}
func LegacyRootPath(clusterID uint64) string {
return path.Join(pdRootPath, strconv.FormatUint(clusterID, 10))
}

// KeyspaceGroupPrimaryPath returns the path of keyspace group primary.
// default keyspace group: "/ms/{cluster_id}/tso/00000/primary".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary".
func KeyspaceGroupPrimaryPath(rootPath string, keyspaceGroupID uint32) string {
electionPath := KeyspaceGroupsElectionPath(rootPath, keyspaceGroupID)
return path.Join(electionPath, utils.KeyspaceGroupsPrimaryKey)
}

// KeyspaceGroupsElectionPath returns the path of keyspace groups election.
// default keyspace group: "/ms/{cluster_id}/tso/00000".
// non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}".
func KeyspaceGroupsElectionPath(rootPath string, keyspaceGroupID uint32) string {
if keyspaceGroupID == utils.DefaultKeyspaceGroupID {
return path.Join(rootPath, "00000")
}
return path.Join(rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, fmt.Sprintf("%05d", keyspaceGroupID))
}

// GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id.
func GetCompiledNonDefaultIDRegexp(clusterID uint64) *regexp.Regexp {
rootPath := TSOSvcRootPath(clusterID)
pattern := strings.Join([]string{rootPath, utils.KeyspaceGroupsKey, keyspaceGroupsElectionKey, `(\d{5})`, utils.KeyspaceGroupsPrimaryKey + `$`}, "/")
return regexp.MustCompile(pattern)
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand All @@ -264,19 +314,33 @@ func buildPath(withSuffix bool, str ...string) string {
return sb.String()
}

// GetKeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// KeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func GetKeyspaceGroupTSPath(groupID uint32) string {
func KeyspaceGroupTSPath(groupID uint32) string {
if groupID == utils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

// GetTimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func GetTimestampPath(tsPath string) string {
// TimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func TimestampPath(tsPath string) string {
return path.Join(tsPath, TimestampKey)
}

// FullTimestampPath returns the full timestamp path.
// 1. for the default keyspace group:
// /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// /ms/{cluster_id}/tso/{group}/gta/timestamp
func FullTimestampPath(clusterID uint64, groupID uint32) string {
rootPath := TSOSvcRootPath(clusterID)
tsPath := TimestampPath(KeyspaceGroupTSPath(groupID))
if groupID == utils.DefaultKeyspaceGroupID {
rootPath = LegacyRootPath(clusterID)
}
return path.Join(rootPath, tsPath)
}
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewGlobalTSOAllocator(
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
tsPath: endpoint.GetKeyspaceGroupTSPath(am.kgID),
tsPath: endpoint.KeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down

0 comments on commit eb48895

Please sign in to comment.