Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server, endpoint, client: Add PD APIs for RawKV GC Worker #4866

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
161 changes: 161 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/gcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -130,6 +131,22 @@ type Client interface {
UpdateOption(option DynamicOption, value interface{}) error
// Close closes the client.
Close()

// GetGCAllServiceGroups returns a list containing all service groups that has safe point in pd
GetGCAllServiceGroups(ctx context.Context) ([]string, error)
Copy link
Contributor

@nolouch nolouch May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to define independently and then nested in here;

// GetGCMinServiceSafePointByServiceGroup returns the minimum of all service safe point of the given group
// It also returns the current revision of the pd storage, with in which the min is valid
// If none is found, it will return 0 as min
GetGCMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error)
// UpdateGCSafePointByServiceGroup update the target safe point, along with revision obtained previously
// If failed, caller should retry from GetGCMinServiceSafePointByServiceGroup
UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (succeeded bool, newSafePoint uint64, err error)
// UpdateGCServiceSafePointByServiceGroup update the given service's safe point
// Pass in a negative ttl to remove it
// If failed, caller should retry with higher safe point
UpdateGCServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (succeeded bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error)
// GetGCAllServiceGroupSafePoints returns GC safe point for all service groups
GetGCAllServiceGroupSafePoints(ctx context.Context) ([]*gcpb.ServiceGroupSafePoint, error)
}

// GetStoreOp represents available options when getting stores.
Expand Down Expand Up @@ -1891,3 +1908,147 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem
}()
return globalConfigWatcherCh, err
}

func (c *client) gcHeader() *gcpb.RequestHeader {
return &gcpb.RequestHeader{
ClusterId: c.clusterID,
}
}

func (c *client) gcClient() gcpb.GCClient {
if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok {
return gcpb.NewGCClient(cc.(*grpc.ClientConn))
}
return nil
}

func (c *client) GetGCAllServiceGroups(ctx context.Context) ([]string, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetGCAllServiceGroups", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetGCAllServiceGroups.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &gcpb.GetAllServiceGroupsRequest{
Header: c.gcHeader(),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.gcClient().GetAllServiceGroups(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetGCAllServiceGroups.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
}

returnSlice := make([]string, 0, len(resp.ServiceGroupId))
for _, serviceGroupID := range resp.ServiceGroupId {
returnSlice = append(returnSlice, string(serviceGroupID))
}
return returnSlice, nil
}

func (c *client) GetGCMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetGCMinServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetGCMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &gcpb.GetMinServiceSafePointByServiceGroupRequest{
Header: c.gcHeader(),
ServiceGroupId: []byte(serviceGroupID),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.gcClient().GetMinServiceSafePointByServiceGroup(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetGCMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, 0, errors.WithStack(err)
}

return resp.SafePoint, resp.Revision, nil
}

func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (succeeded bool, newSafePoint uint64, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &gcpb.UpdateGCSafePointByServiceGroupRequest{
Header: c.gcHeader(),
ServiceGroupId: []byte(serviceGroupID),
SafePoint: safePoint,
Revision: revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.gcClient().UpdateGCSafePointByServiceGroup(ctx, req)
cancel()

if err != nil {
cmdFailedDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return false, 0, errors.WithStack(err)
}
return resp.Succeeded, resp.NewSafePoint, nil
}

func (c *client) UpdateGCServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (succeeded bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &gcpb.UpdateServiceSafePointByServiceGroupRequest{
Header: c.gcHeader(),
ServiceGroupId: []byte(serviceGroupID),
ServiceId: []byte(serviceID),
TTL: ttl,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.gcClient().UpdateServiceSafePointByServiceGroup(ctx, req)
cancel()

if err != nil {
cmdFailedDurationUpdateGCServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return false, 0, 0, 0, errors.WithStack(err)
}

return resp.Succeeded, resp.GcSafePoint, resp.OldSafePoint, resp.NewSafePoint, nil
}

func (c *client) GetGCAllServiceGroupSafePoints(ctx context.Context) ([]*gcpb.ServiceGroupSafePoint, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetGCAllServiceGroupSafePoints", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetGCAllServiceGroupSafePoints.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &gcpb.GetAllServiceGroupGCSafePointsRequest{
Header: c.gcHeader(),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.gcClient().GetAllServiceGroupGCSafePoints(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetGCAllServiceGroupSafePoints.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
}

return resp.SafePoints, nil
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/prometheus/client_golang v1.11.0
go.uber.org/goleak v1.1.11
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e h1:iquj/SVNullS8+llCooL3Pk2DWQPW/HDDpF1EHwsnq0=
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
12 changes: 12 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ var (
cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions")
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")

cmdDurationGetGCAllServiceGroups = cmdDuration.WithLabelValues("get_gc_all_service_groups")
cmdDurationGetGCMinServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("get_gc_min_service_safe_point_by_service_group")
cmdDurationUpdateGCSafePointByServiceGroup = cmdDuration.WithLabelValues("update_gc_safe_point_by_service_group")
cmdDurationUpdateGCServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("update_gc_service_safe_point_by_service_group")
cmdDurationGetGCAllServiceGroupSafePoints = cmdDuration.WithLabelValues("get_gc_all_service_group_safe_points")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
cmdFailDurationGetAllMembers = cmdFailedDuration.WithLabelValues("get_member_info")
Expand All @@ -111,6 +117,12 @@ var (
cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point")
cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point")
requestDurationTSO = requestDuration.WithLabelValues("tso")

cmdFailedDurationGetGCAllServiceGroups = cmdFailedDuration.WithLabelValues("get_gc_all_service_groups")
cmdFailedDurationGetGCMinServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("get_gc_min_service_safe_point_by_service_group")
cmdFailedDurationUpdateGCSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_gc_safe_point_by_service_group")
cmdFailedDurationUpdateGCServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_gc_service_safe_point_by_service_group")
cmdFailedDurationGetGCAllServiceGroupSafePoints = cmdFailedDuration.WithLabelValues("get_gc_all_service_group_safe_points")
)

func init() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20220331105802-5ac69661755c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY=
github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e h1:iquj/SVNullS8+llCooL3Pk2DWQPW/HDDpF1EHwsnq0=
github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down
9 changes: 9 additions & 0 deletions pkg/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/gcpb"
"github.com/pingcap/kvproto/pkg/pdpb"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -86,6 +87,14 @@ func MustNewGrpcClient(c *check.C, addr string) pdpb.PDClient {
return pdpb.NewPDClient(conn)
}

// MustNewGCClient must create a new GC client.
func MustNewGCClient(c *check.C, addr string) gcpb.GCClient {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())

c.Assert(err, check.IsNil)
return gcpb.NewGCClient(conn)
}

// CleanServer is used to clean data directory.
func CleanServer(dataDir string) {
// Clean data directory
Expand Down