Skip to content

Commit

Permalink
fix: multiple hosts in fx and removing service init (#21)
Browse files Browse the repository at this point in the history
* fix: multiple hosts in fx and removing service init

* fix: adding more components, client service tests

* fix: ci and add test file

* fix: deepsource

Co-authored-by: alok <aloknerurkar@no-reply.com>
  • Loading branch information
aloknerurkar and alok committed Dec 30, 2021
1 parent ea76938 commit 04a4ece
Show file tree
Hide file tree
Showing 9 changed files with 434 additions and 93 deletions.
30 changes: 30 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/opentracing/opentracing-go"
"github.com/plexsysio/dLocker"
store "github.com/plexsysio/gkvstore"
"github.com/plexsysio/go-msuite/modules/auth"
Expand All @@ -20,9 +21,11 @@ import (
"github.com/plexsysio/go-msuite/modules/repo"
"github.com/plexsysio/go-msuite/modules/sharedStorage"
"github.com/plexsysio/taskmanager"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)

// Service is the collection of all the interfaces provided by the msuite instance
type Service interface {
Start(context.Context) error
Stop(context.Context) error
Expand All @@ -34,34 +37,61 @@ type Service interface {
// TM uses taskmanager for async task scheduling within
TM() *taskmanager.TaskManager

// Auth is used to provide authorized access to resources
Auth() (Auth, error)
// P2P encapsulates the libp2p related functionality which can be optionally
// configured and used
P2P() (P2P, error)
// GRPC encapsulates the gRPC client-server functionalities
GRPC() (GRPC, error)
// HTTP provides a multiplexer with middlewares configured. Also it provides
// the gRPC Gateway mux registered on the main mux
HTTP() (HTTP, error)
// Locker provides access to the distributed locker configured if any
Locker() (dLocker.DLocker, error)
// Events service can be used to broadcast/handle events in the form of messages
// using underlying PubSub
Events() (events.Events, error)
// Protocols service provides a simple request-response protocol interface to use
Protocols() (protocols.ProtocolsSvc, error)
// SharedStorage provides access to a distributed CRDT K-V store. Callbacks can
// be registered to get updates about certain keys
SharedStorage(string, sharedStorage.Callback) (store.Store, error)
// Files gives access to the ipfslite.Peer object. This can be used to share
// files across different nodes
Files() (*ipfslite.Peer, error)
// Tracing provides access to the configured tracer
Tracing() (opentracing.Tracer, error)
// Metrics provides access to the prometheus registry. This registry already
// has a bunch of default metrics registered.
Metrics() (*prometheus.Registry, error)
}

// P2P encapsulates the libp2p functionality. These can be used to write more
// advanced protocols/features if required
type P2P interface {
Host() host.Host
Routing() routing.Routing
Discovery() discovery.Discovery
Pubsub() *pubsub.PubSub
}

// Auth provides authorized access to resources using ACLs and JWT tokens
type Auth interface {
JWT() auth.JWTManager
ACL() auth.ACL
}

// GRPC provides the gRPC client-server implementations. Can be used to register services
// or call other services already registered
type GRPC interface {
Server() *grpc.Server
Client(context.Context, string, ...grpc.DialOption) (*grpc.ClientConn, error)
}

// HTTP provides the standard HTTP multiplexer already configured with middlewares. This
// can be used to register handlers etc. Gateway provides GRPC gateway multiplexer which
// can be used to register gRPC-gateways
type HTTP interface {
Mux() *http.ServeMux
Gateway() *runtime.ServeMux
Expand Down
39 changes: 38 additions & 1 deletion modules/grpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"time"

logger "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/plexsysio/go-msuite/modules/config"
"github.com/plexsysio/go-msuite/modules/grpc/p2pgrpc"
"github.com/plexsysio/taskmanager"
Expand Down Expand Up @@ -41,12 +44,15 @@ func NewP2PClientService(
Addrs: mainHost.Addrs(),
}

log.Debug("Client service dialer %s Localhost %s", localDialer.ID(), mainHost.ID())
localDialer.Peerstore().AddAddrs(hostAddr.ID, hostAddr.Addrs, peerstore.PermanentAddrTTL)

log.Debugf("Client service dialer %s Localhost %s", localDialer.ID(), mainHost.ID())

return &clientImpl{
ds: d,
h: localDialer,
hostAddr: hostAddr,
svcs: services,
}, nil
}

Expand Down Expand Up @@ -177,5 +183,36 @@ func (c *staticClientImpl) Get(
if !ok {
return nil, errors.New("service address not configured")
}

dialer := func(ctx context.Context, addr string) (net.Conn, error) {
var (
conn net.Conn
err error
done = make(chan struct{})
)
go func() {
defer close(done)

if ipAddr := net.ParseIP(addr); ipAddr != nil {
conn, err = net.Dial("tcp", addr)
return
}
if _, e := os.Stat(addr); e == nil {
conn, err = net.Dial("unix", addr)
return
}
err = fmt.Errorf("transport not supported %s", addr)
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-done:
return conn, err
}
}

opts = append(opts, grpc.WithContextDialer(dialer))

return grpc.DialContext(ctx, addr, opts...)
}
166 changes: 166 additions & 0 deletions modules/grpc/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package grpcclient_test

import (
"context"
"net"
"os"
"testing"
"time"

bhost "github.com/libp2p/go-libp2p-blankhost"
"github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
jsonConf "github.com/plexsysio/go-msuite/modules/config/json"
grpcclient "github.com/plexsysio/go-msuite/modules/grpc/client"
"github.com/plexsysio/go-msuite/modules/grpc/p2pgrpc"
"github.com/plexsysio/taskmanager"
"google.golang.org/grpc"
)

func TestStaticAddrs(t *testing.T) {

l1, err := net.Listen("tcp", ":10081")
if err != nil {
t.Fatal(err)
}

l2, err := net.Listen("unix", "/tmp/sock")
if err != nil {
t.Fatal(err)
}

t.Cleanup(func() {
l1.Close()
l2.Close()
_ = os.RemoveAll("/tmp/sock")
})

cfg := jsonConf.DefaultConfig()
cfg.Set("StaticAddresses", map[string]string{
"svc1": "localhost:10081",
"svc2": "/tmp/sock",
})

c := grpcclient.NewStaticClientService(cfg)

// Security credentials must be used, otherwise insecure should be explicitly
// added
_, err = c.Get(context.TODO(), "svc1")
if err == nil {
t.Fatal("succeeded without any options")
}

conn, err := c.Get(context.TODO(), "svc1", grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
conn.Close()

conn, err = c.Get(context.TODO(), "svc2", grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
conn.Close()
}

type testDiscovery struct {
adv chan string
addr peer.AddrInfo
}

func (t *testDiscovery) Advertise(_ context.Context, ns string, _ ...discovery.Option) (time.Duration, error) {
t.adv <- ns
return time.Second, nil
}

func (t *testDiscovery) FindPeers(_ context.Context, _ string, _ ...discovery.Option) (<-chan peer.AddrInfo, error) {
res := make(chan peer.AddrInfo)
go func() {
res <- t.addr
close(res)
}()
return res, nil
}

func TestP2PClient(t *testing.T) {

h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableQUIC))

h2Fired, h3Fired := make(chan struct{}), make(chan struct{})

h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableQUIC))
h2.SetStreamHandler(p2pgrpc.Protocol, func(s network.Stream) {
close(h2Fired)
})

h3 := bhost.NewBlankHost(swarmt.GenSwarm(t, swarmt.OptDisableQUIC))
h3.SetStreamHandler(p2pgrpc.Protocol, func(s network.Stream) {
close(h3Fired)
})

t.Cleanup(func() {
h1.Close()
h2.Close()
h3.Close()
})

cfg := jsonConf.DefaultConfig()
cfg.Set("Services", []string{"svc1"})

cs, err := grpcclient.NewP2PClientService(
cfg,
&testDiscovery{addr: h3.Peerstore().PeerInfo(h3.ID())}, // discovery provide h3 for svc2
h1, // local dialer
h2, // local host with svc1
)
if err != nil {
t.Fatal(err)
}

conn, err := cs.Get(context.TODO(), "svc1", grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}

<-h2Fired
conn.Close()

conn, err = cs.Get(context.TODO(), "svc2", grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}

<-h3Fired
conn.Close()
}

