Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exchange signed routing records in identify #747

Merged
merged 46 commits into from Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
94439e6
wip - exchange signed routing records in identify
yusefnapora Dec 20, 2019
cfd7149
point -peerstore dep at PR branch
yusefnapora Dec 20, 2019
daaef63
fix error handling
yusefnapora Nov 22, 2019
2db11a5
test certified addr exchange during identify
yusefnapora Nov 25, 2019
c999bd0
intellij was supposed to run go fmt for me. i feel betrayed
yusefnapora Nov 25, 2019
b632eba
add option to disable signed addrs for testing
yusefnapora Nov 27, 2019
8948610
more explicit name for option to disable signed addrs
yusefnapora Nov 27, 2019
c97dd74
add routingStateManager
yusefnapora Dec 20, 2019
de97dd1
use event bus to trigger identify/push
yusefnapora Dec 20, 2019
2b0a74e
update to reflect name changes in -core
yusefnapora Jan 7, 2020
c2309af
update to track changes from -core PR
yusefnapora Jan 21, 2020
3f7951d
only generate initial peer record if we have addrs
yusefnapora Jan 21, 2020
71187c5
update PR branch dependencies
yusefnapora Jan 21, 2020
baf71df
add test for addr change event production
yusefnapora Jan 23, 2020
f86b996
rename option (RoutingState -> PeerRecord)
yusefnapora Jan 27, 2020
5fc5ead
change NewPeerRecordManager to not accept Host
yusefnapora Jan 27, 2020
c86f6e3
tests for peerRecordManager behavior
yusefnapora Jan 27, 2020
ad52da5
fix racy test
yusefnapora Jan 27, 2020
63c99a2
include local addrs in peer records by defaul
yusefnapora Jan 27, 2020
ea966e2
track peer record API changes, update PR deps
yusefnapora Feb 4, 2020
64f1353
name changes + mutex around latest peer record
yusefnapora Feb 4, 2020
6ecbfea
doc comments
yusefnapora Feb 4, 2020
1f922f0
remove option to filter local addrs from peer rec
yusefnapora Feb 4, 2020
58fe062
naming
yusefnapora Feb 4, 2020
c019ac1
define new protocol versions for id & id/push
yusefnapora Feb 4, 2020
1da3bd5
update -core dependency
yusefnapora Feb 10, 2020
8d316c3
use struct as map key instead of stringifying
yusefnapora Feb 14, 2020
ba8f960
simplify protoSupportsPeerRecords
yusefnapora Feb 14, 2020
db2625b
add nil check before emitting peer record event
yusefnapora Feb 14, 2020
839c8d2
regen with correct gogoproto package
yusefnapora Mar 6, 2020
aff832d
import grouping
yusefnapora Mar 6, 2020
140feb8
make peerRecordManager stateless
yusefnapora Mar 6, 2020
c775624
Merge origin/master into feat/identify-signed-addrs
yusefnapora Mar 6, 2020
0808676
certified addrs don't replace existing anymore
yusefnapora Mar 6, 2020
65c2cf1
move peerRecordManager from IDService to BasicHost
yusefnapora Mar 6, 2020
4bfae9d
order imports
aarshkshah1992 Mar 26, 2020
a45bb3f
signal addr changes
aarshkshah1992 Mar 26, 2020
6fa7285
Merge branch 'master' into feat/identify-signed-addrs
aarshkshah1992 Mar 26, 2020
4269886
get it working
aarshkshah1992 Mar 26, 2020
ffef2df
skip test local addr filtering
aarshkshah1992 Mar 26, 2020
c351c42
take signed records to completion
aarshkshah1992 Mar 26, 2020
d740574
change ticker time
aarshkshah1992 Mar 27, 2020
4474645
changes as per first round of review
aarshkshah1992 Apr 1, 2020
15310dc
changes as per review
aarshkshah1992 Apr 9, 2020
3b2ae0a
merged master changes
aarshkshah1992 Apr 13, 2020
8c60995
changes as per review
aarshkshah1992 Apr 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
94 changes: 70 additions & 24 deletions p2p/host/basic/basic_host.go
Expand Up @@ -2,27 +2,30 @@ package basichost

import (
"context"
"errors"
"io"
"net"
"sync"
"time"

"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"

"github.com/libp2p/go-eventbus"
inat "github.com/libp2p/go-libp2p-nat"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

logging "github.com/ipfs/go-log"

"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
Expand Down Expand Up @@ -93,6 +96,9 @@ type BasicHost struct {
}

addrChangeChan chan struct{}

signKey crypto.PrivKey
caBook peerstore.CertifiedAddrBook
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -150,10 +156,21 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this should be stateful. Are we doing this to get around the fact that we don't have some form of GetPeerRecord() method as described in #747 (comment)? If so, we should just add that method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien We have a GetPeerRecord(p peer.ID) on the peerstore now.

However, calling that when you get a EvtLocalAddressChanged event could be racy as the address you see on the event could be different fro the signed record you get from the peerstore.

This avoids that race.

return nil, err
}

