Skip to content

Commit

Permalink
Merge bitcoin#21528: [p2p] Reduce addr blackholes
Browse files Browse the repository at this point in the history
  • Loading branch information
fanquake authored and jagdeep sidhu committed Aug 4, 2021
1 parent d397d1a commit 9045bd0
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 24 deletions.
82 changes: 63 additions & 19 deletions src/net_processing.cpp
Expand Up @@ -254,9 +254,31 @@ struct Peer {

/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send;
/** Probabilistic filter of addresses that this peer already knows.
* Used to avoid relaying addresses to this peer more than once. */
const std::unique_ptr<CRollingBloomFilter> m_addr_known;
/** Probabilistic filter to track recent addr messages relayed with this
* peer. Used to avoid relaying redundant addresses to this peer.
*
* We initialize this filter for outbound peers (other than
* block-relay-only connections) or when an inbound peer sends us an
* address related message (ADDR, ADDRV2, GETADDR).
*
* Presence of this filter must correlate with m_addr_relay_enabled.
**/
std::unique_ptr<CRollingBloomFilter> m_addr_known;
/** Whether we are participating in address relay with this connection.
*
* We set this bool to true for outbound peers (other than
* block-relay-only connections), or when an inbound peer sends us an
* address related message (ADDR, ADDRV2, GETADDR).
*
* We use this bool to decide whether a peer is eligible for gossiping
* addr messages. This avoids relaying to peers that are unlikely to
* forward them, effectively blackholing self announcements. Reasons
* peers might support addr relay on the link include that they connected
* to us as a block-relay-only peer or they are a light client.
*
* This field must correlate with whether m_addr_known has been
* initialized.*/
std::atomic_bool m_addr_relay_enabled{false};
/** Whether a getaddr request to this peer is outstanding. */
bool m_getaddr_sent{false};
/** Guards address sending timers. */
Expand Down Expand Up @@ -290,9 +312,8 @@ struct Peer {
// SYSCOIN
/** This peer's a masternode connection */
std::atomic<bool> m_masternode_connection{false};
explicit Peer(NodeId id, bool addr_relay)
explicit Peer(NodeId id)
: m_id(id)
, m_addr_known{addr_relay ? std::make_unique<CRollingBloomFilter>(5000, 0.001) : nullptr}
{}
};

Expand Down Expand Up @@ -658,6 +679,14 @@ class PeerManagerImpl final : public PeerManager
* @param[in] vRecv The raw message received
*/
void ProcessGetCFCheckPt(CNode& peer, CDataStream& vRecv);

/** Checks if address relay is permitted with peer. If needed, initializes
* the m_addr_known bloom filter and sets m_addr_relay_enabled to true.
*
* @return True if address relay is enabled with peer
* False if address relay is disallowed
*/
bool SetupAddressRelay(CNode& node, Peer& peer);
};
} // namespace

Expand Down Expand Up @@ -780,11 +809,6 @@ static CNodeState *State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main) {
return &it->second;
}

static bool RelayAddrsWithPeer(const Peer& peer)
{
return peer.m_addr_known != nullptr;
}

/**
* Whether the peer supports the address. For example, a peer that does not
* implement BIP155 cannot receive Tor v3 addresses because it requires
Expand Down Expand Up @@ -1199,9 +1223,7 @@ void PeerManagerImpl::InitializeNode(CNode *pnode)
assert(m_txrequest.Count(nodeid) == 0);
}
{
// Addr relay is disabled for outbound block-relay-only peers to
// prevent adversaries from inferring these links from addr traffic.
PeerRef peer = std::make_shared<Peer>(nodeid, /* addr_relay = */ !pnode->IsBlockOnlyConn());
PeerRef peer = std::make_shared<Peer>(nodeid);
LOCK(m_peer_mutex);
m_peer_map.emplace_hint(m_peer_map.end(), nodeid, std::move(peer));
}
Expand Down Expand Up @@ -1340,6 +1362,7 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c
stats.m_ping_wait = ping_wait;
stats.m_addr_processed = peer->m_addr_processed.load();
stats.m_addr_rate_limited = peer->m_addr_rate_limited.load();
stats.m_addr_relay_enabled = peer->m_addr_relay_enabled.load();