func TestP2PAdvertiser(t *testing.T) {
cfg := jsonConf.DefaultConfig()
cfg.Set("Services", []string{"svc1", "svc2"})

tm := taskmanager.New(0, 2, time.Second)
t.Cleanup(func() {
tm.Stop()
})

adv := make(chan string)
d := &testDiscovery{adv: adv}
err := grpcclient.NewP2PClientAdvertiser(cfg, d, tm)
if err != nil {
t.Fatal(err)
}

time.Sleep(100 * time.Millisecond)
s := <-adv
if s != "svc1" {
t.Fatal("incorrect advertisement", s)
}

time.Sleep(100 * time.Millisecond)
s = <-adv
if s != "svc2" {
t.Fatal("incorrect advertisement", s)
}
}
15 changes: 11 additions & 4 deletions modules/node/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ func Transport(c config.Config) fx.Option {
return fx.Options(
fx.Provide(NewMuxedListener),
utils.MaybeProvide(NewTCPListener, c.IsSet("UseTCP")),
utils.MaybeProvide(NewP2PListener, c.IsSet("UseP2PGRPC")),
utils.MaybeProvide(
fx.Annotate(NewP2PListener, fx.ParamTags(`name:"mainHost"`)),
c.IsSet("UseP2P") && c.IsSet("UseP2PGRPC"),
),
utils.MaybeProvide(NewUDSListener, c.IsSet("UseUDS")),
)
}
Expand Down Expand Up @@ -130,12 +133,16 @@ func Client(c config.Config) fx.Option {
utils.MaybeProvide(
fx.Annotate(
grpcclient.NewP2PClientService,
fx.ParamTags(``, ``, `name:localDialer`, ``),
fx.ParamTags(``, ``, `name:"localDialer"`, `name:"mainHost"`),
fx.ResultTags(`name:"p2pClientSvc"`),
),
c.IsSet("UseP2P"),
),
utils.MaybeInvoke(grpcclient.NewP2PClientAdvertiser, c.IsSet("UseP2P")),
utils.MaybeProvide(grpcclient.NewStaticClientService, !c.IsSet("UseP2P") && c.IsSet("UseStaticDiscovery")),
utils.MaybeInvoke(grpcclient.NewP2PClientAdvertiser, c.IsSet("UseP2P") && c.IsSet("UseP2PGRPC")),
utils.MaybeProvide(
fx.Annotate(grpcclient.NewStaticClientService, fx.ResultTags(`name:"staticClientSvc"`)),
c.IsSet("UseStaticDiscovery"),
),
)
}

