Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate the event bus, handle protocol update events, make identify emit deltas #659

Merged
merged 13 commits into from Jun 24, 2019
7 changes: 4 additions & 3 deletions go.mod
Expand Up @@ -9,16 +9,17 @@ require (
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-eventbus v0.0.1
github.com/libp2p/go-libp2p-autonat v0.1.0
github.com/libp2p/go-libp2p-blankhost v0.1.1
github.com/libp2p/go-libp2p-blankhost v0.1.2
github.com/libp2p/go-libp2p-circuit v0.1.0
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-core v0.0.4
github.com/libp2p/go-libp2p-discovery v0.1.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.2.1
github.com/libp2p/go-libp2p-nat v0.0.4
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.1.1
github.com/libp2p/go-libp2p-secio v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.4
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Expand Up @@ -3,8 +3,11 @@ github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETF
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA=
github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg=
github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY=
github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc=
Expand Down Expand Up @@ -85,16 +88,22 @@ github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOS
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-conn-security-multistream v0.1.0 h1:aqGmto+ttL/uJgX0JtQI0tD21CIEy5eYd1Hlp0juHY0=
github.com/libp2p/go-conn-security-multistream v0.1.0/go.mod h1:aw6eD7LOsHEX7+2hJkDxw1MteijaVcI+/eP2/x3J1xc=
github.com/libp2p/go-eventbus v0.0.1 h1:7hEu1mXhGmHUkUIgQYuWUIdpPYUs1h0YXmRh85u+Zs0=
github.com/libp2p/go-eventbus v0.0.1/go.mod h1:ghoLUYgnTL/ZaeVXPdV60iT9AEH43++U/y11FVgMQfs=
github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-libp2p-autonat v0.1.0 h1:aCWAu43Ri4nU0ZPO7NyLzUvvfqd0nE3dX0R/ZGYVgOU=
github.com/libp2p/go-libp2p-autonat v0.1.0/go.mod h1:1tLf2yXxiE/oKGtDwPYWTSYG3PtvYlJmg7NeVtPRqH8=
github.com/libp2p/go-libp2p-blankhost v0.1.1 h1:X919sCh+KLqJcNRApj43xCSiQRYqOSI88Fdf55ngf78=
github.com/libp2p/go-libp2p-blankhost v0.1.1/go.mod h1:pf2fvdLJPsC1FsVrNP3DUUvMzUts2dsLLBEpo1vW1ro=
github.com/libp2p/go-libp2p-blankhost v0.1.2 h1:IQqFW63VkZiCsod5SlE2FhAa2WYIiJNSrrwp3upHUWc=
github.com/libp2p/go-libp2p-blankhost v0.1.2/go.mod h1:ibwZioa1iHUMotliJMthahrnDNFY9kvedaxZzHQuKIg=
github.com/libp2p/go-libp2p-circuit v0.1.0 h1:eniLL3Y9aq/sryfyV1IAHj5rlvuyj3b7iz8tSiZpdhY=
github.com/libp2p/go-libp2p-circuit v0.1.0/go.mod h1:Ahq4cY3V9VJcHcn1SBXjr78AbFkZeIRmfunbA7pmFh8=
github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.0.4 h1:wNg7b2tKrZlSNibP+j7A1v8eatjf4+9+YRw0L3DUeFE=
github.com/libp2p/go-libp2p-core v0.0.4/go.mod h1:jyuCQP356gzfCFtRKyvAbNkyeuxb7OlyhWZ3nls5d2I=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0 h1:j+R6cokKcGbnZLf4kcNwpx6mDEUPF3N6SrqMymQhmvs=
Expand All @@ -112,6 +121,8 @@ github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUje
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E=
github.com/libp2p/go-libp2p-peerstore v0.1.0/go.mod h1:2CeHkQsr8svp4fZ+Oi9ykN1HBb6u0MOvdJ7YIsmcwtY=
github.com/libp2p/go-libp2p-peerstore v0.1.1 h1:AJZF2sPpVo+0aAr3IrRiGVsPjJm1INlUQ9EGClgXJ4M=
github.com/libp2p/go-libp2p-peerstore v0.1.1/go.mod h1:ojEWnwG7JpJLkJ9REWYXQslyu9ZLrPWPEcCdiZzEbSM=
github.com/libp2p/go-libp2p-secio v0.1.0 h1:NNP5KLxuP97sE5Bu3iuwOWyT/dKEGMN5zSLMWdB7GTQ=
github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8=
github.com/libp2p/go-libp2p-swarm v0.1.0 h1:HrFk2p0awrGEgch9JXK/qp/hfjqQfgNxpLWnCiWPg5s=
Expand Down Expand Up @@ -233,6 +244,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f h1:R423Cnkcp5JABoeemiGEPlt9tHXFfw5kvc0yqlxRPWo=
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443 h1:IcSOAf4PyMp3U3XbIEj1/xJ2BjNN2jWv7JoyOsMxXUU=
golang.org/x/crypto v0.0.0-20190618222545-ea8f1a30c443/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTdvMUC/cFajkphs1YLQo=
Expand Down
38 changes: 32 additions & 6 deletions p2p/host/basic/basic_host.go
Expand Up @@ -7,21 +7,24 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/libp2p/go-eventbus"
inat "github.com/libp2p/go-libp2p-nat"

logging "github.com/ipfs/go-log"
goprocess "github.com/jbenet/goprocess"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

inat "github.com/libp2p/go-libp2p-nat"

identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"
ping "github.com/libp2p/go-libp2p/p2p/protocol/ping"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
Expand Down Expand Up @@ -69,6 +72,7 @@ type BasicHost struct {
natmgr NATManager
maResolver *madns.Resolver
cmgr connmgr.ConnManager
eventbus event.Bus

AddrsFactory AddrsFactory

Expand All @@ -78,14 +82,16 @@ type BasicHost struct {

mx sync.Mutex
lastAddrs []ma.Multiaddr
emitters struct {
evtLocalProtocolsUpdated event.Emitter
}
}

var _ host.Host = (*BasicHost)(nil)

// HostOpts holds options that can be passed to NewHost in order to
// customize construction of the *BasicHost.
type HostOpts struct {

// MultistreamMuxer is essential for the *BasicHost and will use a sensible default value if omitted.
MultistreamMuxer *msmux.MultistreamMuxer

Expand Down Expand Up @@ -125,6 +131,12 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
negtimeout: DefaultNegotiationTimeout,
AddrsFactory: DefaultAddrsFactory,
maResolver: madns.DefaultResolver,
eventbus: eventbus.NewBus(),
}

var err error
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}

h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
Expand Down Expand Up @@ -366,6 +378,10 @@ func (h *BasicHost) IDService() *identify.IDService {
return h.ids
}

func (h *BasicHost) EventBus() event.Bus {
return h.eventbus
}

// SetStreamHandler sets the protocol handler on the Host's Mux.
// This is equivalent to:
// host.Mux().SetHandler(proto, handler)
Expand All @@ -377,6 +393,9 @@ func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHand
handler(is)
return nil
})
h.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
Added: []protocol.ID{pid},
})
}

