Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exchange signed routing records in identify #747

Merged
merged 46 commits into from Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
94439e6
wip - exchange signed routing records in identify
yusefnapora Dec 20, 2019
cfd7149
point -peerstore dep at PR branch
yusefnapora Dec 20, 2019
daaef63
fix error handling
yusefnapora Nov 22, 2019
2db11a5
test certified addr exchange during identify
yusefnapora Nov 25, 2019
c999bd0
intellij was supposed to run go fmt for me. i feel betrayed
yusefnapora Nov 25, 2019
b632eba
add option to disable signed addrs for testing
yusefnapora Nov 27, 2019
8948610
more explicit name for option to disable signed addrs
yusefnapora Nov 27, 2019
c97dd74
add routingStateManager
yusefnapora Dec 20, 2019
de97dd1
use event bus to trigger identify/push
yusefnapora Dec 20, 2019
2b0a74e
update to reflect name changes in -core
yusefnapora Jan 7, 2020
c2309af
update to track changes from -core PR
yusefnapora Jan 21, 2020
3f7951d
only generate initial peer record if we have addrs
yusefnapora Jan 21, 2020
71187c5
update PR branch dependencies
yusefnapora Jan 21, 2020
baf71df
add test for addr change event production
yusefnapora Jan 23, 2020
f86b996
rename option (RoutingState -> PeerRecord)
yusefnapora Jan 27, 2020
5fc5ead
change NewPeerRecordManager to not accept Host
yusefnapora Jan 27, 2020
c86f6e3
tests for peerRecordManager behavior
yusefnapora Jan 27, 2020
ad52da5
fix racy test
yusefnapora Jan 27, 2020
63c99a2
include local addrs in peer records by defaul
yusefnapora Jan 27, 2020
ea966e2
track peer record API changes, update PR deps
yusefnapora Feb 4, 2020
64f1353
name changes + mutex around latest peer record
yusefnapora Feb 4, 2020
6ecbfea
doc comments
yusefnapora Feb 4, 2020
1f922f0
remove option to filter local addrs from peer rec
yusefnapora Feb 4, 2020
58fe062
naming
yusefnapora Feb 4, 2020
c019ac1
define new protocol versions for id & id/push
yusefnapora Feb 4, 2020
1da3bd5
update -core dependency
yusefnapora Feb 10, 2020
8d316c3
use struct as map key instead of stringifying
yusefnapora Feb 14, 2020
ba8f960
simplify protoSupportsPeerRecords
yusefnapora Feb 14, 2020
db2625b
add nil check before emitting peer record event
yusefnapora Feb 14, 2020
839c8d2
regen with correct gogoproto package
yusefnapora Mar 6, 2020
aff832d
import grouping
yusefnapora Mar 6, 2020
140feb8
make peerRecordManager stateless
yusefnapora Mar 6, 2020
c775624
Merge origin/master into feat/identify-signed-addrs
yusefnapora Mar 6, 2020
0808676
certified addrs don't replace existing anymore
yusefnapora Mar 6, 2020
65c2cf1
move peerRecordManager from IDService to BasicHost
yusefnapora Mar 6, 2020
4bfae9d
order imports
aarshkshah1992 Mar 26, 2020
a45bb3f
signal addr changes
aarshkshah1992 Mar 26, 2020
6fa7285
Merge branch 'master' into feat/identify-signed-addrs
aarshkshah1992 Mar 26, 2020
4269886
get it working
aarshkshah1992 Mar 26, 2020
ffef2df
skip test local addr filtering
aarshkshah1992 Mar 26, 2020
c351c42
take signed records to completion
aarshkshah1992 Mar 26, 2020
d740574
change ticker time
aarshkshah1992 Mar 27, 2020
4474645
changes as per first round of review
aarshkshah1992 Apr 1, 2020
15310dc
changes as per review
aarshkshah1992 Apr 9, 2020
3b2ae0a
merged master changes
aarshkshah1992 Apr 13, 2020
8c60995
changes as per review
aarshkshah1992 Apr 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 6 additions & 9 deletions go.mod
Expand Up @@ -5,21 +5,21 @@ require (
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v1.0.2
github.com/ipfs/go-log v1.0.3
github.com/jbenet/go-cienv v0.1.0
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-eventbus v0.1.1-0.20200326052355-c30c9409b9a4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this bump?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we do not. This was the remanent of a previous eventbus change I made that we do not need anymore.

This has been fixed.

github.com/libp2p/go-libp2p-autonat v0.2.1
github.com/libp2p/go-libp2p-blankhost v0.1.4
github.com/libp2p/go-libp2p-circuit v0.1.4
github.com/libp2p/go-libp2p-core v0.4.0
github.com/libp2p/go-libp2p-core v0.5.1-0.20200326115709-8173af76d179
github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.2.2
github.com/libp2p/go-libp2p-nat v0.0.5
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.1
github.com/libp2p/go-libp2p-secio v0.2.1
github.com/libp2p/go-libp2p-swarm v0.2.2
github.com/libp2p/go-libp2p-testing v0.1.1
Expand All @@ -29,16 +29,13 @@ require (
github.com/libp2p/go-stream-muxer-multistream v0.2.0
github.com/libp2p/go-tcp-transport v0.1.1
github.com/libp2p/go-ws-transport v0.2.0
github.com/mailru/easyjson v0.7.1 // indirect
github.com/miekg/dns v1.1.12 // indirect
github.com/miekg/dns v1.1.28 // indirect
github.com/multiformats/go-multiaddr v0.2.1
github.com/multiformats/go-multiaddr-dns v0.2.0
github.com/multiformats/go-multiaddr-net v0.1.3
github.com/multiformats/go-multistream v0.1.1
github.com/stretchr/testify v1.4.0
github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 // indirect
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527 // indirect
golang.org/x/tools v0.0.0-20200306143135-a0897bacddcb // indirect
)

go 1.12
63 changes: 37 additions & 26 deletions go.sum

Large diffs are not rendered by default.

151 changes: 100 additions & 51 deletions p2p/host/basic/basic_host.go
Expand Up @@ -2,28 +2,31 @@ package basichost

import (
"context"
"errors"
"io"
"net"
"time"

"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/record"

"github.com/libp2p/go-eventbus"
inat "github.com/libp2p/go-libp2p-nat"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"

"github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr"
madns "github.com/multiformats/go-multiaddr-dns"
manet "github.com/multiformats/go-multiaddr-net"
Expand Down Expand Up @@ -76,7 +79,6 @@ type BasicHost struct {
maResolver *madns.Resolver
cmgr connmgr.ConnManager
eventbus event.Bus
peerrecmgr *peerRecordManager

AddrsFactory AddrsFactory

Expand All @@ -90,6 +92,9 @@ type BasicHost struct {
}

addrChangeChan chan struct{}

signKey crypto.PrivKey
caBook peerstore.CertifiedAddrBook
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -143,18 +148,19 @@ func NewHost(ctx context.Context, net network.Network, opts *HostOpts) (*BasicHo
if h.emitters.evtLocalProtocolsUpdated, err = h.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil, err
}
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}); err != nil {
if h.emitters.evtLocalAddrsUpdated, err = h.eventbus.Emitter(&event.EvtLocalAddressesUpdated{}, eventbus.Stateful); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this should be stateful. Are we doing this to get around the fact that we don't have some form of GetPeerRecord() method as described in #747 (comment)? If so, we should just add that method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien We have a GetPeerRecord(p peer.ID) on the peerstore now.

However, calling that when you get a EvtLocalAddressChanged event could be racy as the address you see on the event could be different fro the signed record you get from the peerstore.

This avoids that race.

return nil, err
}

hostKey := h.Peerstore().PrivKey(h.ID())
if hostKey == nil {
log.Warn("unable to access host key. peer record support disabled.")
} else {
h.peerrecmgr, err = NewPeerRecordManager(ctx, h.EventBus(), hostKey, h.Addrs())
if err != nil {
log.Errorf("error creating peer record manager. peer record support disabled. err: %s", err)
}
cab, ok := peerstore.GetCertifiedAddrBook(net.Peerstore())
if !ok {
return nil, errors.New("peerstore should also be a certified address book")
}
h.caBook = cab

h.signKey = h.Peerstore().PrivKey(h.ID())
if h.signKey == nil {
return nil, errors.New("unable to access host key")
}

h.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
Expand Down Expand Up @@ -241,12 +247,12 @@ func New(net network.Network, opts ...interface{}) *BasicHost {
}

h, err := NewHost(context.Background(), net, hostopts)
h.Start()
if err != nil {
// this cannot happen with legacy options
// plus we want to keep the (deprecated) legacy interface unchanged
panic(err)
}
h.Start()

return h
}
Expand Down Expand Up @@ -312,44 +318,13 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
go handle(protoID, s)
}

