diff --git a/store/etcdv3/mercury.go b/store/etcdv3/mercury.go index 33e962484..c3f985e14 100644 --- a/store/etcdv3/mercury.go +++ b/store/etcdv3/mercury.go @@ -22,7 +22,8 @@ import ( ) const ( - podInfoKey = "/pod/info/%s" // /pod/info/{podname} + podInfoKey = "/pod/info/%s" // /pod/info/{podname} + serviceStatusPrefix = "/services/" nodeInfoKey = "/node/%s" // /node/{nodename} nodePodKey = "/node/%s:pod/%s" // /node/{podname}:pod/{nodename} diff --git a/store/etcdv3/service.go b/store/etcdv3/service.go new file mode 100644 index 000000000..0d499a83d --- /dev/null +++ b/store/etcdv3/service.go @@ -0,0 +1,104 @@ +package etcdv3 + +import ( + "context" + "strings" + "time" + + log "github.com/sirupsen/logrus" + "go.etcd.io/etcd/v3/clientv3" + "go.etcd.io/etcd/v3/mvcc/mvccpb" +) + +type endpoints map[string]struct{} + +func (e *endpoints) Add(endpoint string) (changed bool) { + if _, ok := (*e)[endpoint]; !ok { + (*e)[endpoint] = struct{}{} + changed = true + } + return +} + +func (e *endpoints) Remove(endpoint string) (changed bool) { + if _, ok := (*e)[endpoint]; ok { + delete(*e, endpoint) + changed = true + } + return +} + +func (e endpoints) ToSlice() (eps []string) { + for ep := range e { + eps = append(eps, ep) + } + return +} + +func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error) { + ch := make(chan []string) + go func() { + defer close(ch) + log.Info("[ServiceStatusStream] start watching service status") + resp, err := m.Get(ctx, serviceStatusPrefix, clientv3.WithPrefix()) + if err != nil { + log.Errorf("[ServiceStatusStream] failed to get current services: %v", err) + return + } + eps := endpoints{} + for _, ev := range resp.Kvs { + eps.Add(parseServiceKey(ev.Key)) + } + ch <- eps.ToSlice() + + for resp := range m.watch(ctx, serviceStatusPrefix, clientv3.WithPrefix()) { + if resp.Err() != nil { + if !resp.Canceled { + log.Errorf("[ServiceStatusStream] watch failed %v", resp.Err()) + } + return + } + + changed := false + for _, ev := range resp.Events { + endpoint := parseServiceKey(ev.Kv.Key) + c := false + switch ev.Type { + case mvccpb.PUT: + c = eps.Add(endpoint) + case mvccpb.DELETE: + c = eps.Remove(endpoint) + } + if c { + changed = true + } + } + if changed { + ch <- eps.ToSlice() + } + } + }() + return ch, nil +} + +func (m *Mercury) RegisterService(ctx context.Context, serviceAddress string, expire time.Duration) error { + key := serviceStatusPrefix + serviceAddress + lease, err := m.cliv3.Grant(ctx, int64(expire/time.Second)) + if err != nil { + return err + } + + _, err = m.Put(ctx, key, "", clientv3.WithLease(lease.ID)) + return err +} + +func (m *Mercury) UnregisterService(ctx context.Context, serviceAddress string) error { + key := serviceStatusPrefix + serviceAddress + _, err := m.Delete(ctx, key) + return err +} + +func parseServiceKey(key []byte) (endpoint string) { + parts := strings.Split(string(key), "/") + return parts[len(parts)-1] +} diff --git a/store/store.go b/store/store.go index 21ef4593e..651d94560 100644 --- a/store/store.go +++ b/store/store.go @@ -17,6 +17,11 @@ const ( //Store store eru data type Store interface { + // service + ServiceStatusStream(context.Context) (chan []string, error) + RegisterService(context.Context, string, time.Duration) error + UnregisterService(context.Context, string) error + // pod AddPod(ctx context.Context, name, desc string) (*types.Pod, error) GetPod(ctx context.Context, podname string) (*types.Pod, error)