Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: protocols service, events tests, bootstrapper, local dialer #19

Merged
merged 3 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
store "github.com/plexsysio/gkvstore"
"github.com/plexsysio/go-msuite/modules/auth"
"github.com/plexsysio/go-msuite/modules/events"
"github.com/plexsysio/go-msuite/modules/protocols"
"github.com/plexsysio/go-msuite/modules/repo"
"github.com/plexsysio/go-msuite/modules/sharedStorage"
"github.com/plexsysio/taskmanager"
Expand All @@ -39,6 +40,7 @@ type Service interface {
HTTP() (HTTP, error)
Locker() (dLocker.DLocker, error)
Events() (events.Events, error)
Protocols() (protocols.ProtocolsSvc, error)
SharedStorage(string, sharedStorage.Callback) (store.Store, error)
Files() (*ipfslite.Peer, error)
}
Expand Down
2 changes: 1 addition & 1 deletion modules/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"sync"

logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-pubsub"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/plexsysio/taskmanager"
)

Expand Down
133 changes: 133 additions & 0 deletions modules/events/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package events_test

import (
"context"
"encoding/json"
"sync"
"testing"
"time"

logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/plexsysio/go-msuite/modules/events"
"github.com/plexsysio/taskmanager"
)

type testEvent struct {
Msg string
}

func (testEvent) Topic() string {
return "testEvent"
}

func (t *testEvent) Marshal() ([]byte, error) {
return json.Marshal(t)
}

func (t *testEvent) Unmarshal(buf []byte) error {
return json.Unmarshal(buf, t)
}

func TestEvents(t *testing.T) {
_ = logger.SetLogLevel("pubsub", "Debug")

tm1 := taskmanager.New(0, 2, time.Second)
tm2 := taskmanager.New(0, 2, time.Second)

h1, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}

h2, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
if err != nil {
t.Fatal(err)
}

psub1, err := pubsub.NewGossipSub(context.TODO(), h1, pubsub.WithFloodPublish(true))
if err != nil {
t.Fatal(err)
}

psub2, err := pubsub.NewGossipSub(context.TODO(), h2, pubsub.WithFloodPublish(true))
if err != nil {
t.Fatal(err)
}

ev1, err := events.NewEventsSvc(psub1, tm1)
if err != nil {
t.Fatal(err)
}

ev2, err := events.NewEventsSvc(psub2, tm2)
if err != nil {
t.Fatal(err)
}

err = h1.Connect(context.TODO(), peer.AddrInfo{
ID: h2.ID(),
Addrs: h2.Addrs(),
})
if err != nil {
t.Fatal(err)
}

mtx := sync.Mutex{}
count1, count2 := 0, 0

ev1.RegisterHandler(func() events.Event { return new(testEvent) }, func(ev events.Event) {
testEv, ok := ev.(*testEvent)
if !ok {
t.Fatal("invalid event in handler")
}
if testEv.Msg != "hello" && testEv.Msg != "world" {
t.Fatal("incorrect msg in event")
}
mtx.Lock()
count1++
mtx.Unlock()
})

ev2.RegisterHandler(func() events.Event { return new(testEvent) }, func(ev events.Event) {
testEv, ok := ev.(*testEvent)
if !ok {
t.Fatal("invalid event in handler")
}
if testEv.Msg != "hello" && testEv.Msg != "world" {
t.Fatal("incorrect msg in event")
}
mtx.Lock()
count2++
mtx.Unlock()
})

err = ev1.Broadcast(context.TODO(), &testEvent{Msg: "hello"})
if err != nil {
t.Fatal(err)
}

err = ev2.Broadcast(context.TODO(), &testEvent{Msg: "world"})
if err != nil {
t.Fatal(err)
}

started := time.Now()
for {
time.Sleep(time.Second)

mtx.Lock()
if count1 == 2 && count2 == 2 {
mtx.Unlock()
break
}
mtx.Unlock()

if time.Since(started) > 3*time.Second {
t.Fatal("waited 3 secs for events to trigger", count1, count2)
}
}

}
35 changes: 29 additions & 6 deletions modules/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/plexsysio/go-msuite/modules/config"
"github.com/plexsysio/go-msuite/modules/grpc/p2pgrpc"
"github.com/plexsysio/taskmanager"
Expand All @@ -26,14 +27,27 @@ type ClientSvc interface {
}

