Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exchange signed routing records in identify #747

Merged
merged 46 commits into from Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
94439e6
wip - exchange signed routing records in identify
yusefnapora Dec 20, 2019
cfd7149
point -peerstore dep at PR branch
yusefnapora Dec 20, 2019
daaef63
fix error handling
yusefnapora Nov 22, 2019
2db11a5
test certified addr exchange during identify
yusefnapora Nov 25, 2019
c999bd0
intellij was supposed to run go fmt for me. i feel betrayed
yusefnapora Nov 25, 2019
b632eba
add option to disable signed addrs for testing
yusefnapora Nov 27, 2019
8948610
more explicit name for option to disable signed addrs
yusefnapora Nov 27, 2019
c97dd74
add routingStateManager
yusefnapora Dec 20, 2019
de97dd1
use event bus to trigger identify/push
yusefnapora Dec 20, 2019
2b0a74e
update to reflect name changes in -core
yusefnapora Jan 7, 2020
c2309af
update to track changes from -core PR
yusefnapora Jan 21, 2020
3f7951d
only generate initial peer record if we have addrs
yusefnapora Jan 21, 2020
71187c5
update PR branch dependencies
yusefnapora Jan 21, 2020
baf71df
add test for addr change event production
yusefnapora Jan 23, 2020
f86b996
rename option (RoutingState -> PeerRecord)
yusefnapora Jan 27, 2020
5fc5ead
change NewPeerRecordManager to not accept Host
yusefnapora Jan 27, 2020
c86f6e3
tests for peerRecordManager behavior
yusefnapora Jan 27, 2020
ad52da5
fix racy test
yusefnapora Jan 27, 2020
63c99a2
include local addrs in peer records by defaul
yusefnapora Jan 27, 2020
ea966e2
track peer record API changes, update PR deps
yusefnapora Feb 4, 2020
64f1353
name changes + mutex around latest peer record
yusefnapora Feb 4, 2020
6ecbfea
doc comments
yusefnapora Feb 4, 2020
1f922f0
remove option to filter local addrs from peer rec
yusefnapora Feb 4, 2020
58fe062
naming
yusefnapora Feb 4, 2020
c019ac1
define new protocol versions for id & id/push
yusefnapora Feb 4, 2020
1da3bd5
update -core dependency
yusefnapora Feb 10, 2020
8d316c3
use struct as map key instead of stringifying
yusefnapora Feb 14, 2020
ba8f960
simplify protoSupportsPeerRecords
yusefnapora Feb 14, 2020
db2625b
add nil check before emitting peer record event
yusefnapora Feb 14, 2020
839c8d2
regen with correct gogoproto package
yusefnapora Mar 6, 2020
aff832d
import grouping
yusefnapora Mar 6, 2020
140feb8
make peerRecordManager stateless
yusefnapora Mar 6, 2020
c775624
Merge origin/master into feat/identify-signed-addrs
yusefnapora Mar 6, 2020
0808676
certified addrs don't replace existing anymore
yusefnapora Mar 6, 2020
65c2cf1
move peerRecordManager from IDService to BasicHost
yusefnapora Mar 6, 2020
4bfae9d
order imports
aarshkshah1992 Mar 26, 2020
a45bb3f
signal addr changes
aarshkshah1992 Mar 26, 2020
6fa7285
Merge branch 'master' into feat/identify-signed-addrs
aarshkshah1992 Mar 26, 2020
4269886
get it working
aarshkshah1992 Mar 26, 2020
ffef2df
skip test local addr filtering
aarshkshah1992 Mar 26, 2020
c351c42
take signed records to completion
aarshkshah1992 Mar 26, 2020
d740574
change ticker time
aarshkshah1992 Mar 27, 2020
4474645
changes as per first round of review
aarshkshah1992 Apr 1, 2020
15310dc
changes as per review
aarshkshah1992 Apr 9, 2020
3b2ae0a
merged master changes
aarshkshah1992 Apr 13, 2020
8c60995
changes as per review
aarshkshah1992 Apr 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 8 additions & 5 deletions go.mod
Expand Up @@ -5,21 +5,21 @@ require (
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-log v1.0.2
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p-autonat v0.1.1
github.com/libp2p/go-libp2p-blankhost v0.1.4
github.com/libp2p/go-libp2p-circuit v0.1.4
github.com/libp2p/go-libp2p-core v0.3.1
github.com/libp2p/go-libp2p-core v0.4.0
github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.2.1
github.com/libp2p/go-libp2p-nat v0.0.5
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-peerstore v0.2.0
github.com/libp2p/go-libp2p-secio v0.2.1
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/libp2p/go-libp2p-testing v0.1.1
Expand All @@ -29,13 +29,16 @@ require (
github.com/libp2p/go-stream-muxer-multistream v0.2.0
github.com/libp2p/go-tcp-transport v0.1.1
github.com/libp2p/go-ws-transport v0.2.0
github.com/mailru/easyjson v0.7.1 // indirect
github.com/miekg/dns v1.1.12 // indirect
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-net v0.1.2
github.com/multiformats/go-multistream v0.1.1
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69 // indirect
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 // indirect
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
golang.org/x/tools v0.0.0-20200306143135-a0897bacddcb // indirect
)

go 1.12
151 changes: 61 additions & 90 deletions go.sum

Large diffs are not rendered by default.

74 changes: 52 additions & 22 deletions p2p/host/basic/basic_host.go
Expand Up @@ -77,6 +77,7 @@ type BasicHost struct {
maResolver *madns.Resolver
cmgr connmgr.ConnManager
eventbus event.Bus
peerrecmgr *peerRecordManager

AddrsFactory AddrsFactory

Expand All @@ -88,6 +89,7 @@ type BasicHost struct {
lastAddrs []ma.Multiaddr
emitters struct {
evtLocalProtocolsUpdated event.Emitter
evtLocalAddrsUpdated event.Emitter
}
}

Expand Down Expand Up @@ -141,6 +143,19 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
return nil, err
}

hostKey := h.Peerstore().PrivKey(h.ID())
if hostKey == nil {
log.Warn("unable to access host key. peer record support disabled.")
} else {
h.peerrecmgr, err = NewPeerRecordManager(ctx, h.EventBus(), hostKey, h.Addrs())
if err != nil {
log.Errorf("error creating peer record manager. peer record support disabled. err: %s", err)
}
}

h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
if h.natmgr != nil {
Expand All @@ -150,6 +165,7 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
h.cmgr.Close()
}
_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
return h.Network().Close()
})

