Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: status reporters, making taskmanager mandatory #16

Merged
merged 6 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 15 additions & 46 deletions modules/auth/aclManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,35 @@ package auth_test
import (
"testing"

ds "github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
store "github.com/plexsysio/gkvstore"
dsstore "github.com/plexsysio/gkvstore-ipfsds"
"github.com/plexsysio/go-msuite/modules/auth"
"github.com/plexsysio/go-msuite/modules/config"
jsonConf "github.com/plexsysio/go-msuite/modules/config/json"
"github.com/plexsysio/go-msuite/modules/repo/inmem"
)

type testRepo struct {
st store.Store
cfg config.Config
d ds.Batching
}

func newTestRepo() *testRepo {
cfg := jsonConf.DefaultConfig()
bs := syncds.MutexWrap(ds.NewMapDatastore())
st := dsstore.New(bs)
return &testRepo{st: st, cfg: cfg, d: bs}
}

func (t *testRepo) Datastore() ds.Batching {
return t.d
}

func (t *testRepo) Config() config.Config {
return t.cfg
}

func (t *testRepo) Store() store.Store {
return t.st
}

func (t *testRepo) SetConfig(c config.Config) error {
t.cfg = c
return nil
}

func (t *testRepo) Close() error {
t.st.Close()
t.d.Close()
return nil
}

func TestNewAclManager(t *testing.T) {
r := newTestRepo()
r, err := inmem.CreateOrOpen(jsonConf.DefaultConfig())
if err != nil {
t.Fatal("failed creating repo", err)
}
defer r.Close()

_, err := auth.NewAclManager(r)
_, err = auth.NewAclManager(r)
if err != nil {
t.Fatal("Failed creating new acl manager", err.Error())
}
}

