Skip to content

Commit

Permalink
Support use-tso-server-proxy option in pd-tso-bench.
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 26, 2023
1 parent ef11b5f commit 60b959f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 30 deletions.
14 changes: 14 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ func WithForwardingOption(enableForwarding bool) ClientOption {
}
}

// WithTSOServerProxyOption configures the client to use TSO server proxy,
// i.e., the client will send TSO requests to the API leader (the TSO server
// proxy) which will forward the requests to the TSO servers.
func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption {
return func(c *client) {
c.option.useTSOServerProxy = useTSOServerProxy

Check warning on line 252 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L251-L252

Added lines #L251 - L252 were not covered by tests
}
}

// WithMaxErrorRetry configures the client max retry times when connect meets error.
func WithMaxErrorRetry(count int) ClientOption {
return func(c *client) {
Expand Down Expand Up @@ -589,6 +598,11 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
c.Lock()
defer c.Unlock()

if c.option.useTSOServerProxy && newMode == pdpb.ServiceMode_API_SVC_MODE {
// If we are using TSO server proxy, we always use PD_SVC_MODE.
newMode = pdpb.ServiceMode_PD_SVC_MODE

Check warning on line 603 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L603

Added line #L603 was not covered by tests
}

if newMode == c.serviceMode {
return
}
Expand Down
13 changes: 7 additions & 6 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ const (
// It provides the ability to change some PD client's options online from the outside.
type option struct {
// Static options.
gRPCDialOptions []grpc.DialOption
timeout time.Duration
maxRetryTimes int
enableForwarding bool
metricsLabels prometheus.Labels
initMetrics bool
gRPCDialOptions []grpc.DialOption
timeout time.Duration
maxRetryTimes int
enableForwarding bool
metricsLabels prometheus.Labels
initMetrics bool
useTSOServerProxy bool

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand Down
57 changes: 33 additions & 24 deletions tools/pd-tso-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,25 @@ import (
)

var (
pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address")
clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark")
concurrency = flag.Int("c", 1000, "concurrency")
count = flag.Int("count", 1, "the count number that the test will run")
duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last")
dcLocation = flag.String("dc", "global", "which dc-location this bench will request")
verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end")
interval = flag.Duration("interval", time.Second, "interval to output the statistics")
caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs")
certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format")
keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format")
maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval")
enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy")
enableFaultInjection = flag.Bool("enable-fault-injection", false, "whether enable fault injection")
faultInjectionRate = flag.Float64("fault-injection-rate", 0.01, "the failure rate [0.0001, 1]. 0.01 means 1% failure rate")
keyspace = flag.Uint("keyspace", 0, "the id of the keyspac to access")
wg sync.WaitGroup
pdAddrs = flag.String("pd", "127.0.0.1:2379", "pd address")
clientNumber = flag.Int("client", 1, "the number of pd clients involved in each benchmark")
concurrency = flag.Int("c", 1000, "concurrency")
count = flag.Int("count", 1, "the count number that the test will run")
duration = flag.Duration("duration", 60*time.Second, "how many seconds the test will last")
dcLocation = flag.String("dc", "global", "which dc-location this bench will request")
verbose = flag.Bool("v", false, "output statistics info every interval and output metrics info at the end")
interval = flag.Duration("interval", time.Second, "interval to output the statistics")
caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs")
certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format")
keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format")
useTSOServerProxy = flag.Bool("use-tso-server-proxy", false, "whether send tso requests to tso server proxy instead of tso service directly")
maxBatchWaitInterval = flag.Duration("batch-interval", 0, "the max batch wait interval")
enableTSOFollowerProxy = flag.Bool("enable-tso-follower-proxy", false, "whether enable the TSO Follower Proxy")
enableFaultInjection = flag.Bool("enable-fault-injection", false, "whether enable fault injection")
faultInjectionRate = flag.Float64("fault-injection-rate", 0.01, "the failure rate [0.0001, 1]. 0.01 means 1% failure rate")
maxTSOSendIntervalMilliseconds = flag.Int("max-send-interval-ms", 0, "max tso send interval in milliseconds, 60s by default")
keyspace = flag.Uint("keyspace", 0, "the id of the keyspac to access")
wg sync.WaitGroup
)

var promServer *httptest.Server
Expand Down Expand Up @@ -344,10 +346,9 @@ func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh
reqCtx, cancel := context.WithCancel(ctx)
defer cancel()
var (
err error
maxRetryTime int = 50
maxTSOSendIntervalSeconds int = 3
sleepIntervalOnFailure time.Duration = 100 * time.Millisecond
err error
maxRetryTime int = 50
sleepIntervalOnFailure time.Duration = 100 * time.Millisecond
)
pdCli := pdClients[clientIdx]

Expand All @@ -370,8 +371,11 @@ func reqWorker(ctx context.Context, pdClients []pd.Client, clientIdx int, durCh

i := 0
for ; i < maxRetryTime; i++ {
if shouldInjectFault() {
time.Sleep(time.Duration(rand.Intn(maxTSOSendIntervalSeconds)) * time.Second)
if *maxTSOSendIntervalMilliseconds > 0 {
select {
case <-reqCtx.Done():
case <-time.After(time.Duration(rand.Intn(*maxTSOSendIntervalMilliseconds)) * time.Millisecond):
}
}
_, _, err = pdCli.GetLocalTS(reqCtx, *dcLocation)
if errors.Cause(err) == context.Canceled {
Expand Down Expand Up @@ -402,11 +406,16 @@ func createPDClient(ctx context.Context) (pd.Client, error) {
err error
)

opts := make([]pd.ClientOption, 0)
if *useTSOServerProxy {
opts = append(opts, pd.WithTSOServerProxyOption(true))
}

pdCli, err = pd.NewClientWithKeyspace(ctx, uint32(*keyspace), []string{*pdAddrs}, pd.SecurityOption{
CAPath: *caPath,
CertPath: *certPath,
KeyPath: *keyPath,
})
}, opts...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 60b959f

Please sign in to comment.