Expand Down Expand Up @@ -295,21 +311,23 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
go handle(protoID, s)
}

// PushIdentify pushes an identify update through the identify push protocol
// CheckForAddressChanges determines whether our listen addresses have recently
// changed and emits an EvtLocalAddressesUpdatedEvent if so.
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) PushIdentify() {
push := false

func (h *BasicHost) CheckForAddressChanges() {
h.mx.Lock()
addrs := h.Addrs()
if !sameAddrs(addrs, h.lastAddrs) {
push = true
changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs)
if changeEvt != nil {
h.lastAddrs = addrs
}
h.mx.Unlock()

if push {
h.ids.Push()
if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}
}

Expand All @@ -329,32 +347,44 @@ func (h *BasicHost) background(p goprocess.Process) {
for {
select {
case <-ticker.C:
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved
h.PushIdentify()
h.CheckForAddressChanges()

case <-p.Closing():
return
}
}
}

func sameAddrs(a, b []ma.Multiaddr) bool {
if len(a) != len(b) {
return false
}
func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddressesUpdated {
prevmap := make(map[string]ma.Multiaddr, len(prev))
evt := event.EvtLocalAddressesUpdated{Diffs: true}
addrsAdded := false

bmap := make(map[string]struct{}, len(b))
for _, addr := range b {
bmap[string(addr.Bytes())] = struct{}{}
for _, addr := range prev {
prevmap[string(addr.Bytes())] = addr
}

for _, addr := range a {
_, ok := bmap[string(addr.Bytes())]
if !ok {
return false
for _, addr := range current {
_, ok := prevmap[string(addr.Bytes())]
updated := event.UpdatedAddress{Address: addr}
if ok {
updated.Action = event.Maintained
} else {
updated.Action = event.Added
addrsAdded = true
}
evt.Current = append(evt.Current, updated)
delete(prevmap, string(addr.Bytes()))
}
for _, addr := range prevmap {
updated := event.UpdatedAddress{Action: event.Removed, Address: addr}
evt.Removed = append(evt.Removed, updated)
}

if !addrsAdded && len(evt.Removed) == 0 {
return nil
}

return true
return &evt
}

// ID returns the (local) peer.ID associated with this Host
Expand Down
175 changes: 167 additions & 8 deletions p2p/host/basic/basic_host_test.go
Expand Up @@ -3,6 +3,7 @@ package basichost
import (
"bytes"
"context"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"io"
"reflect"
"sort"
Expand Down Expand Up @@ -90,16 +91,35 @@ func TestProtocolHandlerEvents(t *testing.T) {
}
defer sub.Close()

assert := func(added, removed []protocol.ID) {
var next event.EvtLocalProtocolsUpdated
select {
case evt := <-sub.Out():
next = evt.(event.EvtLocalProtocolsUpdated)
break
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
// the identify service adds new protocol handlers shortly after the host
// starts. this helps us filter those events out, since they're unrelated
// to the test.
isIdentify := func(evt event.EvtLocalProtocolsUpdated) bool {
for _, p := range evt.Added {
if p == identify.ID || p == identify.IDPush {
return true
}
}
return false
}

nextEvent := func() event.EvtLocalProtocolsUpdated {
for {
select {
case evt := <-sub.Out():
next := evt.(event.EvtLocalProtocolsUpdated)
if isIdentify(next) {
continue
}
return next
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal must be called from the main goroutine: https://golang.org/pkg/testing/#T

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is being executed on the main goroutine, not on a separate one.

}
}
}

assert := func(added, removed []protocol.ID) {
next := nextEvent()
if !reflect.DeepEqual(added, next.Added) {
t.Errorf("expected added: %v; received: %v", added, next.Added)
}
Expand Down Expand Up @@ -507,10 +527,149 @@ func TestAddrResolutionRecursive(t *testing.T) {
}
}

func TestHostAddrChangeDetection(t *testing.T) {
yusefnapora marked this conversation as resolved.
Show resolved Hide resolved
// This test uses the address factory to provide several
// sets of listen addresses for the host. It advances through
// the sets by changing the currentAddrSet index var below.
addrSets := [][]ma.Multiaddr{
{},
{ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
{ma.StringCast("/ip4/1.2.3.4/tcp/1234"), ma.StringCast("/ip4/2.3.4.5/tcp/1234")},
{ma.StringCast("/ip4/2.3.4.5/tcp/1234"), ma.StringCast("/ip4/3.4.5.6/tcp/4321")},
}

// The events we expect the host to emit when CheckForAddressChanges is called
// and the changes between addr sets are detected
expectedEvents := []event.EvtLocalAddressesUpdated{
{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Added, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
},
Removed: []event.UpdatedAddress{},
},
{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Maintained, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
{Action: event.Added, Address: ma.StringCast("/ip4/2.3.4.5/tcp/1234")},
},
Removed: []event.UpdatedAddress{},
},
{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Added, Address: ma.StringCast("/ip4/3.4.5.6/tcp/4321")},
{Action: event.Maintained, Address: ma.StringCast("/ip4/2.3.4.5/tcp/1234")},
},
Removed: []event.UpdatedAddress{
{Action: event.Removed, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
},
},
}

currentAddrSet := 0
addrsFactory := func(addrs []ma.Multiaddr) []ma.Multiaddr {
return addrSets[currentAddrSet]
}

ctx := context.Background()
h := New(swarmt.GenSwarm(t, ctx), AddrsFactory(addrsFactory))
defer h.Close()

sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{}, eventbus.BufSize(10))
if err != nil {
t.Error(err)
}
defer sub.Close()

// host should start with no addrs (addrSet 0)
addrs := h.Addrs()
if len(addrs) != 0 {
t.Fatalf("expected 0 addrs, got %d", len(addrs))
}

// Advance between addrSets
for i := 1; i < len(addrSets); i++ {
currentAddrSet = i
h.CheckForAddressChanges() // forces the host to check for changes now, instead of waiting for background update
}

// drain events from the subscription
var receivedEvents []event.EvtLocalAddressesUpdated
readEvents:
for {
select {
case evt, more := <-sub.Out():
if !more {
break readEvents
}
receivedEvents = append(receivedEvents, evt.(event.EvtLocalAddressesUpdated))
if len(receivedEvents) == len(expectedEvents) {
break readEvents
}
case <-ctx.Done():
break readEvents
case <-time.After(1 * time.Second):
break readEvents
}
}

// assert that we received the events we expected
if len(receivedEvents) != len(expectedEvents) {
t.Errorf("expected to receive %d addr change events, got %d", len(expectedEvents), len(receivedEvents))
}
for i, expected := range expectedEvents {
actual := receivedEvents[i]
if !updatedAddrEventsEqual(expected, actual) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, actual)
}
}
}

type sortedMultiaddrs []ma.Multiaddr

func (sma sortedMultiaddrs) Len() int { return len(sma) }
func (sma sortedMultiaddrs) Swap(i, j int) { sma[i], sma[j] = sma[j], sma[i] }
func (sma sortedMultiaddrs) Less(i, j int) bool {
return bytes.Compare(sma[i].Bytes(), sma[j].Bytes()) == 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn’t this be == -1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely got rid of this code as we don't really need it.

}

// updatedAddrsEqual is a helper to check whether two lists of
// event.UpdatedAddress have the same contents, ignoring ordering.
func updatedAddrsEqual(a, b []event.UpdatedAddress) bool {
if len(a) != len(b) {
return false
}

// We can't use an UpdatedAddress directly as a map key, since
// Multiaddr is an interface, and go won't know how to compare
// for equality. So we convert to this little struct, which
// stores the multiaddr as a string.
type ua struct {
action event.AddrAction
addrStr string
}
aSet := make(map[ua]struct{})
for _, addr := range a {
k := ua{action: addr.Action, addrStr: string(addr.Address.Bytes())}
aSet[k] = struct{}{}
}
for _, addr := range b {
k := ua{action: addr.Action, addrStr: string(addr.Address.Bytes())}
_, ok := aSet[k]
if !ok {
return false
}
}
return true
}

// updatedAddrEventsEqual is a helper to check whether two
// event.EvtLocalAddressesUpdated are equal, ignoring the ordering of
// addresses in the inner lists.
func updatedAddrEventsEqual(a, b event.EvtLocalAddressesUpdated) bool {
return a.Diffs == b.Diffs &&
updatedAddrsEqual(a.Current, b.Current) &&
updatedAddrsEqual(a.Removed, b.Removed)
}