Skip to content

Commit

Permalink
client side code
Browse files Browse the repository at this point in the history
  • Loading branch information
AmoebaProtozoa committed Apr 27, 2022
1 parent 20d42a1 commit 6556954
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 29 deletions.
102 changes: 75 additions & 27 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,14 @@ type Client interface {
// determine the safepoint for multiple services, it does not trigger a GC
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error)
// UpdateGCSafePointByServiceGroup update GC safe point, the update will only be successful if proposed
// safe point is later than the old one
// returns the new safePoint after the update attempt (may return the old safe point if update rejected)
UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64) (uint64, error)
// UpdateServiceSafePointByServiceGroup update service safe point for specific service under given service group
// pass in a ttl less than 0 to remove the target service safe point instead
// will return the min safePoint of the serviceGroup after the update,
// if no service safePoint exists after the given operation, return 0
UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (uint64, error)
// GetAllServiceGroupGcSafePoint returns a list containing gc safe point for each service group
GetAllServiceGroupGcSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafepoint, error)

// GC API V2
GetServiceGroup(ctx context.Context) ([]string, error)
GetMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error)
UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (isSuccessful bool, newSafePoint uint64, validRevision bool, err error)
UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (isSuccessful bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error)
GetAllServiceGroupGCSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error)

// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
// NOTICE: This method is the old version of ScatterRegions, you should use the later one as your first choice.
Expand Down Expand Up @@ -1668,19 +1665,71 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
return resp.GetMinSafePoint(), nil
}

func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64) (uint64, error) {
func (c *client) GetServiceGroup(ctx context.Context) ([]string, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetServiceGroup.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetServiceGroupRequest{
Header: c.requestHeader(),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetServiceGroup(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
}

// have to return a slice of string
returnSlice := make([]string, len(resp.ServiceGroupId))
for _, serviceGroupID := range resp.ServiceGroupId {
returnSlice = append(returnSlice, string(serviceGroupID))
}
return returnSlice, nil
}
func (c *client) GetMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetMinServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetMinServiceSafePointByServiceGroupRequest{
Header: c.requestHeader(),
ServiceGroupId: []byte(serviceGroupID),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetMinServiceSafePointByServiceGroup(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, 0, errors.WithStack(err)
}

return resp.SafePoint, resp.Revision, nil
}
func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (isSuccessful bool, newSafePoint uint64, validRevision bool, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.UpdateGCSafePointByServiceGroupRequest{
Header: c.requestHeader(),
ServiceGroupId: []byte(serviceGroupID),
SafePoint: safePoint,
Revision: revision,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().UpdateGCSafePointByServiceGroup(ctx, req)
Expand All @@ -1689,11 +1738,12 @@ func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGro
if err != nil {
cmdFailedDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, errors.WithStack(err)
return false, 0, false, errors.WithStack(err)
}
return resp.GetNewSafePoint(), nil
// if requested safepoint is the new safepoint, then update succeeded
return safePoint == resp.NewSafePoint, resp.NewSafePoint, resp.ValidRevision, nil
}
func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (isSuccessful bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -1715,35 +1765,33 @@ func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, servi
if err != nil {
cmdFailedDurationUpdateServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return 0, errors.WithStack(err)
return false, 0, 0, 0, errors.WithStack(err)
}

return resp.GetMinSafePoint(), nil
return resp.NewServiceSafePoint == safePoint, resp.GcSafePoint, resp.OldServiceSafePoint, resp.NewServiceSafePoint, nil
}

func (c *client) GetAllServiceGroupGcSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafepoint, error) {
func (c *client) GetAllServiceGroupGCSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetAllServiceGroupGcSafePoint", opentracing.ChildOf(span.Context()))
span = opentracing.StartSpan("pdclient.GetAllServiceGroupGCSafePoint", opentracing.ChildOf(span.Context()))
defer span.Finish()
}

start := time.Now()
defer func() { cmdDurationGetAllServiceGroupGcSafePoint.Observe(time.Since(start).Seconds()) }()
defer func() { cmdDurationGetAllServiceGroupGCSafePoint.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetAllServiceGroupGcSafePointRequest{
req := &pdpb.GetAllServiceGroupGCSafePointRequest{
Header: c.requestHeader(),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetAllServiceGroupGcSafePoint(ctx, req)
resp, err := c.getClient().GetAllServiceGroupGCSafePoint(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetAllServiceGroupGcSafePoint.Observe(time.Since(start).Seconds())
cmdFailedDurationGetAllServiceGroupGCSafePoint.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
}

return resp.GetServiceGroupSafePoint(), nil
return resp.ServiceGroupSafePoint, nil
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
Expand Down
8 changes: 6 additions & 2 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ var (
cmdDurationGetOperator = cmdDuration.WithLabelValues("get_operator")
cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions")
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationGetServiceGroup = cmdDuration.WithLabelValues("get_service_group")
cmdDurationGetMinServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("get_min_service_safe_point_by_service_group")
cmdDurationUpdateGCSafePointByServiceGroup = cmdDuration.WithLabelValues("update_gc_safe_point_by_service_group")
cmdDurationUpdateServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("update_service_safe_point_by_service_group")
cmdDurationGetAllServiceGroupGcSafePoint = cmdDuration.WithLabelValues("get_all_service_group_gc_safe_point")
cmdDurationGetAllServiceGroupGCSafePoint = cmdDuration.WithLabelValues("get_all_service_group_gc_safe_point")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -113,9 +115,11 @@ var (
cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores")
cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point")
cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point")
cmdFailedDurationGetServiceGroup = cmdFailedDuration.WithLabelValues("get_service_group")
cmdFailedDurationGetMinServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("get_min_service_safe_point_by_service_group")
cmdFailedDurationUpdateGCSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_gc_safe_point_by_service_group")
cmdFailedDurationUpdateServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_service_safe_point_by_service_group")
cmdFailedDurationGetAllServiceGroupGcSafePoint = cmdFailedDuration.WithLabelValues("get_all_service_group_gc_safe_point")
cmdFailedDurationGetAllServiceGroupGCSafePoint = cmdFailedDuration.WithLabelValues("get_all_service_group_gc_safe_point")
requestDurationTSO = requestDuration.WithLabelValues("tso")
)

Expand Down

0 comments on commit 6556954

Please sign in to comment.