Skip to content

Commit

Permalink
Add skeleton to initialize tso microservice (#5917)
Browse files Browse the repository at this point in the history
ref #5836, ref #5858

This PR added the skeleton along the E2E path to initialize the TSO microservice.
It mainly contains the following change:

1. Parse the subcommands from the command line arguments and refactor the entrypoints for all-in-one pd service and other independent microservice. Needs to compare with @lhy1024's recent PR #5858
2. Refactor Server/TSO config. 
3. Add the skeleton alone E2E path to initialize the TSO microservice.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
binshi-bing and ti-chi-bot committed Feb 9, 2023
1 parent a3bb320 commit 28a95cd
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 58 deletions.
2 changes: 1 addition & 1 deletion pkg/basic_server/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ type Server interface {

// GetClient returns builtin etcd client.
GetClient() *clientv3.Client
// GetHTTPClient returns builtin etcd client.
// GetHTTPClient returns builtin http client.
GetHTTPClient() *http.Client
}
123 changes: 123 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"flag"
"net/http"
"os"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
basicsvr "github.com/tikv/pd/pkg/basic_server"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.etcd.io/etcd/clientv3"
)

// If server doesn't implement all methods of basicsvr.Server, this line will result in a clear
// error message like "*Server does not implement basicsvr.Server (missing Method method)"
var _ basicsvr.Server = (*Server)(nil)

// Server is the TSO server, and it implements basicsvr.Server.
// nolint
type Server struct {
ctx context.Context
}

// TODO: Implement the following methods defined in basicsvr.Server

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return ""
}

// Context returns the context of server.
func (s *Server) Context() context.Context {
return s.ctx
}

// Run runs the pd server.
func (s *Server) Run() error {
return nil
}

// Close closes the server.
func (s *Server) Close() {
}

// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return nil
}

// GetHTTPClient returns builtin http client.
func (s *Server) GetHTTPClient() *http.Client {
return nil
}

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) {
cfg := tso.NewConfig()
err := cfg.Parse(os.Args[1:])

if cfg.Version {
printVersionInfo()
exit(0)
}

defer logutil.LogPanic()

switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
exit(0)
default:
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
printConfigCheckMsg(cfg)
exit(0)
}

// TODO: Initialize logger

// TODO: Make it configurable if it has big impact on performance.
grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

// TODO: Create the server

return nil, nil, nil
}

// TODO: implement it
func printVersionInfo() {
}

// TODO: implement it
func printConfigCheckMsg(cfg *tso.Config) {
}

func exit(code int) {
log.Sync()
os.Exit(code)
}
26 changes: 25 additions & 1 deletion pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"flag"
"time"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

