Skip to content

Commit

Permalink
Merge pull request #960 from libp2p/fix/obs-perf
Browse files Browse the repository at this point in the history
optimize numInbound count
  • Loading branch information
Stebalien committed Jun 4, 2020
2 parents 94e5e1e + 5ae0888 commit 5bfaf4d
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 116 deletions.
16 changes: 8 additions & 8 deletions p2p/net/mock/mock_link.go
Expand Up @@ -34,15 +34,15 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
defer l.RUnlock()

parent := process.WithTeardown(func() error { return nil })
c1 := newConn(parent, l.nets[0], l.nets[1], l, network.DirOutbound)
c2 := newConn(parent, l.nets[1], l.nets[0], l, network.DirInbound)
c1.rconn = c2
c2.rconn = c1

if dialer == c1.net {
return c1, c2
target := l.nets[0]
if target == dialer {
target = l.nets[1]
}
return c2, c1
dc := newConn(parent, dialer, target, l, network.DirOutbound)
tc := newConn(parent, target, dialer, l, network.DirInbound)
dc.rconn = tc
tc.rconn = dc
return dc, tc
}

func (l *link) Networks() []network.Network {
Expand Down
109 changes: 61 additions & 48 deletions p2p/protocol/identify/obsaddr.go
Expand Up @@ -35,48 +35,44 @@ var observedAddrManagerWorkerChannelSize = 16
// we will return for each (IPx/TCP or UDP) group.
var maxObservedAddrsPerIPAndTransport = 2

// observation records an address observation from an "observer" (where every IP
// address is a unique observer).
type observation struct {
seenTime time.Time
connDirection network.Direction
// seenTime is the last time this observation was made.
seenTime time.Time
// inbound indicates whether or not this observation has been made from
// an inbound connection. This remains true even if we an observation
// from a subsequent outbound connection.
inbound bool
}

// ObservedAddr is an entry for an address reported by our peers.
// observedAddr is an entry for an address reported by our peers.
// We only use addresses that:
// - have been observed at least 4 times in last 40 minutes. (counter symmetric nats)
// - have been observed at least once recently (10 minutes), because our position in the
// network, or network port mapppings, may have changed.
type ObservedAddr struct {
Addr ma.Multiaddr
SeenBy map[string]observation // peer(observer) address -> observation info
LastSeen time.Time
type observedAddr struct {
addr ma.Multiaddr
seenBy map[string]observation // peer(observer) address -> observation info
lastSeen time.Time
numInbound int
}

func (oa *ObservedAddr) activated() bool {
func (oa *observedAddr) activated() bool {

// We only activate if other peers observed the same address
// of ours at least 4 times. SeenBy peers are removed by GC if
// they say the address more than ttl*ActivationThresh
return len(oa.SeenBy) >= ActivationThresh
}

func (oa *ObservedAddr) numInbound() int {
count := 0
for obs := range oa.SeenBy {
if oa.SeenBy[obs].connDirection == network.DirInbound {
count++
}
}

return count
return len(oa.seenBy) >= ActivationThresh
}

// GroupKey returns the group in which this observation belongs. Currently, an
// observed address's group is just the address with all ports set to 0. This
// means we can advertise the most commonly observed external ports without
// advertising _every_ observed port.
func (oa *ObservedAddr) GroupKey() string {
key := make([]byte, 0, len(oa.Addr.Bytes()))
ma.ForEach(oa.Addr, func(c ma.Component) bool {
func (oa *observedAddr) groupKey() string {
key := make([]byte, 0, len(oa.addr.Bytes()))
ma.ForEach(oa.addr, func(c ma.Component) bool {
switch proto := c.Protocol(); proto.Code {
case ma.P_TCP, ma.P_UDP:
key = append(key, proto.VCode...)
Expand Down Expand Up @@ -107,7 +103,7 @@ type ObservedAddrManager struct {

mu sync.RWMutex
// local(internal) address -> list of observed(external) addresses
addrs map[string][]*ObservedAddr
addrs map[string][]*observedAddr
ttl time.Duration
refreshTimer *time.Timer

Expand All @@ -119,7 +115,7 @@ type ObservedAddrManager struct {
// peerstore.OwnObservedAddressTTL as the TTL.
func NewObservedAddrManager(ctx context.Context, host host.Host) *ObservedAddrManager {
oas := &ObservedAddrManager{
addrs: make(map[string][]*ObservedAddr),
addrs: make(map[string][]*observedAddr),
ttl: peerstore.OwnObservedAddrTTL,
wch: make(chan newObservation, observedAddrManagerWorkerChannelSize),
host: host,
Expand Down Expand Up @@ -160,22 +156,22 @@ func (oas *ObservedAddrManager) Addrs() []ma.Multiaddr {
return nil
}

var allObserved []*ObservedAddr
var allObserved []*observedAddr
for k := range oas.addrs {
allObserved = append(allObserved, oas.addrs[k]...)
}
return oas.filter(allObserved)
}

func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multiaddr {
pmap := make(map[string][]*ObservedAddr)
func (oas *ObservedAddrManager) filter(observedAddrs []*observedAddr) []ma.Multiaddr {
pmap := make(map[string][]*observedAddr)
now := time.Now()

for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.LastSeen) <= oas.ttl && a.activated() {
if now.Sub(a.lastSeen) <= oas.ttl && a.activated() {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.GroupKey()
pat := a.groupKey()
pmap[pat] = append(pmap[pat], a)

}
Expand All @@ -191,15 +187,15 @@ func (oas *ObservedAddrManager) filter(observedAddrs []*ObservedAddr) []ma.Multi
first := s[i]
second := s[j]

if first.numInbound() > second.numInbound() {
if first.numInbound > second.numInbound {
return true
}

return len(first.SeenBy) > len(second.SeenBy)
return len(first.seenBy) > len(second.seenBy)
})

for i := 0; i < maxObservedAddrsPerIPAndTransport && i < len(s); i++ {
addrs = append(addrs, s[i].Addr)
addrs = append(addrs, s[i].addr)
}
}

Expand Down Expand Up @@ -281,14 +277,17 @@ func (oas *ObservedAddrManager) gc() {
filteredAddrs := observedAddrs[:0]
for _, a := range observedAddrs {
// clean up SeenBy set
for k, ob := range a.SeenBy {
for k, ob := range a.seenBy {
if now.Sub(ob.seenTime) > oas.ttl*time.Duration(ActivationThresh) {
delete(a.SeenBy, k)
delete(a.seenBy, k)
if ob.inbound {
a.numInbound--
}
}
}

// leave only alive observed addresses
if now.Sub(a.LastSeen) <= oas.ttl {
if now.Sub(a.lastSeen) <= oas.ttl {
filteredAddrs = append(filteredAddrs, a)
}
}
Expand Down Expand Up @@ -382,27 +381,41 @@ func (oas *ObservedAddrManager) recordObservationUnlocked(conn network.Conn, obs
observerString := observerGroup(conn.RemoteMultiaddr())
localString := string(conn.LocalMultiaddr().Bytes())
ob := observation{
seenTime: now,
connDirection: conn.Stat().Direction,
seenTime: now,
inbound: conn.Stat().Direction == network.DirInbound,
}

observedAddrs := oas.addrs[localString]
// check if observed address seen yet, if so, update it
for i, previousObserved := range observedAddrs {
if previousObserved.Addr.Equal(observed) {
observedAddrs[i].SeenBy[observerString] = ob
observedAddrs[i].LastSeen = now
for _, observedAddr := range oas.addrs[localString] {
if observedAddr.addr.Equal(observed) {
// Don't trump an outbound observation with an inbound
// one.
wasInbound := observedAddr.seenBy[observerString].inbound
isInbound := ob.inbound
ob.inbound = isInbound || wasInbound

if !wasInbound && isInbound {
observedAddr.numInbound++
}

observedAddr.seenBy[observerString] = ob
observedAddr.lastSeen = now
return
}
}

// observed address not seen yet, append it
oas.addrs[localString] = append(oas.addrs[localString], &ObservedAddr{
Addr: observed,
SeenBy: map[string]observation{
oa := &observedAddr{
addr: observed,
seenBy: map[string]observation{
observerString: ob,
},
LastSeen: now,
})
lastSeen: now,
}
if ob.inbound {
oa.numInbound++
}
oas.addrs[localString] = append(oas.addrs[localString], oa)
}

// observerGroup is a function that determines what part of
Expand Down
38 changes: 38 additions & 0 deletions p2p/protocol/identify/obsaddr_glass_test.go
@@ -0,0 +1,38 @@
package identify

// This test lives in the identify package, not the identify_test package, so it
// can access internal types.

import (
"testing"

ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)

func TestObservedAddrGroupKey(t *testing.T) {
oa1 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/tcp/2345")}
oa2 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/tcp/1231")}
oa3 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.5/tcp/1231")}
oa4 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1231")}
oa5 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1531")}
oa6 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1531/quic")}
oa7 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.4/udp/1111/quic")}
oa8 := &observedAddr{addr: ma.StringCast("/ip4/1.2.3.5/udp/1111/quic")}

// different ports, same IP => same key
require.Equal(t, oa1.groupKey(), oa2.groupKey())
// different IPs => different key
require.NotEqual(t, oa2.groupKey(), oa3.groupKey())
// same port, different protos => different keys
require.NotEqual(t, oa3.groupKey(), oa4.groupKey())
// same port, same address, different protos => different keys
require.NotEqual(t, oa2.groupKey(), oa4.groupKey())
// udp works as well
require.Equal(t, oa4.groupKey(), oa5.groupKey())
// udp and quic are different
require.NotEqual(t, oa5.groupKey(), oa6.groupKey())
// quic works as well
require.Equal(t, oa6.groupKey(), oa7.groupKey())
require.NotEqual(t, oa7.groupKey(), oa8.groupKey())
}

0 comments on commit 5bfaf4d

Please sign in to comment.