// CheckForAddressChanges determines whether our listen addresses have recently
// changed and emits an EvtLocalAddressesUpdatedEvent if so.
// SignalAddressChange signals to the host that it needs to determine whether our listen addresses have recently
// changed.
// Warning: this interface is unstable and may disappear in the future.
func (h *BasicHost) CheckForAddressChanges() {
h.mx.Lock()
addrs := h.Addrs()
changeEvt := makeUpdatedAddrEvent(h.lastAddrs, addrs)
if changeEvt != nil {
h.lastAddrs = addrs
}

if changeEvt != nil {
err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt)
if err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}

return &evt
}

func (h *BasicHost) background(p goprocess.Process) {
// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

// initialize lastAddrs
lastAddrs := h.Addrs()

for {
select {
case <-ticker.C:
h.CheckForAddressChanges()

case <-p.Closing():
return
}
func (h *BasicHost) SignalAddressChange() {
select {
case h.addrChangeChan <- struct{}{}:
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine because the channel is buffered. If the event loop is firing, we will queue the next trigger. If the channel is full, the next iteration of the event loop will pick up the update we were intending to communicate.

}
}

Expand Down Expand Up @@ -385,6 +360,80 @@ func makeUpdatedAddrEvent(prev, current []ma.Multiaddr) *event.EvtLocalAddresses
return &evt
}

func (h *BasicHost) makeSignedPeerRecord(evt *event.EvtLocalAddressesUpdated) (*record.Envelope, error) {
current := make([]multiaddr.Multiaddr, len(evt.Current))
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
for _, a := range evt.Current {
current = append(current, a.Address)
}

addrs := make([]multiaddr.Multiaddr, 0, len(current))
for _, a := range current {
if a == nil {
continue
}
addrs = append(addrs, a)
}
Stebalien marked this conversation as resolved.
Show resolved Hide resolved

rec := peer.NewPeerRecord()
rec.PeerID = h.network.LocalPeer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

host.ID()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

rec.Addrs = addrs
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
return record.Seal(rec, h.signKey)
}

func (h *BasicHost) background(p goprocess.Process) {
var lastAddrs []ma.Multiaddr

emitAddrChange := func(currentAddrs []ma.Multiaddr) {
changeEvt := makeUpdatedAddrEvent(lastAddrs, currentAddrs)
if changeEvt != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negate the condition to short-circuit instead, and outdent the block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// add signed peer record to the event
sr, err := h.makeSignedPeerRecord(changeEvt)
if err != nil {
log.Errorf("error creating a signed peer record from the set of current addresses, err=%s", err)
return
}
changeEvt.SignedPeerRecord = *sr

// persist the signed record to the peerstore
if _, err := h.caBook.ConsumePeerRecord(sr, peerstore.PermanentAddrTTL); err != nil {
log.Errorf("failed to persist signed peer record in peer store, err=%s", err)
return
}

// emit addr change event on the bus
if err := h.emitters.evtLocalAddrsUpdated.Emit(*changeEvt); err != nil {
log.Warnf("error emitting event for updated addrs: %s", err)
}
}
}

// immediately emit the first address change event if we have a listen address
if h.Addrs() != nil {
emitAddrChange(h.Addrs())
}
// init lastAddrs
lastAddrs = h.Addrs()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racy. Capture the addresses and use them when emitting the event and setting lastAddrs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raulk Even though this code is executed in a single dedicated go-routine, have refactored this to make it easier to reason about.


// periodically schedules an IdentifyPush to update our peers for changes
// in our address set (if needed)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
case <-h.addrChangeChan:
case <-p.Closing():
return
}

// emit an EvtLocalAddressesUpdatedEvent event if our listen addresses have changed.
emitAddrChange(h.Addrs())
// update last seen addrs
lastAddrs = h.Addrs()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Racy. Capture, emit and set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

// ID returns the (local) peer.ID associated with this Host
func (h *BasicHost) ID() peer.ID {
return h.Network().LocalPeer()
Expand Down