Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdutil: add watch loop #6390

Merged
merged 30 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cac7e85
etcdutil: add watch loop
lhy1024 Apr 26, 2023
9f3f683
update
lhy1024 Apr 26, 2023
eab3fb6
update
lhy1024 Apr 27, 2023
e9bd5f0
fix test
lhy1024 Apr 27, 2023
072c114
fix
lhy1024 Apr 27, 2023
6cf0edd
Merge branch 'master' of http://github.com/tikv/pd into watch-loop
lhy1024 Apr 27, 2023
9cf42de
update
lhy1024 Apr 27, 2023
5f4ac40
update
lhy1024 Apr 27, 2023
efa00e1
fix test
lhy1024 May 4, 2023
06b280e
fix test
lhy1024 May 4, 2023
e4786b4
Merge branch 'master' into watch-loop
lhy1024 May 5, 2023
54bb282
Merge github.com:lhy1024/pd into watch-loop
lhy1024 May 5, 2023
ff495c1
remove unnecessary failpoint
lhy1024 May 5, 2023
e6e6e72
Merge branch 'watch-loop' of github.com:lhy1024/pd into watch-loop
lhy1024 May 5, 2023
2cfc882
fix test
lhy1024 May 5, 2023
7c26ae3
add withLimit by default
lhy1024 May 5, 2023
acd2420
use loopwatcher in keyspaces
lhy1024 May 5, 2023
87949f0
use sync.Map as groupUpdateRetryList
lhy1024 May 5, 2023
edb213a
add post function
lhy1024 May 5, 2023
54a06c6
address comments
lhy1024 May 5, 2023
a5ce1a6
fix next key in limit
lhy1024 May 5, 2023
7068643
add more comments
lhy1024 May 6, 2023
c8de713
address comments
lhy1024 May 6, 2023
ea2aad0
Merge branch 'master' of http://github.com/tikv/pd into watch-loop
lhy1024 May 6, 2023
4a230e6
add more tests
lhy1024 May 6, 2023
0c6a5cf
address comments
lhy1024 May 6, 2023
c747311
address comments and fix tests
lhy1024 May 6, 2023
3f62ccb
Merge branch 'master' into watch-loop
lhy1024 May 6, 2023
cb5815b
fix other test leak
lhy1024 May 6, 2023
82efaae
fix test again
lhy1024 May 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 40 additions & 123 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

Expand All @@ -40,10 +41,6 @@ const (
allocNodesToKeyspaceGroupsInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
retryInterval = 100 * time.Millisecond
)

const (
Expand All @@ -65,18 +62,14 @@ type GroupManager struct {
// store is the storage for keyspace group related information.
store endpoint.KeyspaceGroupStorage

client *clientv3.Client

// tsoServiceKey is the path of TSO service in etcd.
tsoServiceKey string
// tsoServiceEndKey is the end key of TSO service in etcd.
tsoServiceEndKey string

// nodeBalancer is the balancer for tso nodes.
// TODO: add user kind with different balancer
// when we ensure where the correspondence between tso node and user kind will be found
nodesBalancer balancer.Balancer[string]
// serviceRegistryMap stores the mapping from the service registry key to the service address.
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
Expand All @@ -87,7 +80,6 @@ func NewKeyspaceGroupManager(
clusterID uint64,
) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
key := discovery.TSOPath(clusterID)
groups := make(map[endpoint.UserKind]*indexedHeap)
for i := 0; i < int(endpoint.UserKindCount); i++ {
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
Expand All @@ -96,20 +88,18 @@ func NewKeyspaceGroupManager(
ctx: ctx,
cancel: cancel,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
groups: groups,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if m.client != nil {
log.Info("start the watch loop for tso service discovery")
m.wg.Add(1)
go m.startWatchLoop(ctx)
if client != nil {
m.wg.Add(2)
m.initTSONodesWatcher(client, clusterID)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
Expand All @@ -130,12 +120,6 @@ func (m *GroupManager) Bootstrap() error {
m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand Down Expand Up @@ -200,109 +184,42 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
}
}

func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
var (
resp *clientv3.GetResponse
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision + 1
for _, item := range resp.Kvs {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(item.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry", zap.Error(err))
continue
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(item.Key)] = s.ServiceAddr
}
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
}
for {
select {
case <-ctx.Done():
return
default:
}
nextRevision, err := m.watchServiceAddrs(ctx, revision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
revision = nextRevision
time.Sleep(watchEtcdChangeRetryInterval)
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
delete(m.serviceRegistryMap, key)
m.nodesBalancer.Delete(serviceAddr)
return nil
}
return errors.Errorf("failed to find the service address for key %s", key)
}
}

func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (int64, error) {
watcher := clientv3.NewWatcher(m.client)
defer watcher.Close()
for {
WatchChan:
watchChan := watcher.Watch(ctx, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey), clientv3.WithRev(revision))
select {
case <-ctx.Done():
return revision, nil
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, the watcher will watch again with the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
goto WatchChan
}
if wresp.Err() != nil {
log.Error("watch is canceled or closed",
zap.Int64("required-revision", revision),
zap.Error(wresp.Err()))
return revision, wresp.Err()
}
for _, event := range wresp.Events {
switch event.Type {
case clientv3.EventTypePut:
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(event.Kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(event.Kv.Key)), zap.Error(err))
break
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(event.Kv.Key)] = s.ServiceAddr
case clientv3.EventTypeDelete:
key := string(event.Kv.Key)
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
delete(m.serviceRegistryMap, key)
m.nodesBalancer.Delete(serviceAddr)
} else {
log.Warn("can't retrieve service addr from service registry map",
zap.String("event-kv-key", key))
}
}
}
revision = wresp.Header.Revision + 1
}
}
m.tsoNodesWatcher = etcdutil.NewLoopWatcher(
m.ctx,
&m.wg,
client,
"tso-nodes-watcher",
tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)
}

// CreateKeyspaceGroups creates keyspace groups.
Expand Down
Loading