Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: Auto fix gc_worker's service safepoint for upgraded clusters #3371

Merged
38 changes: 31 additions & 7 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,10 @@ func (s *Storage) RemoveServiceGCSafePoint(serviceID string) error {
return s.Remove(key)
}

func (s *Storage) initServiceGCSafePointForGCWorker() (*ServiceSafePoint, error) {
func (s *Storage) initServiceGCSafePointForGCWorker(initialValue uint64) (*ServiceSafePoint, error) {
ssp := &ServiceSafePoint{
ServiceID: gcWorkerServiceSafePointID,
SafePoint: 0,
SafePoint: initialValue,
ExpiredAt: math.MaxInt64,
}
if err := s.SaveServiceGCSafePoint(ssp); err != nil {
Expand All @@ -523,25 +523,49 @@ func (s *Storage) LoadMinServiceGCSafePoint(now time.Time) (*ServiceSafePoint, e
}
if len(keys) == 0 {
// There's no service safepoint. Store an initial value for GC worker.
return s.initServiceGCSafePointForGCWorker()
return s.initServiceGCSafePointForGCWorker(0)
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
var min uint64 = math.MaxUint64
hasGCWorker := false
validMin := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}
if ssp.ServiceID == gcWorkerServiceSafePointID {
hasGCWorker = true
// If gc_worker's expire time is incorrectly set, fix it.
if ssp.ExpiredAt != math.MaxInt64 {
ssp.ExpiredAt = math.MaxInt64
err = s.SaveServiceGCSafePoint(ssp)
if err != nil {
return nil, errors.Trace(err)
}
}
}

if ssp.SafePoint < min {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ssp is expired, we may get a wrong min here. I suggest deleting min and keeping only validMin.

min = ssp.SafePoint
}

if ssp.ExpiredAt < now.Unix() {
s.Remove(key)
continue
}
if ssp.SafePoint < min.SafePoint {
min = ssp
if ssp.SafePoint < validMin.SafePoint {
validMin = ssp
}
}

return min, nil
if !hasGCWorker {
// If there exists some service safepoints but gc_worker is missing, init it with the min value among all
// safepoints (including expired ones)
return s.initServiceGCSafePointForGCWorker(min)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return s.initServiceGCSafePointForGCWorker(min)
return s.initServiceGCSafePointForGCWorker(validMin.SafePoint)

}

return validMin, nil
}

// GetAllServiceGCSafePoints returns all services GC safepoints
Expand Down
14 changes: 14 additions & 0 deletions server/core/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,22 @@ func (s *testKVSuite) TestLoadMinServiceGCSafePoint(c *C) {
c.Assert(storage.SaveServiceGCSafePoint(ssp), IsNil)
}

// gc_worker's safepoint will be automatically inserted when loading service safepoints.
ssp, err := storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.ServiceID, Equals, "gc_worker")
c.Assert(ssp.ExpiredAt, Equals, int64(math.MaxInt64))
c.Assert(ssp.SafePoint, Equals, uint64(1))

// Advance gc_worker's safepoint
c.Assert(storage.SaveServiceGCSafePoint(&ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: math.MaxInt64,
SafePoint: 10,
}), IsNil)

ssp, err = storage.LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(ssp.ServiceID, Equals, "2")
c.Assert(ssp.ExpiredAt, Equals, expireAt)
c.Assert(ssp.SafePoint, Equals, uint64(2))
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (s *Server) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.Upd
ExpiredAt: now.Unix() + request.TTL,
SafePoint: request.SafePoint,
}
if request.TTL == math.MaxInt64 {
if math.MaxInt64-now.Unix() <= request.TTL {
ssp.ExpiredAt = math.MaxInt64
}
if err := s.storage.SaveServiceGCSafePoint(ssp); err != nil {
Expand Down
51 changes: 51 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"path"
"path/filepath"
"sort"
"strconv"
Expand Down Expand Up @@ -911,6 +912,56 @@ func (s *testClientSuite) TestUpdateServiceGCSafePoint(c *C) {
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"", 1000, 15)
c.Assert(err, NotNil)

// Put some other safepoints to test fixing gc_worker's safepoint when there exists other safepoints.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 11)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"b", 1000, 12)
c.Assert(err, IsNil)
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"c", 1000, 13)
c.Assert(err, IsNil)

// Force set invalid ttl to gc_worker
gcWorkerKey := path.Join("gc", "safe_point", "service", "gc_worker")
{
gcWorkerSsp := &core.ServiceSafePoint{
ServiceID: "gc_worker",
ExpiredAt: -12345,
SafePoint: 10,
}
value, err := json.Marshal(gcWorkerSsp)
c.Assert(err, IsNil)
err = s.srv.GetStorage().Save(gcWorkerKey, string(value))
c.Assert(err, IsNil)
}

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(10))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))

// Force delete gc_worker, then the min service safepoint is 11 of "a".
err = s.srv.GetStorage().Remove(gcWorkerKey)
c.Assert(err, IsNil)
minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.SafePoint, Equals, uint64(11))
// After calling LoadMinServiceGCS when "gc_worker"'s service safepoint is missing, "gc_worker"'s service safepoint
// will be newly created.
// Increase "a" so that "gc_worker" is the only minimum that will be returned by LoadMinServiceGCSafePoint.
_, err = s.client.UpdateServiceGCSafePoint(context.Background(),
"a", 1000, 14)
c.Assert(err, IsNil)

minSsp, err = s.srv.GetStorage().LoadMinServiceGCSafePoint(time.Now())
c.Assert(err, IsNil)
c.Assert(minSsp.ServiceID, Equals, "gc_worker")
c.Assert(minSsp.SafePoint, Equals, uint64(11))
c.Assert(minSsp.ExpiredAt, Equals, int64(math.MaxInt64))
}

func (s *testClientSuite) TestScatterRegion(c *C) {
Expand Down