Skip to content

Commit

Permalink
feat: protocols service, events tests, bootstrapper, local dialer (#19)
Browse files Browse the repository at this point in the history
* feat: protocols service, events tests, bootstrapper, local dialer

* fix: events tests

* fix: deepsource and events test

Co-authored-by: alok <aloknerurkar@no-reply.com>
  • Loading branch information
aloknerurkar and alok committed Dec 27, 2021
1 parent aa9fb3c commit ca89eeb
Show file tree
Hide file tree
Showing 11 changed files with 546 additions and 9 deletions.
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
store "github.com/plexsysio/gkvstore"
"github.com/plexsysio/go-msuite/modules/auth"
"github.com/plexsysio/go-msuite/modules/events"
"github.com/plexsysio/go-msuite/modules/protocols"
"github.com/plexsysio/go-msuite/modules/repo"
"github.com/plexsysio/go-msuite/modules/sharedStorage"
"github.com/plexsysio/taskmanager"
Expand All @@ -39,6 +40,7 @@ type Service interface {
HTTP() (HTTP, error)
Locker() (dLocker.DLocker, error)
Events() (events.Events, error)
Protocols() (protocols.ProtocolsSvc, error)
SharedStorage(string, sharedStorage.Callback) (store.Store, error)
Files() (*ipfslite.Peer, error)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"

logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-pubsub"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/plexsysio/taskmanager"
)

Expand Down
133 changes: 133 additions & 0 deletions modules/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package events_test

import (
"context"
"encoding/json"
"sync"
"testing"
"time"

logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/plexsysio/go-msuite/modules/events"
"github.com/plexsysio/taskmanager"
)

type testEvent struct {
Msg string
}

func (testEvent) Topic() string {
return "testEvent"
}

func (t *testEvent) Marshal() ([]byte, error) {
return json.Marshal(t)
}

func (t *testEvent) Unmarshal(buf []byte) error {
return json.Unmarshal(buf, t)
}

func TestEvents(t *testing.T) {
_ = logger.SetLogLevel("pubsub", "Debug")

tm1 := taskmanager.New(0, 2, time.Second)
tm2 := taskmanager.New(0, 2, time.Second)

h1, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}

h2, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}

psub1, err := pubsub.NewGossipSub(context.TODO(), h1, pubsub.WithFloodPublish(true))
if err != nil {
t.Fatal(err)
}

psub2, err := pubsub.NewGossipSub(context.TODO(), h2, pubsub.WithFloodPublish(true))
if err != nil {
t.Fatal(err)
}

ev1, err := events.NewEventsSvc(psub1, tm1)
if err != nil {
t.Fatal(err)
}

ev2, err := events.NewEventsSvc(psub2, tm2)
if err != nil {
t.Fatal(err)
}

