Skip to content

Commit

Permalink
store: RegisterService, UnregisterService
Browse files Browse the repository at this point in the history
  • Loading branch information
jschwinger233 authored and CMGS committed Aug 13, 2020
1 parent da4e6ef commit 4868f94
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 1 deletion.
3 changes: 2 additions & 1 deletion store/etcdv3/mercury.go
Expand Up @@ -22,7 +22,8 @@ import (
)

const (
podInfoKey = "/pod/info/%s" // /pod/info/{podname}
podInfoKey = "/pod/info/%s" // /pod/info/{podname}
serviceStatusPrefix = "/services/"

nodeInfoKey = "/node/%s" // /node/{nodename}
nodePodKey = "/node/%s:pod/%s" // /node/{podname}:pod/{nodename}
Expand Down
104 changes: 104 additions & 0 deletions store/etcdv3/service.go
@@ -0,0 +1,104 @@
package etcdv3

import (
"context"
"strings"
"time"

log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/v3/clientv3"
"go.etcd.io/etcd/v3/mvcc/mvccpb"
)

type endpoints map[string]struct{}

func (e *endpoints) Add(endpoint string) (changed bool) {
if _, ok := (*e)[endpoint]; !ok {
(*e)[endpoint] = struct{}{}
changed = true
}
return
}

func (e *endpoints) Remove(endpoint string) (changed bool) {
if _, ok := (*e)[endpoint]; ok {
delete(*e, endpoint)
changed = true
}
return
}

func (e endpoints) ToSlice() (eps []string) {
for ep := range e {
eps = append(eps, ep)
}
return
}

func (m *Mercury) ServiceStatusStream(ctx context.Context) (chan []string, error) {
ch := make(chan []string)
go func() {
defer close(ch)
log.Info("[ServiceStatusStream] start watching service status")
resp, err := m.Get(ctx, serviceStatusPrefix, clientv3.WithPrefix())
if err != nil {
log.Errorf("[ServiceStatusStream] failed to get current services: %v", err)
return
}
eps := endpoints{}
for _, ev := range resp.Kvs {
eps.Add(parseServiceKey(ev.Key))
}
ch <- eps.ToSlice()

for resp := range m.watch(ctx, serviceStatusPrefix, clientv3.WithPrefix()) {
if resp.Err() != nil {
if !resp.Canceled {
log.Errorf("[ServiceStatusStream] watch failed %v", resp.Err())
}
return
}

changed := false
for _, ev := range resp.Events {
endpoint := parseServiceKey(ev.Kv.Key)
c := false
switch ev.Type {
case mvccpb.PUT:
c = eps.Add(endpoint)
case mvccpb.DELETE:
c = eps.Remove(endpoint)
}
if c {
changed = true
}
}
if changed {
ch <- eps.ToSlice()
}
}
}()
return ch, nil
}

func (m *Mercury) RegisterService(ctx context.Context, serviceAddress string, expire time.Duration) error {
key := serviceStatusPrefix + serviceAddress
lease, err := m.cliv3.Grant(ctx, int64(expire/time.Second))
if err != nil {
return err
}

_, err = m.Put(ctx, key, "", clientv3.WithLease(lease.ID))
return err
}

func (m *Mercury) UnregisterService(ctx context.Context, serviceAddress string) error {
key := serviceStatusPrefix + serviceAddress
_, err := m.Delete(ctx, key)
return err
}

func parseServiceKey(key []byte) (endpoint string) {
parts := strings.Split(string(key), "/")
return parts[len(parts)-1]
}
5 changes: 5 additions & 0 deletions store/store.go
Expand Up @@ -17,6 +17,11 @@ const (

//Store store eru data
type Store interface {
// service
ServiceStatusStream(context.Context) (chan []string, error)
RegisterService(context.Context, string, time.Duration) error
UnregisterService(context.Context, string) error

// pod
AddPod(ctx context.Context, name, desc string) (*types.Pod, error)
GetPod(ctx context.Context, podname string) (*types.Pod, error)
Expand Down

0 comments on commit 4868f94

Please sign in to comment.