cab, ok := peerstore.GetCertifiedAddrBook(net.Peerstore())
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab

h.signKey = h.Peerstore().PrivKey(h.ID())
if h.signKey == nil {
return nil, errors.New("unable to access host key")
}

if opts.MultistreamMuxer != nil {
h.mux = opts.MultistreamMuxer
}
Expand Down Expand Up @@ -222,12 +239,12 @@ func New(net network.Network, opts ...interface{}) *BasicHost {
}

h, err := NewHost(context.Background(), net, hostopts)
h.Start()
if err != nil {
// this cannot happen with legacy options
// plus we want to keep the (deprecated) legacy interface unchanged
panic(err)
}
h.Start()

return h
}
Expand Down Expand Up @@ -336,39 +353,68 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
return &evt
}

func (h *BasicHost) makeSignedPeerRecord(evt *event.EvtLocalAddressesUpdated) (*record.Envelope, error) {
current := make([]multiaddr.Multiaddr, 0, len(evt.Current))
for _, a := range evt.Current {
current = append(current, a.Address)
}

rec := peer.PeerRecordFromAddrInfo(peer.AddrInfo{h.ID(), current})
return record.Seal(rec, h.signKey)
}

func (h *BasicHost) background() {
defer h.refCount.Done()
var lastAddrs []ma.Multiaddr

emitAddrChange := func(currentAddrs []ma.Multiaddr, lastAddrs []ma.Multiaddr) {
// nothing to do if both are nil..defensive check
if currentAddrs == nil && lastAddrs == nil {
return
}

changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs)

if changeEvt == nil {
return
}

// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(changeEvt)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
return
}
changeEvt.SignedPeerRecord = *sr

// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}

// emit addr change event on the bus
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}

// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

// initialize lastAddrs
lastAddrs := h.Addrs()

for {
curr := h.Addrs()
emitAddrChange(curr, lastAddrs)
lastAddrs = curr

select {
case <-ticker.C:
case <-h.addrChangeChan:
case <-h.ctx.Done():
return
}

// emit an EvtLocalAddressesUpdatedEvent & a Push Identify if our listen addresses have changed.
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(lastAddrs, addrs)
if changeEvt != nil {
lastAddrs = addrs
}

if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
h.ids.Push()
}
}
}

