Skip to content

Commit

Permalink
updated client and server according to new proto
Browse files Browse the repository at this point in the history
Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com>
  • Loading branch information
AmoebaProtozoa committed Apr 29, 2022
1 parent 7d94eb4 commit 3160a94
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 55 deletions.
55 changes: 32 additions & 23 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,21 @@ type Client interface {
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error)

// GC API V2
GetServiceGroup(ctx context.Context) ([]string, error)
// GetAllServiceGroups returns a list containing all service groups that has safe point in pd
GetAllServiceGroups(ctx context.Context) ([]string, error)
// GetMinServiceSafePointByServiceGroup return the minimum of all service safe point of the given group
// it also returns the current revision of the pd storage, with in which the min is valid
// if none is found, it will return 0 as min
GetMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error)
UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (isSuccessful bool, newSafePoint uint64, validRevision bool, err error)
UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (isSuccessful bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error)
GetAllServiceGroupGCSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error)
// UpdateGCSafePointByServiceGroup update the target safe point, along with revision obtained previously
// if failed, caller should retry form GetMinServiceSafePointByServiceGroup
UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (succeeded bool, newSafePoint uint64, err error)
// UpdateServiceSafePointByServiceGroup update the given service's safe point
// pass in a negative ttl to remove it
// if failed, caller should retry with higher safe point
UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (succeeded bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error)
// GetAllServiceGroupGCSafePoints returns GC safe point for all service groups
GetAllServiceGroupGCSafePoints(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error)

// ScatterRegion scatters the specified region. Should use it for a batch of regions,
// and the distribution of these regions will be dispersed.
Expand Down Expand Up @@ -1665,23 +1674,23 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
return resp.GetMinSafePoint(), nil
}

