Skip to content

Commit

Permalink
etcdutil: add watch loop (tikv#6390)
Browse files Browse the repository at this point in the history
close tikv#6391

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 authored and rleungx committed Aug 2, 2023
1 parent fd43eff commit cc3e3af
Show file tree
Hide file tree
Showing 9 changed files with 809 additions and 599 deletions.
167 changes: 42 additions & 125 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

Expand All @@ -40,10 +41,6 @@ const (
allocNodesToKeyspaceGroupsInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
retryInterval = 100 * time.Millisecond
)

const (
Expand All @@ -65,18 +62,14 @@ type GroupManager struct {
// store is the storage for keyspace group related information.
store endpoint.KeyspaceGroupStorage

client *clientv3.Client

// tsoServiceKey is the path of TSO service in etcd.
tsoServiceKey string
// tsoServiceEndKey is the end key of TSO service in etcd.
tsoServiceEndKey string

// TODO: add user kind with different balancer
// when we ensure where the correspondence between tso node and user kind will be found
// nodeBalancer is the balancer for tso nodes.
// TODO: add user kind with different balancer when we ensure where the correspondence between tso node and user kind will be found
nodesBalancer balancer.Balancer[string]
// serviceRegistryMap stores the mapping from the service registry key to the service address.
// Note: it is only used in tsoNodesWatcher.
serviceRegistryMap map[string]string
// tsoNodesWatcher is the watcher for the registered tso servers.
tsoNodesWatcher *etcdutil.LoopWatcher
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
Expand All @@ -87,7 +80,6 @@ func NewKeyspaceGroupManager(
clusterID uint64,
) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
key := discovery.TSOPath(clusterID)
groups := make(map[endpoint.UserKind]*indexedHeap)
for i := 0; i < int(endpoint.UserKindCount); i++ {
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
Expand All @@ -96,20 +88,18 @@ func NewKeyspaceGroupManager(
ctx: ctx,
cancel: cancel,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
groups: groups,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if m.client != nil {
log.Info("start the watch loop for tso service discovery")
m.wg.Add(1)
go m.startWatchLoop(ctx)
if client != nil {
m.initTSONodesWatcher(client, clusterID)
m.wg.Add(2)
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
Expand All @@ -130,12 +120,6 @@ func (m *GroupManager) Bootstrap() error {
m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand Down Expand Up @@ -200,109 +184,42 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
}
}

func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
var (
resp *clientv3.GetResponse
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision + 1
for _, item := range resp.Kvs {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(item.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry", zap.Error(err))
continue
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(item.Key)] = s.ServiceAddr
}
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client, clusterID uint64) {
tsoServiceKey := discovery.TSOPath(clusterID)
tsoServiceEndKey := clientv3.GetPrefixRangeEnd(tsoServiceKey) + "/"

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr
return nil
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
}
for {
select {
case <-ctx.Done():
return
default:
}
nextRevision, err := m.watchServiceAddrs(ctx, revision)
if err != nil {
log.Error("watcher canceled unexpectedly and a new watcher will start after a while",
zap.Int64("next-revision", nextRevision),
zap.Time("retry-at", time.Now().Add(watchEtcdChangeRetryInterval)),
zap.Error(err))
revision = nextRevision
time.Sleep(watchEtcdChangeRetryInterval)
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
delete(m.serviceRegistryMap, key)
m.nodesBalancer.Delete(serviceAddr)
return nil
}
return errors.Errorf("failed to find the service address for key %s", key)
}
}

func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (int64, error) {
watcher := clientv3.NewWatcher(m.client)
defer watcher.Close()
for {
WatchChan:
watchChan := watcher.Watch(ctx, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey), clientv3.WithRev(revision))
select {
case <-ctx.Done():
return revision, nil
case wresp := <-watchChan:
if wresp.CompactRevision != 0 {
log.Warn("required revision has been compacted, the watcher will watch again with the compact revision",
zap.Int64("required-revision", revision),
zap.Int64("compact-revision", wresp.CompactRevision))
revision = wresp.CompactRevision
goto WatchChan
}
if wresp.Err() != nil {
log.Error("watch is canceled or closed",
zap.Int64("required-revision", revision),
zap.Error(wresp.Err()))
return revision, wresp.Err()
}
for _, event := range wresp.Events {
switch event.Type {
case clientv3.EventTypePut:
s := &discovery.ServiceRegistryEntry{}
if err := json.Unmarshal(event.Kv.Value, s); err != nil {
log.Warn("failed to unmarshal service registry entry",
zap.String("event-kv-key", string(event.Kv.Key)), zap.Error(err))
break
}
m.nodesBalancer.Put(s.ServiceAddr)
m.serviceRegistryMap[string(event.Kv.Key)] = s.ServiceAddr
case clientv3.EventTypeDelete:
key := string(event.Kv.Key)
if serviceAddr, ok := m.serviceRegistryMap[key]; ok {
delete(m.serviceRegistryMap, key)
m.nodesBalancer.Delete(serviceAddr)
} else {
log.Warn("can't retrieve service addr from service registry map",
zap.String("event-kv-key", key))
}
}
}
revision = wresp.Header.Revision + 1
}
}
m.tsoNodesWatcher = etcdutil.NewLoopWatcher(
m.ctx,
&m.wg,
client,
"tso-nodes-watcher",
tsoServiceKey,
putFn,
deleteFn,
func() error { return nil },
clientv3.WithRange(tsoServiceEndKey),
)
}

// CreateKeyspaceGroups creates keyspace groups.
Expand Down
Loading

0 comments on commit cc3e3af

Please sign in to comment.