Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skeleton to initialize tso microservice #5917

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/basic_server/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ type Server interface {

// GetClient returns builtin etcd client.
binshi-bing marked this conversation as resolved.
Show resolved Hide resolved
GetClient() *clientv3.Client
binshi-bing marked this conversation as resolved.
Show resolved Hide resolved
// GetHTTPClient returns builtin etcd client.
// GetHTTPClient returns builtin http client.
GetHTTPClient() *http.Client
}
123 changes: 123 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"flag"
"net/http"
"os"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
basicsvr "github.com/tikv/pd/pkg/basic_server"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.etcd.io/etcd/clientv3"
)

// If server doesn't implement all methods of basicsvr.Server, this line will result in a clear
// error message like "*Server does not implement basicsvr.Server (missing Method method)"
var _ basicsvr.Server = (*Server)(nil)

// Server is the TSO server, and it implements basicsvr.Server.
// nolint
type Server struct {
ctx context.Context
}

// TODO: Implement the following methods defined in basicsvr.Server

// Name returns the unique etcd Name for this server in etcd cluster.
func (s *Server) Name() string {
return ""
}

// Context returns the context of server.
func (s *Server) Context() context.Context {
return s.ctx
}

// Run runs the pd server.
func (s *Server) Run() error {
return nil
}

// Close closes the server.
func (s *Server) Close() {
}

// GetClient returns builtin etcd client.
func (s *Server) GetClient() *clientv3.Client {
return nil
}

// GetHTTPClient returns builtin http client.
func (s *Server) GetHTTPClient() *http.Client {
return nil
}

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, basicsvr.Server) {
cfg := tso.NewConfig()
err := cfg.Parse(os.Args[1:])

if cfg.Version {
printVersionInfo()
exit(0)
}

defer logutil.LogPanic()

switch errors.Cause(err) {
case nil:
case flag.ErrHelp:
exit(0)
default:
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
printConfigCheckMsg(cfg)
exit(0)
}

// TODO: Initialize logger

// TODO: Make it configurable if it has big impact on performance.
grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

// TODO: Create the server

return nil, nil, nil
}

// TODO: implement it
func printVersionInfo() {
}

// TODO: implement it
func printConfigCheckMsg(cfg *tso.Config) {
}

func exit(code int) {
log.Sync()
os.Exit(code)
}
26 changes: 25 additions & 1 deletion pkg/tso/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"flag"
"time"

"github.com/pingcap/errors"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