func TestNewAclManagerWithAcls(t *testing.T) {
r := newTestRepo()
r, err := inmem.CreateOrOpen(jsonConf.DefaultConfig())
if err != nil {
t.Fatal("failed creating repo", err)
}
defer r.Close()

r.Config().Set("ACL", map[string]string{
"dummy": "invalidACL",
})
_, err := auth.NewAclManager(r)
_, err = auth.NewAclManager(r)
if err == nil {
t.Fatal("Expected error while creating new acl manager")
}
Expand All @@ -80,7 +46,10 @@ func TestNewAclManagerWithAcls(t *testing.T) {
}

func TestACLLifecycle(t *testing.T) {
r := newTestRepo()
r, err := inmem.CreateOrOpen(jsonConf.DefaultConfig())
if err != nil {
t.Fatal("failed creating repo", err)
}
defer r.Close()

am, err := auth.NewAclManager(r)
Expand Down
72 changes: 18 additions & 54 deletions modules/diag/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,36 @@ import (
"encoding/json"
"net/http"
"sync"

"github.com/plexsysio/taskmanager"
)

type Reporter interface {
Status() interface{}
}

type Manager interface {
Report(string, Status)
AddReporter(string, Reporter)
Status() map[string]interface{}
}

type Status interface {
Delta(Status)
type impl struct {
mp sync.Map
}

type String string

func (s String) Delta(_ Status) {}

type Map map[string]interface{}

func (m Map) Delta(old Status) {
oldMp, ok := old.(Map)
if !ok {
return
}
for k, v := range oldMp {
_, ok := m[k]
if !ok {
m[k] = v
}
}
func New() Manager {
return new(impl)
}

type impl struct {
mp sync.Map
tm *taskmanager.TaskManager
func (m *impl) AddReporter(key string, reporter Reporter) {
m.mp.Store(key, reporter)
}

func New(
tm *taskmanager.TaskManager,
) Manager {
m := &impl{
tm: tm,
}
return m
func (m *impl) Status() map[string]interface{} {
retStatus := make(map[string]interface{})
m.mp.Range(func(k, v interface{}) bool {
retStatus[k.(string)] = v.(Reporter).Status()
return true
})
return retStatus
}

func RegisterHTTP(m Manager, mux *http.ServeMux) {
Expand All @@ -62,25 +48,3 @@ func RegisterHTTP(m Manager, mux *http.ServeMux) {
_, _ = w.Write(buf)
})
}

func (m *impl) Report(key string, msg Status) {
val, loaded := m.mp.LoadOrStore(key, msg)
if !loaded {
return
}
oldStatus, ok := val.(Status)
if ok {
msg.Delta(oldStatus)
m.mp.Store(key, msg)
}
}

func (m *impl) Status() map[string]interface{} {
retStatus := make(map[string]interface{})
m.mp.Range(func(k, v interface{}) bool {
retStatus[k.(string)] = v
return true
})
retStatus["Task Manager"] = m.tm.TaskStatus()
return retStatus
}
68 changes: 34 additions & 34 deletions modules/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import (
"context"
"errors"
"fmt"
"time"

logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/plexsysio/go-msuite/modules/config"
"github.com/plexsysio/go-msuite/modules/grpc/p2pgrpc"
"github.com/plexsysio/taskmanager"
"google.golang.org/grpc"
"time"
)

var log = logger.Logger("grpc/client")

const discoveryTTL = 15 * time.Minute

var ErrNoPeerForSvc = errors.New("failed to find any usable peer for service")

type ClientSvc interface {
Get(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error)
}
Expand Down Expand Up @@ -60,44 +65,29 @@ func (d *discoveryProvider) Name() string {

func (d *discoveryProvider) Execute(ctx context.Context) error {
for {
var (
startTTL time.Duration
err error
)
started := time.Now()
for i, svc := range d.services {
log.Infof("Advertising service: %s", svc)
ttl, e := d.ds.Advertise(ctx, svc, discovery.TTL(time.Minute*15))
if e != nil {
err = fmt.Errorf("error advertising %s: %w", svc, e)
var err error
for _, svc := range d.services {
log.Debugf("Advertising service: %s", svc)
_, err = d.ds.Advertise(ctx, svc, discovery.TTL(discoveryTTL))
if err != nil {
err = fmt.Errorf("error advertising %s: %w", svc, err)
break
}
// Use TTL of first advertisement for wait in the next part
if i == 0 {
startTTL = ttl
}
}
if err != nil {
log.Debug(err.Error())
log.Errorf("error advertising %v", err)
select {
case <-time.After(time.Minute * 2):
continue
case <-ctx.Done():
return nil
}
}
// Time to wait needs to obey TTL of the first service advertised.
// If the operation takes time and we wait for all services to advertise, initial
// services might not get advertised.
ttl := startTTL - time.Since(started)
if ttl <= 0 {
ttl = startTTL
}
wait := 7 * ttl / 8
wait := 7 * discoveryTTL / 8
select {
case <-time.After(wait):
case <-ctx.Done():
log.Info("Stopping advertiser")
log.Info("stopping advertiser")
return nil
}
}
Expand All @@ -113,24 +103,34 @@ func (c *clientImpl) Get(
svc string,
opts ...grpc.DialOption,
) (*grpc.ClientConn, error) {
p, err := c.ds.FindPeers(ctx, svc, discovery.Limit(1))

// FindPeers is called without limit opt, so this cancel is required to release
// any resources used by it
cCtx, cCancel := context.WithCancel(ctx)
defer cCancel()

p, err := c.ds.FindPeers(cCtx, svc)
if err != nil {
return nil, err
}
select {
case <-time.After(time.Second * 10):
return nil, errors.New("unable to find peer for service " + svc)
case pAddr, ok := <-p:
if ok {

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case pAddr, more := <-p:
if !more {
return nil, ErrNoPeerForSvc
}
err = c.h.Connect(ctx, pAddr)
if err != nil {
return nil, fmt.Errorf("failed to connect to peer %v", pAddr)
log.Errorf("failed to connect to peer %v err %v", pAddr, err)
continue
}
log.Infof("Connected to peer %v for service %s", pAddr, svc)
log.Debugf("connected to peer %v for service %s", pAddr, svc)
return p2pgrpc.NewP2PDialer(c.h).Dial(ctx, pAddr.ID.String(), opts...)
}
}
return nil, errors.New("invalid address received for peer")
}

func NewStaticClientService(c config.Config) ClientSvc {
Expand Down
Loading