Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiaddrs instead of UNIX socket paths for daemon sockets #41

Merged
merged 16 commits into from
Dec 27, 2018
16 changes: 9 additions & 7 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ developed in, nor whether a native libp2p implementation exists in that
language. Running *multiple instances* of the daemon is also possible, and
specially useful for testing purposes.

When establishing connections, the daemon handles transport selection, security
negotiation, and protocol and stream multiplexing. Streams are mapped 1:1 to
local endpoints (e.g. unix sockets, shared-memory). Writes and reads to/from
those endpoints are converted to writes and reads to/from the stream, allowing
any application to interact with a libp2p network through simple, local IO.
When establishing connections, the daemon handles transport selection,
security negotiation, and protocol and stream multiplexing. Streams
are mapped 1:1 to socket connections. Writes and reads to/from those
sockets are converted to writes and reads to/from the stream, allowing
any application to interact with a libp2p network through simple,
well-defined IO.

The daemon exposes a control endpoint for management, supporting basic
operations such as peer connection/disconnection, stream opening/closing, etc.
Expand Down Expand Up @@ -55,6 +56,7 @@ please open a [Github issue](https://github.com/libp2p/go-libp2p-daemon/issues).
- ✅ Daemon identity: auto-generated, and persisted.
- 🚧 Subsystem: DHT interactions.
- 🚧 Subsystem: Pubsub interactions.
- 🚧 Support multiaddr protocols instead of exclusively unix sockets.
- Subsystem: Circuit relay support.
- Subsystem: Peerstore operations.
- Connection notifications.
Expand All @@ -68,12 +70,12 @@ These are the medium-term priorities for us. If you feel something is missing,
please open a [Github issue](https://github.com/libp2p/go-libp2p-daemon/issues).

- Multi-tenancy, one application = one identity = one peer ID.
- app <> daemon isolation; trust-less scenario; programs should not be able to
- app <> daemon isolation; trust-less scenario; programs should not be able to
interfere or spy on streams owned by others.
- Shared-memory local transport between apps and the daemon: potentially more
efficient than unix sockets.
- Extracting local transports as go-libp2p transports.
- Allowing "blessed" applications to act on behalf of the daemon.
- Global services implemented in the user space.
- Plugins: services providing features back to the daemon, for use by other
- Plugins: services providing features back to the daemon, for use by other
tenants.
9 changes: 6 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,18 @@ func (d *Daemon) doStreamHandler(req *pb.Request) *pb.Response {
d.mx.Lock()
defer d.mx.Unlock()

path := *req.StreamHandler.Path
maddr, err := ma.NewMultiaddrBytes(req.StreamHandler.Addr)
if err != nil {
return errorResponse(err)
}
for _, sp := range req.StreamHandler.Proto {
p := proto.ID(sp)
_, ok := d.handlers[p]
if !ok {
d.host.SetStreamHandler(p, d.handleStream)
}
log.Debugf("set stream handler: %s -> %s", sp, path)
d.handlers[p] = path
log.Debugf("set stream handler: %s -> %s", sp, maddr.String())
d.handlers[p] = maddr
}

return okResponse()
Expand Down
14 changes: 7 additions & 7 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package p2pd
import (
"context"
"fmt"
"net"
"sync"

logging "github.com/ipfs/go-log"
Expand All @@ -16,30 +15,31 @@ import (
ps "github.com/libp2p/go-libp2p-pubsub"
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

var log = logging.Logger("p2pd")

type Daemon struct {
ctx context.Context
host host.Host
listener net.Listener
listener manet.Listener

dht *dht.IpfsDHT
pubsub *ps.PubSub

mx sync.Mutex
// stream handlers: map of protocol.ID to unix socket path
handlers map[proto.ID]string
// stream handlers: map of protocol.ID to multi-address
handlers map[proto.ID]ma.Multiaddr
}

func NewDaemon(ctx context.Context, path string, opts ...libp2p.Option) (*Daemon, error) {
func NewDaemon(ctx context.Context, maddr ma.Multiaddr, opts ...libp2p.Option) (*Daemon, error) {
h, err := libp2p.New(ctx, opts...)
if err != nil {
return nil, err
}

l, err := net.Listen("unix", path)
l, err := manet.Listen(maddr)
if err != nil {
h.Close()
return nil, err
Expand All @@ -49,7 +49,7 @@ func NewDaemon(ctx context.Context, path string, opts ...libp2p.Option) (*Daemon
ctx: ctx,
host: h,
listener: l,
handlers: make(map[proto.ID]string),
handlers: make(map[proto.ID]ma.Multiaddr),
}

go d.listen()
Expand Down
24 changes: 12 additions & 12 deletions p2pclient/p2pclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package p2pclient

import (
"errors"
"net"
"sync"

ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-daemon/pb"
peer "github.com/libp2p/go-libp2p-peer"
multiaddr "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

var log = logging.Logger("p2pclient")
Expand All @@ -19,22 +19,22 @@ const MessageSizeMax = 1 << 22 // 4 MB

// Client is the struct that manages a connection to a libp2p daemon.
type Client struct {
controlPath string
listenPath string
listener net.Listener
controlMaddr multiaddr.Multiaddr
listenMaddr multiaddr.Multiaddr
listener manet.Listener

mhandlers sync.Mutex
handlers map[string]StreamHandlerFunc
}

// NewClient creates a new libp2p daemon client, connecting to a daemon
// listening on a unix socket at controlPath, and establishing an inbound socket
// at listenPath.
func NewClient(controlPath, listenPath string) (*Client, error) {
// listening on a multi-addr at controlMaddr, and establishing an inbound
// listening multi-address at listenMaddr
func NewClient(controlMaddr, listenMaddr multiaddr.Multiaddr) (*Client, error) {
client := &Client{
controlPath: controlPath,
listenPath: listenPath,
handlers: make(map[string]StreamHandlerFunc),
controlMaddr: controlMaddr,
listenMaddr: listenMaddr,
handlers: make(map[string]StreamHandlerFunc),
}

if err := client.listen(); err != nil {
Expand All @@ -44,8 +44,8 @@ func NewClient(controlPath, listenPath string) (*Client, error) {
return client, nil
}

func (c *Client) newControlConn() (net.Conn, error) {
return net.Dial("unix", c.controlPath)
func (c *Client) newControlConn() (manet.Conn, error) {
return manet.Dial(c.controlMaddr)
}

// Identify queries the daemon for its peer ID and listen addresses.
Expand Down
23 changes: 11 additions & 12 deletions p2pclient/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"fmt"
"io"
"net"
"os"

ggio "github.com/gogo/protobuf/io"
"github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/proto"
pb "github.com/libp2p/go-libp2p-daemon/pb"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// StreamInfo wraps the protobuf structure with friendlier types.
Expand Down Expand Up @@ -53,17 +53,17 @@ func (c *byteReaderConn) ReadByte() (byte, error) {
}

func readMsgBytesSafe(r *byteReaderConn) (*bytes.Buffer, error) {
len, err := binary.ReadUvarint(r)
length, err := binary.ReadUvarint(r)
mvid marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
out := &bytes.Buffer{}
n, err := io.CopyN(out, r, int64(len))
n, err := io.CopyN(out, r, int64(length))
if err != nil {
return nil, err
}
if n != int64(len) {
return nil, fmt.Errorf("read incorrect number of bytes in header: expected %d, got %d", len, n)
if n != int64(length) {
return nil, fmt.Errorf("read incorrect number of bytes in header: expected %d, got %d", length, n)
}
return out, nil
}
Expand Down Expand Up @@ -122,11 +122,10 @@ func (c *Client) NewStream(peer peer.ID, protos []string) (*StreamInfo, io.ReadW
return info, control, nil
}

// Close stops the listener socket.
// Close stops the listener address.
func (c *Client) Close() error {
if c.listener != nil {
err := c.listener.Close()
os.Remove(c.listenPath)
return err
}
return nil
Expand All @@ -136,7 +135,7 @@ func (c *Client) streamDispatcher() {
for {
rawconn, err := c.listener.Accept()
if err != nil {
log.Errorf("accepting incoming connection: %s", err)
log.Warningf("accepting incoming connection: %s", err)
return
}
conn := &byteReaderConn{rawconn}
Expand Down Expand Up @@ -168,7 +167,7 @@ func (c *Client) streamDispatcher() {
}

func (c *Client) listen() error {
l, err := net.Listen("unix", c.listenPath)
l, err := manet.Listen(c.listenMaddr)
if err != nil {
return err
}
Expand All @@ -183,7 +182,7 @@ func (c *Client) listen() error {
// on a given protocol.
type StreamHandlerFunc func(*StreamInfo, io.ReadWriteCloser)

// NewStreamHandler establishes an inbound unix socket and starts a listener.
// NewStreamHandler establishes an inbound multi-address and starts a listener.
// All inbound connections to the listener are delegated to the provided
// handler.
func (c *Client) NewStreamHandler(protos []string, handler StreamHandlerFunc) error {
Expand All @@ -199,7 +198,7 @@ func (c *Client) NewStreamHandler(protos []string, handler StreamHandlerFunc) er
req := &pb.Request{
Type: pb.Request_STREAM_HANDLER.Enum(),
StreamHandler: &pb.StreamHandlerRequest{
Path: &c.listenPath,
Addr: c.listenMaddr.Bytes(),
Proto: protos,
},
}
Expand Down
12 changes: 9 additions & 3 deletions p2pd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
ps "github.com/libp2p/go-libp2p-pubsub"
quic "github.com/libp2p/go-libp2p-quic-transport"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
multiaddr "github.com/multiformats/go-multiaddr"
)

func main() {
identify.ClientVersion = "p2pd/0.1"

sock := flag.String("sock", "/tmp/p2pd.sock", "daemon control socket path")
mvid marked this conversation as resolved.
Show resolved Hide resolved
maddrString := flag.String("listen", "/unix/tmp/p2pd.sock", "daemon control listen multiaddr")
quiet := flag.Bool("q", false, "be quiet")
id := flag.String("id", "", "peer identity; private key file")
bootstrap := flag.Bool("b", false, "connects to bootstrap peers and bootstraps the dht if enabled")
Expand All @@ -41,6 +42,11 @@ func main() {

var opts []libp2p.Option

maddr, err := multiaddr.NewMultiaddr(*maddrString)
if err != nil {
log.Fatal(err)
}

if *id != "" {
key, err := p2pd.ReadIdentity(*id)
if err != nil {
Expand Down Expand Up @@ -71,7 +77,7 @@ func main() {
opts = append(opts, libp2p.NATPortMap())
}

d, err := p2pd.NewDaemon(context.Background(), *sock, opts...)
d, err := p2pd.NewDaemon(context.Background(), maddr, opts...)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -110,7 +116,7 @@ func main() {
}

if !*quiet {
fmt.Printf("Control socket: %s\n", *sock)
fmt.Printf("Control socket: %s\n", maddr.String())
fmt.Printf("Peer ID: %s\n", d.ID().Pretty())
fmt.Printf("Peer Addrs:\n")
for _, addr := range d.Addrs() {
Expand Down
26 changes: 15 additions & 11 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,37 @@
"gxDependencies": [
{
"author": "whyrusleeping",
"hash": "QmRBaUEQEeFWywfrZJ64QgsmvcqgLSK3VbvGMR2NM2Edpf",
"hash": "QmYxivS34F2M2n44WQQnRHGAKS8aoRUxwGpi9wk4Cdn4Jf",
"name": "go-libp2p",
"version": "6.0.28"
"version": "6.0.30"
},
{
"hash": "QmXbPygnUKAPMwseE5U3hQA7Thn59GVm7pQrhkFV63umT8",
"hash": "QmNoNExMdWrYSPZDiJJTVmxSh6uKLN26xYVzbLzBLedRcv",
"name": "go-libp2p-kad-dht",
"version": "4.4.17"
"version": "4.4.19"
},
{
"author": "whyrusleeping",
"hash": "QmWJBngogUPF87Xz788NotnwfYS1B5oanKew82zuMwUkQu",
"hash": "QmYB44VSn76PMvefjvcKxdhnHtZxB36zrToCSh6u4H9U7M",
"name": "go-libp2p-connmgr",
"version": "0.3.29"
"version": "0.3.30"
},
{
"author": "marten-seemann",
"hash": "QmSvK3DvgynMo45orM88RQowdupvgdxs3fDyahQsKkmcUP",
"hash": "QmR1g19UeP13BrVPCeEJm6R1J1E5yCdueiKpQJfPdnWC9z",
"name": "go-libp2p-quic-transport",
"version": "0.2.14"
"version": "0.2.15"
},
{
"author": "whyrusleeping",
"hash": "QmaqGyUhWLsJbVo1QAujSu13mxNjFJ98Kt2VWGSnShGE1Q",
"hash": "QmVRxA4J3UPQpw74dLrQ6NJkfysCA1H4GU28gVpXQt9zMU",
"name": "go-libp2p-pubsub",
"version": "0.11.9"
"version": "0.11.10"
},
{
"hash": "QmZcLBXKaFe8ND5YHPkJRAwmhJGrVsi1JqDZNyJ4nRK5Mj",
"name": "go-multiaddr-net",
vyzo marked this conversation as resolved.
Show resolved Hide resolved
"version": "1.7.1"
}
],
"gxVersion": "0.12.1",
Expand All @@ -42,4 +47,3 @@
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "0.0.4"
}

Loading