Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Auto fix gc_worker's service safepoint for upgraded clusters #3371

Merged
36 changes: 32 additions & 4 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/encryptionkm"
Expand Down Expand Up @@ -501,10 +502,10 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error {
return s.Remove(key)
}

func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) {
func (s *Storage) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) {
ssp := &ServiceSafePoint{
ServiceID: gcWorkerServiceSafePointID,
SafePoint: 0,
SafePoint: initialValue,
ExpiredAt: math.MaxInt64,
}
if err := s.SaveServiceGCSafePoint(ssp); err != nil {
Expand All @@ -522,16 +523,31 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e
return nil, err
}
if len(keys) == 0 {
// There's no service safepoint. Store an initial value for GC worker.
return s.initServiceGCSafePointForGCWorker()
// There's no service safepoint. It may be a new cluster, or upgraded from an older version where all service
// safepoints are missing. For the second case, we have no way to recover it. Store an initial value 0 for
// gc_worker.
return s.initServiceGCSafePointForGCWorker(0)
}

hasGCWorker := false
min := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.ServiceID == gcWorkerServiceSafePointID {
hasGCWorker = true
// If gc_worker's expire time is incorrectly set, fix it.
if ssp.ExpiredAt != math.MaxInt64 {
ssp.ExpiredAt = math.MaxInt64
err = s.SaveServiceGCSafePoint(ssp)
if err != nil {
return nil, errors.Trace(err)
}
}
}

if ssp.ExpiredAt < now.Unix() {
s.Remove(key)
continue
Expand All @@ -541,6 +557,18 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e
}
}

if min.SafePoint == math.MaxUint64 {
// There's no valid safepoints and we have no way to recover it. Just set gc_worker to 0.
log.Info("there are no valid service safepoints. init gc_worker's service safepoint to 0")
return s.initServiceGCSafePointForGCWorker(0)
}

if !hasGCWorker {
// If there exists some service safepoints but gc_worker is missing, init it with the min value among all
// safepoints (including expired ones)
return s.initServiceGCSafePointForGCWorker(min.SafePoint)
}

return min, nil
}

Expand Down
13 changes: 13 additions & 0 deletions server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,21 @@ func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) {
c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil)
}

// gc_worker's safepoint will be automatically inserted when loading service safepoints. Here the returned
// safepoint can be either of "gc_worker" or "2".
ssp, err := storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.SafePoint, Equals, uint64(2))

// Advance gc_worker's safepoint
c.Assert(storage.SaveServiceGCSafePoint(&ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: math.MaxInt64,
SafePoint: 10,
}), IsNil)

ssp, err = storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.ServiceID, Equals, "2")
c.Assert(ssp.ExpiredAt, Equals, expireAt)
c.Assert(ssp.SafePoint, Equals, uint64(2))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
ExpiredAt: now.Unix() + request.TTL,
SafePoint: request.SafePoint,
}
if request.TTL == math.MaxInt64 {
if math.MaxInt64-now.Unix() <= request.TTL {
ssp.ExpiredAt = math.MaxInt64
}
if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil {
Expand Down
51 changes: 51 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"path"
"path/filepath"
"sort"
"strconv"
Expand Down Expand Up @@ -918,6 +919,56 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"", 1000, 15)
c.Assert(err, NotNil)

// Put some other safepoints to test fixing gc_worker's safepoint when there exists other safepoints.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 11)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"b", 1000, 12)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"c", 1000, 13)
c.Assert(err, IsNil)

// Force set invalid ttl to gc_worker
gcWorkerKey := path.Join("gc", "safe_point", "service", "gc_worker")
{
gcWorkerSsp := &core.ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: -12345,
SafePoint: 10,
}
value, err := json.Marshal(gcWorkerSsp)
c.Assert(err, IsNil)
err = s.srv.GetStorage().Save(gcWorkerKey, string(value))
c.Assert(err, IsNil)
}

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(10))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))

// Force delete gc_worker, then the min service safepoint is 11 of "a".
err = s.srv.GetStorage().Remove(gcWorkerKey)
c.Assert(err, IsNil)
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.SafePoint, Equals, uint64(11))
// After calling LoadMinServiceGCS when "gc_worker"'s service safepoint is missing, "gc_worker"'s service safepoint
// will be newly created.
// Increase "a" so that "gc_worker" is the only minimum that will be returned by LoadMinServiceGCSafePoint.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 14)
c.Assert(err, IsNil)

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(11))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))
}

func (s *testClientSuite) TestScatterRegion(c *C) {
Expand Down