func NewP2PClientService(
cfg config.Config,
d discovery.Discovery,
localDialer host.Host,
mainHost host.Host,
) (ClientSvc, error) {
csvc := &clientImpl{
ds: d,
h: localDialer,

var services []string
_ = cfg.Get("Services", &services)

hostAddr := peer.AddrInfo{
ID: mainHost.ID(),
Addrs: mainHost.Addrs(),
}
return csvc, nil

log.Debug("Client service dialer %s Localhost %s", localDialer.ID(), mainHost.ID())

return &clientImpl{
ds: d,
h: localDialer,
hostAddr: hostAddr,
}, nil
}

func NewP2PClientAdvertiser(
Expand Down Expand Up @@ -94,8 +108,10 @@ func (d *discoveryProvider) Execute(ctx context.Context) error {
}

type clientImpl struct {
ds discovery.Discovery
h host.Host
ds discovery.Discovery
h host.Host
svcs []string
hostAddr peer.AddrInfo
}

func (c *clientImpl) Get(
Expand All @@ -104,6 +120,13 @@ func (c *clientImpl) Get(
opts ...grpc.DialOption,
) (*grpc.ClientConn, error) {

// Local service, dial to locally running P2P host
for _, v := range c.svcs {
if svc == v {
return p2pgrpc.NewP2PDialer(c.h).Dial(ctx, c.hostAddr.ID.String(), opts...)
}
}

// FindPeers is called without limit opt, so this cancel is required to release
// any resources used by it
cCtx, cCancel := context.WithCancel(ctx)
Expand Down
2 changes: 1 addition & 1 deletion modules/node/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func Client(c config.Config) fx.Option {
utils.MaybeProvide(
fx.Annotate(
grpcclient.NewP2PClientService,
fx.ParamTags(``, `name:localDialer`),
fx.ParamTags(``, ``, `name:localDialer`, ``),
),
c.IsSet("UseP2P"),
),
Expand Down
67 changes: 66 additions & 1 deletion modules/node/ipfs/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ import (
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/discovery"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
p2pdiscovery "github.com/libp2p/go-libp2p-discovery"
pubsub "github.com/libp2p/go-libp2p-pubsub"
libp2ptls "github.com/libp2p/go-libp2p-tls"
multiaddr "github.com/multiformats/go-multiaddr"
"github.com/plexsysio/go-msuite/modules/config"
"github.com/plexsysio/go-msuite/modules/diag/status"
"github.com/plexsysio/taskmanager"
"go.uber.org/fx"
)

Expand Down Expand Up @@ -126,7 +130,7 @@ func NewNode(
}

func Pubsub(ctx context.Context, h host.Host) (*pubsub.PubSub, error) {
return pubsub.NewGossipSub(ctx, h)
return pubsub.NewGossipSub(ctx, h, pubsub.WithFloodPublish(true))
}

func NewSvcDiscovery(r routing.Routing) discovery.Discovery {
Expand All @@ -150,3 +154,64 @@ func (p *p2pReporter) Status() interface{} {

return stat
}

func parseBootstrapPeers(addrs []string) ([]peer.AddrInfo, error) {
maddrs := make([]multiaddr.Multiaddr, len(addrs))
for i, addr := range addrs {
var err error
maddrs[i], err = multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
}
return peer.AddrInfosFromP2pAddrs(maddrs...)
}

func Bootstrapper(
lc fx.Lifecycle,
cfg config.Config,
tm *taskmanager.TaskManager,
h host.Host,
) error {
var addrs []string
if cfg.Get("BootstrapAddresses", &addrs) {
peers, err := parseBootstrapPeers(addrs)
if err != nil {
return err
}

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
sched, err := tm.GoFunc("Bootstrapper", func(c context.Context) error {
t := time.NewTicker(15 * time.Second)
for {
select {
case <-c.Done():
return nil
case <-t.C:
for _, p := range peers {
if h.Network().Connectedness(p.ID) != network.Connected {
h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
if err := h.Connect(ctx, p); err != nil {
log.Warn("could not connect to bootstrap address", p)
}
}
}
}
}
})
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-sched:
return nil
}
},
})
}

return nil
}
1 change: 1 addition & 0 deletions modules/node/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var P2PModule = fx.Options(
fx.Provide(NewSvcDiscovery),
fx.Invoke(NewMDNSDiscovery),
fx.Invoke(NewP2PReporter),
fx.Invoke(Bootstrapper),
)

var FilesModule = fx.Provide(NewNode)
10 changes: 10 additions & 0 deletions modules/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
mhttp "github.com/plexsysio/go-msuite/modules/node/http"
"github.com/plexsysio/go-msuite/modules/node/ipfs"
"github.com/plexsysio/go-msuite/modules/node/locker"
"github.com/plexsysio/go-msuite/modules/protocols"
"github.com/plexsysio/go-msuite/modules/repo"
"github.com/plexsysio/go-msuite/modules/repo/fsrepo"
"github.com/plexsysio/go-msuite/modules/repo/inmem"
Expand Down Expand Up @@ -97,6 +98,7 @@ func New(bCfg config.Config) (core.Service, error) {
utils.MaybeOption(grpcsvc.Module(r.Config()), bCfg.IsSet("UseGRPC")),
utils.MaybeOption(mhttp.Module(r.Config()), bCfg.IsSet("UseHTTP")),
utils.MaybeOption(fx.Provide(events.NewEventsSvc), bCfg.IsSet("UseP2P")),
utils.MaybeOption(fx.Provide(protocols.New), bCfg.IsSet("UseP2P")),
utils.MaybeOption(fx.Provide(sharedStorage.NewSharedStoreProvider), bCfg.IsSet("UseP2P")),
utils.MaybeInvoke(status.RegisterHTTP, bCfg.IsSet("UseHTTP")),
fx.Invoke(func(lc fx.Lifecycle, cancel context.CancelFunc) {
Expand Down Expand Up @@ -181,6 +183,7 @@ type deps struct {
St store.Store `optional:"true"`
Jm auth.JWTManager `optional:"true"`
Ev events.Events `optional:"true"`
Pr protocols.ProtocolsSvc `optional:"true"`
Cs grpcclient.ClientSvc `optional:"true"`
ShSt sharedStorage.Provider `optional:"true"`
}
Expand Down Expand Up @@ -285,6 +288,13 @@ func (s *impl) Locker() (dLocker.DLocker, error) {
return s.dp.Lk, nil
}

func (s *impl) Protocols() (protocols.ProtocolsSvc, error) {
if s.dp.Pr == nil {
return nil, errors.New("Protocols svc not configured")
}
return s.dp.Pr, nil
}

func (s *impl) Events() (events.Events, error) {
if s.dp.Ev == nil {
return nil, errors.New("Events not configured")
Expand Down
Loading