diff --git a/client/client.go b/client/client.go index 07598910af4..a781050565c 100644 --- a/client/client.go +++ b/client/client.go @@ -26,6 +26,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/gcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -130,6 +131,22 @@ type Client interface { UpdateOption(option DynamicOption, value interface{}) error // Close closes the client. Close() + + // GetGCAllServiceGroups returns a list containing all service groups that has safe point in pd + GetGCAllServiceGroups(ctx context.Context) ([]string, error) + // GetGCMinServiceSafePointByServiceGroup returns the minimum of all service safe point of the given group + // It also returns the current revision of the pd storage, with in which the min is valid + // If none is found, it will return 0 as min + GetGCMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error) + // UpdateGCSafePointByServiceGroup update the target safe point, along with revision obtained previously + // If failed, caller should retry from GetGCMinServiceSafePointByServiceGroup + UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (succeeded bool, newSafePoint uint64, err error) + // UpdateGCServiceSafePointByServiceGroup update the given service's safe point + // Pass in a negative ttl to remove it + // If failed, caller should retry with higher safe point + UpdateGCServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (succeeded bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error) + // GetGCAllServiceGroupSafePoints returns GC safe point for all service groups + GetGCAllServiceGroupSafePoints(ctx context.Context) ([]*gcpb.ServiceGroupSafePoint, error) } // GetStoreOp represents available options when getting stores. @@ -1891,3 +1908,147 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem }() return globalConfigWatcherCh, err } + +func (c *client) gcHeader() *gcpb.RequestHeader { + return &gcpb.RequestHeader{ + ClusterId: c.clusterID, + } +} + +func (c *client) gcClient() gcpb.GCClient { + if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok { + return gcpb.NewGCClient(cc.(*grpc.ClientConn)) + } + return nil +} + +func (c *client) GetGCAllServiceGroups(ctx context.Context) ([]string, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.GetGCAllServiceGroups", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationGetGCAllServiceGroups.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &gcpb.GetAllServiceGroupsRequest{ + Header: c.gcHeader(), + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + resp, err := c.gcClient().GetAllServiceGroups(ctx, req) + cancel() + + if err != nil { + cmdFailedDurationGetGCAllServiceGroups.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return nil, errors.WithStack(err) + } + + returnSlice := make([]string, 0, len(resp.ServiceGroupId)) + for _, serviceGroupID := range resp.ServiceGroupId { + returnSlice = append(returnSlice, string(serviceGroupID)) + } + return returnSlice, nil +} + +func (c *client) GetGCMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.GetGCMinServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationGetGCMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &gcpb.GetMinServiceSafePointByServiceGroupRequest{ + Header: c.gcHeader(), + ServiceGroupId: []byte(serviceGroupID), + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + resp, err := c.gcClient().GetMinServiceSafePointByServiceGroup(ctx, req) + cancel() + + if err != nil { + cmdFailedDurationGetGCMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return 0, 0, errors.WithStack(err) + } + + return resp.SafePoint, resp.Revision, nil +} + +func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (succeeded bool, newSafePoint uint64, err error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.UpdateGCSafePointByServiceGroup", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &gcpb.UpdateGCSafePointByServiceGroupRequest{ + Header: c.gcHeader(), + ServiceGroupId: []byte(serviceGroupID), + SafePoint: safePoint, + Revision: revision, + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + resp, err := c.gcClient().UpdateGCSafePointByServiceGroup(ctx, req) + cancel() + + if err != nil { + cmdFailedDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return false, 0, errors.WithStack(err) + } + return resp.Succeeded, resp.NewSafePoint, nil +} + +func (c *client) UpdateGCServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (succeeded bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.UpdateGCServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationUpdateGCServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &gcpb.UpdateServiceSafePointByServiceGroupRequest{ + Header: c.gcHeader(), + ServiceGroupId: []byte(serviceGroupID), + ServiceId: []byte(serviceID), + TTL: ttl, + SafePoint: safePoint, + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + resp, err := c.gcClient().UpdateServiceSafePointByServiceGroup(ctx, req) + cancel() + + if err != nil { + cmdFailedDurationUpdateGCServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return false, 0, 0, 0, errors.WithStack(err) + } + + return resp.Succeeded, resp.GcSafePoint, resp.OldSafePoint, resp.NewSafePoint, nil +} + +func (c *client) GetGCAllServiceGroupSafePoints(ctx context.Context) ([]*gcpb.ServiceGroupSafePoint, error) { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.GetGCAllServiceGroupSafePoints", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + start := time.Now() + defer func() { cmdDurationGetGCAllServiceGroupSafePoints.Observe(time.Since(start).Seconds()) }() + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &gcpb.GetAllServiceGroupGCSafePointsRequest{ + Header: c.gcHeader(), + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + resp, err := c.gcClient().GetAllServiceGroupGCSafePoints(ctx, req) + cancel() + + if err != nil { + cmdFailedDurationGetGCAllServiceGroupSafePoints.Observe(time.Since(start).Seconds()) + c.ScheduleCheckLeader() + return nil, errors.WithStack(err) + } + + return resp.SafePoints, nil +} diff --git a/client/go.mod b/client/go.mod index 22ef56aa417..7545e94ea21 100644 --- a/client/go.mod +++ b/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 + github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/prometheus/client_golang v1.11.0 go.uber.org/goleak v1.1.11 diff --git a/client/go.sum b/client/go.sum index becfbccfe12..e4d6245dd7f 100644 --- a/client/go.sum +++ b/client/go.sum @@ -106,8 +106,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e h1:iquj/SVNullS8+llCooL3Pk2DWQPW/HDDpF1EHwsnq0= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/client/metrics.go b/client/metrics.go index a085f095406..1e9eb5fe80b 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -100,6 +100,12 @@ var ( cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions") cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions") + cmdDurationGetGCAllServiceGroups = cmdDuration.WithLabelValues("get_gc_all_service_groups") + cmdDurationGetGCMinServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("get_gc_min_service_safe_point_by_service_group") + cmdDurationUpdateGCSafePointByServiceGroup = cmdDuration.WithLabelValues("update_gc_safe_point_by_service_group") + cmdDurationUpdateGCServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("update_gc_service_safe_point_by_service_group") + cmdDurationGetGCAllServiceGroupSafePoints = cmdDuration.WithLabelValues("get_gc_all_service_group_safe_points") + cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region") cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso") cmdFailDurationGetAllMembers = cmdFailedDuration.WithLabelValues("get_member_info") @@ -111,6 +117,12 @@ var ( cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point") cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point") requestDurationTSO = requestDuration.WithLabelValues("tso") + + cmdFailedDurationGetGCAllServiceGroups = cmdFailedDuration.WithLabelValues("get_gc_all_service_groups") + cmdFailedDurationGetGCMinServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("get_gc_min_service_safe_point_by_service_group") + cmdFailedDurationUpdateGCSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_gc_safe_point_by_service_group") + cmdFailedDurationUpdateGCServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_gc_service_safe_point_by_service_group") + cmdFailedDurationGetGCAllServiceGroupSafePoints = cmdFailedDuration.WithLabelValues("get_gc_all_service_group_safe_points") ) func init() { diff --git a/go.mod b/go.mod index 8b86d17439a..7a337434c1b 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 + github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d github.com/pingcap/tidb-dashboard v0.0.0-20220331105802-5ac69661755c diff --git a/go.sum b/go.sum index 06d674e395b..2e35d8f5901 100644 --- a/go.sum +++ b/go.sum @@ -400,8 +400,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e h1:iquj/SVNullS8+llCooL3Pk2DWQPW/HDDpF1EHwsnq0= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/pkg/testutil/testutil.go b/pkg/testutil/testutil.go index c3c917d7b3a..07037d03c1f 100644 --- a/pkg/testutil/testutil.go +++ b/pkg/testutil/testutil.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/gcpb" "github.com/pingcap/kvproto/pkg/pdpb" "google.golang.org/grpc" ) @@ -86,6 +87,14 @@ func MustNewGrpcClient(c *check.C, addr string) pdpb.PDClient { return pdpb.NewPDClient(conn) } +// MustNewGCClient must create a new GC client. +func MustNewGCClient(c *check.C, addr string) gcpb.GCClient { + conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure()) + + c.Assert(err, check.IsNil) + return gcpb.NewGCClient(conn) +} + // CleanServer is used to clean data directory. func CleanServer(dataDir string) { // Clean data directory diff --git a/server/gc_service.go b/server/gc_service.go new file mode 100644 index 00000000000..6b6d50191f7 --- /dev/null +++ b/server/gc_service.go @@ -0,0 +1,341 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/pingcap/kvproto/pkg/gcpb" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/tsoutil" + "github.com/tikv/pd/server/storage/endpoint" + "github.com/tikv/pd/server/tso" + "go.uber.org/zap" +) + +// GcServer wraps Server to provide garbage collection service. +type GcServer struct { + *Server +} + +func (s *GcServer) header() *gcpb.ResponseHeader { + return &gcpb.ResponseHeader{ClusterId: s.clusterID} +} + +func (s *GcServer) errorHeader(err *gcpb.Error) *gcpb.ResponseHeader { + return &gcpb.ResponseHeader{ + ClusterId: s.clusterID, + Error: err, + } +} + +func (s *GcServer) notBootstrappedHeader() *gcpb.ResponseHeader { + return s.errorHeader(&gcpb.Error{ + Type: gcpb.ErrorType_NOT_BOOTSTRAPPED, + Message: "cluster is not bootstrapped", + }) +} + +func (s *GcServer) revisionMismatchHeader(requestRevision, currentRevision int64) *gcpb.ResponseHeader { + return s.errorHeader(&gcpb.Error{ + Type: gcpb.ErrorType_REVISION_MISMATCH, + Message: fmt.Sprintf("revision mismatch, requested revision %v but current revision %v", requestRevision, currentRevision), + }) +} + +func (s *GcServer) safePointRollbackHeader(requestSafePoint, requiredSafePoint uint64) *gcpb.ResponseHeader { + return s.errorHeader(&gcpb.Error{ + Type: gcpb.ErrorType_SAFEPOINT_ROLLBACK, + Message: fmt.Sprintf("safe point rollback, requested safe point %v is less than required safe point %v", requestSafePoint, requiredSafePoint), + }) +} + +// GetAllServiceGroups return all service group IDs. +func (s *GcServer) GetAllServiceGroups(ctx context.Context, request *gcpb.GetAllServiceGroupsRequest) (*gcpb.GetAllServiceGroupsResponse, error) { + rc := s.GetRaftCluster() + if rc == nil { + return &gcpb.GetAllServiceGroupsResponse{Header: s.notBootstrappedHeader()}, nil + } + + var storage endpoint.GCSafePointStorage = s.storage + serviceGroupList, err := storage.LoadAllServiceGroups() + if err != nil { + return nil, err + } + + serviceGroupIDs := make([][]byte, 0, len(serviceGroupList)) + for _, sgid := range serviceGroupList { + serviceGroupIDs = append(serviceGroupIDs, []byte(sgid)) + } + + return &gcpb.GetAllServiceGroupsResponse{ + Header: s.header(), + ServiceGroupId: serviceGroupIDs, + }, nil +} + +// getServiceRevisionByServiceGroup return etcd ModRevision of given service group. +// It's used to detect new service safe point between `GetMinServiceSafePointByServiceGroup` & `UpdateGCSafePointByServiceGroup`. +// Return `kv.RevisionUnavailable` if the service group is not existed. +func (s *GcServer) getServiceRevisionByServiceGroup(serviceGroupID string) (int64, error) { + servicePath := endpoint.GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID) + _, revision, err := s.storage.LoadRevision(servicePath) + return revision, err +} + +// touchServiceRevisionByServiceGroup advances revision of service group path. +// It's used when new service safe point is saved. +func (s *GcServer) touchServiceRevisionByServiceGroup(serviceGroupID string) error { + servicePath := endpoint.GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID) + return s.storage.Save(servicePath, "") +} + +func (s *GcServer) getNow() (time.Time, error) { + nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, 1) + if err != nil { + return time.Time{}, err + } + now, _ := tsoutil.ParseTimestamp(nowTSO) + return now, err +} + +// GetMinServiceSafePointByServiceGroup returns given service group's min service safe point. +func (s *GcServer) GetMinServiceSafePointByServiceGroup(ctx context.Context, request *gcpb.GetMinServiceSafePointByServiceGroupRequest) (*gcpb.GetMinServiceSafePointByServiceGroupResponse, error) { + // Lock to ensure that there is no other change between `min` and `currentRevision`. + // Also note that `storage.LoadMinServiceSafePointByServiceGroup` is not thread-safe. + s.gcServiceGroupLock.Lock() + defer s.gcServiceGroupLock.Unlock() + + rc := s.GetRaftCluster() + if rc == nil { + return &gcpb.GetMinServiceSafePointByServiceGroupResponse{Header: s.notBootstrappedHeader()}, nil + } + + var storage endpoint.GCSafePointStorage = s.storage + serviceGroupID := string(request.ServiceGroupId) + + now, err := s.getNow() + if err != nil { + return nil, err + } + + min, err := storage.LoadMinServiceSafePointByServiceGroup(serviceGroupID, now) + if err != nil { + return nil, err + } + var returnSafePoint uint64 + if min != nil { + returnSafePoint = min.SafePoint + } + + currentRevision, err := s.getServiceRevisionByServiceGroup(serviceGroupID) + if err != nil { + return nil, err + } + + return &gcpb.GetMinServiceSafePointByServiceGroupResponse{ + Header: s.header(), + SafePoint: returnSafePoint, + Revision: currentRevision, + }, nil +} + +// UpdateGCSafePointByServiceGroup used by gc_worker to update their gc safe points. +func (s *GcServer) UpdateGCSafePointByServiceGroup(ctx context.Context, request *gcpb.UpdateGCSafePointByServiceGroupRequest) (*gcpb.UpdateGCSafePointByServiceGroupResponse, error) { + s.gcServiceGroupLock.Lock() + defer s.gcServiceGroupLock.Unlock() + + rc := s.GetRaftCluster() + if rc == nil { + return &gcpb.UpdateGCSafePointByServiceGroupResponse{Header: s.notBootstrappedHeader()}, nil + } + + var storage endpoint.GCSafePointStorage = s.storage + serviceGroupID := string(request.ServiceGroupId) + + // check if revision changed since last min calculation. + currentRevision, err := s.getServiceRevisionByServiceGroup(serviceGroupID) + if err != nil { + return nil, err + } + requestRevision := request.GetRevision() + if currentRevision != requestRevision { + return &gcpb.UpdateGCSafePointByServiceGroupResponse{ + Header: s.revisionMismatchHeader(requestRevision, currentRevision), + Succeeded: false, + NewSafePoint: 0, + }, nil + } + + newSafePoint := &endpoint.ServiceGroupGCSafePoint{ + ServiceGroupID: serviceGroupID, + SafePoint: request.SafePoint, + } + prev, err := storage.LoadGCSafePointByServiceGroup(serviceGroupID) + if err != nil { + return nil, err + } + // if no previous safepoint, treat it as 0. + var oldSafePoint uint64 = 0 + if prev != nil { + oldSafePoint = prev.SafePoint + } + + response := &gcpb.UpdateGCSafePointByServiceGroupResponse{} + + // fail to store due to safe point rollback. + if newSafePoint.SafePoint < oldSafePoint { + log.Warn("trying to update gc_worker safe point", + zap.String("service-group-id", serviceGroupID), + zap.Uint64("old-safe-point", request.SafePoint), + zap.Uint64("new-safe-point", newSafePoint.SafePoint)) + response.Header = s.safePointRollbackHeader(newSafePoint.SafePoint, oldSafePoint) + response.Succeeded = false + response.NewSafePoint = oldSafePoint + return response, nil + } + + // save the safe point to storage. + if err := storage.SaveGCSafePointByServiceGroup(newSafePoint); err != nil { + return nil, err + } + response.Header = s.header() + response.Succeeded = true + response.NewSafePoint = newSafePoint.SafePoint + log.Info("updated gc_worker safe point", + zap.String("service-group-id", serviceGroupID), + zap.Uint64("safe-point", newSafePoint.SafePoint), + zap.Uint64("old-safe-point", oldSafePoint)) + return response, nil +} + +// UpdateServiceSafePointByServiceGroup for services like CDC/BR/Lightning to update gc safe points in PD. +func (s *GcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, request *gcpb.UpdateServiceSafePointByServiceGroupRequest) (*gcpb.UpdateServiceSafePointByServiceGroupResponse, error) { + s.gcServiceGroupLock.Lock() + defer s.gcServiceGroupLock.Unlock() + + rc := s.GetRaftCluster() + if rc == nil { + return &gcpb.UpdateServiceSafePointByServiceGroupResponse{Header: s.notBootstrappedHeader()}, nil + } + + var storage endpoint.GCSafePointStorage = s.storage + serviceGroupID := string(request.ServiceGroupId) + serviceID := string(request.ServiceId) + // a less than 0 ttl means to remove the safe point, immediately return after the deletion request. + if request.TTL <= 0 { + if err := storage.RemoveServiceSafePointByServiceGroup(serviceGroupID, serviceID); err != nil { + return nil, err + } + return &gcpb.UpdateServiceSafePointByServiceGroupResponse{ + Header: s.header(), + Succeeded: true, + }, nil + } + + now, err := s.getNow() + if err != nil { + return nil, err + } + + sspOld, err := storage.LoadServiceSafePointByServiceGroup(serviceGroupID, serviceID) + if err != nil { + return nil, err + } + gcsp, err := storage.LoadGCSafePointByServiceGroup(serviceGroupID) + if err != nil { + return nil, err + } + + response := &gcpb.UpdateServiceSafePointByServiceGroupResponse{} + // safePointLowerBound is the minimum request.SafePoint for update request to succeed. + // It is oldServiceSafePoint if oldServiceSafePoint exists, else gcSafePoint if it exists. + // Otherwise it's set to 0, indicate all safePoint accepted. + var safePointLowerBound uint64 = 0 + if gcsp != nil { + safePointLowerBound = gcsp.SafePoint + response.GcSafePoint = gcsp.SafePoint + } + if sspOld != nil { + safePointLowerBound = sspOld.SafePoint + response.OldSafePoint = sspOld.SafePoint + } + + // request.SafePoint smaller than safePointLowerBound, we have a safePointRollBack. + if request.SafePoint < safePointLowerBound { + response.Header = s.safePointRollbackHeader(request.SafePoint, safePointLowerBound) + response.Succeeded = false + return response, nil + } + + response.Succeeded = true + response.NewSafePoint = request.SafePoint + ssp := &endpoint.ServiceSafePoint{ + ServiceID: serviceID, + ExpiredAt: now.Unix() + request.TTL, + SafePoint: request.SafePoint, + } + // Handles overflow. + if math.MaxInt64-now.Unix() <= request.TTL { + ssp.ExpiredAt = math.MaxInt64 + } + + if sspOld == nil { + // Touch service revision to advance revision, for indicating that a new service safe point is added. + // Should be invoked before `SaveServiceSafePointByServiceGroup`, to avoid touch fail after new service safe point is saved. + if err := s.touchServiceRevisionByServiceGroup(serviceGroupID); err != nil { + return nil, err + } + } + + if err := storage.SaveServiceSafePointByServiceGroup(serviceGroupID, ssp); err != nil { + return nil, err + } + log.Info("update service safe point by service group", + zap.String("service-group-id", serviceGroupID), + zap.String("service-id", ssp.ServiceID), + zap.Int64("expire-at", ssp.ExpiredAt), + zap.Uint64("safepoint", ssp.SafePoint)) + return response, nil +} + +// GetAllServiceGroupGCSafePoints returns all service group's gc safe point. +func (s *GcServer) GetAllServiceGroupGCSafePoints(ctx context.Context, request *gcpb.GetAllServiceGroupGCSafePointsRequest) (*gcpb.GetAllServiceGroupGCSafePointsResponse, error) { + rc := s.GetRaftCluster() + if rc == nil { + return &gcpb.GetAllServiceGroupGCSafePointsResponse{Header: s.notBootstrappedHeader()}, nil + } + + var storage endpoint.GCSafePointStorage = s.storage + gcSafePoints, err := storage.LoadAllServiceGroupGCSafePoints() + if err != nil { + return nil, err + } + + safePoints := make([]*gcpb.ServiceGroupSafePoint, 0, len(gcSafePoints)) + for _, sp := range gcSafePoints { + safePoints = append(safePoints, &gcpb.ServiceGroupSafePoint{ + ServiceGroupId: []byte(sp.ServiceGroupID), + SafePoint: sp.SafePoint, + }) + } + return &gcpb.GetAllServiceGroupGCSafePointsResponse{ + Header: s.header(), + SafePoints: safePoints, + }, nil +} diff --git a/server/server.go b/server/server.go index 3c08ac9a514..ec2b5dc7539 100644 --- a/server/server.go +++ b/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/kvproto/pkg/gcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -146,8 +147,10 @@ type Server struct { // serviceSafePointLock is a lock for UpdateServiceGCSafePoint serviceSafePointLock syncutil.Mutex + // Lock for GC service group interfaces + gcServiceGroupLock syncutil.Mutex - // hot region history info storeage + // hot region history info storage hotRegionStorage *storage.HotRegionStorage // Store as map[string]*grpc.ClientConn clientConns sync.Map @@ -271,6 +274,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, serviceBuilders ...Ha } etcdCfg.ServiceRegister = func(gs *grpc.Server) { pdpb.RegisterPDServer(gs, &GrpcServer{Server: s}) + gcpb.RegisterGCServer(gs, &GcServer{Server: s}) diagnosticspb.RegisterDiagnosticsServer(gs, s) } s.etcdCfg = etcdCfg diff --git a/server/storage/endpoint/gc_safe_point.go b/server/storage/endpoint/gc_safe_point.go index e213eca4ed5..ffbfe9cdcd5 100644 --- a/server/storage/endpoint/gc_safe_point.go +++ b/server/storage/endpoint/gc_safe_point.go @@ -34,6 +34,12 @@ type ServiceSafePoint struct { SafePoint uint64 `json:"safe_point"` } +// ServiceGroupGCSafePoint is gcWorker's safepoint for specific service group +type ServiceGroupGCSafePoint struct { + ServiceGroupID string `json:"service_group_id"` + SafePoint uint64 `json:"safe_point"` +} + // GCSafePointStorage defines the storage operations on the GC safe point. type GCSafePointStorage interface { LoadGCSafePoint() (uint64, error) @@ -42,6 +48,17 @@ type GCSafePointStorage interface { LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error) SaveServiceGCSafePoint(ssp *ServiceSafePoint) error RemoveServiceGCSafePoint(serviceID string) error + + LoadAllServiceGroups() ([]string, error) + // Service safe point interfaces. + SaveServiceSafePointByServiceGroup(serviceGroupID string, ssp *ServiceSafePoint) error + LoadServiceSafePointByServiceGroup(serviceGroupID, serviceID string) (*ServiceSafePoint, error) + LoadMinServiceSafePointByServiceGroup(serviceGroupID string, now time.Time) (*ServiceSafePoint, error) + RemoveServiceSafePointByServiceGroup(serviceGroupID, serviceID string) error + // GC safe point interfaces. + SaveGCSafePointByServiceGroup(gcSafePoint *ServiceGroupGCSafePoint) error + LoadGCSafePointByServiceGroup(serviceGroupID string) (*ServiceGroupGCSafePoint, error) + LoadAllServiceGroupGCSafePoints() ([]*ServiceGroupGCSafePoint, error) } var _ GCSafePointStorage = (*StorageEndpoint)(nil) diff --git a/server/storage/endpoint/gc_service_group.go b/server/storage/endpoint/gc_service_group.go new file mode 100644 index 00000000000..57547c16224 --- /dev/null +++ b/server/storage/endpoint/gc_service_group.go @@ -0,0 +1,154 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "encoding/json" + "math" + "time" + + "github.com/pingcap/errors" + "go.etcd.io/etcd/clientv3" +) + +// Predefine service groups. More service groups would come from "Multi-tenant". +const ( + // ServiceGroupRawKVDefault is service group ID for RawKV. + ServiceGroupRawKVDefault = "default_rawkv" +) + +// LoadAllServiceGroups returns a list of all service group IDs. +// We have only predefine service groups by now. +// More service groups would come from "Multi-tenant". +func (se *StorageEndpoint) LoadAllServiceGroups() ([]string, error) { + serviceGroupIDs := []string{ + ServiceGroupRawKVDefault, + } + + return serviceGroupIDs, nil +} + +// SaveServiceSafePointByServiceGroup saves service safe point under given service group. +func (se *StorageEndpoint) SaveServiceSafePointByServiceGroup(serviceGroupID string, ssp *ServiceSafePoint) error { + if ssp.ServiceID == "" { + return errors.New("service id of service safepoint cannot be empty") + } + key := GCServiceSafePointPathByServiceGroup(serviceGroupID, ssp.ServiceID) + value, err := json.Marshal(ssp) + if err != nil { + return err + } + return se.Save(key, string(value)) +} + +// LoadServiceSafePointByServiceGroup reads ServiceSafePoint for the given service group and service name. +// Return nil if no safepoint not exist. +func (se *StorageEndpoint) LoadServiceSafePointByServiceGroup(serviceGroupID, serviceID string) (*ServiceSafePoint, error) { + value, err := se.Load(GCServiceSafePointPathByServiceGroup(serviceGroupID, serviceID)) + if err != nil || value == "" { + return nil, err + } + ssp := &ServiceSafePoint{} + if err := json.Unmarshal([]byte(value), ssp); err != nil { + return nil, err + } + return ssp, nil +} + +// LoadMinServiceSafePointByServiceGroup returns the minimum safepoint for the given service group. +// Note that gc worker safe point are store separately. +// If no service safe point exist for the given service group or all the service safe points just expired, return nil. +func (se *StorageEndpoint) LoadMinServiceSafePointByServiceGroup(serviceGroupID string, now time.Time) (*ServiceSafePoint, error) { + prefix := GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID) + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + + min := &ServiceSafePoint{SafePoint: math.MaxUint64} + for i, key := range keys { + ssp := &ServiceSafePoint{} + if err := json.Unmarshal([]byte(values[i]), ssp); err != nil { + return nil, err + } + + // remove expired safe points. + if ssp.ExpiredAt < now.Unix() { + se.Remove(key) + continue + } + + if ssp.SafePoint < min.SafePoint { + min = ssp + } + } + + if min.SafePoint == math.MaxUint64 { + // no service safe point or all of them are expired. + return nil, nil + } + + // successfully found a valid min safe point. + return min, nil +} + +// RemoveServiceSafePointByServiceGroup removes a service safe point. +func (se *StorageEndpoint) RemoveServiceSafePointByServiceGroup(serviceGroupID, serviceID string) error { + key := GCServiceSafePointPathByServiceGroup(serviceGroupID, serviceID) + return se.Remove(key) +} + +// SaveGCSafePointByServiceGroup saves GCSafePoint under given service group. +func (se *StorageEndpoint) SaveGCSafePointByServiceGroup(gcSafePoint *ServiceGroupGCSafePoint) error { + safePoint, err := json.Marshal(gcSafePoint) + if err != nil { + return err + } + return se.Save(gcSafePointPathByServiceGroup(gcSafePoint.ServiceGroupID), string(safePoint)) +} + +// LoadGCSafePointByServiceGroup reads GCSafePoint for the given service group. +// return nil if no safepoint not exist. +func (se *StorageEndpoint) LoadGCSafePointByServiceGroup(serviceGroupID string) (*ServiceGroupGCSafePoint, error) { + value, err := se.Load(gcSafePointPathByServiceGroup(serviceGroupID)) + if err != nil || value == "" { + return nil, err + } + gcSafePoint := &ServiceGroupGCSafePoint{} + if err := json.Unmarshal([]byte(value), gcSafePoint); err != nil { + return nil, err + } + return gcSafePoint, nil +} + +// LoadAllServiceGroupGCSafePoints returns two slices of ServiceGroupIDs and their corresponding safe points. +func (se *StorageEndpoint) LoadAllServiceGroupGCSafePoints() ([]*ServiceGroupGCSafePoint, error) { + prefix := gcServiceGroupGCSafePointPrefixPath() + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + _, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + safePoints := make([]*ServiceGroupGCSafePoint, 0, len(values)) + for _, value := range values { + gcSafePoint := &ServiceGroupGCSafePoint{} + if err := json.Unmarshal([]byte(value), gcSafePoint); err != nil { + return nil, err + } + safePoints = append(safePoints, gcSafePoint) + } + return safePoints, nil +} diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index 1f5e05601cf..d0b5edd144a 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -31,6 +31,9 @@ const ( customScheduleConfigPath = "scheduler_config" gcWorkerServiceSafePointID = "gc_worker" minResolvedTS = "min_resolved_ts" + + gcServiceGroupGCSafePointPath = "gc_servicegroup/gc_safepoint" + gcServiceGroupServiceSafePointPath = "gc_servicegroup/service_safepoint" ) // AppendToRootPath appends the given key to the rootPath. @@ -99,6 +102,30 @@ func gcSafePointServicePath(serviceID string) string { return path.Join(gcSafePointPath(), "service", serviceID) } +// gcSafePointPathByServiceGroup returns the path of the gc safe point of speicified service group. +// Path: /gc_servicegroup/gc_safepoint/$service_group_id +func gcSafePointPathByServiceGroup(serviceGroupID string) string { + return path.Join(gcServiceGroupGCSafePointPath, serviceGroupID) +} + +// GCServiceSafePointPrefixPathByServiceGroup returns the prefix path of the service safe point of speicified service group. +// Path: /gc_servicegroup/service_safepoint/$service_group_id +func GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID string) string { + return path.Join(gcServiceGroupServiceSafePointPath, serviceGroupID) + "/" +} + +// GCServiceSafePointPathByServiceGroup returns the path of a service's safe point of speicified service group. +// Path: /gc_servicegroup/service_safepoint/$service_group_id/$service_id +func GCServiceSafePointPathByServiceGroup(serviceGroupID, serviceID string) string { + return path.Join(GCServiceSafePointPrefixPathByServiceGroup(serviceGroupID), serviceID) +} + +// gcServiceGroupGCSafePointPrefixPath returns the prefix path of gc safe point for all service groups. +// Path: /gc_servicegroup/gc_safepoint/ +func gcServiceGroupGCSafePointPrefixPath() string { + return gcServiceGroupGCSafePointPath + "/" +} + // MinResolvedTSPath returns the min resolved ts path func MinResolvedTSPath() string { return path.Join(clusterPath, minResolvedTS) diff --git a/server/storage/kv/etcd_kv.go b/server/storage/kv/etcd_kv.go index e30b5f7b462..1b2eda439af 100644 --- a/server/storage/kv/etcd_kv.go +++ b/server/storage/kv/etcd_kv.go @@ -48,18 +48,23 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { } func (kv *etcdKVBase) Load(key string) (string, error) { + value, _, err := kv.LoadRevision(key) + return value, err +} + +func (kv *etcdKVBase) LoadRevision(key string) (string, int64, error) { key = path.Join(kv.rootPath, key) resp, err := etcdutil.EtcdKVGet(kv.client, key) if err != nil { - return "", err + return "", RevisionUnavailable, err } if n := len(resp.Kvs); n == 0 { - return "", nil + return "", RevisionUnavailable, nil } else if n > 1 { - return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs) + return "", RevisionUnavailable, errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs) } - return string(resp.Kvs[0].Value), nil + return string(resp.Kvs[0].Value), resp.Kvs[0].ModRevision, nil } func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) { diff --git a/server/storage/kv/kv.go b/server/storage/kv/kv.go index 2f1fa06e144..60ca030596b 100644 --- a/server/storage/kv/kv.go +++ b/server/storage/kv/kv.go @@ -14,9 +14,14 @@ package kv +// RevisionUnavailable is the value of unavaiable resivion, +// when the kv is not existed (etcd_kv), or not supported (mem_kv & leveldb_kv) +const RevisionUnavailable = -1 + // Base is an abstract interface for load/save pd cluster data. type Base interface { Load(key string) (string, error) + LoadRevision(key string) (string, int64, error) LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) Save(key, value string) error Remove(key string) error diff --git a/server/storage/kv/kv_test.go b/server/storage/kv/kv_test.go index 51c90e8a1d1..a3657732162 100644 --- a/server/storage/kv/kv_test.go +++ b/server/storage/kv/kv_test.go @@ -52,7 +52,7 @@ func (s *testKVSuite) TestEtcd(c *C) { rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) kv := NewEtcdKVBase(client, rootPath) - s.testReadWrite(c, kv) + s.testReadWrite(c, kv, true) s.testRange(c, kv) } @@ -63,30 +63,69 @@ func (s *testKVSuite) TestLevelDB(c *C) { kv, err := NewLevelDBKV(dir) c.Assert(err, IsNil) - s.testReadWrite(c, kv) + s.testReadWrite(c, kv, false) s.testRange(c, kv) } func (s *testKVSuite) TestMemKV(c *C) { kv := NewMemoryKV() - s.testReadWrite(c, kv) + s.testReadWrite(c, kv, false) s.testRange(c, kv) } -func (s *testKVSuite) testReadWrite(c *C, kv Base) { +func (s *testKVSuite) testReadWrite(c *C, kv Base, isEtcd bool) { + Rev := int64(-1) + nextRevision := func() { + if isEtcd { + if Rev == -1 { + Rev = 1 + } + Rev += 1 + } + } + v, err := kv.Load("key") c.Assert(err, IsNil) c.Assert(v, Equals, "") + + v, revision, err := kv.LoadRevision("key") + c.Assert(err, IsNil) + c.Assert(revision, Equals, int64(-1)) + c.Assert(v, Equals, "") + err = kv.Save("key", "value") c.Assert(err, IsNil) + nextRevision() + v, err = kv.Load("key") c.Assert(err, IsNil) c.Assert(v, Equals, "value") + + v, revision, err = kv.LoadRevision("key") + c.Assert(err, IsNil) + c.Assert(v, Equals, "value") + c.Assert(revision, Equals, Rev) + + err = kv.Save("key", "value1") + c.Assert(err, IsNil) + nextRevision() + v, revision, err = kv.LoadRevision("key") + c.Assert(err, IsNil) + c.Assert(v, Equals, "value1") + c.Assert(revision, Equals, Rev) + err = kv.Remove("key") c.Assert(err, IsNil) + v, err = kv.Load("key") c.Assert(err, IsNil) c.Assert(v, Equals, "") + + v, revision, err = kv.LoadRevision("key") + c.Assert(err, IsNil) + c.Assert(revision, Equals, int64(-1)) + c.Assert(v, Equals, "") + err = kv.Remove("key") c.Assert(err, IsNil) } diff --git a/server/storage/kv/levedb_kv.go b/server/storage/kv/levedb_kv.go index 7f134709bd1..38c086e71ae 100644 --- a/server/storage/kv/levedb_kv.go +++ b/server/storage/kv/levedb_kv.go @@ -49,6 +49,12 @@ func (kv *LevelDBKV) Load(key string) (string, error) { return string(v), err } +// LoadRevision gets a value along with revision. The revision is unavailable for `LevelDBKV`. +func (kv *LevelDBKV) LoadRevision(key string) (string, int64, error) { + value, err := kv.Load(key) + return value, RevisionUnavailable, err +} + // LoadRange gets a range of value for a given key range. func (kv *LevelDBKV) LoadRange(startKey, endKey string, limit int) ([]string, []string, error) { iter := kv.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil) diff --git a/server/storage/kv/mem_kv.go b/server/storage/kv/mem_kv.go index b74cab84b11..826db5819b1 100644 --- a/server/storage/kv/mem_kv.go +++ b/server/storage/kv/mem_kv.go @@ -51,6 +51,11 @@ func (kv *memoryKV) Load(key string) (string, error) { return item.(memoryKVItem).value, nil } +func (kv *memoryKV) LoadRevision(key string) (string, int64, error) { + value, err := kv.Load(key) + return value, RevisionUnavailable, err +} + func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string, error) { failpoint.Inject("withRangeLimit", func(val failpoint.Value) { rangeLimit, ok := val.(int) diff --git a/server/storage/storage_gc_test.go b/server/storage/storage_gc_test.go new file mode 100644 index 00000000000..916bbb387e7 --- /dev/null +++ b/server/storage/storage_gc_test.go @@ -0,0 +1,197 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "encoding/json" + "math" + "time" + + . "github.com/pingcap/check" + "github.com/tikv/pd/server/storage/endpoint" +) + +var _ = Suite(&testStorageFopGCSuite{}) + +type testStorageFopGCSuite struct { +} + +func testGCSafePoints() []*endpoint.ServiceGroupGCSafePoint { + return []*endpoint.ServiceGroupGCSafePoint{ + { + ServiceGroupID: "testServiceGroup1", + SafePoint: 0, + }, + { + ServiceGroupID: "testServiceGroup2", + SafePoint: 1, + }, + { + ServiceGroupID: "testServiceGroup3", + SafePoint: 4396, + }, + { + ServiceGroupID: "testServiceGroup4", + SafePoint: 23333333333, + }, + { + ServiceGroupID: "testServiceGroup5", + SafePoint: math.MaxUint64, + }, + } +} + +func (s *testStorageFopGCSuite) TestLoadGCWorkerSafePoint(c *C) { + storage := NewStorageWithMemoryBackend() + testData := testGCSafePoints() + r, e := storage.LoadGCSafePointByServiceGroup("testServiceGroup") + c.Assert(r, IsNil) + c.Assert(e, IsNil) + for _, safePoint := range testData { + err := storage.SaveGCSafePointByServiceGroup(safePoint) + c.Assert(err, IsNil) + loaded, err := storage.LoadGCSafePointByServiceGroup(safePoint.ServiceGroupID) + c.Assert(err, IsNil) + c.Assert(safePoint, DeepEquals, loaded) + } +} + +func (s *testStorageFopGCSuite) TestLoadAllServiceGroupGCSafePoints(c *C) { + storage := NewStorageWithMemoryBackend() + testData := testGCSafePoints() + for _, safePoint := range testData { + err := storage.SaveGCSafePointByServiceGroup(safePoint) + c.Assert(err, IsNil) + } + safePoints, err := storage.LoadAllServiceGroupGCSafePoints() + c.Assert(err, IsNil) + for i, safePoint := range testData { + c.Assert(safePoints[i], DeepEquals, safePoint) + } +} + +func (s *testStorageFopGCSuite) TestLoadAllServiceGroup(c *C) { + storage := NewStorageWithMemoryBackend() + serviceGroups, err := storage.LoadAllServiceGroups() + c.Assert(err, IsNil) + c.Assert(serviceGroups, DeepEquals, []string{endpoint.ServiceGroupRawKVDefault}) +} + +func (s *testStorageFopGCSuite) TestLoadServiceSafePointByServiceGroup(c *C) { + storage := NewStorageWithMemoryBackend() + expireAt := time.Now().Add(100 * time.Second).Unix() + serviceSafePoints := []*endpoint.ServiceSafePoint{ + {ServiceID: "1", ExpiredAt: expireAt, SafePoint: 1}, + {ServiceID: "2", ExpiredAt: expireAt, SafePoint: 2}, + {ServiceID: "3", ExpiredAt: expireAt, SafePoint: 3}, + } + serviceGroups := []string{ + "serviceGroup1", + "serviceGroup2", + "serviceGroup3", + } + + for _, serviceGroup := range serviceGroups { + for _, serviceSafePoint := range serviceSafePoints { + c.Assert(storage.SaveServiceSafePointByServiceGroup(serviceGroup, serviceSafePoint), IsNil) + } + } + for _, serviceGroup := range serviceGroups { + for _, serviceSafePoint := range serviceSafePoints { + key := endpoint.GCServiceSafePointPathByServiceGroup(serviceGroup, serviceSafePoint.ServiceID) + value, err := storage.Load(key) + c.Assert(err, IsNil) + ssp := &endpoint.ServiceSafePoint{} + c.Assert(json.Unmarshal([]byte(value), ssp), IsNil) + c.Assert(ssp, DeepEquals, serviceSafePoint) + } + } +} + +func (s *testStorageFopGCSuite) TestRemoveServiceSafePointByServiceGroup(c *C) { + storage := NewStorageWithMemoryBackend() + expireAt := time.Now().Add(100 * time.Second).Unix() + + serviceSafePoints := []*endpoint.ServiceSafePoint{ + {ServiceID: "1", ExpiredAt: expireAt, SafePoint: 1}, + {ServiceID: "2", ExpiredAt: expireAt, SafePoint: 2}, + {ServiceID: "3", ExpiredAt: expireAt, SafePoint: 3}, + } + serviceGroups := []string{ + "serviceGroup1", + "serviceGroup2", + "serviceGroup3", + } + // save service safe points + for _, serviceGroup := range serviceGroups { + for _, serviceSafePoint := range serviceSafePoints { + c.Assert(storage.SaveServiceSafePointByServiceGroup(serviceGroup, serviceSafePoint), IsNil) + } + } + + // remove service safe points + for _, serviceGroup := range serviceGroups { + for _, serviceSafePoint := range serviceSafePoints { + c.Assert(storage.RemoveServiceSafePointByServiceGroup(serviceGroup, serviceSafePoint.ServiceID), IsNil) + } + } + + // check that service safe points are empty + for _, serviceGroup := range serviceGroups { + for _, serviceSafePoint := range serviceSafePoints { + safepoint, err := storage.LoadServiceSafePointByServiceGroup(serviceGroup, serviceSafePoint.ServiceID) + c.Assert(err, IsNil) + c.Assert(safepoint, IsNil) + } + } +} + +func (s *testStorageFopGCSuite) TestLoadMinServiceSafePointByServiceGroup(c *C) { + storage := NewStorageWithMemoryBackend() + currentTime := time.Now() + expireAt1 := currentTime.Add(100 * time.Second).Unix() + expireAt2 := currentTime.Add(200 * time.Second).Unix() + expireAt3 := currentTime.Add(300 * time.Second).Unix() + + serviceSafePoints := []*endpoint.ServiceSafePoint{ + {ServiceID: "1", ExpiredAt: expireAt1, SafePoint: 100}, + {ServiceID: "2", ExpiredAt: expireAt2, SafePoint: 200}, + {ServiceID: "3", ExpiredAt: expireAt3, SafePoint: 300}, + } + + for _, serviceSafePoint := range serviceSafePoints { + c.Assert(storage.SaveServiceSafePointByServiceGroup("testServiceGroup1", serviceSafePoint), IsNil) + } + minSafePoint, err := storage.LoadMinServiceSafePointByServiceGroup("testServiceGroup1", currentTime) + c.Assert(err, IsNil) + c.Assert(minSafePoint, DeepEquals, serviceSafePoints[0]) + + // this should remove safePoint with ServiceID 1 due to expiration + // and find the safePoint with ServiceID 2 + minSafePoint2, err := storage.LoadMinServiceSafePointByServiceGroup("testServiceGroup1", currentTime.Add(150*time.Second)) + c.Assert(err, IsNil) + c.Assert(minSafePoint2, DeepEquals, serviceSafePoints[1]) + + // verify that one with ServiceID 1 has been removed + ssp, err := storage.LoadServiceSafePointByServiceGroup("testServiceGroup1", "1") + c.Assert(err, IsNil) + c.Assert(ssp, IsNil) + + // this should remove all service safe points + // and return nil + ssp, err = storage.LoadMinServiceSafePointByServiceGroup("testServiceGroup1", currentTime.Add(500*time.Second)) + c.Assert(err, IsNil) + c.Assert(ssp, IsNil) +} diff --git a/tests/client/go.mod b/tests/client/go.mod index 1fc23575b2a..dc3a0512eb0 100644 --- a/tests/client/go.mod +++ b/tests/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 + github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 diff --git a/tests/client/go.sum b/tests/client/go.sum index 1e4ba31add1..049da2ca0a4 100644 --- a/tests/client/go.sum +++ b/tests/client/go.sum @@ -408,8 +408,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e h1:iquj/SVNullS8+llCooL3Pk2DWQPW/HDDpF1EHwsnq0= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/tests/cluster.go b/tests/cluster.go index 2061668f393..326eec7b2a6 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -64,6 +64,7 @@ type TestServer struct { sync.RWMutex server *server.Server grpcServer *server.GrpcServer + gcServer *server.GcServer state int32 } @@ -91,6 +92,7 @@ func NewTestServer(ctx context.Context, cfg *config.Config) (*TestServer, error) return &TestServer{ server: svr, grpcServer: &server.GrpcServer{Server: svr}, + gcServer: &server.GcServer{Server: svr}, state: Initial, }, nil } @@ -358,6 +360,13 @@ func (s *TestServer) GetStoreRegions(storeID uint64) []*core.RegionInfo { return s.server.GetRaftCluster().GetStoreRegions(storeID) } +// GetGCService returns the gc service. +func (s *TestServer) GetGCService() *server.GcServer { + s.RLock() + defer s.RUnlock() + return s.gcServer +} + // BootstrapCluster is used to bootstrap the cluster. func (s *TestServer) BootstrapCluster() error { bootstrapReq := &pdpb.BootstrapRequest{ diff --git a/tests/server/gc/gc_test.go b/tests/server/gc/gc_test.go new file mode 100644 index 00000000000..40ab8133367 --- /dev/null +++ b/tests/server/gc/gc_test.go @@ -0,0 +1,363 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gc_test + +import ( + "context" + "fmt" + "math" + "sync" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/gcpb" + "github.com/tikv/pd/pkg/testutil" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/storage/endpoint" + "github.com/tikv/pd/tests" + "go.uber.org/goleak" +) + +func Test(t *testing.T) { + TestingT(t) +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, testutil.LeakOptions...) +} + +var _ = Suite(&testGCSuite{}) + +func newRequestHeader(clusterID uint64) *gcpb.RequestHeader { + return &gcpb.RequestHeader{ + ClusterId: clusterID, + } +} + +type testGCSuite struct { + ctx context.Context + cancel context.CancelFunc +} + +func (s *testGCSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + server.EnableZap = true +} + +func (s *testGCSuite) TearDownSuite(c *C) { + s.cancel() +} + +func (s *testGCSuite) mustNewGCService(c *C) (addr string, cluster *tests.TestCluster, clusterID uint64) { + var err error + cluster, err = tests.NewTestCluster(s.ctx, 1) + c.Assert(err, IsNil) + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + leader := cluster.GetServer(cluster.GetLeader()) + c.Assert(leader.BootstrapCluster(), IsNil) + + clusterID = leader.GetClusterID() + addr = leader.GetAddr() + return +} + +type testClient struct { + cli gcpb.GCClient + clusterID uint64 + c *C + ctx context.Context +} + +func (c *testClient) mustGetAllServiceGroups() [][]byte { + req := &gcpb.GetAllServiceGroupsRequest{ + Header: newRequestHeader(c.clusterID), + } + resp, err := c.cli.GetAllServiceGroups(c.ctx, req) + c.c.Assert(err, IsNil) + return resp.ServiceGroupId +} + +func (c *testClient) mustUpdateServiceSafePoint(serviceGroupID []byte, serviceID []byte, ttl int64, safepoint uint64) *gcpb.UpdateServiceSafePointByServiceGroupResponse { + req := &gcpb.UpdateServiceSafePointByServiceGroupRequest{ + Header: newRequestHeader(c.clusterID), + ServiceGroupId: serviceGroupID, + ServiceId: serviceID, + TTL: ttl, + SafePoint: safepoint, + } + resp, err := c.cli.UpdateServiceSafePointByServiceGroup(c.ctx, req) + c.c.Assert(err, IsNil) + return resp +} + +func (c *testClient) mustGetMinServiceSafePoint(serviceGroupID []byte) (safepoint uint64, revision int64) { + req := &gcpb.GetMinServiceSafePointByServiceGroupRequest{ + Header: newRequestHeader(c.clusterID), + ServiceGroupId: serviceGroupID, + } + resp, err := c.cli.GetMinServiceSafePointByServiceGroup(c.ctx, req) + c.c.Assert(err, IsNil) + return resp.GetSafePoint(), resp.GetRevision() +} + +func (c *testClient) mustUpdateGCSafePoint(serviceGroupID []byte, safepoint uint64, revision int64) *gcpb.UpdateGCSafePointByServiceGroupResponse { + req := &gcpb.UpdateGCSafePointByServiceGroupRequest{ + Header: newRequestHeader(c.clusterID), + ServiceGroupId: serviceGroupID, + SafePoint: safepoint, + Revision: revision, + } + resp, err := c.cli.UpdateGCSafePointByServiceGroup(c.ctx, req) + c.c.Assert(err, IsNil) + return resp +} + +func (c *testClient) mustGetAllGCSafePoint() []*gcpb.ServiceGroupSafePoint { + req := &gcpb.GetAllServiceGroupGCSafePointsRequest{ + Header: newRequestHeader(c.clusterID), + } + resp, err := c.cli.GetAllServiceGroupGCSafePoints(c.ctx, req) + c.c.Assert(err, IsNil) + return resp.GetSafePoints() +} + +func (s *testGCSuite) TestGCService(c *C) { + addr, cluster, clusterID := s.mustNewGCService(c) + defer cluster.Destroy() + + client := testClient{ + cli: testutil.MustNewGCClient(c, addr), + clusterID: clusterID, + c: c, + ctx: s.ctx, + } + + serviceGroupRawKV := []byte(endpoint.ServiceGroupRawKVDefault) + serviceGroupTxnKV := []byte("default_txnkv") + serviceID1 := []byte("svc1") + serviceID2 := []byte("svc2") + + c.Assert(client.mustGetAllServiceGroups(), DeepEquals, [][]byte{serviceGroupRawKV}) + + // Update service safe point + { + resp := client.mustUpdateServiceSafePoint(serviceGroupRawKV, serviceID1, math.MaxInt64, 100) + expected := &gcpb.UpdateServiceSafePointByServiceGroupResponse{ + Header: resp.GetHeader(), + Succeeded: true, + GcSafePoint: 0, + OldSafePoint: 0, + NewSafePoint: 100, + } + c.Assert(resp, DeepEquals, expected) + + // Safe point roll back + resp = client.mustUpdateServiceSafePoint(serviceGroupRawKV, serviceID1, math.MaxInt64, 99) + c.Assert(resp.GetHeader().GetError().GetType(), Equals, gcpb.ErrorType_SAFEPOINT_ROLLBACK) + c.Assert(resp.GetSucceeded(), IsFalse) + } + // now: svc1: 100 + + // Update GC safe point with revision mismatch + { + safepoint, revision := client.mustGetMinServiceSafePoint(serviceGroupRawKV) + c.Assert(safepoint, Equals, uint64(100)) + // c.Assert(revision, Equals, ?): Revision value is not stable. Don't check it. + + // Add a new service safe point + respSvc := client.mustUpdateServiceSafePoint(serviceGroupRawKV, serviceID2, math.MaxInt64, 50) + expected := &gcpb.UpdateServiceSafePointByServiceGroupResponse{ + Header: respSvc.GetHeader(), + Succeeded: true, + GcSafePoint: 0, + OldSafePoint: 0, + NewSafePoint: 50, + } + c.Assert(respSvc, DeepEquals, expected) + + // Revision mismatch + respUpdate := client.mustUpdateGCSafePoint(serviceGroupRawKV, 100, revision) + c.Assert(respUpdate.Succeeded, IsFalse) + c.Assert(respUpdate.GetHeader().GetError().GetType(), Equals, gcpb.ErrorType_REVISION_MISMATCH) + } + // now: svc1: 100, svc2: 50 + + // Retry update GC safe point + { + safepoint, revision := client.mustGetMinServiceSafePoint(serviceGroupRawKV) + c.Assert(safepoint, Equals, uint64(50)) + + respSvc := client.mustUpdateServiceSafePoint(serviceGroupRawKV, serviceID2, math.MaxInt64, 80) + expected := &gcpb.UpdateServiceSafePointByServiceGroupResponse{ + Header: respSvc.GetHeader(), + Succeeded: true, + GcSafePoint: 0, + OldSafePoint: 50, + NewSafePoint: 80, + } + c.Assert(respSvc, DeepEquals, expected) + + respUpdate := client.mustUpdateGCSafePoint(serviceGroupRawKV, 50, revision) + c.Assert(respUpdate.Succeeded, IsTrue) + c.Assert(respUpdate.GetNewSafePoint(), Equals, uint64(50)) + + // GC safe point roll back + respUpdate = client.mustUpdateGCSafePoint(serviceGroupRawKV, 49, revision) + c.Assert(respUpdate.Succeeded, IsFalse) + c.Assert(respUpdate.GetHeader().GetError().GetType(), Equals, gcpb.ErrorType_SAFEPOINT_ROLLBACK) + } + // now: svc1: 100, svc2: 80, gc: 50 + + // Remove svc2 + { + respSvc := client.mustUpdateServiceSafePoint(serviceGroupRawKV, serviceID2, 0, 0) + expected := &gcpb.UpdateServiceSafePointByServiceGroupResponse{ + Header: respSvc.GetHeader(), + Succeeded: true, + } + c.Assert(respSvc, DeepEquals, expected) + + safepoint, _ := client.mustGetMinServiceSafePoint(serviceGroupRawKV) + c.Assert(safepoint, Equals, uint64(100)) + } + // now: svc1: 100, gc: 50 + + // Add svc2 with safe point roll back + { + respSvc := client.mustUpdateServiceSafePoint(serviceGroupRawKV, serviceID2, math.MaxInt64, 49) + c.Assert(respSvc.Succeeded, IsFalse) + c.Assert(respSvc.GetHeader().GetError().GetType(), Equals, gcpb.ErrorType_SAFEPOINT_ROLLBACK) + } + + // Another service group with no service safe point + { + safepoint, revision := client.mustGetMinServiceSafePoint(serviceGroupTxnKV) + c.Assert(safepoint, Equals, uint64(0)) + c.Assert(revision, Equals, int64(-1)) + + respUpdate := client.mustUpdateGCSafePoint(serviceGroupTxnKV, 100, -1) + c.Assert(respUpdate.Succeeded, IsTrue) + c.Assert(respUpdate.GetNewSafePoint(), Equals, uint64(100)) + } + + // Get all service group GC safe points + { + safepoints := client.mustGetAllGCSafePoint() + expected := []*gcpb.ServiceGroupSafePoint{ + {ServiceGroupId: serviceGroupRawKV, SafePoint: 50}, + {ServiceGroupId: serviceGroupTxnKV, SafePoint: 100}, + } + c.Assert(safepoints, DeepEquals, expected) + } +} + +func (s *testGCSuite) TestConcurrency(c *C) { + count := 500 + concurrency := 10 + + addr, cluster, clusterID := s.mustNewGCService(c) + defer cluster.Destroy() + + newClient := func() testClient { + return testClient{ + cli: testutil.MustNewGCClient(c, addr), + clusterID: clusterID, + c: c, + ctx: s.ctx, + } + } + + serviceGroupID := []byte(endpoint.ServiceGroupRawKVDefault) + closeCh := make(chan struct{}) + + { // Initialize GC safe point to make sure that tikvThread will get a valid safe point. + client := newClient() + client.mustUpdateGCSafePoint(serviceGroupID, 0, -1) + } + + gcWorkerThread := func() { + client := newClient() + for { + safepoint, revision := client.mustGetMinServiceSafePoint(serviceGroupID) + if safepoint == 0 { + continue + } + + client.mustUpdateGCSafePoint(serviceGroupID, safepoint, revision) + + select { + case <-closeCh: + return + default: + } + } + } + + svcThread := func(svcName string) { + client := newClient() + for i := 1; i <= count; i++ { + client.mustUpdateServiceSafePoint(serviceGroupID, []byte(svcName), math.MaxInt64, uint64(i*10)) + } + } + + tikvThread := func() { + client := newClient() + for { + safepoints := client.mustGetAllGCSafePoint() + c.Assert(len(safepoints), Equals, 1) + gcSafePoint := safepoints[0].SafePoint + + svcSafePoint, _ := client.mustGetMinServiceSafePoint(serviceGroupID) + c.Assert(gcSafePoint <= svcSafePoint, IsTrue) + + select { + case <-closeCh: + return + default: + } + } + } + + wgSvc := sync.WaitGroup{} + wgGc := sync.WaitGroup{} + + for i := 0; i < concurrency; i++ { + i := i + wgSvc.Add(1) + go func() { + defer wgSvc.Done() + svcThread(fmt.Sprintf("svc_%v", i)) + }() + } + + wgGc.Add(1) + go func() { + defer wgGc.Done() + gcWorkerThread() + }() + + wgGc.Add(1) + go func() { + defer wgGc.Done() + tikvThread() + }() + + wgSvc.Wait() + close(closeCh) + wgGc.Wait() +} diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index 1e0eaf8b258..4a6f5f8c38d 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -108,8 +108,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748 h1:i4MBe1zGq9/r3BH6rTRunizi4T59fpNk8hvBCrB5UAY= -github.com/pingcap/kvproto v0.0.0-20220330070404-8c4cd3f93748/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e h1:iquj/SVNullS8+llCooL3Pk2DWQPW/HDDpF1EHwsnq0= +github.com/pingcap/kvproto v0.0.0-20220506032820-55094d91343e/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=