func (c *client) GetServiceGroup(ctx context.Context) ([]string, error) {
func (c *client) GetAllServiceGroups(ctx context.Context) ([]string, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetServiceGroup", opentracing.ChildOf(span.Context()))
span = opentracing.StartSpan("pdclient.GetAllServiceGroups", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetServiceGroup.Observe(time.Since(start).Seconds()) }()
defer func() { cmdDurationGetAllServiceGroups.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetServiceGroupRequest{
req := &pdpb.GetAllServiceGroupsRequest{
Header: c.requestHeader(),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetServiceGroup(ctx, req)
resp, err := c.getClient().GetAllServiceGroups(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetServiceGroup.Observe(time.Since(start).Seconds())
cmdFailedDurationGetAllServiceGroups.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
}
Expand Down Expand Up @@ -1717,7 +1726,7 @@ func (c *client) GetMinServiceSafePointByServiceGroup(ctx context.Context, servi

return resp.SafePoint, resp.Revision, nil
}
func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (isSuccessful bool, newSafePoint uint64, validRevision bool, err error) {
func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (succeeded bool, newSafePoint uint64, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -1738,12 +1747,12 @@ func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGro
if err != nil {
cmdFailedDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return false, 0, false, errors.WithStack(err)
return false, 0, errors.WithStack(err)
}
// if requested safepoint is the new safepoint, then update succeeded
return safePoint == resp.NewSafePoint, resp.NewSafePoint, resp.ValidRevision, nil
return resp.Succeeded, resp.NewSafePoint, nil
}
func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (isSuccessful bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error) {
func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (succeeded bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand All @@ -1768,30 +1777,30 @@ func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, servi
return false, 0, 0, 0, errors.WithStack(err)
}

return resp.NewServiceSafePoint == safePoint, resp.GcSafePoint, resp.OldServiceSafePoint, resp.NewServiceSafePoint, nil
return resp.Succeeded, resp.GcSafePoint, resp.OldSafePoint, resp.NewSafePoint, nil
}
func (c *client) GetAllServiceGroupGCSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error) {
func (c *client) GetAllServiceGroupGCSafePoints(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetAllServiceGroupGCSafePoint", opentracing.ChildOf(span.Context()))
span = opentracing.StartSpan("pdclient.GetAllServiceGroupGCSafePoints", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationGetAllServiceGroupGCSafePoint.Observe(time.Since(start).Seconds()) }()
defer func() { cmdDurationGetAllServiceGroupGCSafePoints.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetAllServiceGroupGCSafePointRequest{
req := &pdpb.GetAllServiceGroupGCSafePointsRequest{
Header: c.requestHeader(),
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetAllServiceGroupGCSafePoint(ctx, req)
resp, err := c.getClient().GetAllServiceGroupGCSafePoints(ctx, req)
cancel()

if err != nil {
cmdFailedDurationGetAllServiceGroupGCSafePoint.Observe(time.Since(start).Seconds())
cmdFailedDurationGetAllServiceGroupGCSafePoints.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
}

return resp.ServiceGroupSafePoint, nil
return resp.SafePoints, nil
}

func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {
Expand Down
8 changes: 4 additions & 4 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ var (
cmdDurationGetOperator = cmdDuration.WithLabelValues("get_operator")
cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions")
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
cmdDurationGetServiceGroup = cmdDuration.WithLabelValues("get_service_group")
cmdDurationGetAllServiceGroups = cmdDuration.WithLabelValues("get_all_service_groups")
cmdDurationGetMinServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("get_min_service_safe_point_by_service_group")
cmdDurationUpdateGCSafePointByServiceGroup = cmdDuration.WithLabelValues("update_gc_safe_point_by_service_group")
cmdDurationUpdateServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("update_service_safe_point_by_service_group")
cmdDurationGetAllServiceGroupGCSafePoint = cmdDuration.WithLabelValues("get_all_service_group_gc_safe_point")
cmdDurationGetAllServiceGroupGCSafePoints = cmdDuration.WithLabelValues("get_all_service_group_gc_safe_points")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -115,11 +115,11 @@ var (
cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores")
cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point")
cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point")
cmdFailedDurationGetServiceGroup = cmdFailedDuration.WithLabelValues("get_service_group")
cmdFailedDurationGetAllServiceGroups = cmdFailedDuration.WithLabelValues("get_all_service_groups")
cmdFailedDurationGetMinServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("get_min_service_safe_point_by_service_group")
cmdFailedDurationUpdateGCSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_gc_safe_point_by_service_group")
cmdFailedDurationUpdateServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_service_safe_point_by_service_group")
cmdFailedDurationGetAllServiceGroupGCSafePoint = cmdFailedDuration.WithLabelValues("get_all_service_group_gc_safe_point")
cmdFailedDurationGetAllServiceGroupGCSafePoints = cmdFailedDuration.WithLabelValues("get_all_service_group_gc_safe_points")
requestDurationTSO = requestDuration.WithLabelValues("tso")
)

Expand Down
55 changes: 30 additions & 25 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,28 +1438,28 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb
}

// GetServiceGroup return all service group ids
func (s *GrpcServer) GetServiceGroup(ctx context.Context, request *pdpb.GetServiceGroupRequest) (*pdpb.GetServiceGroupResponse, error) {
func (s *GrpcServer) GetAllServiceGroups(ctx context.Context, request *pdpb.GetAllServiceGroupsRequest) (*pdpb.GetAllServiceGroupsResponse, error) {
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
return pdpb.NewPDClient(client).GetServiceGroup(ctx, request)
return pdpb.NewPDClient(client).GetAllServiceGroups(ctx, request)
}
if rsp, err := s.unaryMiddleware(ctx, request.GetHeader(), fn); err != nil {
return nil, err
} else if rsp != nil {
return rsp.(*pdpb.GetServiceGroupResponse), err
return rsp.(*pdpb.GetAllServiceGroupsResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetServiceGroupResponse{Header: s.notBootstrappedHeader()}, nil
return &pdpb.GetAllServiceGroupsResponse{Header: s.notBootstrappedHeader()}, nil
}

var storage endpoint.GCSafePointStorage = s.storage
serviceGroupList, err := storage.LoadAllServiceGroup()
serviceGroupList, err := storage.LoadAllServiceGroups()
if err != nil {
return nil, err
}

return &pdpb.GetServiceGroupResponse{
return &pdpb.GetAllServiceGroupsResponse{
Header: s.header(),
ServiceGroupId: serviceGroupList,
}, nil
Expand Down Expand Up @@ -1532,9 +1532,9 @@ func (s *GrpcServer) UpdateGCSafePointByServiceGroup(ctx context.Context, reques
currentRevision := rsp.Header.GetRevision()
if currentRevision != request.GetRevision() {
return &pdpb.UpdateGCSafePointByServiceGroupResponse{
Header: s.header(),
NewSafePoint: 0,
ValidRevision: false,
Header: s.header(),
Succeeded: false,
NewSafePoint: 0,
}, nil
}
serviceGroupID := string(request.ServiceGroupId)
Expand Down Expand Up @@ -1569,9 +1569,9 @@ func (s *GrpcServer) UpdateGCSafePointByServiceGroup(ctx context.Context, reques
newSafePoint.SafePoint = oldSafePoint
}
return &pdpb.UpdateGCSafePointByServiceGroupResponse{
Header: s.header(),
NewSafePoint: newSafePoint.SafePoint,
ValidRevision: true,
Header: s.header(),
Succeeded: true,
NewSafePoint: newSafePoint.SafePoint,
}, nil
}

Expand Down Expand Up @@ -1603,7 +1603,8 @@ func (s *GrpcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, r
return nil, err
}
return &pdpb.UpdateServiceSafePointByServiceGroupResponse{
Header: s.header(),
Header: s.header(),
Succeeded: true,
}, nil
}

Expand All @@ -1622,6 +1623,8 @@ func (s *GrpcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, r
return nil, err
}
var oldServiceSafePoint, gcSafePoint, newServiceSafePoint uint64 = 0, 0, 0
succeeded := false

if sspOld != nil {
oldServiceSafePoint = sspOld.SafePoint
newServiceSafePoint = oldServiceSafePoint // case where update denied
Expand All @@ -1637,6 +1640,7 @@ func (s *GrpcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, r
caseInit := oldServiceSafePoint == 0 && request.SafePoint >= gcSafePoint

if caseUpdate || caseInit {
succeeded = true
ssp := &endpoint.ServiceSafePoint{
ServiceID: serviceID,
ExpiredAt: now.Unix() + request.TTL,
Expand All @@ -1658,27 +1662,28 @@ func (s *GrpcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, r
}

return &pdpb.UpdateServiceSafePointByServiceGroupResponse{
Header: s.header(),
GcSafePoint: gcSafePoint,
OldServiceSafePoint: oldServiceSafePoint,
NewServiceSafePoint: newServiceSafePoint,
Header: s.header(),
Succeeded: succeeded,
GcSafePoint: gcSafePoint,
OldSafePoint: oldServiceSafePoint,
NewSafePoint: newServiceSafePoint,
}, nil
}

// GetAllServiceGroupGCSafePoint returns all service group's gc safe point
func (s *GrpcServer) GetAllServiceGroupGCSafePoint(ctx context.Context, request *pdpb.GetAllServiceGroupGCSafePointRequest) (*pdpb.GetAllServiceGroupGCSafePointResponse, error) {
// GetAllServiceGroupGCSafePoints returns all service group's gc safe point
func (s *GrpcServer) GetAllServiceGroupGCSafePoints(ctx context.Context, request *pdpb.GetAllServiceGroupGCSafePointsRequest) (*pdpb.GetAllServiceGroupGCSafePointsResponse, error) {
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
return pdpb.NewPDClient(client).GetAllServiceGroupGCSafePoint(ctx, request)
return pdpb.NewPDClient(client).GetAllServiceGroupGCSafePoints(ctx, request)
}
if rsp, err := s.unaryMiddleware(ctx, request.GetHeader(), fn); err != nil {
return nil, err
} else if rsp != nil {
return rsp.(*pdpb.GetAllServiceGroupGCSafePointResponse), err
return rsp.(*pdpb.GetAllServiceGroupGCSafePointsResponse), err
}

rc := s.GetRaftCluster()
if rc == nil {
return &pdpb.GetAllServiceGroupGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil
return &pdpb.GetAllServiceGroupGCSafePointsResponse{Header: s.notBootstrappedHeader()}, nil
}

var storage endpoint.GCSafePointStorage = s.storage
Expand All @@ -1688,9 +1693,9 @@ func (s *GrpcServer) GetAllServiceGroupGCSafePoint(ctx context.Context, request
return nil, err
}

return &pdpb.GetAllServiceGroupGCSafePointResponse{
Header: s.header(),
ServiceGroupSafePoint: safePoints,
return &pdpb.GetAllServiceGroupGCSafePointsResponse{
Header: s.header(),
SafePoints: safePoints,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions server/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type GCSafePointStorage interface {
SaveServiceGCSafePoint(ssp *ServiceSafePoint) error
RemoveServiceGCSafePoint(serviceID string) error

LoadAllServiceGroup() ([][]byte, error)
LoadAllServiceGroups() ([][]byte, error)
LoadMinServiceSafePointByServiceGroup(serviceGroupID string, now time.Time) (*ServiceSafePoint, error)
LoadGCWorkerSafePoint(serviceGroupID string) (*GCSafePoint, error)
SaveGCWorkerSafePoint(gcSafePoint *GCSafePoint) error
Expand Down Expand Up @@ -247,8 +247,8 @@ func (se *StorageEndpoint) LoadMinServiceSafePointByServiceGroup(serviceGroupID
return min, nil
}

// LoadAllServiceGroup returns a list of all service group IDs
func (se *StorageEndpoint) LoadAllServiceGroup() ([][]byte, error) {
// LoadAllServiceGroups returns a list of all service group IDs
func (se *StorageEndpoint) LoadAllServiceGroups() ([][]byte, error) {
prefix := gcSafePointPrefixPath()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, _, err := se.LoadRange(prefix, prefixEnd, 0)
Expand Down

0 comments on commit 3160a94

Please sign in to comment.