Skip to content

Commit

Permalink
feat: status reporters, making taskmanager mandatory (#16)
Browse files Browse the repository at this point in the history
* feat: status reporters, making taskmanager mandatory

* fix: minor issues

* fix: lint

* fix: mux test

* fix: deepsource

* fix: increase coverage

Co-authored-by: alok <aloknerurkar@no-reply.com>
  • Loading branch information
aloknerurkar and alok committed Dec 14, 2021
1 parent cb2f85f commit 64cb5e7
Show file tree
Hide file tree
Showing 18 changed files with 324 additions and 240 deletions.
61 changes: 15 additions & 46 deletions modules/auth/aclManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,69 +3,35 @@ package auth_test
import (
"testing"

ds "github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
store "github.com/plexsysio/gkvstore"
dsstore "github.com/plexsysio/gkvstore-ipfsds"
"github.com/plexsysio/go-msuite/modules/auth"
"github.com/plexsysio/go-msuite/modules/config"
jsonConf "github.com/plexsysio/go-msuite/modules/config/json"
"github.com/plexsysio/go-msuite/modules/repo/inmem"
)

type testRepo struct {
st store.Store
cfg config.Config
d ds.Batching
}

func newTestRepo() *testRepo {
cfg := jsonConf.DefaultConfig()
bs := syncds.MutexWrap(ds.NewMapDatastore())
st := dsstore.New(bs)
return &testRepo{st: st, cfg: cfg, d: bs}
}

func (t *testRepo) Datastore() ds.Batching {
return t.d
}

func (t *testRepo) Config() config.Config {
return t.cfg
}

func (t *testRepo) Store() store.Store {
return t.st
}

func (t *testRepo) SetConfig(c config.Config) error {
t.cfg = c
return nil
}

func (t *testRepo) Close() error {
t.st.Close()
t.d.Close()
return nil
}

func TestNewAclManager(t *testing.T) {
r := newTestRepo()
r, err := inmem.CreateOrOpen(jsonConf.DefaultConfig())
if err != nil {
t.Fatal("failed creating repo", err)
}
defer r.Close()

_, err := auth.NewAclManager(r)
_, err = auth.NewAclManager(r)
if err != nil {
t.Fatal("Failed creating new acl manager", err.Error())
}
}

func TestNewAclManagerWithAcls(t *testing.T) {
r := newTestRepo()
r, err := inmem.CreateOrOpen(jsonConf.DefaultConfig())
if err != nil {
t.Fatal("failed creating repo", err)
}
defer r.Close()

r.Config().Set("ACL", map[string]string{
"dummy": "invalidACL",
})
_, err := auth.NewAclManager(r)
_, err = auth.NewAclManager(r)
if err == nil {
t.Fatal("Expected error while creating new acl manager")
}
Expand All @@ -80,7 +46,10 @@ func TestNewAclManagerWithAcls(t *testing.T) {
}

func TestACLLifecycle(t *testing.T) {
r := newTestRepo()
r, err := inmem.CreateOrOpen(jsonConf.DefaultConfig())
if err != nil {
t.Fatal("failed creating repo", err)
}
defer r.Close()

am, err := auth.NewAclManager(r)
Expand Down
72 changes: 18 additions & 54 deletions modules/diag/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,36 @@ import (
"encoding/json"
"net/http"
"sync"

"github.com/plexsysio/taskmanager"
)

type Reporter interface {
Status() interface{}
}

type Manager interface {
Report(string, Status)
AddReporter(string, Reporter)
Status() map[string]interface{}
}

type Status interface {
Delta(Status)
type impl struct {
mp sync.Map
}

type String string

func (s String) Delta(_ Status) {}

type Map map[string]interface{}

func (m Map) Delta(old Status) {
oldMp, ok := old.(Map)
if !ok {
return
}
for k, v := range oldMp {
_, ok := m[k]
if !ok {
m[k] = v
}
}
func New() Manager {
return new(impl)
}

type impl struct {
mp sync.Map
tm *taskmanager.TaskManager
func (m *impl) AddReporter(key string, reporter Reporter) {
m.mp.Store(key, reporter)
}

func New(
tm *taskmanager.TaskManager,
) Manager {
m := &impl{
tm: tm,
}
return m
func (m *impl) Status() map[string]interface{} {
retStatus := make(map[string]interface{})
m.mp.Range(func(k, v interface{}) bool {
retStatus[k.(string)] = v.(Reporter).Status()
return true
})
return retStatus
}

func RegisterHTTP(m Manager, mux *http.ServeMux) {
Expand All @@ -62,25 +48,3 @@ func RegisterHTTP(m Manager, mux *http.ServeMux) {
_, _ = w.Write(buf)
})
}

func (m *impl) Report(key string, msg Status) {
val, loaded := m.mp.LoadOrStore(key, msg)
if !loaded {
return
}
oldStatus, ok := val.(Status)
if ok {
msg.Delta(oldStatus)
m.mp.Store(key, msg)
}
}

func (m *impl) Status() map[string]interface{} {
retStatus := make(map[string]interface{})
m.mp.Range(func(k, v interface{}) bool {
retStatus[k.(string)] = v
return true
})
retStatus["Task Manager"] = m.tm.TaskStatus()
return retStatus
}
68 changes: 34 additions & 34 deletions modules/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ import (
"context"
"errors"
"fmt"
"time"

logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/plexsysio/go-msuite/modules/config"
"github.com/plexsysio/go-msuite/modules/grpc/p2pgrpc"
"github.com/plexsysio/taskmanager"
"google.golang.org/grpc"
"time"
)

var log = logger.Logger("grpc/client")

const discoveryTTL = 15 * time.Minute

var ErrNoPeerForSvc = errors.New("failed to find any usable peer for service")

type ClientSvc interface {
Get(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error)
}
Expand Down Expand Up @@ -60,44 +65,29 @@ func (d *discoveryProvider) Name() string {

func (d *discoveryProvider) Execute(ctx context.Context) error {
for {
var (
startTTL time.Duration
err error
)
started := time.Now()
for i, svc := range d.services {
log.Infof("Advertising service: %s", svc)
ttl, e := d.ds.Advertise(ctx, svc, discovery.TTL(time.Minute*15))
if e != nil {
err = fmt.Errorf("error advertising %s: %w", svc, e)
var err error
for _, svc := range d.services {
log.Debugf("Advertising service: %s", svc)
_, err = d.ds.Advertise(ctx, svc, discovery.TTL(discoveryTTL))
if err != nil {
err = fmt.Errorf("error advertising %s: %w", svc, err)
break
}
// Use TTL of first advertisement for wait in the next part
if i == 0 {
startTTL = ttl
}
}
if err != nil {
log.Debug(err.Error())
log.Errorf("error advertising %v", err)
select {
case <-time.After(time.Minute * 2):
continue
case <-ctx.Done():
return nil
}
}
// Time to wait needs to obey TTL of the first service advertised.
// If the operation takes time and we wait for all services to advertise, initial
// services might not get advertised.
ttl := startTTL - time.Since(started)
if ttl <= 0 {
ttl = startTTL
}
wait := 7 * ttl / 8
wait := 7 * discoveryTTL / 8
select {
case <-time.After(wait):
case <-ctx.Done():
log.Info("Stopping advertiser")
log.Info("stopping advertiser")
return nil
}
}
Expand All @@ -113,24 +103,34 @@ func (c *clientImpl) Get(
svc string,
opts ...grpc.DialOption,
) (*grpc.ClientConn, error) {
p, err := c.ds.FindPeers(ctx, svc, discovery.Limit(1))

// FindPeers is called without limit opt, so this cancel is required to release
// any resources used by it
cCtx, cCancel := context.WithCancel(ctx)
defer cCancel()

p, err := c.ds.FindPeers(cCtx, svc)
if err != nil {
return nil, err
}
select {
case <-time.After(time.Second * 10):
return nil, errors.New("unable to find peer for service " + svc)
case pAddr, ok := <-p:
if ok {

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case pAddr, more := <-p:
if !more {
return nil, ErrNoPeerForSvc
}
err = c.h.Connect(ctx, pAddr)
if err != nil {
return nil, fmt.Errorf("failed to connect to peer %v", pAddr)
log.Errorf("failed to connect to peer %v err %v", pAddr, err)
continue
}
log.Infof("Connected to peer %v for service %s", pAddr, svc)
log.Debugf("connected to peer %v for service %s", pAddr, svc)
return p2pgrpc.NewP2PDialer(c.h).Dial(ctx, pAddr.ID.String(), opts...)
}
}
return nil, errors.New("invalid address received for peer")
}

func NewStaticClientService(c config.Config) ClientSvc {
Expand Down
Loading

0 comments on commit 64cb5e7

Please sign in to comment.