Expand All @@ -30,7 +32,11 @@ const (
type Config struct {
flagSet *flag.FlagSet

configFile string
Version bool `json:"-"`

ConfigCheck bool `json:"-"`
configFile string

// EnableLocalTSO is used to enable the Local TSO Allocator feature,
// which allows the PD server to generate Local TSO for certain DC-level transactions.
// To make this feature meaningful, user has to set the "zone" label for the PD server
Expand All @@ -46,6 +52,11 @@ type Config struct {
// This config is only valid in 1ms to 10s. If it's configured too long or too short, it will
// be automatically clamped to the range.
TSOUpdatePhysicalInterval typeutil.Duration `toml:"tso-update-physical-interval" json:"tso-update-physical-interval"`

// MaxResetTSGap is the max gap to reset the TSO.
MaxResetTSGap typeutil.Duration `toml:"max-gap-reset-ts" json:"max-gap-reset-ts"`

Metric metricutil.MetricConfig `toml:"metric" json:"metric"`
}

// NewConfig creates a new config.
Expand All @@ -58,3 +69,16 @@ func NewConfig() *Config {

return cfg
}

// Parse parses flag definitions from the argument list.
func (c *Config) Parse(arguments []string) error {
// Parse first to get config file.
err := c.flagSet.Parse(arguments)
if err != nil {
return errors.WithStack(err)
}

// TODO: Implement the main function body

return nil
}
68 changes: 68 additions & 0 deletions pkg/utils/configutil/configutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configutil

import (
"errors"

"github.com/BurntSushi/toml"
)

// ConfigMetaData is an utility to test if a configuration is defined.
type ConfigMetaData struct {
meta *toml.MetaData
path []string
}

// NewConfigMetadata is the a factory method to create a ConfigMetaData object
func NewConfigMetadata(meta *toml.MetaData) *ConfigMetaData {
return &ConfigMetaData{meta: meta}
}

// IsDefined checks if the given key is defined in the configuration
func (m *ConfigMetaData) IsDefined(key string) bool {
if m.meta == nil {
return false
}
keys := append([]string(nil), m.path...)
keys = append(keys, key)
return m.meta.IsDefined(keys...)
}

// Child gets the config metadata of the given path
func (m *ConfigMetaData) Child(path ...string) *ConfigMetaData {
newPath := append([]string(nil), m.path...)
newPath = append(newPath, path...)
return &ConfigMetaData{
meta: m.meta,
path: newPath,
}
}

// CheckUndecoded checks if the configuration contains undefined items
func (m *ConfigMetaData) CheckUndecoded() error {
if m.meta == nil {
return nil
}
undecoded := m.meta.Undecoded()
if len(undecoded) == 0 {
return nil
}
errInfo := "Config contains undefined item: "
for _, key := range undecoded {
errInfo += key.String() + ", "
}
return errors.New(errInfo[:len(errInfo)-2])
}
64 changes: 11 additions & 53 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand Down Expand Up @@ -459,7 +460,7 @@ func (c *Config) Validate() error {

// Adjust is used to adjust the PD configurations.
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := newConfigMetadata(meta)
configMetaData := configutil.NewConfigMetadata(meta)
if err := configMetaData.CheckUndecoded(); err != nil {
c.WarningMsgs = append(c.WarningMsgs, err.Error())
}
Expand Down Expand Up @@ -570,50 +571,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
return nil
}

// Utility to test if a configuration is defined.
type configMetaData struct {
meta *toml.MetaData
path []string
}

func newConfigMetadata(meta *toml.MetaData) *configMetaData {
return &configMetaData{meta: meta}
}

func (m *configMetaData) IsDefined(key string) bool {
if m.meta == nil {
return false
}
keys := append([]string(nil), m.path...)
keys = append(keys, key)
return m.meta.IsDefined(keys...)
}

func (m *configMetaData) Child(path ...string) *configMetaData {
newPath := append([]string(nil), m.path...)
newPath = append(newPath, path...)
return &configMetaData{
meta: m.meta,
path: newPath,
}
}

func (m *configMetaData) CheckUndecoded() error {
if m.meta == nil {
return nil
}
undecoded := m.meta.Undecoded()
if len(undecoded) == 0 {
return nil
}
errInfo := "Config contains undefined item: "
for _, key := range undecoded {
errInfo += key.String() + ", "
}
return errors.New(errInfo[:len(errInfo)-2])
}

func (c *Config) adjustLog(meta *configMetaData) {
func (c *Config) adjustLog(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("disable-error-verbose") {
c.Log.DisableErrorVerbose = defaultDisableErrorVerbose
}
Expand Down Expand Up @@ -840,7 +798,7 @@ const (
defaultSlowStoreEvictingAffectedStoreRatioThreshold = 0.3
)

func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool) error {
if !meta.IsDefined("max-snapshot-count") {
adjustUint64(&c.MaxSnapshotCount, defaultMaxSnapshotCount)
}
Expand Down Expand Up @@ -961,7 +919,7 @@ func (c *ScheduleConfig) GetMaxMergeRegionKeys() uint64 {
return c.MaxMergeRegionSize * 10000
}

func (c *ScheduleConfig) parseDeprecatedFlag(meta *configMetaData, name string, old, new bool) (bool, error) {
func (c *ScheduleConfig) parseDeprecatedFlag(meta *configutil.ConfigMetaData, name string, old, new bool) (bool, error) {
oldName, newName := "disable-"+name, "enable-"+name
defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName)
switch {
Expand Down Expand Up @@ -1146,7 +1104,7 @@ func (c *ReplicationConfig) Validate() error {
return nil
}

func (c *ReplicationConfig) adjust(meta *configMetaData) error {
func (c *ReplicationConfig) adjust(meta *configutil.ConfigMetaData) error {
adjustUint64(&c.MaxReplicas, defaultMaxReplicas)
if !meta.IsDefined("enable-placement-rules") {
c.EnablePlacementRules = defaultEnablePlacementRules
Expand Down Expand Up @@ -1186,7 +1144,7 @@ type PDServerConfig struct {
MinResolvedTSPersistenceInterval typeutil.Duration `toml:"min-resolved-ts-persistence-interval" json:"min-resolved-ts-persistence-interval"`
}

func (c *PDServerConfig) adjust(meta *configMetaData) error {
func (c *PDServerConfig) adjust(meta *configutil.ConfigMetaData) error {
adjustDuration(&c.MaxResetTSGap, defaultMaxResetTSGap)
if !meta.IsDefined("use-region-storage") {
c.UseRegionStorage = defaultUseRegionStorage
Expand All @@ -1213,7 +1171,7 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error {
return c.Validate()
}

func (c *PDServerConfig) migrateConfigurationFromFile(meta *configMetaData) error {
func (c *PDServerConfig) migrateConfigurationFromFile(meta *configutil.ConfigMetaData) error {
oldName, newName := "trace-region-flow", "flow-round-by-digit"
defineOld, defineNew := meta.IsDefined(oldName), meta.IsDefined(newName)
switch {
Expand Down Expand Up @@ -1431,7 +1389,7 @@ func (c *DashboardConfig) ToTiDBTLSConfig() (*tls.Config, error) {
return nil, nil
}

func (c *DashboardConfig) adjust(meta *configMetaData) {
func (c *DashboardConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("enable-telemetry") {
c.EnableTelemetry = defaultEnableTelemetry
}
Expand All @@ -1451,7 +1409,7 @@ func (c *ReplicationModeConfig) Clone() *ReplicationModeConfig {
return &cfg
}

func (c *ReplicationModeConfig) adjust(meta *configMetaData) {
func (c *ReplicationModeConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("replication-mode") || NormalizeReplicationMode(c.ReplicationMode) == "" {
c.ReplicationMode = "majority"
}
Expand Down Expand Up @@ -1479,7 +1437,7 @@ type DRAutoSyncReplicationConfig struct {
PauseRegionSplit bool `toml:"pause-region-split" json:"pause-region-split,string"`
}

func (c *DRAutoSyncReplicationConfig) adjust(meta *configMetaData) {
func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("wait-store-timeout") {
c.WaitStoreTimeout = typeutil.NewDuration(defaultDRWaitStoreTimeout)
}
Expand Down