Expand Down
131 changes: 102 additions & 29 deletions p2p/host/basic/basic_host_test.go
Expand Up @@ -5,21 +5,24 @@ import (
"context"
"io"
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"
"github.com/libp2p/go-libp2p-core/test"

"github.com/libp2p/go-eventbus"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"

ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -102,16 +105,35 @@ func TestProtocolHandlerEvents(t *testing.T) {
}
defer sub.Close()

assert := func(added, removed []protocol.ID) {
var next event.EvtLocalProtocolsUpdated
select {
case evt := <-sub.Out():
next = evt.(event.EvtLocalProtocolsUpdated)
break
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
// the identify service adds new protocol handlers shortly after the host
// starts. this helps us filter those events out, since they're unrelated
// to the test.
isIdentify := func(evt event.EvtLocalProtocolsUpdated) bool {
for _, p := range evt.Added {
if p == identify.ID || p == identify.IDPush {
return true
}
}
return false
}

nextEvent := func() event.EvtLocalProtocolsUpdated {
for {
select {
case evt := <-sub.Out():
next := evt.(event.EvtLocalProtocolsUpdated)
if isIdentify(next) {
continue
}
return next
case <-time.After(5 * time.Second):
t.Fatal("event not received in 5 seconds")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal must be called from the main goroutine: https://golang.org/pkg/testing/#T

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is being executed on the main goroutine, not on a separate one.

}
}
}

assert := func(added, removed []protocol.ID) {
next := nextEvent()
if !reflect.DeepEqual(added, next.Added) {
t.Errorf("expected added: %v; received: %v", added, next.Added)
}
Expand Down Expand Up @@ -443,11 +465,10 @@ func TestAddrResolution(t *testing.T) {
_ = h.Connect(tctx, *pi)

addrs := h.Peerstore().Addrs(pi.ID)
sort.Sort(sortedMultiaddrs(addrs))

if len(addrs) != 2 || !addrs[0].Equal(addr1) || !addrs[1].Equal(addr2) {
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs)
}
require.Len(t, addrs, 2)
require.Contains(t, addrs, addr1)
require.Contains(t, addrs, addr2)
}

func TestAddrResolutionRecursive(t *testing.T) {
Expand Down Expand Up @@ -498,11 +519,9 @@ func TestAddrResolutionRecursive(t *testing.T) {
_ = h.Connect(tctx, *pi1)

addrs1 := h.Peerstore().Addrs(pi1.ID)
sort.Sort(sortedMultiaddrs(addrs1))

if len(addrs1) != 2 || !addrs1[0].Equal(addr1) || !addrs1[1].Equal(addr2) {
t.Fatalf("expected [%s %s], got %+v", addr1, addr2, addrs1)
}
require.Len(t, addrs1, 2)
require.Contains(t, addrs1, addr1)
require.Contains(t, addrs1, addr2)

pi2, err := peer.AddrInfoFromP2pAddr(p2paddr2)
if err != nil {
Expand All @@ -512,11 +531,49 @@ func TestAddrResolutionRecursive(t *testing.T) {
_ = h.Connect(tctx, *pi2)

addrs2 := h.Peerstore().Addrs(pi2.ID)
sort.Sort(sortedMultiaddrs(addrs2))
require.Len(t, addrs2, 1)
require.Contains(t, addrs2, addr1)
}

func TestAddrChangeImmediatelyIfAddressNonEmpty(t *testing.T) {
ctx := context.Background()
taddrs := []ma.Multiaddr{ma.StringCast("/ip4/1.2.3.4/tcp/1234")}

if len(addrs2) != 1 || !addrs2[0].Equal(addr1) {
t.Fatalf("expected [%s], got %+v", addr1, addrs2)
h := New(swarmt.GenSwarm(t, ctx), AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
return taddrs
}))
defer h.Close()

sub, err := h.EventBus().Subscribe(&event.EvtLocalAddressesUpdated{})
if err != nil {
t.Error(err)
}
defer sub.Close()
// wait for the host background thread to start
time.Sleep(1 * time.Second)

expected := event.EvtLocalAddressesUpdated{
Diffs: true,
Current: []event.UpdatedAddress{
{Action: event.Added, Address: ma.StringCast("/ip4/1.2.3.4/tcp/1234")},
},
Removed: []event.UpdatedAddress{}}

// assert we get expected event
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expected, evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expected, evt)
}

// assert it's on the signed record
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
require.Equal(t, taddrs, rc.Addrs)

// assert it's in the peerstore
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
require.NotNil(t, ev)
rc = peerRecordFromEnvelope(t, *ev)
require.Equal(t, taddrs, rc.Addrs)
}

func TestHostAddrChangeDetection(t *testing.T) {
yusefnapora marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -594,9 +651,18 @@ func TestHostAddrChangeDetection(t *testing.T) {
h.SignalAddressChange()
evt := waitForAddrChangeEvent(ctx, sub, t)
if !updatedAddrEventsEqual(expectedEvents[i-1], evt) {
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i], evt)
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i-1], evt)
}

// assert it's on the signed record
rc := peerRecordFromEnvelope(t, evt.SignedPeerRecord)
require.Equal(t, addrSets[i], rc.Addrs)

// assert it's in the peerstore
ev := h.Peerstore().(peerstore.CertifiedAddrBook).GetPeerRecord(h.ID())
require.NotNil(t, ev)
rc = peerRecordFromEnvelope(t, *ev)
require.Equal(t, addrSets[i], rc.Addrs)
}
}

Expand Down Expand Up @@ -655,10 +721,17 @@ func updatedAddrEventsEqual(a, b event.EvtLocalAddressesUpdated) bool {
updatedAddrsEqual(a.Removed, b.Removed)
}

type sortedMultiaddrs []ma.Multiaddr

func (sma sortedMultiaddrs) Len() int { return len(sma) }
func (sma sortedMultiaddrs) Swap(i, j int) { sma[i], sma[j] = sma[j], sma[i] }
func (sma sortedMultiaddrs) Less(i, j int) bool {
return bytes.Compare(sma[i].Bytes(), sma[j].Bytes()) == 1
func peerRecordFromEnvelope(t *testing.T, ev record.Envelope) *peer.PeerRecord {
t.Helper()
rec, err := ev.Record()
if err != nil {
t.Fatalf("error getting PeerRecord from event: %v", err)
return nil
}
peerRec, ok := rec.(*peer.PeerRecord)
if !ok {
t.Fatalf("wrong type for peer record")
return nil
}
return peerRec
}