Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize numInbound count #960

Merged
merged 6 commits into from Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions p2p/net/mock/mock_link.go
Expand Up @@ -34,15 +34,15 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
defer l.RUnlock()

parent := process.WithTeardown(func() error { return nil })
c1 := newConn(parent, l.nets[0], l.nets[1], l, network.DirOutbound)
c2 := newConn(parent, l.nets[1], l.nets[0], l, network.DirInbound)
c1.rconn = c2
c2.rconn = c1

if dialer == c1.net {
return c1, c2
target := l.nets[0]
if target == dialer {
target = l.nets[1]
}
return c2, c1
dc := newConn(parent, dialer, target, l, network.DirOutbound)
tc := newConn(parent, target, dialer, l, network.DirInbound)
dc.rconn = tc
tc.rconn = dc
return dc, tc
}

func (l *link) Networks() []network.Network {
Expand Down
109 changes: 61 additions & 48 deletions p2p/protocol/identify/obsaddr.go
Expand Up @@ -35,48 +35,44 @@ var observedAddrManagerWorkerChannelSize = 16
// we will return for each (IPx/TCP or UDP) group.
var maxObservedAddrsPerIPAndTransport = 2

// observation records an address observation from an "observer" (where every IP
// address is a unique observer).
type observation struct {
seenTime time.Time
connDirection network.Direction
// seenTime is the last time this observation was made.
seenTime time.Time
// inbound indicates whether or not this observation has been made from
// an inbound connection. This remains true even if we an observation
// from a subsequent outbound connection.
inbound bool
}

// ObservedAddr is an entry for an address reported by our peers.
// observedAddr is an entry for an address reported by our peers.
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
// We only use addresses that:
// - have been observed at least 4 times in last 40 minutes. (counter symmetric nats)
// - have been observed at least once recently (10 minutes), because our position in the
// network, or network port mapppings, may have changed.
type ObservedAddr struct {
Addr ma.Multiaddr
SeenBy map[string]observation // peer(observer) address -> observation info
LastSeen time.Time
type observedAddr struct {
addr ma.Multiaddr
seenBy map[string]observation // peer(observer) address -> observation info
lastSeen time.Time
numInbound int
}

func (oa *ObservedAddr) activated() bool {
func (oa *observedAddr) activated() bool {

// We only activate if other peers observed the same address
// of ours at least 4 times. SeenBy peers are removed by GC if
// they say the address more than ttl*ActivationThresh
return len(oa.SeenBy) >= ActivationThresh
}

func (oa *ObservedAddr) numInbound() int {
count := 0
for obs := range oa.SeenBy {
if oa.SeenBy[obs].connDirection == network.DirInbound {
count++
}
}

return count
return len(oa.seenBy) >= ActivationThresh
}

// GroupKey returns the group in which this observation belongs. Currently, an
// observed address's group is just the address with all ports set to 0. This
// means we can advertise the most commonly observed external ports without
// advertising _every_ observed port.
func (oa *ObservedAddr) GroupKey() string {
key := make([]byte, 0, len(oa.Addr.Bytes()))
ma.ForEach(oa.Addr, func(c ma.Component) bool {
func (oa *observedAddr) groupKey() string {
key := make([]byte, 0, len(oa.addr.Bytes()))
ma.ForEach(oa.addr, func(c ma.Component) bool {
switch proto := c.Protocol(); proto.Code {
case ma.P_TCP, ma.P_UDP:
key = append(key, proto.VCode...)
Expand Down Expand Up @@ -107,7 +103,7 @@ type ObservedAddrManager struct {

mu sync.RWMutex
// local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
addrs map[string][]*observedAddr
ttl time.Duration
refreshTimer *time.Timer

Expand All @@ -119,7 +115,7 @@ type ObservedAddrManager struct {
// peerstore.OwnObservedAddressTTL as the TTL.
func NewObservedAddrManager(ctx context.Context, host host.Host) *ObservedAddrManager {
oas := &ObservedAddrManager{
addrs: make(map[string][]*ObservedAddr),
addrs: make(map[string][]*observedAddr),
ttl: peerstore.OwnObservedAddrTTL,
wch: make(chan newObservation, observedAddrManagerWorkerChannelSize),
host: host,
Expand Down Expand Up @@ -160,22 +156,22 @@ func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
return nil
}

var allObserved []*ObservedAddr
var allObserved []*observedAddr
for k := range oas.addrs {
allObserved = append(allObserved, oas.addrs[k]...)
}
return oas.filter(allObserved)
}

func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr {
pmap := make(map[string][]*ObservedAddr)
func (oas *ObservedAddrManager) filter(observedAddrs []*observedAddr) []ma.Multiaddr {
pmap := make(map[string][]*observedAddr)
now := time.Now()

for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.LastSeen) <= oas.ttl && a.activated() {
if now.Sub(a.lastSeen) <= oas.ttl && a.activated() {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.GroupKey()
pat := a.groupKey()
pmap[pat] = append(pmap[pat], a)

}
Expand All @@ -191,15 +187,15 @@ func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multi
first := s[i]
second := s[j]

if first.numInbound() > second.numInbound() {
if first.numInbound > second.numInbound {
return true
}

return len(first.SeenBy) > len(second.SeenBy)
return len(first.seenBy) > len(second.seenBy)
})

for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
addrs = append(addrs, s[i].Addr)
addrs = append(addrs, s[i].addr)
}
}

Expand Down Expand Up @@ -281,14 +277,17 @@ func (oas *ObservedAddrManager) gc() {
filteredAddrs := observedAddrs[:0]
for _, a := range observedAddrs {
// clean up SeenBy set
for k, ob := range a.SeenBy {
for k, ob := range a.seenBy {
if now.Sub(ob.seenTime) > oas.ttl*time.Duration(ActivationThresh) {
delete(a.SeenBy, k)
delete(a.seenBy, k)
if ob.inbound {
a.numInbound--
}
}
}

// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
if now.Sub(a.lastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
}
}
Expand Down Expand Up @@ -382,27 +381,41 @@ func (oas *ObservedAddrManager) recordObservationUnlocked(conn network.Conn, obs
observerString := observerGroup(conn.RemoteMultiaddr())
localString := string(conn.LocalMultiaddr().Bytes())
ob := observation{
seenTime: now,
connDirection: conn.Stat().Direction,
seenTime: now,
inbound: conn.Stat().Direction == network.DirInbound,
}

observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
if previousObserved.Addr.Equal(observed) {
observedAddrs[i].SeenBy[observerString] = ob
observedAddrs[i].LastSeen = now
for _, observedAddr := range oas.addrs[localString] {
if observedAddr.addr.Equal(observed) {
// Don't trump an outbound observation with an inbound
// one.
Comment on lines +391 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the opposite? You're saying to mark as inbound if it was, or is currently inbound.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the wording here is just throwing me off (maybe just tired 😃). Maybe something like "Observation is considered inbound if the peer ever gave it to us as an inbound observation"

wasInbound := observedAddr.seenBy[observerString].inbound
isInbound := ob.inbound
ob.inbound = isInbound || wasInbound

if !wasInbound && isInbound {
observedAddr.numInbound++
}

observedAddr.seenBy[observerString] = ob
observedAddr.lastSeen = now
return
}
}

// observed address not seen yet, append it
oas.addrs[localString] = append(oas.addrs[localString], &ObservedAddr{
Addr: observed,
SeenBy: map[string]observation{
oa := &observedAddr{
addr: observed,
seenBy: map[string]observation{
observerString: ob,
},
LastSeen: now,
})
lastSeen: now,
}
if ob.inbound {
oa.numInbound++
}
oas.addrs[localString] = append(oas.addrs[localString], oa)
}

// observerGroup is a function that determines what part of
Expand Down
38 changes: 38 additions & 0 deletions p2p/protocol/identify/obsaddr_glass_test.go
@@ -0,0 +1,38 @@
package identify

// This test lives in the identify package, not the identify_test package, so it
// can access internal types.

import (
"testing"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func TestObservedAddrGroupKey(t *testing.T) {
oa1 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/tcp/2345")}
oa2 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
oa3 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.5/tcp/1231")}
oa4 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
oa5 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1531")}
oa6 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1531/quic")}
oa7 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1111/quic")}
oa8 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.5/udp/1111/quic")}

// different ports, same IP => same key
require.Equal(t, oa1.groupKey(), oa2.groupKey())
// different IPs => different key
require.NotEqual(t, oa2.groupKey(), oa3.groupKey())
// same port, different protos => different keys
require.NotEqual(t, oa3.groupKey(), oa4.groupKey())
// same port, same address, different protos => different keys
require.NotEqual(t, oa2.groupKey(), oa4.groupKey())
// udp works as well
require.Equal(t, oa4.groupKey(), oa5.groupKey())
// udp and quic are different
require.NotEqual(t, oa5.groupKey(), oa6.groupKey())
// quic works as well
require.Equal(t, oa6.groupKey(), oa7.groupKey())
require.NotEqual(t, oa7.groupKey(), oa8.groupKey())
}