Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tso grpc service include both server side and client side in a basic manner #5986

Merged
merged 9 commits into from
Feb 16, 2023
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type Client interface {
KeyspaceClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// TSOClient is the client of TSO service
TSOClient
// Close closes the client.
Close()
}
Expand Down
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/pingcap/kvproto => github.com/binshi-bing/kvproto v0.0.0-20230215190639-990822e37e27
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/binshi-bing/kvproto v0.0.0-20230215190639-990822e37e27 h1:Ios3/N3yaGVX3zToEozYy0caatC8CRXG43CdrO21M10=
github.com/binshi-bing/kvproto v0.0.0-20230215190639-990822e37e27/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -79,8 +81,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125 h1:ZiCJcEzmmF5xNgt8GIXekd3WQXI/22kzYQnrHi3Fc/4=
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
102 changes: 102 additions & 0 deletions client/tso_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/tsopb"
)

// TSOClient manages resource group info and token request.
type TSOClient interface {
// GetTSWithinKeyspace gets a timestamp within the given keyspace from the TSO service
GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) (int64, int64, error)
binshi-bing marked this conversation as resolved.
Show resolved Hide resolved
}

// GetTSWithinKeyspace gets a timestamp within the given keyspace from the TSO service
// TODO: Refactor and share the TSO streaming framework in the PD client. The implementation
// here is in a basic manner and only for testing and integration purpose -- no batching,
// no async, no pooling, no forwarding, no retry and no deliberate error handling.
func (c *client) GetTSWithinKeyspace(ctx context.Context, keyspaceID uint32) (physical int64, logical int64, err error) {
tsoClient, err := c.getTSOClient(keyspaceID)
if err != nil {
return 0, 0, err
}
done := make(chan struct{})
cctx, cancel := context.WithCancel(ctx)
go checkStream(cctx, cancel, done)
stream, err := tsoClient.Tso(cctx)
done <- struct{}{}
if err != nil {
return 0, 0, err
}
defer cancel()

start := time.Now()
req := &tsopb.TsoRequest{
Header: c.getTSORequestHeader(keyspaceID),
Count: 1,
DcLocation: globalDCLocation,
}

if err := stream.Send(req); err != nil {
err = errors.WithStack(err)
return 0, 0, err
}
resp, err := stream.Recv()
if err != nil {
err = errors.WithStack(err)
return 0, 0, err
}
requestDurationTSO.Observe(time.Since(start).Seconds())
tsoBatchSize.Observe(1.00)

if resp.GetCount() != 1 {
err = errors.WithStack(errTSOLength)
return 0, 0, err
}

// No need to adjust logical part, because the batch size is 1.
physical, logical, _ = resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits()
return physical, logical, nil
}

func (c *client) getTSORequestHeader(keyspaceID uint32) *tsopb.RequestHeader {
return &tsopb.RequestHeader{
ClusterId: c.clusterID,
KeyspaceId: keyspaceID,
}
}

// getTSOClient gets the TSO client of the TSO instance who is in charge of
// the primary replica of the keyspace (group).
// TODO: implement TSO service discovery client side logic
func (c *client) getTSOClient(keyspaceID uint32) (tsopb.TSOClient, error) {
return nil, nil
}

func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
select {
case <-done:
return
case <-time.After(3 * time.Second):
cancel()
case <-streamCtx.Done():
}
<-done
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ require (
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect
Expand Down Expand Up @@ -192,3 +192,5 @@ replace google.golang.org/grpc v1.51.0 => google.golang.org/grpc v1.26.0
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/binshi-bing/kvproto v0.0.0-20230215190639-990822e37e27
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved