Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Use etcd lease to manage safe point expiration #4984

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
32c44ee
added storage methods for RawKV GC
AmoebaProtozoa May 12, 2022
b12eb9b
push back on updating kvproto in go.mod
AmoebaProtozoa May 12, 2022
d0b4f3b
linting
AmoebaProtozoa May 12, 2022
a2ba0e7
changed storage path structure
AmoebaProtozoa May 12, 2022
b9bb3e4
update comments
AmoebaProtozoa May 12, 2022
a7e3ece
added ByKeySpace suffix for disambiguity
AmoebaProtozoa May 12, 2022
7ff125e
removed default key spaces
AmoebaProtozoa May 13, 2022
1bdb642
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 13, 2022
1ebb51d
renaming, move delete expired safepoints to a goroutine
AmoebaProtozoa May 16, 2022
312704a
update tests
AmoebaProtozoa May 16, 2022
766b344
lint
AmoebaProtozoa May 16, 2022
de77dfc
added back KeySpaceGCSafePoint
AmoebaProtozoa May 16, 2022
d6eda93
remove expired all at once
AmoebaProtozoa May 16, 2022
cbe1ed0
address comments
AmoebaProtozoa May 16, 2022
9b27bcd
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 16, 2022
88610a0
move sleep to failpoint
AmoebaProtozoa May 16, 2022
995033e
modified failpoint to eliminate sleep
AmoebaProtozoa May 17, 2022
3099939
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 17, 2022
a825d37
log error when failed to remove expired service safe point
AmoebaProtozoa May 18, 2022
8bef6ee
added load revision
AmoebaProtozoa May 18, 2022
89cf4d0
added SaveWithTTL to kvs
AmoebaProtozoa May 18, 2022
b8bc86f
updated storage methods to use lease
AmoebaProtozoa May 18, 2022
1eca8d9
update tests to use ttl
AmoebaProtozoa May 18, 2022
7e3651e
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa May 18, 2022
78f3e14
merge and resolve conflict
AmoebaProtozoa May 27, 2022
176061e
lint
AmoebaProtozoa May 27, 2022
5d994ea
use etcdutil to save with ttl
AmoebaProtozoa May 27, 2022
a36305a
update go mod
AmoebaProtozoa May 27, 2022
28a4e9e
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa May 30, 2022
d8e7ab6
update go mod
AmoebaProtozoa May 31, 2022
6d2f26d
update test/client/go.mod
AmoebaProtozoa May 31, 2022
594edc7
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 7, 2022
3ab7d4f
small cleanup
AmoebaProtozoa Jun 7, 2022
a379eea
avoid modification to general kv interface
AmoebaProtozoa Jun 8, 2022
152fe6c
Merge branch 'master' into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 8, 2022
a8790d5
storage: handle case where ttl is maxInt
AmoebaProtozoa Jun 8, 2022
5269aff
add etcdKVBase comments
AmoebaProtozoa Jun 8, 2022
7c05203
storage: Revision should be int64
AmoebaProtozoa Jun 8, 2022
77d6f26
storage: add revision tests
AmoebaProtozoa Jun 8, 2022
bda31f4
storage: lint
AmoebaProtozoa Jun 8, 2022
9ff8090
storage: change spaceID to uint32
AmoebaProtozoa Jun 14, 2022
ffe8a18
storage: migrate tests to testify
AmoebaProtozoa Jun 21, 2022
ab98f7c
Merge branch 'master' of github.com:tikv/pd into RawKV_GC_API_storageTTL
AmoebaProtozoa Jun 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 72 additions & 60 deletions server/storage/endpoint/gc_key_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,65 @@ import (
"math"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/storage/kv"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

const (
spaceIDBase = 16
safePointBase = 16
)

// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeySpaceGCSafePoint struct {
SpaceID string `json:"space_id"`
SpaceID uint32 `json:"space_id"`
SafePoint uint64 `json:"safe_point,omitempty"`
}

// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points
type KeySpaceGCSafePointStorage interface {
// Service safe point interfaces.
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID, serviceID string) error
SaveServiceSafePoint(spaceID uint32, ssp *ServiceSafePoint, ttl int64) error
LoadServiceSafePoint(spaceID uint32, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID uint32) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID uint32, serviceID string) error
// GC safe point interfaces.
SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error
LoadKeySpaceGCSafePoint(spaceID string) (uint64, error)
SaveKeySpaceGCSafePoint(spaceID uint32, safePoint uint64) error
LoadKeySpaceGCSafePoint(spaceID uint32) (uint64, error)
LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error)
// Revision interfaces.
TouchKeySpaceRevision(spaceID uint32) error
LoadKeySpaceRevision(spaceID uint32) (int64, error)
}

var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil)

// SaveServiceSafePoint saves service safe point under given key-space.
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error {
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID uint32, ssp *ServiceSafePoint, ttl int64) error {
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
etcdEndpoint, err := se.getEtcdBase()
if err != nil {
return err
}
key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
}
return se.Save(key, string(value))
// A MaxInt64 ttl means safe point never expire.
if ttl == math.MaxInt64 {
return etcdEndpoint.Save(key, string(value))
}
return etcdEndpoint.SaveWithTTL(key, string(value), ttl)
}

// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
// Return nil if no safepoint exist for given service or just expired.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
// Return nil if no safepoint exist for given service.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID uint32, serviceID string) (*ServiceSafePoint, error) {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
value, err := se.Load(key)
if err != nil || value == "" {
Expand All @@ -75,61 +87,29 @@ func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*Ser
if err := json.Unmarshal([]byte(value), ssp); err != nil {
return nil, err
}
if ssp.ExpiredAt < time.Now().Unix() {
go func() {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}()
return nil, nil
}
return ssp, nil
}

// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space.
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) {
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID uint32) (*ServiceSafePoint, error) {
prefix := KeySpaceServiceSafePointPrefix(spaceID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
_, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
min := &ServiceSafePoint{SafePoint: math.MaxUint64}
expiredKeys := make([]string, 0)
for i, key := range keys {
for i := range values {
ssp := &ServiceSafePoint{}
if err = json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}

// gather expired keys
if ssp.ExpiredAt < now.Unix() {
expiredKeys = append(expiredKeys, key)
continue
}
if ssp.SafePoint < min.SafePoint {
min = ssp
}
}
// failpoint for immediate removal
failpoint.Inject("removeExpiredKeys", func() {
for _, key := range expiredKeys {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}
expiredKeys = []string{}
})
// remove expired keys asynchronously
go func() {
for _, key := range expiredKeys {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}
}()
if min.SafePoint == math.MaxUint64 {
// no service safe point or all of them are expired.
return nil, nil
Expand All @@ -140,25 +120,25 @@ func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time
}

// RemoveServiceSafePoint removes target ServiceSafePoint
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error {
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID uint32, serviceID string) error {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
return se.Remove(key)
}

// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error {
value := strconv.FormatUint(safePoint, 16)
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID uint32, safePoint uint64) error {
value := strconv.FormatUint(safePoint, safePointBase)
return se.Save(KeySpaceGCSafePointPath(spaceID), value)
}

// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space.
// Returns 0 if target safepoint not exist.
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) {
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID uint32) (uint64, error) {
value, err := se.Load(KeySpaceGCSafePointPath(spaceID))
if err != nil || value == "" {
return 0, err
}
safePoint, err := strconv.ParseUint(value, 16, 64)
safePoint, err := strconv.ParseUint(value, safePointBase, 64)
if err != nil {
return 0, err
}
Expand All @@ -182,11 +162,15 @@ func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]
continue
}
safePoint := &KeySpaceGCSafePoint{}
spaceID := strings.TrimPrefix(keys[i], prefix)
spaceID = strings.TrimSuffix(spaceID, suffix)
safePoint.SpaceID = spaceID
spaceIDStr := strings.TrimPrefix(keys[i], prefix)
spaceIDStr = strings.TrimSuffix(spaceIDStr, suffix)
spaceID, err := strconv.ParseUint(spaceIDStr, spaceIDBase, 32)
if err != nil {
return nil, err
}
safePoint.SpaceID = uint32(spaceID)
if withGCSafePoint {
value, err := strconv.ParseUint(values[i], 16, 64)
value, err := strconv.ParseUint(values[i], safePointBase, 64)
if err != nil {
return nil, err
}
Expand All @@ -196,3 +180,31 @@ func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]
}
return safePoints, nil
}

// TouchKeySpaceRevision advances revision of the given key space.
// It's used when new service safe point is saved.
func (se *StorageEndpoint) TouchKeySpaceRevision(spaceID uint32) error {
path := KeySpacePath(spaceID)
return se.Save(path, "")
}

// LoadKeySpaceRevision loads the revision of the given key space.
func (se *StorageEndpoint) LoadKeySpaceRevision(spaceID uint32) (int64, error) {
etcdEndpoint, err := se.getEtcdBase()
if err != nil {
return 0, err
}
keySpacePath := KeySpacePath(spaceID)
_, revision, err := etcdEndpoint.LoadRevision(keySpacePath)
return revision, err
}

// getEtcdBase retrieves etcd base from storage endpoint.
// It's used by operations that needs etcd endpoint specifically.
func (se *StorageEndpoint) getEtcdBase() (*kv.EtcdKVBase, error) {
etcdBase, ok := interface{}(se.Base).(*kv.EtcdKVBase)
if !ok {
return nil, errors.New("safepoint storage only supports etcd backend")
}
return etcdBase, nil
}
18 changes: 13 additions & 5 deletions server/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package endpoint
import (
"fmt"
"path"
"strconv"
)

