Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Add APIs for RawKV GC #4937

Merged
merged 22 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
32c44ee
added storage methods for RawKV GC
AmoebaProtozoa May 12, 2022
b12eb9b
push back on updating kvproto in go.mod
AmoebaProtozoa May 12, 2022
d0b4f3b
linting
AmoebaProtozoa May 12, 2022
a2ba0e7
changed storage path structure
AmoebaProtozoa May 12, 2022
b9bb3e4
update comments
AmoebaProtozoa May 12, 2022
a7e3ece
added ByKeySpace suffix for disambiguity
AmoebaProtozoa May 12, 2022
7ff125e
removed default key spaces
AmoebaProtozoa May 13, 2022
1bdb642
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 13, 2022
1ebb51d
renaming, move delete expired safepoints to a goroutine
AmoebaProtozoa May 16, 2022
312704a
update tests
AmoebaProtozoa May 16, 2022
766b344
lint
AmoebaProtozoa May 16, 2022
de77dfc
added back KeySpaceGCSafePoint
AmoebaProtozoa May 16, 2022
d6eda93
remove expired all at once
AmoebaProtozoa May 16, 2022
cbe1ed0
address comments
AmoebaProtozoa May 16, 2022
9b27bcd
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 16, 2022
88610a0
move sleep to failpoint
AmoebaProtozoa May 16, 2022
995033e
modified failpoint to eliminate sleep
AmoebaProtozoa May 17, 2022
3099939
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 17, 2022
a825d37
log error when failed to remove expired service safe point
AmoebaProtozoa May 18, 2022
c1cb1c0
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 25, 2022
769a40b
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 26, 2022
d8dd130
Merge branch 'master' into RawKV_GC_API_storage
ti-chi-bot May 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions server/storage/endpoint/gc_key_space.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"encoding/json"
"math"
"strings"
"time"

"github.com/pingcap/errors"
"go.etcd.io/etcd/clientv3"
)

// Predefined key spaces. More key spaces would come from "Multi-tenant".
const (
// KeySpaceRawKVDefault is key space ID for RawKV.
KeySpaceRawKVDefault = "default_rawkv"
)

// LoadAllKeySpaces returns a list of all key-space IDs.
// We have only predefined key-spaces by now.
// More key-spaces would come from "Multi-tenant".
func (se *StorageEndpoint) LoadAllKeySpaces() ([]*KeySpaceGCSafePoint, error) {
keySpaces := []*KeySpaceGCSafePoint{
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
{
SpaceID: KeySpaceRawKVDefault,
},
}
return keySpaces, nil
}

// SaveServiceSafePoint saves service safe point under given key-space.
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error {
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
key := ServiceSafePointPath(spaceID, ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
}
return se.Save(key, string(value))
}

// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
// Return nil if no safepoint not exist.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
value, err := se.Load(ServiceSafePointPath(spaceID, serviceID))
if err != nil || value == "" {
return nil, err
}
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(value), ssp); err != nil {
return nil, err
}
return ssp, nil
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
}

// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space.
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) {
prefix := ServiceSafePointPrefix(spaceID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}

min := &ServiceSafePoint{SafePoint: math.MaxUint64}
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err = json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}

// remove expired safe points.
if ssp.ExpiredAt < now.Unix() {
err = se.Remove(key)
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
continue
}

if ssp.SafePoint < min.SafePoint {
min = ssp
}
}

if min.SafePoint == math.MaxUint64 {
// no service safe point or all of them are expired.
return nil, nil
}

// successfully found a valid min safe point.
return min, nil
}

// RemoveServiceSafePoint removes GCSafePoint for the given key-space.
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error {
key := ServiceSafePointPath(spaceID, serviceID)
return se.Remove(key)
}

// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(gcSafePoint *KeySpaceGCSafePoint) error {
safePoint, err := json.Marshal(gcSafePoint)
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
return se.Save(KeySpaceGCSafePointPath(gcSafePoint.SpaceID), string(safePoint))
}

// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space.
// return nil if safepoint not exist.
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (*KeySpaceGCSafePoint, error) {
value, err := se.Load(KeySpaceGCSafePointPath(spaceID))
if err != nil || value == "" {
return nil, err
}
gcSafePoint := &KeySpaceGCSafePoint{}
if err := json.Unmarshal([]byte(value), gcSafePoint); err != nil {
return nil, err
}
return gcSafePoint, nil
}

