diff --git a/server/storage/endpoint/gc_key_space.go b/server/storage/endpoint/gc_key_space.go new file mode 100644 index 00000000000..e23197da90e --- /dev/null +++ b/server/storage/endpoint/gc_key_space.go @@ -0,0 +1,198 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "encoding/json" + "math" + "strconv" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space +type KeySpaceGCSafePoint struct { + SpaceID string `json:"space_id"` + SafePoint uint64 `json:"safe_point,omitempty"` +} + +// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points +type KeySpaceGCSafePointStorage interface { + // Service safe point interfaces. + SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error + LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) + LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) + RemoveServiceSafePoint(spaceID, serviceID string) error + // GC safe point interfaces. + SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error + LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) + LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) +} + +var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil) + +// SaveServiceSafePoint saves service safe point under given key-space. +func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error { + if ssp.ServiceID == "" { + return errors.New("service id of service safepoint cannot be empty") + } + key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID) + value, err := json.Marshal(ssp) + if err != nil { + return err + } + return se.Save(key, string(value)) +} + +// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name. +// Return nil if no safepoint exist for given service or just expired. +func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) { + key := KeySpaceServiceSafePointPath(spaceID, serviceID) + value, err := se.Load(key) + if err != nil || value == "" { + return nil, err + } + ssp := &ServiceSafePoint{} + if err := json.Unmarshal([]byte(value), ssp); err != nil { + return nil, err + } + if ssp.ExpiredAt < time.Now().Unix() { + go func() { + if err = se.Remove(key); err != nil { + log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) + } + }() + return nil, nil + } + return ssp, nil +} + +// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space. +// Note that gc worker safe point are store separately. +// If no service safe point exist for the given key-space or all the service safe points just expired, return nil. +func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) { + prefix := KeySpaceServiceSafePointPrefix(spaceID) + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + min := &ServiceSafePoint{SafePoint: math.MaxUint64} + expiredKeys := make([]string, 0) + for i, key := range keys { + ssp := &ServiceSafePoint{} + if err = json.Unmarshal([]byte(values[i]), ssp); err != nil { + return nil, err + } + + // gather expired keys + if ssp.ExpiredAt < now.Unix() { + expiredKeys = append(expiredKeys, key) + continue + } + if ssp.SafePoint < min.SafePoint { + min = ssp + } + } + // failpoint for immediate removal + failpoint.Inject("removeExpiredKeys", func() { + for _, key := range expiredKeys { + if err = se.Remove(key); err != nil { + log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) + } + } + expiredKeys = []string{} + }) + // remove expired keys asynchronously + go func() { + for _, key := range expiredKeys { + if err = se.Remove(key); err != nil { + log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) + } + } + }() + if min.SafePoint == math.MaxUint64 { + // no service safe point or all of them are expired. + return nil, nil + } + + // successfully found a valid min safe point. + return min, nil +} + +// RemoveServiceSafePoint removes target ServiceSafePoint +func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error { + key := KeySpaceServiceSafePointPath(spaceID, serviceID) + return se.Remove(key) +} + +// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space. +func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error { + value := strconv.FormatUint(safePoint, 16) + return se.Save(KeySpaceGCSafePointPath(spaceID), value) +} + +// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space. +// Returns 0 if target safepoint not exist. +func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) { + value, err := se.Load(KeySpaceGCSafePointPath(spaceID)) + if err != nil || value == "" { + return 0, err + } + safePoint, err := strconv.ParseUint(value, 16, 64) + if err != nil { + return 0, err + } + return safePoint, nil +} + +// LoadAllKeySpaceGCSafePoints returns slice of KeySpaceGCSafePoint. +// If withGCSafePoint set to false, returned safePoints will be 0. +func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) { + prefix := KeySpaceSafePointPrefix() + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + suffix := KeySpaceGCSafePointSuffix() + keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + safePoints := make([]*KeySpaceGCSafePoint, 0, len(values)) + for i := range keys { + // skip non gc safe points + if !strings.HasSuffix(keys[i], suffix) { + continue + } + safePoint := &KeySpaceGCSafePoint{} + spaceID := strings.TrimPrefix(keys[i], prefix) + spaceID = strings.TrimSuffix(spaceID, suffix) + safePoint.SpaceID = spaceID + if withGCSafePoint { + value, err := strconv.ParseUint(values[i], 16, 64) + if err != nil { + return nil, err + } + safePoint.SafePoint = value + } + safePoints = append(safePoints, safePoint) + } + return safePoints, nil +} diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index 01f40ecc74d..db7032b654b 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -32,6 +32,8 @@ const ( customScheduleConfigPath = "scheduler_config" gcWorkerServiceSafePointID = "gc_worker" minResolvedTS = "min_resolved_ts" + keySpaceSafePointPrefix = "key_space/gc_safepoint" + keySpaceGCSafePointSuffix = "gc" ) // AppendToRootPath appends the given key to the rootPath. @@ -104,3 +106,33 @@ func gcSafePointServicePath(serviceID string) string { func MinResolvedTSPath() string { return path.Join(clusterPath, minResolvedTS) } + +// KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point. +// Prefix: /key_space/gc_safepoint/{space_id}/service/ +func KeySpaceServiceSafePointPrefix(spaceID string) string { + return path.Join(keySpaceSafePointPrefix, spaceID, "service") + "/" +} + +// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space. +// Path: /key_space/gc_safepoint/{space_id}/gc +func KeySpaceGCSafePointPath(spaceID string) string { + return path.Join(keySpaceSafePointPrefix, spaceID, keySpaceGCSafePointSuffix) +} + +// KeySpaceServiceSafePointPath returns the path of given service's service safe point. +// Path: /key_space/gc_safepoint/{space_id}/service/{service_id} +func KeySpaceServiceSafePointPath(spaceID, serviceID string) string { + return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID) +} + +// KeySpaceSafePointPrefix returns prefix for all key-spaces' safe points. +// Path: /key_space/gc_safepoint/ +func KeySpaceSafePointPrefix() string { + return keySpaceSafePointPrefix + "/" +} + +// KeySpaceGCSafePointSuffix returns the suffix for any gc safepoint. +// Postfix: /gc +func KeySpaceGCSafePointSuffix() string { + return "/" + keySpaceGCSafePointSuffix +} diff --git a/server/storage/storage.go b/server/storage/storage.go index 3c0a959ca7a..af65dfe4a7b 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -39,6 +39,7 @@ type Storage interface { endpoint.ReplicationStatusStorage endpoint.GCSafePointStorage endpoint.MinResolvedTSStorage + endpoint.KeySpaceGCSafePointStorage } // NewStorageWithMemoryBackend creates a new storage with memory backend. diff --git a/server/storage/storage_gc_test.go b/server/storage/storage_gc_test.go new file mode 100644 index 00000000000..eb0e51c79d0 --- /dev/null +++ b/server/storage/storage_gc_test.go @@ -0,0 +1,216 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "math" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/tikv/pd/server/storage/endpoint" +) + +var _ = Suite(&testStorageGCSuite{}) + +type testStorageGCSuite struct { +} + +func testGCSafePoints() ([]string, []uint64) { + spaceIDs := []string{ + "keySpace1", + "keySpace2", + "keySpace3", + "keySpace4", + "keySpace5", + } + safePoints := []uint64{ + 0, + 1, + 4396, + 23333333333, + math.MaxUint64, + } + return spaceIDs, safePoints +} + +func testServiceSafePoints() ([]string, []*endpoint.ServiceSafePoint) { + spaceIDs := []string{ + "keySpace1", + "keySpace1", + "keySpace1", + "keySpace2", + "keySpace2", + "keySpace2", + "keySpace3", + "keySpace3", + "keySpace3", + } + expireAt := time.Now().Add(100 * time.Second).Unix() + serviceSafePoints := []*endpoint.ServiceSafePoint{ + {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, + {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, + {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, + {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, + {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, + {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, + {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, + {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, + {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, + } + return spaceIDs, serviceSafePoints +} + +func (s *testStorageGCSuite) TestSaveLoadServiceSafePoint(c *C) { + storage := NewStorageWithMemoryBackend() + testSpaceID, testSafePoints := testServiceSafePoints() + for i := range testSpaceID { + c.Assert(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i]), IsNil) + } + for i := range testSpaceID { + loadedSafePoint, err := storage.LoadServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID) + c.Assert(err, IsNil) + c.Assert(loadedSafePoint, DeepEquals, testSafePoints[i]) + } +} + +func (s *testStorageGCSuite) TestLoadMinServiceSafePoint(c *C) { + storage := NewStorageWithMemoryBackend() + currentTime := time.Now() + expireAt1 := currentTime.Add(100 * time.Second).Unix() + expireAt2 := currentTime.Add(200 * time.Second).Unix() + expireAt3 := currentTime.Add(300 * time.Second).Unix() + + serviceSafePoints := []*endpoint.ServiceSafePoint{ + {ServiceID: "0", ExpiredAt: expireAt1, SafePoint: 100}, + {ServiceID: "1", ExpiredAt: expireAt2, SafePoint: 200}, + {ServiceID: "2", ExpiredAt: expireAt3, SafePoint: 300}, + } + + testKeySpace := "test" + for _, serviceSafePoint := range serviceSafePoints { + c.Assert(storage.SaveServiceSafePoint(testKeySpace, serviceSafePoint), IsNil) + } + // enabling failpoint to make expired key removal immediately observable + c.Assert(failpoint.Enable("github.com/tikv/pd/server/storage/endpoint/removeExpiredKeys", "return(true)"), IsNil) + minSafePoint, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime) + c.Assert(err, IsNil) + c.Assert(minSafePoint, DeepEquals, serviceSafePoints[0]) + + // the safePoint with ServiceID 0 should be removed due to expiration + minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(150*time.Second)) + c.Assert(err, IsNil) + c.Assert(minSafePoint2, DeepEquals, serviceSafePoints[1]) + + // verify that service safe point with ServiceID 0 has been removed + ssp, err := storage.LoadServiceSafePoint(testKeySpace, "0") + c.Assert(err, IsNil) + c.Assert(ssp, IsNil) + + // all remaining service safePoints should be removed due to expiration + ssp, err = storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(500*time.Second)) + c.Assert(err, IsNil) + c.Assert(ssp, IsNil) + c.Assert(failpoint.Disable("github.com/tikv/pd/server/storage/endpoint/removeExpiredKeys"), IsNil) +} + +func (s *testStorageGCSuite) TestRemoveServiceSafePoint(c *C) { + storage := NewStorageWithMemoryBackend() + testSpaceID, testSafePoints := testServiceSafePoints() + // save service safe points + for i := range testSpaceID { + c.Assert(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i]), IsNil) + } + // remove saved service safe points + for i := range testSpaceID { + c.Assert(storage.RemoveServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID), IsNil) + } + // check that service safe points are empty + for i := range testSpaceID { + loadedSafePoint, err := storage.LoadServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID) + c.Assert(err, IsNil) + c.Assert(loadedSafePoint, IsNil) + } +} + +func (s *testStorageGCSuite) TestSaveLoadGCSafePoint(c *C) { + storage := NewStorageWithMemoryBackend() + testSpaceIDs, testSafePoints := testGCSafePoints() + for i := range testSpaceIDs { + testSpaceID := testSpaceIDs[i] + testSafePoint := testSafePoints[i] + err := storage.SaveKeySpaceGCSafePoint(testSpaceID, testSafePoint) + c.Assert(err, IsNil) + loaded, err := storage.LoadKeySpaceGCSafePoint(testSpaceID) + c.Assert(err, IsNil) + c.Assert(loaded, Equals, testSafePoint) + } +} + +func (s *testStorageGCSuite) TestLoadAllKeySpaceGCSafePoints(c *C) { + storage := NewStorageWithMemoryBackend() + testSpaceIDs, testSafePoints := testGCSafePoints() + for i := range testSpaceIDs { + err := storage.SaveKeySpaceGCSafePoint(testSpaceIDs[i], testSafePoints[i]) + c.Assert(err, IsNil) + } + loadedSafePoints, err := storage.LoadAllKeySpaceGCSafePoints(true) + c.Assert(err, IsNil) + for i := range loadedSafePoints { + c.Assert(loadedSafePoints[i].SpaceID, Equals, testSpaceIDs[i]) + c.Assert(loadedSafePoints[i].SafePoint, Equals, testSafePoints[i]) + } + + // saving some service safe points. + spaceIDs, safePoints := testServiceSafePoints() + for i := range spaceIDs { + c.Assert(storage.SaveServiceSafePoint(spaceIDs[i], safePoints[i]), IsNil) + } + + // verify that service safe points do not interfere with gc safe points. + loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(true) + c.Assert(err, IsNil) + for i := range loadedSafePoints { + c.Assert(loadedSafePoints[i].SpaceID, Equals, testSpaceIDs[i]) + c.Assert(loadedSafePoints[i].SafePoint, Equals, testSafePoints[i]) + } + + // verify that when withGCSafePoint set to false, returned safePoints is 0 + loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(false) + c.Assert(err, IsNil) + for i := range loadedSafePoints { + c.Assert(loadedSafePoints[i].SpaceID, Equals, testSpaceIDs[i]) + c.Assert(loadedSafePoints[i].SafePoint, Equals, uint64(0)) + } +} + +func (s *testStorageGCSuite) TestLoadEmpty(c *C) { + storage := NewStorageWithMemoryBackend() + + // loading non-existing GC safepoint should return 0 + gcSafePoint, err := storage.LoadKeySpaceGCSafePoint("testKeySpace") + c.Assert(err, IsNil) + c.Assert(gcSafePoint, Equals, uint64(0)) + + // loading non-existing service safepoint should return nil + serviceSafePoint, err := storage.LoadServiceSafePoint("testKeySpace", "testService") + c.Assert(err, IsNil) + c.Assert(serviceSafePoint, IsNil) + + // loading empty key spaces should return empty slices + safePoints, err := storage.LoadAllKeySpaceGCSafePoints(true) + c.Assert(err, IsNil) + c.Assert(safePoints, HasLen, 0) +}