Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Auto fix gc_worker's service safepoint for upgraded clusters #3371

Merged
91 changes: 78 additions & 13 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/encryption"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/encryptionkm"
"github.com/tikv/pd/server/kv"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -501,10 +503,46 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error {
return s.Remove(key)
}

func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) {
// fixGCWorkerServiceSafepoint tries to fix gc_worker's special service safepoint if it's missing or incorrect. An issue
// in the older version may have invalid TTL for gc_worker's safepoint, and it also might be missing. gc_worker's
// safepoint may also be missing when the cluster is just bootstrapped. Detect these cases and fix gc_worker's safepoint
// if necessary.
func (s *Storage) fixGCWorkerServiceSafePpoint(allServiceSafePoints []*ServiceSafePoint) (modified bool, err error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (s *Storage) fixGCWorkerServiceSafePpoint(allServiceSafePoints []*ServiceSafePoint) (modified bool, err error) {
func (s *Storage) fixGCWorkerServiceSafePoint(allServiceSafePoints []*ServiceSafePoint) (modified bool, err error) {

if len(allServiceSafePoints) == 0 {
// It's a new cluster, or everything is lost so we have no way to recover it. Initialize gc_worker's service
// safepoint to zero.
_, err = s.initServiceGCSafePointForGCWorker(0)
return true, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the initialization is wrong, we still return true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but I thought err will be not nil here so it doesn't matter 🤔 maybe it doesn't look good. I'll change it.

}

var min uint64 = math.MaxUint64
for _, ssp := range allServiceSafePoints {
if ssp.ServiceID == gcWorkerServiceSafePointID {
if ssp.ExpiredAt != math.MaxInt64 {
// gc_worker's TTL is incorrectly set. Set it to MaxInt64.
log.Info("gc_worker's service safepoint has invalid TTL, fixing it",
zap.Uint64("safepoint", ssp.SafePoint), zap.Int64("expiredAt", ssp.ExpiredAt))
ssp.ExpiredAt = math.MaxInt64
err = s.SaveServiceGCSafePoint(ssp)
return true, err
}
// gc_worker's service safepoint exists normally.
return false, nil
}
if ssp.SafePoint < min {
min = ssp.SafePoint
}
}

// gc_worker is missing.
_, err = s.initServiceGCSafePointForGCWorker(min)
return true, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}

func (s *Storage) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) {
ssp := &ServiceSafePoint{
ServiceID: gcWorkerServiceSafePointID,
SafePoint: 0,
SafePoint: initialValue,
ExpiredAt: math.MaxInt64,
}
if err := s.SaveServiceGCSafePoint(ssp); err != nil {
Expand All @@ -515,25 +553,52 @@ func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error)

// LoadMinServiceGCSafePoint returns the minimum safepoint across all services
func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, error) {
prefix := path.Join(gcPath, "safe_point", "service") + "/"
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := s.LoadRange(prefix, prefixEnd, 0)
loadAll := func() ([]string, []*ServiceSafePoint, error) {
prefix := path.Join(gcPath, "safe_point", "service") + "/"
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := s.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, nil, err
}

allServiceSafePoints := make([]*ServiceSafePoint, 0, len(values))
for _, value := range values {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(value), ssp); err != nil {
return nil, nil, err
}
allServiceSafePoints = append(allServiceSafePoints, ssp)
}
return keys, allServiceSafePoints, nil
}

keys, allServiceSafePoints, err := loadAll()
if err != nil {
return nil, err
}
if len(keys) == 0 {
// There's no service safepoint. Store an initial value for GC worker.
return s.initServiceGCSafePointForGCWorker()

modified, err := s.fixGCWorkerServiceSafePpoint(allServiceSafePoints)
if err != nil {
return nil, err
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
if modified {
// Reload the safepoints
keys, allServiceSafePoints, err = loadAll()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is just modifying the gcWorkerServiceSafePointID key okay?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's more, it sames that loadAll() will not affect the result of min... I think we can remove this logic and modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user needs the minimum value among all non-expired safepoints, but I want to fix it with the minimum value including expired-but-not-deleted ones. So the final min value may decrease. It's possible to avoid reloading and do all these things in one loop, but trying to make the code more readable, I choose the less-effective way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if err != nil {
return nil, err
}
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i, key := range keys {
ssp := allServiceSafePoints[i]
if ssp.ExpiredAt < now.Unix() {
s.Remove(key)
err = s.Remove(key)
if err != nil {
log.Warn("failed to remove expired service safepoint",
zap.String("id", key), zap.Uint64("safepoint", ssp.SafePoint), zap.Error(err))
}
continue
}
if ssp.SafePoint < min.SafePoint {
Expand Down
14 changes: 14 additions & 0 deletions server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,22 @@ func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) {
c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil)
}

// gc_worker's safepoint will be automatically inserted when loading service safepoints.
ssp, err := storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.ServiceID, Equals, "gc_worker")
c.Assert(ssp.ExpiredAt, Equals, int64(math.MaxInt64))
c.Assert(ssp.SafePoint, Equals, uint64(1))

// Advance gc_worker's safepoint
c.Assert(storage.SaveServiceGCSafePoint(&ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: math.MaxInt64,
SafePoint: 10,
}), IsNil)

ssp, err = storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.ServiceID, Equals, "2")
c.Assert(ssp.ExpiredAt, Equals, expireAt)
c.Assert(ssp.SafePoint, Equals, uint64(2))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
ExpiredAt: now.Unix() + request.TTL,
SafePoint: request.SafePoint,
}
if request.TTL == math.MaxInt64 {
if math.MaxInt64-now.Unix() <= request.TTL {
ssp.ExpiredAt = math.MaxInt64
}
if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil {
Expand Down
51 changes: 51 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"path"
"path/filepath"
"sort"
"strconv"
Expand Down Expand Up @@ -911,6 +912,56 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"", 1000, 15)
c.Assert(err, NotNil)

// Put some other safepoints to test fixing gc_worker's safepoint when there exists other safepoints.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 11)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"b", 1000, 12)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"c", 1000, 13)
c.Assert(err, IsNil)

// Force set invalid ttl to gc_worker
gcWorkerKey := path.Join("gc", "safe_point", "service", "gc_worker")
{
gcWorkerSsp := &core.ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: -12345,
SafePoint: 10,
}
value, err := json.Marshal(gcWorkerSsp)
c.Assert(err, IsNil)
err = s.srv.GetStorage().Save(gcWorkerKey, string(value))
c.Assert(err, IsNil)
}

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(10))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))

// Force delete gc_worker, then the min service safepoint is 11 of "a".
err = s.srv.GetStorage().Remove(gcWorkerKey)
c.Assert(err, IsNil)
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.SafePoint, Equals, uint64(11))
// After calling LoadMinServiceGCS when "gc_worker"'s service safepoint is missing, "gc_worker"'s service safepoint
// will be newly created.
// Increase "a" so that "gc_worker" is the only minimum that will be returned by LoadMinServiceGCSafePoint.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 14)
c.Assert(err, IsNil)

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(11))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))
}

func (s *testClientSuite) TestScatterRegion(c *C) {
Expand Down