err = h1.Connect(context.TODO(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
if err != nil {
t.Fatal(err)
}

mtx := sync.Mutex{}
count1, count2 := 0, 0

ev1.RegisterHandler(func() events.Event { return new(testEvent) }, func(ev events.Event) {
testEv, ok := ev.(*testEvent)
if !ok {
t.Fatal("invalid event in handler")
}
if testEv.Msg != "hello" && testEv.Msg != "world" {
t.Fatal("incorrect msg in event")
}
mtx.Lock()
count1++
mtx.Unlock()
})

ev2.RegisterHandler(func() events.Event { return new(testEvent) }, func(ev events.Event) {
testEv, ok := ev.(*testEvent)
if !ok {
t.Fatal("invalid event in handler")
}
if testEv.Msg != "hello" && testEv.Msg != "world" {
t.Fatal("incorrect msg in event")
}
mtx.Lock()
count2++
mtx.Unlock()
})

err = ev1.Broadcast(context.TODO(), &testEvent{Msg: "hello"})
if err != nil {
t.Fatal(err)
}

err = ev2.Broadcast(context.TODO(), &testEvent{Msg: "world"})
if err != nil {
t.Fatal(err)
}

started := time.Now()
for {
time.Sleep(time.Second)

mtx.Lock()
if count1 == 2 && count2 == 2 {
mtx.Unlock()
break
}
mtx.Unlock()

if time.Since(started) > 3*time.Second {
t.Fatal("waited 3 secs for events to trigger", count1, count2)
}
}

}
35 changes: 29 additions & 6 deletions modules/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/plexsysio/go-msuite/modules/config"
"github.com/plexsysio/go-msuite/modules/grpc/p2pgrpc"
"github.com/plexsysio/taskmanager"
Expand All @@ -26,14 +27,27 @@ type ClientSvc interface {
}

func NewP2PClientService(
cfg config.Config,
d discovery.Discovery,
localDialer host.Host,
mainHost host.Host,
) (ClientSvc, error) {
csvc := &clientImpl{
ds: d,
h: localDialer,

var services []string
_ = cfg.Get("Services", &services)

hostAddr := peer.AddrInfo{
ID: mainHost.ID(),
Addrs: mainHost.Addrs(),
}
return csvc, nil

log.Debug("Client service dialer %s Localhost %s", localDialer.ID(), mainHost.ID())

return &clientImpl{
ds: d,
h: localDialer,
hostAddr: hostAddr,
}, nil
}

func NewP2PClientAdvertiser(
Expand Down Expand Up @@ -94,8 +108,10 @@ func (d *discoveryProvider) Execute(ctx context.Context) error {
}

type clientImpl struct {
ds discovery.Discovery
h host.Host
ds discovery.Discovery
h host.Host
svcs []string
hostAddr peer.AddrInfo
}

func (c *clientImpl) Get(
Expand All @@ -104,6 +120,13 @@ func (c *clientImpl) Get(
opts ...grpc.DialOption,
) (*grpc.ClientConn, error) {

// Local service, dial to locally running P2P host
for _, v := range c.svcs {
if svc == v {
return p2pgrpc.NewP2PDialer(c.h).Dial(ctx, c.hostAddr.ID.String(), opts...)
}
}

// FindPeers is called without limit opt, so this cancel is required to release
// any resources used by it
cCtx, cCancel := context.WithCancel(ctx)
Expand Down
2 changes: 1 addition & 1 deletion modules/node/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func Client(c config.Config) fx.Option {
utils.MaybeProvide(
fx.Annotate(
grpcclient.NewP2PClientService,
fx.ParamTags(``, `name:localDialer`),
fx.ParamTags(``, ``, `name:localDialer`, ``),
),
c.IsSet("UseP2P"),
),
Expand Down
67 changes: 66 additions & 1 deletion modules/node/ipfs/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ import (
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/discovery"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
p2pdiscovery "github.com/libp2p/go-libp2p-discovery"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2ptls "github.com/libp2p/go-libp2p-tls"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/plexsysio/go-msuite/modules/config"
"github.com/plexsysio/go-msuite/modules/diag/status"
"github.com/plexsysio/taskmanager"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -126,7 +130,7 @@ func NewNode(
}

func Pubsub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
return pubsub.NewGossipSub(ctx, h)
return pubsub.NewGossipSub(ctx, h, pubsub.WithFloodPublish(true))
}

func NewSvcDiscovery(r routing.Routing) discovery.Discovery {
Expand All @@ -150,3 +154,64 @@ func (p *p2pReporter) Status() interface{} {

return stat
}

func parseBootstrapPeers(addrs []string) ([]peer.AddrInfo, error) {
maddrs := make([]multiaddr.Multiaddr, len(addrs))
for i, addr := range addrs {
var err error
maddrs[i], err = multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
}
return peer.AddrInfosFromP2pAddrs(maddrs...)
}

func Bootstrapper(
lc fx.Lifecycle,
cfg config.Config,
tm *taskmanager.TaskManager,
h host.Host,
) error {
var addrs []string
if cfg.Get("BootstrapAddresses", &addrs) {
peers, err := parseBootstrapPeers(addrs)
if err != nil {
return err
}

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sched, err := tm.GoFunc("Bootstrapper", func(c context.Context) error {
t := time.NewTicker(15 * time.Second)
for {
select {
case <-c.Done():
return nil
case <-t.C:
for _, p := range peers {
if h.Network().Connectedness(p.ID) != network.Connected {
h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
if err := h.Connect(ctx, p); err != nil {
log.Warn("could not connect to bootstrap address", p)
}
}
}
}
}
})
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-sched:
return nil
}
},
})
}

return nil
}
1 change: 1 addition & 0 deletions modules/node/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var P2PModule = fx.Options(
fx.Provide(NewSvcDiscovery),
fx.Invoke(NewMDNSDiscovery),
fx.Invoke(NewP2PReporter),
fx.Invoke(Bootstrapper),
)

var FilesModule = fx.Provide(NewNode)
10 changes: 10 additions & 0 deletions modules/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
mhttp "github.com/plexsysio/go-msuite/modules/node/http"
"github.com/plexsysio/go-msuite/modules/node/ipfs"
"github.com/plexsysio/go-msuite/modules/node/locker"
"github.com/plexsysio/go-msuite/modules/protocols"
"github.com/plexsysio/go-msuite/modules/repo"
"github.com/plexsysio/go-msuite/modules/repo/fsrepo"
"github.com/plexsysio/go-msuite/modules/repo/inmem"
Expand Down Expand Up @@ -97,6 +98,7 @@ func New(bCfg config.Config) (core.Service, error) {
utils.MaybeOption(grpcsvc.Module(r.Config()), bCfg.IsSet("UseGRPC")),
utils.MaybeOption(mhttp.Module(r.Config()), bCfg.IsSet("UseHTTP")),
utils.MaybeOption(fx.Provide(events.NewEventsSvc), bCfg.IsSet("UseP2P")),
utils.MaybeOption(fx.Provide(protocols.New), bCfg.IsSet("UseP2P")),
utils.MaybeOption(fx.Provide(sharedStorage.NewSharedStoreProvider), bCfg.IsSet("UseP2P")),
utils.MaybeInvoke(status.RegisterHTTP, bCfg.IsSet("UseHTTP")),
fx.Invoke(func(lc fx.Lifecycle, cancel context.CancelFunc) {
Expand Down Expand Up @@ -181,6 +183,7 @@ type deps struct {
St store.Store `optional:"true"`
Jm auth.JWTManager `optional:"true"`
Ev events.Events `optional:"true"`
Pr protocols.ProtocolsSvc `optional:"true"`
Cs grpcclient.ClientSvc `optional:"true"`
ShSt sharedStorage.Provider `optional:"true"`
}
Expand Down Expand Up @@ -285,6 +288,13 @@ func (s *impl) Locker() (dLocker.DLocker, error) {
return s.dp.Lk, nil
}

func (s *impl) Protocols() (protocols.ProtocolsSvc, error) {
if s.dp.Pr == nil {
return nil, errors.New("Protocols svc not configured")
}
return s.dp.Pr, nil
}

func (s *impl) Events() (events.Events, error) {
if s.dp.Ev == nil {
return nil, errors.New("Events not configured")
Expand Down
Loading

0 comments on commit ca89eeb

Please sign in to comment.