diff --git a/server/storage/endpoint/gc_key_space.go b/server/storage/endpoint/gc_key_space.go index e23197da90e..fac27728972 100644 --- a/server/storage/endpoint/gc_key_space.go +++ b/server/storage/endpoint/gc_key_space.go @@ -19,53 +19,65 @@ import ( "math" "strconv" "strings" - "time" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/log" - "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/server/storage/kv" "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" +) + +const ( + spaceIDBase = 16 + safePointBase = 16 ) // KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space type KeySpaceGCSafePoint struct { - SpaceID string `json:"space_id"` + SpaceID uint32 `json:"space_id"` SafePoint uint64 `json:"safe_point,omitempty"` } // KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points type KeySpaceGCSafePointStorage interface { // Service safe point interfaces. - SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error - LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) - LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) - RemoveServiceSafePoint(spaceID, serviceID string) error + SaveServiceSafePoint(spaceID uint32, ssp *ServiceSafePoint, ttl int64) error + LoadServiceSafePoint(spaceID uint32, serviceID string) (*ServiceSafePoint, error) + LoadMinServiceSafePoint(spaceID uint32) (*ServiceSafePoint, error) + RemoveServiceSafePoint(spaceID uint32, serviceID string) error // GC safe point interfaces. - SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error - LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) + SaveKeySpaceGCSafePoint(spaceID uint32, safePoint uint64) error + LoadKeySpaceGCSafePoint(spaceID uint32) (uint64, error) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) + // Revision interfaces. + TouchKeySpaceRevision(spaceID uint32) error + LoadKeySpaceRevision(spaceID uint32) (int64, error) } var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil) // SaveServiceSafePoint saves service safe point under given key-space. -func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error { +func (se *StorageEndpoint) SaveServiceSafePoint(spaceID uint32, ssp *ServiceSafePoint, ttl int64) error { if ssp.ServiceID == "" { return errors.New("service id of service safepoint cannot be empty") } + etcdEndpoint, err := se.getEtcdBase() + if err != nil { + return err + } key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID) value, err := json.Marshal(ssp) if err != nil { return err } - return se.Save(key, string(value)) + // A MaxInt64 ttl means safe point never expire. + if ttl == math.MaxInt64 { + return etcdEndpoint.Save(key, string(value)) + } + return etcdEndpoint.SaveWithTTL(key, string(value), ttl) } // LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name. -// Return nil if no safepoint exist for given service or just expired. -func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) { +// Return nil if no safepoint exist for given service. +func (se *StorageEndpoint) LoadServiceSafePoint(spaceID uint32, serviceID string) (*ServiceSafePoint, error) { key := KeySpaceServiceSafePointPath(spaceID, serviceID) value, err := se.Load(key) if err != nil || value == "" { @@ -75,61 +87,29 @@ func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*Ser if err := json.Unmarshal([]byte(value), ssp); err != nil { return nil, err } - if ssp.ExpiredAt < time.Now().Unix() { - go func() { - if err = se.Remove(key); err != nil { - log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) - } - }() - return nil, nil - } return ssp, nil } // LoadMinServiceSafePoint returns the minimum safepoint for the given key-space. // Note that gc worker safe point are store separately. // If no service safe point exist for the given key-space or all the service safe points just expired, return nil. -func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) { +func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID uint32) (*ServiceSafePoint, error) { prefix := KeySpaceServiceSafePointPrefix(spaceID) prefixEnd := clientv3.GetPrefixRangeEnd(prefix) - keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + _, values, err := se.LoadRange(prefix, prefixEnd, 0) if err != nil { return nil, err } min := &ServiceSafePoint{SafePoint: math.MaxUint64} - expiredKeys := make([]string, 0) - for i, key := range keys { + for i := range values { ssp := &ServiceSafePoint{} if err = json.Unmarshal([]byte(values[i]), ssp); err != nil { return nil, err } - - // gather expired keys - if ssp.ExpiredAt < now.Unix() { - expiredKeys = append(expiredKeys, key) - continue - } if ssp.SafePoint < min.SafePoint { min = ssp } } - // failpoint for immediate removal - failpoint.Inject("removeExpiredKeys", func() { - for _, key := range expiredKeys { - if err = se.Remove(key); err != nil { - log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) - } - } - expiredKeys = []string{} - }) - // remove expired keys asynchronously - go func() { - for _, key := range expiredKeys { - if err = se.Remove(key); err != nil { - log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) - } - } - }() if min.SafePoint == math.MaxUint64 { // no service safe point or all of them are expired. return nil, nil @@ -140,25 +120,25 @@ func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time } // RemoveServiceSafePoint removes target ServiceSafePoint -func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error { +func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID uint32, serviceID string) error { key := KeySpaceServiceSafePointPath(spaceID, serviceID) return se.Remove(key) } // SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space. -func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error { - value := strconv.FormatUint(safePoint, 16) +func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID uint32, safePoint uint64) error { + value := strconv.FormatUint(safePoint, safePointBase) return se.Save(KeySpaceGCSafePointPath(spaceID), value) } // LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space. // Returns 0 if target safepoint not exist. -func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) { +func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID uint32) (uint64, error) { value, err := se.Load(KeySpaceGCSafePointPath(spaceID)) if err != nil || value == "" { return 0, err } - safePoint, err := strconv.ParseUint(value, 16, 64) + safePoint, err := strconv.ParseUint(value, safePointBase, 64) if err != nil { return 0, err } @@ -182,11 +162,15 @@ func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([] continue } safePoint := &KeySpaceGCSafePoint{} - spaceID := strings.TrimPrefix(keys[i], prefix) - spaceID = strings.TrimSuffix(spaceID, suffix) - safePoint.SpaceID = spaceID + spaceIDStr := strings.TrimPrefix(keys[i], prefix) + spaceIDStr = strings.TrimSuffix(spaceIDStr, suffix) + spaceID, err := strconv.ParseUint(spaceIDStr, spaceIDBase, 32) + if err != nil { + return nil, err + } + safePoint.SpaceID = uint32(spaceID) if withGCSafePoint { - value, err := strconv.ParseUint(values[i], 16, 64) + value, err := strconv.ParseUint(values[i], safePointBase, 64) if err != nil { return nil, err } @@ -196,3 +180,31 @@ func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([] } return safePoints, nil } + +// TouchKeySpaceRevision advances revision of the given key space. +// It's used when new service safe point is saved. +func (se *StorageEndpoint) TouchKeySpaceRevision(spaceID uint32) error { + path := KeySpacePath(spaceID) + return se.Save(path, "") +} + +// LoadKeySpaceRevision loads the revision of the given key space. +func (se *StorageEndpoint) LoadKeySpaceRevision(spaceID uint32) (int64, error) { + etcdEndpoint, err := se.getEtcdBase() + if err != nil { + return 0, err + } + keySpacePath := KeySpacePath(spaceID) + _, revision, err := etcdEndpoint.LoadRevision(keySpacePath) + return revision, err +} + +// getEtcdBase retrieves etcd base from storage endpoint. +// It's used by operations that needs etcd endpoint specifically. +func (se *StorageEndpoint) getEtcdBase() (*kv.EtcdKVBase, error) { + etcdBase, ok := interface{}(se.Base).(*kv.EtcdKVBase) + if !ok { + return nil, errors.New("safepoint storage only supports etcd backend") + } + return etcdBase, nil +} diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index db7032b654b..0676e437709 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -17,6 +17,7 @@ package endpoint import ( "fmt" "path" + "strconv" ) const ( @@ -107,21 +108,28 @@ func MinResolvedTSPath() string { return path.Join(clusterPath, minResolvedTS) } +// KeySpacePath returns path to given key space +// Path: /key_space/gc_safepoint/{space_id} +func KeySpacePath(spaceID uint32) string { + spaceIDStr := strconv.FormatUint(uint64(spaceID), spaceIDBase) + return path.Join(keySpaceSafePointPrefix, spaceIDStr) +} + // KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point. // Prefix: /key_space/gc_safepoint/{space_id}/service/ -func KeySpaceServiceSafePointPrefix(spaceID string) string { - return path.Join(keySpaceSafePointPrefix, spaceID, "service") + "/" +func KeySpaceServiceSafePointPrefix(spaceID uint32) string { + return path.Join(KeySpacePath(spaceID), "service") + "/" } // KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space. // Path: /key_space/gc_safepoint/{space_id}/gc -func KeySpaceGCSafePointPath(spaceID string) string { - return path.Join(keySpaceSafePointPrefix, spaceID, keySpaceGCSafePointSuffix) +func KeySpaceGCSafePointPath(spaceID uint32) string { + return path.Join(KeySpacePath(spaceID), keySpaceGCSafePointSuffix) } // KeySpaceServiceSafePointPath returns the path of given service's service safe point. // Path: /key_space/gc_safepoint/{space_id}/service/{service_id} -func KeySpaceServiceSafePointPath(spaceID, serviceID string) string { +func KeySpaceServiceSafePointPath(spaceID uint32, serviceID string) string { return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID) } diff --git a/server/storage/kv/etcd_kv.go b/server/storage/kv/etcd_kv.go index e30b5f7b462..fa5072be53c 100644 --- a/server/storage/kv/etcd_kv.go +++ b/server/storage/kv/etcd_kv.go @@ -32,37 +32,49 @@ import ( const ( requestTimeout = 10 * time.Second slowRequestTime = 1 * time.Second + // RevisionUnavailable is the value of unavailable revision, + // when the kv does not exist. + RevisionUnavailable int64 = -1 ) -type etcdKVBase struct { +// EtcdKVBase is a kv store using etcd. +type EtcdKVBase struct { client *clientv3.Client rootPath string } // NewEtcdKVBase creates a new etcd kv. -func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { - return &etcdKVBase{ +func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase { + return &EtcdKVBase{ client: client, rootPath: rootPath, } } -func (kv *etcdKVBase) Load(key string) (string, error) { +// Load gets a value for a given key. +func (kv *EtcdKVBase) Load(key string) (string, error) { + value, _, err := kv.LoadRevision(key) + return value, err +} + +// LoadRevision gets a value along with revision. +func (kv *EtcdKVBase) LoadRevision(key string) (string, int64, error) { key = path.Join(kv.rootPath, key) resp, err := etcdutil.EtcdKVGet(kv.client, key) if err != nil { - return "", err + return "", RevisionUnavailable, err } if n := len(resp.Kvs); n == 0 { - return "", nil + return "", RevisionUnavailable, nil } else if n > 1 { - return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs) + return "", RevisionUnavailable, errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs) } - return string(resp.Kvs[0].Value), nil + return string(resp.Kvs[0].Value), resp.Kvs[0].ModRevision, nil } -func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) { +// LoadRange gets a range of value for a given key range. +func (kv *EtcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) { // Note: reason to use `strings.Join` instead of `path.Join` is that the latter will // removes suffix '/' of the joined string. // As a result, when we try to scan from "foo/", it ends up scanning from "/pd/foo" @@ -85,7 +97,37 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri return keys, values, nil } -func (kv *etcdKVBase) Save(key, value string) error { +// SaveWithTTL stores a key-value pair that expires after ttlSeconds seconds. +func (kv *EtcdKVBase) SaveWithTTL(key, value string, ttlSeconds int64) error { + key = path.Join(kv.rootPath, key) + start := time.Now() + ctx, cancel := context.WithTimeout(kv.client.Ctx(), requestTimeout) + resp, err := etcdutil.EtcdKVPutWithTTL(ctx, kv.client, key, value, ttlSeconds) + cancel() + + cost := time.Since(start) + if cost > slowRequestTime { + log.Warn("save to etcd with lease runs too slow", + zap.Reflect("response", resp), + zap.Duration("cost", cost), + errs.ZapError(err)) + } + + if err != nil { + e := errs.ErrEtcdKVPut.Wrap(err).GenWithStackByCause() + log.Error("save to etcd with lease meet error", + zap.String("key", key), + zap.String("value", value), + zap.Int64("ttl-seconds", ttlSeconds), + errs.ZapError(e), + ) + return e + } + return nil +} + +// Save stores a key-value pair. +func (kv *EtcdKVBase) Save(key, value string) error { failpoint.Inject("etcdSaveFailed", func() { failpoint.Return(errors.New("save failed")) }) @@ -103,7 +145,8 @@ func (kv *etcdKVBase) Save(key, value string) error { return nil } -func (kv *etcdKVBase) Remove(key string) error { +// Remove deletes a key-value pair for a given key. +func (kv *EtcdKVBase) Remove(key string) error { key = path.Join(kv.rootPath, key) txn := NewSlowLogTxn(kv.client) diff --git a/server/storage/storage_gc_test.go b/server/storage/storage_gc_test.go index 371dc5759f9..21b4d19c116 100644 --- a/server/storage/storage_gc_test.go +++ b/server/storage/storage_gc_test.go @@ -15,204 +15,289 @@ package storage import ( + "fmt" "math" + "net/url" + "os" + "path" + "sort" + "strconv" "testing" "time" - "github.com/pingcap/failpoint" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/tempurl" "github.com/tikv/pd/server/storage/endpoint" + "github.com/tikv/pd/server/storage/kv" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" ) -func testGCSafePoints() ([]string, []uint64) { - spaceIDs := []string{ - "keySpace1", - "keySpace2", - "keySpace3", - "keySpace4", - "keySpace5", - } - safePoints := []uint64{ - 0, - 1, - 4396, - 23333333333, - math.MaxUint64, - } - return spaceIDs, safePoints +func TestStorageGCTestSuite(t *testing.T) { + suite.Run(t, new(StorageGCTestSuite)) } -func testServiceSafePoints() ([]string, []*endpoint.ServiceSafePoint) { - spaceIDs := []string{ - "keySpace1", - "keySpace1", - "keySpace1", - "keySpace2", - "keySpace2", - "keySpace2", - "keySpace3", - "keySpace3", - "keySpace3", - } - expireAt := time.Now().Add(100 * time.Second).Unix() - serviceSafePoints := []*endpoint.ServiceSafePoint{ - {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, - {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, - {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, - {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, - {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, - {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, - {ServiceID: "service1", ExpiredAt: expireAt, SafePoint: 1}, - {ServiceID: "service2", ExpiredAt: expireAt, SafePoint: 2}, - {ServiceID: "service3", ExpiredAt: expireAt, SafePoint: 3}, - } - return spaceIDs, serviceSafePoints +type StorageGCTestSuite struct { + suite.Suite + cfg *embed.Config + etcd *embed.Etcd + storage Storage } -func TestSaveLoadServiceSafePoint(t *testing.T) { - re := require.New(t) - storage := NewStorageWithMemoryBackend() - testSpaceID, testSafePoints := testServiceSafePoints() +func (suite *StorageGCTestSuite) SetupTest() { + var err error + suite.cfg = newTestSingleConfig() + suite.etcd, err = embed.StartEtcd(suite.cfg) + suite.Require().Nil(err) + ep := suite.cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + suite.Require().Nil(err) + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + suite.storage = NewStorageWithEtcdBackend(client, rootPath) +} + +func (suite *StorageGCTestSuite) TearDownTest() { + if suite.etcd != nil { + suite.etcd.Close() + } + suite.Require().NoError(os.RemoveAll(suite.cfg.Dir)) +} + +func (suite *StorageGCTestSuite) TestSaveLoadServiceSafePoint() { + storage := suite.storage + testSpaceID, testSafePoints, testTTLs := testServiceSafePoints() for i := range testSpaceID { - re.NoError(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i])) + suite.NoError(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i], testTTLs[i])) } for i := range testSpaceID { loadedSafePoint, err := storage.LoadServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID) - re.NoError(err) - re.Equal(testSafePoints[i], loadedSafePoint) + suite.Nil(err) + suite.Equal(testSafePoints[i], loadedSafePoint) } } -func TestLoadMinServiceSafePoint(t *testing.T) { - re := require.New(t) - storage := NewStorageWithMemoryBackend() - currentTime := time.Now() - expireAt1 := currentTime.Add(100 * time.Second).Unix() - expireAt2 := currentTime.Add(200 * time.Second).Unix() - expireAt3 := currentTime.Add(300 * time.Second).Unix() - +func (suite *StorageGCTestSuite) TestLoadMinServiceSafePoint() { + storage := suite.storage + testTTLs := []int64{2, 6} serviceSafePoints := []*endpoint.ServiceSafePoint{ - {ServiceID: "0", ExpiredAt: expireAt1, SafePoint: 100}, - {ServiceID: "1", ExpiredAt: expireAt2, SafePoint: 200}, - {ServiceID: "2", ExpiredAt: expireAt3, SafePoint: 300}, + {ServiceID: "0", SafePoint: 100}, + {ServiceID: "1", SafePoint: 200}, } - - testKeySpace := "test" - for _, serviceSafePoint := range serviceSafePoints { - re.NoError(storage.SaveServiceSafePoint(testKeySpace, serviceSafePoint)) + testKeySpace := uint32(100) + for i := range serviceSafePoints { + suite.NoError(storage.SaveServiceSafePoint(testKeySpace, serviceSafePoints[i], testTTLs[i])) } - // enabling failpoint to make expired key removal immediately observable - re.NoError(failpoint.Enable("github.com/tikv/pd/server/storage/endpoint/removeExpiredKeys", "return(true)")) - minSafePoint, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime) - re.NoError(err) - re.Equal(serviceSafePoints[0], minSafePoint) + minSafePoint, err := storage.LoadMinServiceSafePoint(testKeySpace) + suite.Nil(err) + suite.Equal(serviceSafePoints[0], minSafePoint) + + time.Sleep(4 * time.Second) // the safePoint with ServiceID 0 should be removed due to expiration - minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(150*time.Second)) - re.NoError(err) - re.Equal(serviceSafePoints[1], minSafePoint2) + // now min should be safePoint with ServiceID 1 + minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeySpace) + suite.Nil(err) + suite.Equal(serviceSafePoints[1], minSafePoint2) // verify that service safe point with ServiceID 0 has been removed ssp, err := storage.LoadServiceSafePoint(testKeySpace, "0") - re.NoError(err) - re.Nil(ssp) + suite.Nil(err) + suite.Nil(ssp) + time.Sleep(4 * time.Second) // all remaining service safePoints should be removed due to expiration - ssp, err = storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(500*time.Second)) - re.NoError(err) - re.Nil(ssp) - re.NoError(failpoint.Disable("github.com/tikv/pd/server/storage/endpoint/removeExpiredKeys")) + ssp, err = storage.LoadMinServiceSafePoint(testKeySpace) + suite.Nil(err) + suite.Nil(ssp) } -func TestRemoveServiceSafePoint(t *testing.T) { - re := require.New(t) - storage := NewStorageWithMemoryBackend() - testSpaceID, testSafePoints := testServiceSafePoints() +func (suite *StorageGCTestSuite) TestRemoveServiceSafePoint() { + storage := suite.storage + testSpaceID, testSafePoints, testTTLs := testServiceSafePoints() // save service safe points for i := range testSpaceID { - re.NoError(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i])) + suite.NoError(storage.SaveServiceSafePoint(testSpaceID[i], testSafePoints[i], testTTLs[i])) } // remove saved service safe points for i := range testSpaceID { - re.NoError(storage.RemoveServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID)) + suite.NoError(storage.RemoveServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID)) } // check that service safe points are empty for i := range testSpaceID { loadedSafePoint, err := storage.LoadServiceSafePoint(testSpaceID[i], testSafePoints[i].ServiceID) - re.NoError(err) - re.Nil(loadedSafePoint) + suite.Nil(err) + suite.Nil(loadedSafePoint) } } -func TestSaveLoadGCSafePoint(t *testing.T) { - re := require.New(t) - storage := NewStorageWithMemoryBackend() +func (suite *StorageGCTestSuite) TestSaveLoadGCSafePoint() { + storage := suite.storage testSpaceIDs, testSafePoints := testGCSafePoints() for i := range testSpaceIDs { testSpaceID := testSpaceIDs[i] testSafePoint := testSafePoints[i] - err := storage.SaveKeySpaceGCSafePoint(testSpaceID, testSafePoint) - re.NoError(err) + suite.NoError(storage.SaveKeySpaceGCSafePoint(testSpaceID, testSafePoint)) loaded, err := storage.LoadKeySpaceGCSafePoint(testSpaceID) - re.NoError(err) - re.Equal(testSafePoint, loaded) + suite.Nil(err) + suite.Equal(testSafePoint, loaded) } } -func TestLoadAllKeySpaceGCSafePoints(t *testing.T) { - re := require.New(t) - storage := NewStorageWithMemoryBackend() +func (suite *StorageGCTestSuite) TestLoadAllKeySpaceGCSafePoints() { + storage := suite.storage testSpaceIDs, testSafePoints := testGCSafePoints() for i := range testSpaceIDs { - err := storage.SaveKeySpaceGCSafePoint(testSpaceIDs[i], testSafePoints[i]) - re.NoError(err) + suite.NoError(storage.SaveKeySpaceGCSafePoint(testSpaceIDs[i], testSafePoints[i])) } loadedSafePoints, err := storage.LoadAllKeySpaceGCSafePoints(true) - re.NoError(err) + suite.Nil(err) + sort.Slice(loadedSafePoints, func(a, b int) bool { + return loadedSafePoints[a].SpaceID < loadedSafePoints[b].SpaceID + }) for i := range loadedSafePoints { - re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) - re.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint) + suite.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) + suite.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint) } // saving some service safe points. - spaceIDs, safePoints := testServiceSafePoints() + spaceIDs, safePoints, TTLs := testServiceSafePoints() for i := range spaceIDs { - re.NoError(storage.SaveServiceSafePoint(spaceIDs[i], safePoints[i])) + suite.NoError(storage.SaveServiceSafePoint(spaceIDs[i], safePoints[i], TTLs[i])) } // verify that service safe points do not interfere with gc safe points. loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(true) - re.NoError(err) + suite.Nil(err) + sort.Slice(loadedSafePoints, func(a, b int) bool { + return loadedSafePoints[a].SpaceID < loadedSafePoints[b].SpaceID + }) for i := range loadedSafePoints { - re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) - re.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint) + suite.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) + suite.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint) } // verify that when withGCSafePoint set to false, returned safePoints is 0 loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(false) - re.NoError(err) + suite.Nil(err) + sort.Slice(loadedSafePoints, func(a, b int) bool { + return loadedSafePoints[a].SpaceID < loadedSafePoints[b].SpaceID + }) for i := range loadedSafePoints { - re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) - re.Equal(uint64(0), loadedSafePoints[i].SafePoint) + suite.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID) + suite.Equal(uint64(0), loadedSafePoints[i].SafePoint) } } +func (suite *StorageGCTestSuite) TestRevision() { + storage := suite.storage + keySpace1 := uint32(100) + keySpace2 := uint32(200) + // Touching key space 200 should not change revision of key space 100 + suite.NoError(storage.TouchKeySpaceRevision(keySpace1)) + oldRevision, err := storage.LoadKeySpaceRevision(keySpace1) + suite.Nil(err) + suite.NoError(storage.TouchKeySpaceRevision(keySpace2)) + newRevision, err := storage.LoadKeySpaceRevision(keySpace1) + suite.Nil(err) + suite.Equal(oldRevision, newRevision) -func TestLoadEmpty(t *testing.T) { - re := require.New(t) - storage := NewStorageWithMemoryBackend() + // Touching the same key space should change revision + suite.NoError(storage.TouchKeySpaceRevision(keySpace1)) + newRevision, err = storage.LoadKeySpaceRevision(keySpace1) + suite.Nil(err) + suite.NotEqual(oldRevision, newRevision) +} +func (suite *StorageGCTestSuite) TestLoadEmpty() { + storage := suite.storage + testKeySpace := uint32(100) // loading non-existing GC safepoint should return 0 - gcSafePoint, err := storage.LoadKeySpaceGCSafePoint("testKeySpace") - re.NoError(err) - re.Equal(uint64(0), gcSafePoint) + gcSafePoint, err := storage.LoadKeySpaceGCSafePoint(testKeySpace) + suite.Nil(err) + suite.Equal(uint64(0), gcSafePoint) // loading non-existing service safepoint should return nil - serviceSafePoint, err := storage.LoadServiceSafePoint("testKeySpace", "testService") - re.NoError(err) - re.Nil(serviceSafePoint) + serviceSafePoint, err := storage.LoadServiceSafePoint(testKeySpace, "testService") + suite.Nil(err) + suite.Nil(serviceSafePoint) // loading empty key spaces should return empty slices safePoints, err := storage.LoadAllKeySpaceGCSafePoints(true) - re.NoError(err) - re.Len(safePoints, 0) + suite.Nil(err) + suite.Empty(safePoints) + + // Loading untouched key spaces should return unavailable revision + revision, err := storage.LoadKeySpaceRevision(testKeySpace) + suite.Nil(err) + suite.Equal(kv.RevisionUnavailable, revision) +} + +func newTestSingleConfig() *embed.Config { + cfg := embed.NewConfig() + cfg.Name = "test_etcd" + cfg.Dir, _ = os.MkdirTemp("/tmp", "test_etcd") + cfg.WalDir = "" + cfg.Logger = "zap" + cfg.LogOutputs = []string{"stdout"} + + pu, _ := url.Parse(tempurl.Alloc()) + cfg.LPUrls = []url.URL{*pu} + cfg.APUrls = cfg.LPUrls + cu, _ := url.Parse(tempurl.Alloc()) + cfg.LCUrls = []url.URL{*cu} + cfg.ACUrls = cfg.LCUrls + + cfg.StrictReconfigCheck = false + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) + cfg.ClusterState = embed.ClusterStateFlagNew + return cfg +} + +func testGCSafePoints() ([]uint32, []uint64) { + spaceIDs := []uint32{ + 100, + 200, + 300, + 400, + 500, + } + safePoints := []uint64{ + 0, + 1, + 4396, + 23333333333, + math.MaxUint64, + } + return spaceIDs, safePoints +} + +func testServiceSafePoints() ([]uint32, []*endpoint.ServiceSafePoint, []int64) { + spaceIDs := []uint32{ + 100, + 100, + 100, + 200, + 200, + 200, + 300, + 300, + 300, + } + serviceSafePoints := []*endpoint.ServiceSafePoint{ + {ServiceID: "service1", SafePoint: 1}, + {ServiceID: "service2", SafePoint: 2}, + {ServiceID: "service3", SafePoint: 3}, + {ServiceID: "service1", SafePoint: 1}, + {ServiceID: "service2", SafePoint: 2}, + {ServiceID: "service3", SafePoint: 3}, + {ServiceID: "service1", SafePoint: 1}, + {ServiceID: "service2", SafePoint: 2}, + {ServiceID: "service3", SafePoint: 3}, + } + testTTls := make([]int64, 9) + for i := range testTTls { + testTTls[i] = 10 + } + return spaceIDs, serviceSafePoints, testTTls }