return true;
}
Expand Down Expand Up @@ -1802,7 +1825,7 @@ void PeerManagerImpl::RelayAddress(NodeId originator,
LOCK(m_peer_mutex);

for (auto& [id, peer] : m_peer_map) {
if (RelayAddrsWithPeer(*peer) && id != originator && IsAddrCompatible(*peer, addr)) {
if (peer->m_addr_relay_enabled && id != originator && IsAddrCompatible(*peer, addr)) {
uint64_t hashKey = CSipHasher(hasher).Write(id).Finalize();
for (unsigned int i = 0; i < nRelayNodes; i++) {
if (hashKey > best[i].first) {
Expand Down Expand Up @@ -2843,7 +2866,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
UpdatePreferredDownload(pfrom, State(pfrom.GetId()));
}

if (!pfrom.IsInboundConn() && !pfrom.IsBlockOnlyConn()) {
// Self advertisement & GETADDR logic
if (!pfrom.IsInboundConn() && SetupAddressRelay(pfrom, *peer)) {
// For outbound peers, we try to relay our address (so that other
// nodes can try to find us more quickly, as we have no guarantee
// that an outbound peer is even aware of how to reach us) and do a
Expand All @@ -2852,8 +2876,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// empty and no one will know who we are, so these mechanisms are
// important to help us connect to the network.
//
// We skip this for block-relay-only peers to avoid potentially leaking
// information about our block-relay-only connections via address relay.
// We skip this for block-relay-only peers. We want to avoid
// potentially leaking addr information and we do not want to
// indicate to the peer that we will participate in addr relay.
if (fListen && !m_chainman.ActiveChainstate().IsInitialBlockDownload())
{
CAddress addr = GetLocalAddress(&pfrom.addr, pfrom.GetLocalServices());
Expand Down Expand Up @@ -3074,10 +3099,11 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,

s >> vAddr;

if (!RelayAddrsWithPeer(*peer)) {
if (!SetupAddressRelay(pfrom, *peer)) {
LogPrint(BCLog::NET, "ignoring %s message from %s peer=%d\n", msg_type, pfrom.ConnectionTypeAsString(), pfrom.GetId());
return;
}

if (vAddr.size() > MAX_ADDR_TO_SEND)
{
Misbehaving(pfrom.GetId(), 20, strprintf("%s message size = %u", msg_type, vAddr.size()));
Expand Down Expand Up @@ -4048,6 +4074,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
return;
}

SetupAddressRelay(pfrom, *peer);

// Only send one GetAddr response per connection to reduce resource waste
// and discourage addr stamping of INV announcements.
if (peer->m_getaddr_recvd) {
Expand Down Expand Up @@ -4698,7 +4726,7 @@ void PeerManagerImpl::MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::mic
void PeerManagerImpl::MaybeSendAddr(CNode& node, Peer& peer, std::chrono::microseconds current_time)
{
// Nothing to do for non-address-relay peers
if (!RelayAddrsWithPeer(peer)) return;
if (!peer.m_addr_relay_enabled) return;

LOCK(peer.m_addr_send_times_mutex);
// Periodically advertise our local address to the peer.
Expand Down Expand Up @@ -4826,6 +4854,22 @@ class CompareInvMempoolOrder
};
}

bool PeerManagerImpl::SetupAddressRelay(CNode& node, Peer& peer)
{
// We don't participate in addr relay with outbound block-relay-only
// connections to prevent providing adversaries with the additional
// information of addr traffic to infer the link.
if (node.IsBlockOnlyConn()) return false;

if (!peer.m_addr_relay_enabled.exchange(true)) {
// First addr message we have received from the peer, initialize
// m_addr_known
peer.m_addr_known = std::make_unique<CRollingBloomFilter>(5000, 0.001);
}

return true;
}

bool PeerManagerImpl::SendMessages(CNode* pto)
{
PeerRef peer = GetPeerRef(pto->GetId());
Expand Down
1 change: 1 addition & 0 deletions src/net_processing.h
Expand Up @@ -31,6 +31,7 @@ struct CNodeStateStats {
std::vector<int> vHeightInFlight;
uint64_t m_addr_processed = 0;
uint64_t m_addr_rate_limited = 0;
bool m_addr_relay_enabled{false};
};

class PeerManager : public CValidationInterface, public NetEventsInterface
Expand Down
2 changes: 2 additions & 0 deletions src/rpc/net.cpp
Expand Up @@ -118,6 +118,7 @@ static RPCHelpMan getpeerinfo()
{RPCResult::Type::STR, "addr", "(host:port) The IP address and port of the peer"},
{RPCResult::Type::STR, "addrbind", "(ip:port) Bind address of the connection to the peer"},
{RPCResult::Type::STR, "addrlocal", "(ip:port) Local address as reported by the peer"},
{RPCResult::Type::BOOL, "addr_relay_enabled", "Whether we participate in address relay with this peer"},
{RPCResult::Type::STR, "network", "Network (" + Join(GetNetworkNames(/* append_unroutable */ true), ", ") + ")"},
{RPCResult::Type::NUM, "mapped_as", "The AS in the BGP route to the peer used for diversifying\n"
"peer selection (only available if the asmap config flag is set)"},
Expand Down Expand Up @@ -210,6 +211,7 @@ static RPCHelpMan getpeerinfo()
if (!(stats.addrLocal.empty())) {
obj.pushKV("addrlocal", stats.addrLocal);
}
obj.pushKV("addr_relay_enabled", statestats.m_addr_relay_enabled);
obj.pushKV("network", GetNetworkName(stats.m_network));
if (stats.m_mapped_as != 0) {
obj.pushKV("mapped_as", uint64_t(stats.m_mapped_as));
Expand Down
82 changes: 77 additions & 5 deletions test/functional/p2p_addr_relay.py
Expand Up @@ -11,14 +11,15 @@
NODE_NETWORK,
NODE_WITNESS,
msg_addr,
msg_getaddr
msg_getaddr,
msg_verack
)
from test_framework.p2p import (
P2PInterface,
p2p_lock,
)
from test_framework.test_framework import SyscoinTestFramework
from test_framework.util import assert_equal
from test_framework.util import assert_equal, assert_greater_than
import random
import time

Expand All @@ -27,10 +28,12 @@ class AddrReceiver(P2PInterface):
num_ipv4_received = 0
test_addr_contents = False
_tokens = 1
send_getaddr = True

def __init__(self, test_addr_contents=False):
def __init__(self, test_addr_contents=False, send_getaddr=True):
super().__init__()
self.test_addr_contents = test_addr_contents
self.send_getaddr = send_getaddr

def on_addr(self, message):
for addr in message.addrs:
Expand Down Expand Up @@ -60,6 +63,11 @@ def increment_tokens(self, n):
def addr_received(self):
return self.num_ipv4_received != 0

def on_version(self, message):
self.send_message(msg_verack())
if (self.send_getaddr):
self.send_message(msg_getaddr())

def getaddr_received(self):
return self.message_count['getaddr'] > 0

Expand All @@ -75,6 +83,10 @@ def set_test_params(self):
def run_test(self):
self.oversized_addr_test()
self.relay_tests()
self.inbound_blackhole_tests()

# This test populates the addrman, which can impact the node's behavior
# in subsequent tests
self.getaddr_tests()
self.blocksonly_mode_tests()
self.rate_limit_tests()
Expand Down Expand Up @@ -156,7 +168,7 @@ def relay_tests(self):
self.nodes[0].disconnect_p2ps()

self.log.info('Check relay of addresses received from outbound peers')
inbound_peer = self.nodes[0].add_p2p_connection(AddrReceiver(test_addr_contents=True))
inbound_peer = self.nodes[0].add_p2p_connection(AddrReceiver(test_addr_contents=True, send_getaddr=False))
full_outbound_peer = self.nodes[0].add_outbound_p2p_connection(AddrReceiver(), p2p_idx=0, connection_type="outbound-full-relay")
msg = self.setup_addr_msg(2)
self.send_addr_msg(full_outbound_peer, msg, [inbound_peer])
Expand All @@ -167,6 +179,9 @@ def relay_tests(self):
# of the outbound peer which is often sent before the GETADDR response.
assert_equal(inbound_peer.num_ipv4_received, 0)

# Send an empty ADDR message to intialize address relay on this connection.
inbound_peer.send_and_ping(msg_addr())

self.log.info('Check that subsequent addr messages sent from an outbound peer are relayed')
msg2 = self.setup_addr_msg(2)
self.send_addr_msg(full_outbound_peer, msg2, [inbound_peer])
Expand All @@ -184,7 +199,64 @@ def relay_tests(self):

self.nodes[0].disconnect_p2ps()

def sum_addr_messages(self, msgs_dict):
return sum(bytes_received for (msg, bytes_received) in msgs_dict.items() if msg in ['addr', 'addrv2', 'getaddr'])

def inbound_blackhole_tests(self):
self.log.info('Check that we only relay addresses to inbound peers who have previously sent us addr related messages')

addr_source = self.nodes[0].add_p2p_connection(P2PInterface())
receiver_peer = self.nodes[0].add_p2p_connection(AddrReceiver())
blackhole_peer = self.nodes[0].add_p2p_connection(AddrReceiver(send_getaddr=False))
initial_addrs_received = receiver_peer.num_ipv4_received

peerinfo = self.nodes[0].getpeerinfo()
assert_equal(peerinfo[0]['addr_relay_enabled'], True) # addr_source
assert_equal(peerinfo[1]['addr_relay_enabled'], True) # receiver_peer
assert_equal(peerinfo[2]['addr_relay_enabled'], False) # blackhole_peer

# addr_source sends 2 addresses to node0
msg = self.setup_addr_msg(2)
addr_source.send_and_ping(msg)
self.mocktime += 30 * 60
self.nodes[0].setmocktime(self.mocktime)
receiver_peer.sync_with_ping()
blackhole_peer.sync_with_ping()

peerinfo = self.nodes[0].getpeerinfo()

# Confirm node received addr-related messages from receiver peer
assert_greater_than(self.sum_addr_messages(peerinfo[1]['bytesrecv_per_msg']), 0)
# And that peer received addresses
assert_equal(receiver_peer.num_ipv4_received - initial_addrs_received, 2)

# Confirm node has not received addr-related messages from blackhole peer
assert_equal(self.sum_addr_messages(peerinfo[2]['bytesrecv_per_msg']), 0)
# And that peer did not receive addresses
assert_equal(blackhole_peer.num_ipv4_received, 0)

self.log.info("After blackhole peer sends addr message, it becomes eligible for addr gossip")
blackhole_peer.send_and_ping(msg_addr())

# Confirm node has now received addr-related messages from blackhole peer
assert_greater_than(self.sum_addr_messages(peerinfo[1]['bytesrecv_per_msg']), 0)
assert_equal(self.nodes[0].getpeerinfo()[2]['addr_relay_enabled'], True)

msg = self.setup_addr_msg(2)
self.send_addr_msg(addr_source, msg, [receiver_peer, blackhole_peer])

# And that peer received addresses
assert_equal(blackhole_peer.num_ipv4_received, 2)

self.nodes[0].disconnect_p2ps()

def getaddr_tests(self):
# In the previous tests, the node answered GETADDR requests with an
# empty addrman. Due to GETADDR response caching (see
# CConnman::GetAddresses), the node would continue to provide 0 addrs
# in response until enough time has passed or the node is restarted.
self.restart_node(0)

self.log.info('Test getaddr behavior')
self.log.info('Check that we send a getaddr message upon connecting to an outbound-full-relay peer')
full_outbound_peer = self.nodes[0].add_outbound_p2p_connection(AddrReceiver(), p2p_idx=0, connection_type="outbound-full-relay")
Expand All @@ -197,7 +269,7 @@ def getaddr_tests(self):
assert_equal(block_relay_peer.getaddr_received(), False)

self.log.info('Check that we answer getaddr messages only from inbound peers')
inbound_peer = self.nodes[0].add_p2p_connection(AddrReceiver())
inbound_peer = self.nodes[0].add_p2p_connection(AddrReceiver(send_getaddr=False))
inbound_peer.sync_with_ping()

# Add some addresses to addrman
Expand Down
1 change: 1 addition & 0 deletions test/functional/test_framework/p2p.py
Expand Up @@ -466,6 +466,7 @@ def on_version(self, message):
self.send_message(msg_sendaddrv2())
self.send_message(msg_verack())
self.nServices = message.nServices
self.send_message(msg_getaddr())

# Connection helper methods
def wait_until(self, test_function_in, *, timeout=60, sleep=0.05, check_connected=True):
Expand Down

0 comments on commit 9045bd0

Please sign in to comment.