Skip to content

Commit

Permalink
Merge pull request #2535 from libp2p/merge-gostream
Browse files Browse the repository at this point in the history
move libp2p/go-libp2p-gostream to p2p/net/gostream
  • Loading branch information
marten-seemann committed Aug 29, 2023
2 parents b438d18 + 8e0d111 commit b427242
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 0 deletions.
14 changes: 14 additions & 0 deletions p2p/net/gostream/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package gostream

import "github.com/libp2p/go-libp2p/core/peer"

// addr implements net.Addr and holds a libp2p peer ID.
type addr struct{ id peer.ID }

// Network returns the name of the network that this address belongs to
// (libp2p).
func (a *addr) Network() string { return Network }

// String returns the peer ID of this address in string form
// (B58-encoded).
func (a *addr) String() string { return a.id.String() }
43 changes: 43 additions & 0 deletions p2p/net/gostream/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package gostream

import (
"context"
"net"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)

// conn is an implementation of net.Conn which wraps
// libp2p streams.
type conn struct {
network.Stream
}

// newConn creates a conn given a libp2p stream
func newConn(s network.Stream) net.Conn {
return &conn{s}
}

// LocalAddr returns the local network address.
func (c *conn) LocalAddr() net.Addr {
return &addr{c.Stream.Conn().LocalPeer()}
}

// RemoteAddr returns the remote network address.
func (c *conn) RemoteAddr() net.Addr {
return &addr{c.Stream.Conn().RemotePeer()}
}

// Dial opens a stream to the destination address
// (which should parseable to a peer ID) using the given
// host and returns it as a standard net.Conn.
func Dial(ctx context.Context, h host.Host, pid peer.ID, tag protocol.ID) (net.Conn, error) {
s, err := h.NewStream(ctx, pid, tag)
if err != nil {
return nil, err
}
return newConn(s), nil
}
19 changes: 19 additions & 0 deletions p2p/net/gostream/gostream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package gostream allows to replace the standard net stack in Go
// with [LibP2P](https://github.com/libp2p/libp2p) streams.
//
// Given a libp2p.Host, gostream provides Dial() and Listen() methods which
// return implementations of net.Conn and net.Listener.
//
// Instead of the regular "host:port" addressing, `gostream` uses a Peer ID,
// and rather than a raw TCP connection, gostream will use libp2p's net.Stream.
// This means your connections will take advantage of LibP2P's multi-routes,
// NAT transversal and stream multiplexing.
//
// Note that LibP2P hosts cannot dial to themselves, so there is no possibility
// of using the same Host as server and as client.
package gostream

// Network is the "net.Addr.Network()" name returned by
// addresses used by gostream connections. In turn, the "net.Addr.String()" will
// be a peer ID.
var Network = "libp2p"
141 changes: 141 additions & 0 deletions p2p/net/gostream/gostream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package gostream

import (
"bufio"
"context"
"io"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/multiformats/go-multiaddr"
)

// newHost illustrates how to build a libp2p host with secio using
// a randomly generated key-pair
func newHost(t *testing.T, listen multiaddr.Multiaddr) host.Host {
h, err := libp2p.New(
libp2p.ListenAddrs(listen),
)
if err != nil {
t.Fatal(err)
}
return h
}

func TestServerClient(t *testing.T) {
m1, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10000")
m2, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/10001")
srvHost := newHost(t, m1)
clientHost := newHost(t, m2)
defer srvHost.Close()
defer clientHost.Close()

srvHost.Peerstore().AddAddrs(clientHost.ID(), clientHost.Addrs(), peerstore.PermanentAddrTTL)
clientHost.Peerstore().AddAddrs(srvHost.ID(), srvHost.Addrs(), peerstore.PermanentAddrTTL)

var tag protocol.ID = "/testitytest"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

done := make(chan struct{})
go func() {
defer close(done)
listener, err := Listen(srvHost, tag)
if err != nil {
t.Error(err)
return
}
defer listener.Close()

if listener.Addr().String() != srvHost.ID().Pretty() {
t.Error("bad listener address")
return
}

servConn, err := listener.Accept()
if err != nil {
t.Error(err)
return
}
defer servConn.Close()

reader := bufio.NewReader(servConn)
for {
msg, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
t.Error(err)
return
}
if msg != "is libp2p awesome?\n" {
t.Errorf("Bad incoming message: %s", msg)
return
}

_, err = servConn.Write([]byte("yes it is\n"))
if err != nil {
t.Error(err)
return
}
}
}()

clientConn, err := Dial(ctx, clientHost, srvHost.ID(), tag)
if err != nil {
t.Fatal(err)
}

if clientConn.LocalAddr().String() != clientHost.ID().Pretty() {
t.Fatal("Bad LocalAddr")
}

if clientConn.RemoteAddr().String() != srvHost.ID().Pretty() {
t.Fatal("Bad RemoteAddr")
}

if clientConn.LocalAddr().Network() != Network {
t.Fatal("Bad Network()")
}

err = clientConn.SetDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

err = clientConn.SetReadDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

err = clientConn.SetWriteDeadline(time.Now().Add(time.Second))
if err != nil {
t.Fatal(err)
}

_, err = clientConn.Write([]byte("is libp2p awesome?\n"))
if err != nil {
t.Fatal(err)
}

reader := bufio.NewReader(clientConn)
resp, err := reader.ReadString('\n')
if err != nil {
t.Fatal(err)
}

if string(resp) != "yes it is\n" {
t.Errorf("Bad response: %s", resp)
}

err = clientConn.Close()
if err != nil {
t.Fatal(err)
}
<-done
}
71 changes: 71 additions & 0 deletions p2p/net/gostream/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package gostream

import (
"context"
"net"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)

// listener is an implementation of net.Listener which handles
// http-tagged streams from a libp2p connection.
// A listener can be built with Listen()
type listener struct {
host host.Host
ctx context.Context
tag protocol.ID
cancel func()
streamCh chan network.Stream
}

// Accept returns the next a connection to this listener.
// It blocks if there are no connections. Under the hood,
// connections are libp2p streams.
func (l *listener) Accept() (net.Conn, error) {
select {
case s := <-l.streamCh:
return newConn(s), nil
case <-l.ctx.Done():
return nil, l.ctx.Err()
}
}

// Close terminates this listener. It will no longer handle any
// incoming streams
func (l *listener) Close() error {
l.cancel()
l.host.RemoveStreamHandler(l.tag)
return nil
}

// Addr returns the address for this listener, which is its libp2p Peer ID.
func (l *listener) Addr() net.Addr {
return &addr{l.host.ID()}
}

// Listen provides a standard net.Listener ready to accept "connections".
// Under the hood, these connections are libp2p streams tagged with the
// given protocol.ID.
func Listen(h host.Host, tag protocol.ID) (net.Listener, error) {
ctx, cancel := context.WithCancel(context.Background())

l := &listener{
host: h,
ctx: ctx,
cancel: cancel,
tag: tag,
streamCh: make(chan network.Stream),
}

h.SetStreamHandler(tag, func(s network.Stream) {
select {
case l.streamCh <- s:
case <-ctx.Done():
s.Reset()
}
})

return l, nil
}

0 comments on commit b427242

Please sign in to comment.