// LoadAllKeySpaceGCSafePoints returns slice of key-spaces and their corresponding gc safe points.
func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints() ([]*KeySpaceGCSafePoint, error) {
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
prefix := SafePointPrefix()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
suffix := GCSafePointSuffix()
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
safePoints := make([]*KeySpaceGCSafePoint, 0, len(values))
for i := range keys {
// skip non gc safe points
if !strings.HasSuffix(keys[i], suffix) {
continue
}
safePoint := &KeySpaceGCSafePoint{}
if err = json.Unmarshal([]byte(values[i]), safePoint); err != nil {
return nil, err
}
safePoints = append(safePoints, safePoint)
}
return safePoints, nil
}
17 changes: 17 additions & 0 deletions server/storage/endpoint/gc_safe_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ type ServiceSafePoint struct {
SafePoint uint64 `json:"safe_point"`
}

// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeySpaceGCSafePoint struct {
SpaceID string `json:"space_id"`
SafePoint uint64 `json:"safe_point"`
}

// GCSafePointStorage defines the storage operations on the GC safe point.
type GCSafePointStorage interface {
LoadGCSafePoint() (uint64, error)
Expand All @@ -42,6 +48,17 @@ type GCSafePointStorage interface {
LoadAllServiceGCSafePoints() ([]*ServiceSafePoint, error)
SaveServiceGCSafePoint(ssp *ServiceSafePoint) error
RemoveServiceGCSafePoint(serviceID string) error

LoadAllKeySpaces() ([]*KeySpaceGCSafePoint, error)
// Service safe point interfaces.
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID, serviceID string) error
// GC safe point interfaces.
SaveKeySpaceGCSafePoint(gcSafePoint *KeySpaceGCSafePoint) error
LoadKeySpaceGCSafePoint(spaceID string) (*KeySpaceGCSafePoint, error)
LoadAllKeySpaceGCSafePoints() ([]*KeySpaceGCSafePoint, error)
}

var _ GCSafePointStorage = (*StorageEndpoint)(nil)
Expand Down
32 changes: 32 additions & 0 deletions server/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
customScheduleConfigPath = "scheduler_config"
gcWorkerServiceSafePointID = "gc_worker"
minResolvedTS = "min_resolved_ts"
keySpaceGCPrefix = "key_space/gc_safepoint"
gcSafePointSuffix = "gc"
)

// AppendToRootPath appends the given key to the rootPath.
Expand Down Expand Up @@ -103,3 +105,33 @@ func gcSafePointServicePath(serviceID string) string {
func MinResolvedTSPath() string {
return path.Join(clusterPath, minResolvedTS)
}

// ServiceSafePointPrefix returns the prefix of given service's service safe point.
// Prefix: /key_space/gc_safepoint/{space_id}/service/
func ServiceSafePointPrefix(spaceID string) string {
AmoebaProtozoa marked this conversation as resolved.
Show resolved Hide resolved
return path.Join(keySpaceGCPrefix, spaceID, "service") + "/"
}

// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space.
// Path: /key_space/gc_safepoint/{space_id}/gc
func KeySpaceGCSafePointPath(spaceID string) string {
return path.Join(keySpaceGCPrefix, spaceID, gcSafePointSuffix)
}

// ServiceSafePointPath returns the path of given service's service safe point.
// Path: /key_space/gc_safepoint/{space_id}/service/{service_id}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we know the endpoint types from service_id? such as it's from tidb, raw_kv client, cdc or others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can tell by service_id alone,
for example, both TiDB and raw_kv's gc_worker may use gc_worker as their service_id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... Is it different in space_id? like cdc_xxx, tidb_xxx, client_xxxx . I would like to know who use the path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, we use spaceID to differentiate key spaces, things like
default_rawkv, default_txnkv, default_tidb
serviceID is used to identify the specific service under that KeySpace, like
cdc, br, etc.

tidb cdc and rawkv cdc may have the same serviceID but different spaceIDs
rawkv cdc and rawkv br have the same spaceID but different serviceIDs

func ServiceSafePointPath(spaceID, serviceID string) string {
return path.Join(ServiceSafePointPrefix(spaceID), serviceID)
}

// SafePointPrefix returns prefix for all key-spaces' safe points.
// Path: /key_space/gc_safepoint/
func SafePointPrefix() string {
return keySpaceGCPrefix + "/"
}

// GCSafePointSuffix returns the suffix for any gc safepoint.
// Postfix: /gc
func GCSafePointSuffix() string {
return "/" + gcSafePointSuffix
}
Loading