Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Use etcd lease to manage safe point expiration #4984

Closed
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
32c44ee
added storage methods for RawKV GC
AmoebaProtozoa May 12, 2022
b12eb9b
push back on updating kvproto in go.mod
AmoebaProtozoa May 12, 2022
d0b4f3b
linting
AmoebaProtozoa May 12, 2022
a2ba0e7
changed storage path structure
AmoebaProtozoa May 12, 2022
b9bb3e4
update comments
AmoebaProtozoa May 12, 2022
a7e3ece
added ByKeySpace suffix for disambiguity
AmoebaProtozoa May 12, 2022
7ff125e
removed default key spaces
AmoebaProtozoa May 13, 2022
1bdb642
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 13, 2022
1ebb51d
renaming, move delete expired safepoints to a goroutine
AmoebaProtozoa May 16, 2022
312704a
update tests
AmoebaProtozoa May 16, 2022
766b344
lint
AmoebaProtozoa May 16, 2022
de77dfc
added back KeySpaceGCSafePoint
AmoebaProtozoa May 16, 2022
d6eda93
remove expired all at once
AmoebaProtozoa May 16, 2022
cbe1ed0
address comments
AmoebaProtozoa May 16, 2022
9b27bcd
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 16, 2022
88610a0
move sleep to failpoint
AmoebaProtozoa May 16, 2022
995033e
modified failpoint to eliminate sleep
AmoebaProtozoa May 17, 2022
3099939
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 17, 2022
a825d37
log error when failed to remove expired service safe point
AmoebaProtozoa May 18, 2022
8bef6ee
added load revision
AmoebaProtozoa May 18, 2022
89cf4d0
added SaveWithTTL to kvs
AmoebaProtozoa May 18, 2022
b8bc86f
updated storage methods to use lease
AmoebaProtozoa May 18, 2022
1eca8d9
update tests to use ttl
AmoebaProtozoa May 18, 2022
7e3651e
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa May 18, 2022
78f3e14
merge and resolve conflict
AmoebaProtozoa May 27, 2022
176061e
lint
AmoebaProtozoa May 27, 2022
5d994ea
use etcdutil to save with ttl
AmoebaProtozoa May 27, 2022
a36305a
update go mod
AmoebaProtozoa May 27, 2022
28a4e9e
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa May 30, 2022
d8e7ab6
update go mod
AmoebaProtozoa May 31, 2022
6d2f26d
update test/client/go.mod
AmoebaProtozoa May 31, 2022
594edc7
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 7, 2022
3ab7d4f
small cleanup
AmoebaProtozoa Jun 7, 2022
a379eea
avoid modification to general kv interface
AmoebaProtozoa Jun 8, 2022
152fe6c
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 8, 2022
a8790d5
storage: handle case where ttl is maxInt
AmoebaProtozoa Jun 8, 2022
5269aff
add etcdKVBase comments
AmoebaProtozoa Jun 8, 2022
7c05203
storage: Revision should be int64
AmoebaProtozoa Jun 8, 2022
77d6f26
storage: add revision tests
AmoebaProtozoa Jun 8, 2022
bda31f4
storage: lint
AmoebaProtozoa Jun 8, 2022
9ff8090
storage: change spaceID to uint32
AmoebaProtozoa Jun 14, 2022
ffe8a18
storage: migrate tests to testify
AmoebaProtozoa Jun 21, 2022
ab98f7c
Merge branch 'master' of github.com:tikv/pd into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 9 additions & 45 deletions server/storage/endpoint/gc_key_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ import (
"math"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
Expand All @@ -38,9 +33,10 @@ type KeySpaceGCSafePoint struct {
// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points
type KeySpaceGCSafePointStorage interface {
// Service safe point interfaces.
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error
// NOTE: field ServiceSafePoint.ExpiredAt will be ignored, use etcd's lease to manage lifetime instead.
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint, ttl int64) error
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID, serviceID string) error
// GC safe point interfaces.
SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error
Expand All @@ -51,7 +47,7 @@ type KeySpaceGCSafePointStorage interface {
var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil)

// SaveServiceSafePoint saves service safe point under given key-space.
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error {
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint, ttl int64) error {
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
Expand All @@ -60,11 +56,11 @@ func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafe
if err != nil {
return err
}
return se.Save(key, string(value))
return se.SaveWithTTL(key, string(value), ttl)
}

// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
// Return nil if no safepoint exist for given service or just expired.
// Return nil if no safepoint exist for given service.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
value, err := se.Load(key)
Expand All @@ -75,61 +71,29 @@ func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*Ser
if err := json.Unmarshal([]byte(value), ssp); err != nil {
return nil, err
}
if ssp.ExpiredAt < time.Now().Unix() {
go func() {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}()
return nil, nil
}
return ssp, nil
}

// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space.
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) {
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string) (*ServiceSafePoint, error) {
prefix := KeySpaceServiceSafePointPrefix(spaceID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
_, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
min := &ServiceSafePoint{SafePoint: math.MaxUint64}
expiredKeys := make([]string, 0)
for i, key := range keys {
for i := range values {
ssp := &ServiceSafePoint{}
if err = json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}

// gather expired keys
if ssp.ExpiredAt < now.Unix() {
expiredKeys = append(expiredKeys, key)
continue
}
if ssp.SafePoint < min.SafePoint {
min = ssp
}
}
// failpoint for immediate removal
failpoint.Inject("removeExpiredKeys", func() {
for _, key := range expiredKeys {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}
expiredKeys = []string{}
})
// remove expired keys asynchronously
go func() {
for _, key := range expiredKeys {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}
}()
if min.SafePoint == math.MaxUint64 {
// no service safe point or all of them are expired.
return nil, nil
Expand Down
42 changes: 38 additions & 4 deletions server/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,24 @@ func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase {
}

func (kv *etcdKVBase) Load(key string) (string, error) {
value, _, err := kv.LoadRevision(key)
return value, err
}

// LoadRevision gets a value along with revision.
func (kv *etcdKVBase) LoadRevision(key string) (string, int64, error) {
key = path.Join(kv.rootPath, key)

resp, err := etcdutil.EtcdKVGet(kv.client, key)
if err != nil {
return "", err
return "", RevisionUnavailable, err
}
if n := len(resp.Kvs); n == 0 {
return "", nil
return "", RevisionUnavailable, nil
} else if n > 1 {
return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs)
return "", RevisionUnavailable, errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs)
}
return string(resp.Kvs[0].Value), nil
return string(resp.Kvs[0].Value), resp.Kvs[0].ModRevision, nil
}

func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
Expand All @@ -85,6 +91,34 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri
return keys, values, nil
}

func (kv *etcdKVBase) SaveWithTTL(key, value string, ttlSeconds int64) error {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(kv.client.Ctx(), requestTimeout)
start := time.Now()
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
resp, err := etcdutil.EtcdKVPutWithTTL(ctx, kv.client, key, value, ttlSeconds)
cancel()

cost := time.Since(start)
if cost > slowRequestTime {
log.Warn("save to etcd with lease runs too slow",
zap.Reflect("response", resp),
zap.Duration("cost", cost),
errs.ZapError(err))
}

if err != nil {
e := errs.ErrEtcdKVPut.Wrap(err).GenWithStackByCause()
log.Error("save to etcd with lease meet error",
zap.String("key", key),
zap.String("value", value),
zap.Int64("ttl-seconds", ttlSeconds),
errs.ZapError(e),
)
return e
}
return nil
}

func (kv *etcdKVBase) Save(key, value string) error {
failpoint.Inject("etcdSaveFailed", func() {
failpoint.Return(errors.New("save failed"))
Expand Down
6 changes: 6 additions & 0 deletions server/storage/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@

package kv

// RevisionUnavailable is the value of unavailable revision,
// when the kv does not exist (etcd_kv), or is not supported (mem_kv & leveldb_kv).
const RevisionUnavailable = -1

// Base is an abstract interface for load/save pd cluster data.
type Base interface {
Load(key string) (string, error)
LoadRange(key, endKey string, limit int) (keys []string, values []string, err error)
LoadRevision(key string) (string, int64, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revision here is a very specific term that is related to etcd, is there a better way to avoid introducing it in this generic KV interface?

Save(key, value string) error
SaveWithTTL(key, value string, ttlSeconds int64) error
Remove(key string) error
}
11 changes: 11 additions & 0 deletions server/storage/kv/levedb_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func (kv *LevelDBKV) Load(key string) (string, error) {
return string(v), err
}

// LoadRevision gets a value along with revision. The revision is unavailable for `LevelDBKV`.
func (kv *LevelDBKV) LoadRevision(key string) (string, int64, error) {
value, err := kv.Load(key)
return value, RevisionUnavailable, err
}

// LoadRange gets a range of value for a given key range.
func (kv *LevelDBKV) LoadRange(startKey, endKey string, limit int) ([]string, []string, error) {
iter := kv.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil)
Expand All @@ -72,6 +78,11 @@ func (kv *LevelDBKV) Save(key, value string) error {
return errors.WithStack(kv.Put([]byte(key), []byte(value), nil))
}

// SaveWithTTL not supported on LevelDBKV
func (kv *LevelDBKV) SaveWithTTL(key, value string, ttlSeconds int64) error {
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("ttl operation not supported on LevelDBKV")
}

// Remove deletes a key-value pair for a given key.
func (kv *LevelDBKV) Remove(key string) error {
return errors.WithStack(kv.Delete([]byte(key), nil))
Expand Down
11 changes: 11 additions & 0 deletions server/storage/kv/mem_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ func (kv *memoryKV) Load(key string) (string, error) {
return item.(memoryKVItem).value, nil
}

// LoadRevision gets a value along with revision. The revision is unavailable for `memoryKV`.
func (kv *memoryKV) LoadRevision(key string) (string, int64, error) {
value, err := kv.Load(key)
return value, RevisionUnavailable, err
}

func (kv *memoryKV) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
failpoint.Inject("withRangeLimit", func(val failpoint.Value) {
rangeLimit, ok := val.(int)
Expand Down Expand Up @@ -80,6 +86,11 @@ func (kv *memoryKV) Save(key, value string) error {
return nil
}

// SaveWithTTL not supported on memoryKV
func (kv *memoryKV) SaveWithTTL(key, value string, ttlSeconds int64) error {
return errors.New("ttl operation not supported on memoryKV")
}

func (kv *memoryKV) Remove(key string) error {
kv.Lock()
defer kv.Unlock()
Expand Down