Expand All @@ -30,7 +32,11 @@ const (
type Config struct {
flagSet *flag.FlagSet

configFile string
Version bool `json:"-"`

ConfigCheck bool `json:"-"`
configFile string

// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
Expand All @@ -46,6 +52,11 @@ type Config struct {
// This config is only valid in 1ms to 10s. If it's configured too long or too short, it will
// be automatically clamped to the range.
TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"`

// MaxResetTSGap is the max gap to reset the TSO.
MaxResetTSGap typeutil.Duration `toml:"max-gap-reset-ts" json:"max-gap-reset-ts"`

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`
}

// NewConfig creates a new config.
Expand All @@ -58,3 +69,16 @@ func NewConfig() *Config {

return cfg
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(arguments []string) error {
// Parse first to get config file.
err := c.flagSet.Parse(arguments)
if err != nil {
return errors.WithStack(err)
}

// TODO: Implement the main function body

return nil
}
68 changes: 68 additions & 0 deletions pkg/utils/configutil/configutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configutil

import (
"errors"

"github.com/BurntSushi/toml"
)

// ConfigMetaData is an utility to test if a configuration is defined.
type ConfigMetaData struct {
meta *toml.MetaData
path []string
}

// NewConfigMetadata is the a factory method to create a ConfigMetaData object
func NewConfigMetadata(meta *toml.MetaData) *ConfigMetaData {
return &ConfigMetaData{meta: meta}
}

// IsDefined checks if the given key is defined in the configuration
func (m *ConfigMetaData) IsDefined(key string) bool {
if m.meta == nil {
return false
}
keys := append([]string(nil), m.path...)
keys = append(keys, key)
return m.meta.IsDefined(keys...)
}

// Child gets the config metadata of the given path
func (m *ConfigMetaData) Child(path ...string) *ConfigMetaData {
newPath := append([]string(nil), m.path...)
newPath = append(newPath, path...)
return &ConfigMetaData{
meta: m.meta,
path: newPath,
}
}

// CheckUndecoded checks if the configuration contains undefined items
func (m *ConfigMetaData) CheckUndecoded() error {
if m.meta == nil {
return nil
}
undecoded := m.meta.Undecoded()
if len(undecoded) == 0 {
return nil
}
errInfo := "Config contains undefined item: "
for _, key := range undecoded {
errInfo += key.String() + ", "
}
return errors.New(errInfo[:len(errInfo)-2])
}
64 changes: 11 additions & 53 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -459,7 +460,7 @@ func (c *Config) Validate() error {

// Adjust is used to adjust the PD configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := newConfigMetadata(meta)
configMetaData := configutil.NewConfigMetadata(meta)
if err := configMetaData.CheckUndecoded(); err != nil {
c.WarningMsgs = append(c.WarningMsgs, err.Error())
}
Expand Down Expand Up @@ -570,50 +571,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
return nil
}

// Utility to test if a configuration is defined.
type configMetaData struct {
meta *toml.MetaData
path []string
}

func newConfigMetadata(meta *toml.MetaData) *configMetaData {
return &configMetaData{meta: meta}
}

func (m *configMetaData) IsDefined(key string) bool {
if m.meta == nil {
return false
}
keys := append([]string(nil), m.path...)
keys = append(keys, key)
return m.meta.IsDefined(keys...)
}

func (m *configMetaData) Child(path ...string) *configMetaData {
newPath := append([]string(nil), m.path...)
newPath = append(newPath, path...)
return &configMetaData{
meta: m.meta,
path: newPath,
}
}

func (m *configMetaData) CheckUndecoded() error {
if m.meta == nil {
return nil
}
undecoded := m.meta.Undecoded()
if len(undecoded) == 0 {
return nil
}
errInfo := "Config contains undefined item: "
for _, key := range undecoded {
errInfo += key.String() + ", "
}
return errors.New(errInfo[:len(errInfo)-2])
}

func (c *Config) adjustLog(meta *configMetaData) {
func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = defaultDisableErrorVerbose
}
Expand Down Expand Up @@ -840,7 +798,7 @@ const (
defaultSlowStoreEvictingAffectedStoreRatioThreshold = 0.3
)

func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool) error {
if !meta.IsDefined("max-snapshot-count") {
adjustUint64(&c.MaxSnapshotCount, defaultMaxSnapshotCount)
}
Expand Down Expand Up @@ -961,7 +919,7 @@ func (c *ScheduleConfig) GetMaxMergeRegionKeys() uint64 {
return c.MaxMergeRegionSize * 10000
}

func (c *ScheduleConfig) parseDeprecatedFlag(meta *configMetaData, name string, old, new bool) (bool, error) {
func (c *ScheduleConfig) parseDeprecatedFlag(meta *configutil.ConfigMetaData, name string, old, new bool) (bool, error) {
oldName, newName := "disable-"+name, "enable-"+name
defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName)
switch {
Expand Down Expand Up @@ -1146,7 +1104,7 @@ func (c *ReplicationConfig) Validate() error {
return nil
}

func (c *ReplicationConfig) adjust(meta *configMetaData) error {
func (c *ReplicationConfig) adjust(meta *configutil.ConfigMetaData) error {
adjustUint64(&c.MaxReplicas, defaultMaxReplicas)
if !meta.IsDefined("enable-placement-rules") {
c.EnablePlacementRules = defaultEnablePlacementRules
Expand Down Expand Up @@ -1186,7 +1144,7 @@ type PDServerConfig struct {
MinResolvedTSPersistenceInterval typeutil.Duration `toml:"min-resolved-ts-persistence-interval" json:"min-resolved-ts-persistence-interval"`
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error {
adjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap)
if !meta.IsDefined("use-region-storage") {
c.UseRegionStorage = defaultUseRegionStorage
Expand All @@ -1213,7 +1171,7 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
return c.Validate()
}

func (c *PDServerConfig) migrateConfigurationFromFile(meta *configMetaData) error {
func (c *PDServerConfig) migrateConfigurationFromFile(meta *configutil.ConfigMetaData) error {
oldName, newName := "trace-region-flow", "flow-round-by-digit"
defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName)
switch {
Expand Down Expand Up @@ -1431,7 +1389,7 @@ func (c *DashboardConfig) ToTiDBTLSConfig() (*tls.Config, error) {
return nil, nil
}

func (c *DashboardConfig) adjust(meta *configMetaData) {
func (c *DashboardConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("enable-telemetry") {
c.EnableTelemetry = defaultEnableTelemetry
}
Expand All @@ -1451,7 +1409,7 @@ func (c *ReplicationModeConfig) Clone() *ReplicationModeConfig {
return &cfg
}

func (c *ReplicationModeConfig) adjust(meta *configMetaData) {
func (c *ReplicationModeConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("replication-mode") || NormalizeReplicationMode(c.ReplicationMode) == "" {
c.ReplicationMode = "majority"
}
Expand Down Expand Up @@ -1479,7 +1437,7 @@ type DRAutoSyncReplicationConfig struct {
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
}

func (c *DRAutoSyncReplicationConfig) adjust(meta *configMetaData) {
func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("wait-store-timeout") {
c.WaitStoreTimeout = typeutil.NewDuration(defaultDRWaitStoreTimeout)
}
Expand Down

0 comments on commit 28a95cd

Please sign in to comment.