const (
Expand Down Expand Up @@ -107,21 +108,28 @@ func MinResolvedTSPath() string {
return path.Join(clusterPath, minResolvedTS)
}

// KeySpacePath returns path to given key space
// Path: /key_space/gc_safepoint/{space_id}
func KeySpacePath(spaceID uint32) string {
spaceIDStr := strconv.FormatUint(uint64(spaceID), spaceIDBase)
return path.Join(keySpaceSafePointPrefix, spaceIDStr)
}

// KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point.
// Prefix: /key_space/gc_safepoint/{space_id}/service/
func KeySpaceServiceSafePointPrefix(spaceID string) string {
return path.Join(keySpaceSafePointPrefix, spaceID, "service") + "/"
func KeySpaceServiceSafePointPrefix(spaceID uint32) string {
return path.Join(KeySpacePath(spaceID), "service") + "/"
}

// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space.
// Path: /key_space/gc_safepoint/{space_id}/gc
func KeySpaceGCSafePointPath(spaceID string) string {
return path.Join(keySpaceSafePointPrefix, spaceID, keySpaceGCSafePointSuffix)
func KeySpaceGCSafePointPath(spaceID uint32) string {
return path.Join(KeySpacePath(spaceID), keySpaceGCSafePointSuffix)
}

// KeySpaceServiceSafePointPath returns the path of given service's service safe point.
// Path: /key_space/gc_safepoint/{space_id}/service/{service_id}
func KeySpaceServiceSafePointPath(spaceID, serviceID string) string {
func KeySpaceServiceSafePointPath(spaceID uint32, serviceID string) string {
return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID)
}

Expand Down
65 changes: 54 additions & 11 deletions server/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,49 @@ import (
const (
requestTimeout = 10 * time.Second
slowRequestTime = 1 * time.Second
// RevisionUnavailable is the value of unavailable revision,
// when the kv does not exist.
RevisionUnavailable int64 = -1
)

type etcdKVBase struct {
// EtcdKVBase is a kv store using etcd.
type EtcdKVBase struct {
client *clientv3.Client
rootPath string
}

// NewEtcdKVBase creates a new etcd kv.
func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase {
return &etcdKVBase{
func NewEtcdKVBase(client *clientv3.Client, rootPath string) *EtcdKVBase {
return &EtcdKVBase{
client: client,
rootPath: rootPath,
}
}

func (kv *etcdKVBase) Load(key string) (string, error) {
// Load gets a value for a given key.
func (kv *EtcdKVBase) Load(key string) (string, error) {
value, _, err := kv.LoadRevision(key)
return value, err
}

// LoadRevision gets a value along with revision.
func (kv *EtcdKVBase) LoadRevision(key string) (string, int64, error) {
key = path.Join(kv.rootPath, key)

resp, err := etcdutil.EtcdKVGet(kv.client, key)
if err != nil {
return "", err
return "", RevisionUnavailable, err
}
if n := len(resp.Kvs); n == 0 {
return "", nil
return "", RevisionUnavailable, nil
} else if n > 1 {
return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs)
return "", RevisionUnavailable, errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs)
}
return string(resp.Kvs[0].Value), nil
return string(resp.Kvs[0].Value), resp.Kvs[0].ModRevision, nil
}

func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
// LoadRange gets a range of value for a given key range.
func (kv *EtcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
// Note: reason to use `strings.Join` instead of `path.Join` is that the latter will
// removes suffix '/' of the joined string.
// As a result, when we try to scan from "foo/", it ends up scanning from "/pd/foo"
Expand All @@ -85,7 +97,37 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri
return keys, values, nil
}

func (kv *etcdKVBase) Save(key, value string) error {
// SaveWithTTL stores a key-value pair that expires after ttlSeconds seconds.
func (kv *EtcdKVBase) SaveWithTTL(key, value string, ttlSeconds int64) error {
key = path.Join(kv.rootPath, key)
start := time.Now()
ctx, cancel := context.WithTimeout(kv.client.Ctx(), requestTimeout)
resp, err := etcdutil.EtcdKVPutWithTTL(ctx, kv.client, key, value, ttlSeconds)
cancel()

cost := time.Since(start)
if cost > slowRequestTime {
log.Warn("save to etcd with lease runs too slow",
zap.Reflect("response", resp),
zap.Duration("cost", cost),
errs.ZapError(err))
}

if err != nil {
e := errs.ErrEtcdKVPut.Wrap(err).GenWithStackByCause()
log.Error("save to etcd with lease meet error",
zap.String("key", key),
zap.String("value", value),
zap.Int64("ttl-seconds", ttlSeconds),
errs.ZapError(e),
)
return e
}
return nil
}

// Save stores a key-value pair.
func (kv *EtcdKVBase) Save(key, value string) error {
failpoint.Inject("etcdSaveFailed", func() {
failpoint.Return(errors.New("save failed"))
})
Expand All @@ -103,7 +145,8 @@ func (kv *etcdKVBase) Save(key, value string) error {
return nil
}

func (kv *etcdKVBase) Remove(key string) error {
// Remove deletes a key-value pair for a given key.
func (kv *EtcdKVBase) Remove(key string) error {
key = path.Join(kv.rootPath, key)

txn := NewSlowLogTxn(kv.client)
Expand Down
Loading