Expand Down
1 change: 1 addition & 0 deletions modules/node/ipfs/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func LocalDialer(
return h.Close()
},
})
log.Debug("created local dialer", h.ID())
return h, nil
}

Expand Down
19 changes: 7 additions & 12 deletions modules/node/ipfs/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,13 @@ import (

var P2PModule = fx.Options(
fx.Provide(Identity),
fx.Provide(Libp2p),
fx.Provide(
fx.Annotate(
LocalDialer,
fx.ResultTags(`name:"localDialer"`),
),
),
fx.Provide(Pubsub),
fx.Provide(fx.Annotate(Libp2p, fx.ResultTags(`name:"mainHost"`, ``, ``))),
fx.Provide(fx.Annotate(LocalDialer, fx.ResultTags(`name:"localDialer"`))),
fx.Provide(fx.Annotate(Pubsub, fx.ParamTags(``, `name:"mainHost"`))),
fx.Provide(NewSvcDiscovery),
fx.Invoke(NewMDNSDiscovery),
fx.Invoke(NewP2PReporter),
fx.Invoke(Bootstrapper),
fx.Invoke(fx.Annotate(NewMDNSDiscovery, fx.ParamTags(``, `name:"mainHost"`))),
fx.Invoke(fx.Annotate(NewP2PReporter, fx.ParamTags(`name:"mainHost"`, ``))),
fx.Invoke(fx.Annotate(Bootstrapper, fx.ParamTags(``, ``, ``, `name:"mainHost"`))),
)

var FilesModule = fx.Provide(NewNode)
var FilesModule = fx.Provide(fx.Annotate(NewNode, fx.ParamTags(``, `name:"mainHost"`, ``, ``)))
Loading

0 comments on commit 04a4ece

Please sign in to comment.