// SetStreamHandlerMatch sets the protocol handler on the Host's Mux
Expand All @@ -388,11 +407,17 @@ func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(string) bool,
handler(is)
return nil
})
h.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
Added: []protocol.ID{pid},
})
}

// RemoveStreamHandler returns ..
func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
h.Mux().RemoveHandler(string(pid))
h.emitters.evtLocalProtocolsUpdated.Emit(event.EvtLocalProtocolsUpdated{
Removed: []protocol.ID{pid},
})
}

// NewStream opens a new stream to given peer p, and writes a p2p/protocol
Expand Down Expand Up @@ -721,6 +746,7 @@ func (h *BasicHost) AllAddrs() []ma.Multiaddr {

// Close shuts down the Host's services (network, etc).
func (h *BasicHost) Close() error {
_ = h.emitters.evtLocalProtocolsUpdated.Close()
return h.proc.Close()
}

Expand Down
39 changes: 39 additions & 0 deletions p2p/host/basic/basic_host_test.go
Expand Up @@ -4,10 +4,12 @@ import (
"bytes"
"context"
"io"
"reflect"
"sort"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
Expand Down Expand Up @@ -69,6 +71,43 @@ func TestHostSimple(t *testing.T) {
}
}

func TestProtocolHandlerEvents(t *testing.T) {
ctx := context.Background()
h := New(swarmt.GenSwarm(t, ctx))
defer h.Close()

ch := make(chan event.EvtLocalProtocolsUpdated, 100)
cancelFunc, err := h.EventBus().Subscribe(ch)
if err != nil {
t.Fatal(err)
}
defer cancelFunc()

assert := func(added, removed []protocol.ID) {
var next event.EvtLocalProtocolsUpdated
select {
case next = <-ch:
break
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
}

if !reflect.DeepEqual(added, next.Added) {
t.Errorf("expected added: %v; received: %v", added, next.Added)
}
if !reflect.DeepEqual(removed, next.Removed) {
t.Errorf("expected removed: %v; received: %v", removed, next.Removed)
}
}

h.SetStreamHandler(protocol.TestingID, func(s network.Stream) {})
assert([]protocol.ID{protocol.TestingID}, nil)
h.SetStreamHandler(protocol.ID("foo"), func(s network.Stream) {})
assert([]protocol.ID{protocol.ID("foo")}, nil)
h.RemoveStreamHandler(protocol.TestingID)
assert(nil, []protocol.ID{protocol.TestingID})
}

func TestHostAddrsFactory(t *testing.T) {
maddr := ma.StringCast("/ip4/1.2.3.4/tcp/1234")
addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr {
Expand Down
5 changes: 5 additions & 0 deletions p2p/host/routed/routed.go
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -153,6 +154,10 @@ func (rh *RoutedHost) Mux() protocol.Switch {
return rh.host.Mux()
}

func (rh *RoutedHost) EventBus() event.Bus {
return rh.host.EventBus()
}

func (rh *RoutedHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
rh.host.SetStreamHandler(pid, handler)
}
Expand Down