Skip to content

Commit b01bfa3

Browse files
fanquakeFrank-GER
authored and
Frank-GER
committed
Merge bitcoin#28165: net: transport abstraction
8a3b6f3 refactor: make Transport::ReceivedBytes just return success/fail (Pieter Wuille) bb4aab9 net: move message conversion to wire bytes from PushMessage to SocketSendData (Pieter Wuille) a1a1060 net: measure send buffer fullness based on memory usage (Pieter Wuille) 009ff8d fuzz: add bidirectional fragmented transport test (Pieter Wuille) fb2c5ed net: make V1Transport implicitly use current chainparams (Pieter Wuille) 0de48fe net: abstract sending side of transport serialization further (Pieter Wuille) 649a83c refactor: rename Transport class receive functions (Pieter Wuille) 27f9ba2 net: add V1Transport lock protecting receive state (Pieter Wuille) 93594e4 refactor: merge transport serializer and deserializer into Transport class (Pieter Wuille) Pull request description: This PR furthers the P2P message serialization/deserialization abstraction introduced in bitcoin#16202 and bitcoin#16562, in preparation for introducing the BIP324 v2 transport (making this part of bitcoin#27634). However, nothing in this PR is BIP324-specific, and it contains a number of independently useful improvements. The overall idea is to have a single object in every `CNode` (called `m_transport`) that is responsible for converting sent messages to wire bytes, and for converting received wire bytes back to messages, while having as little as possible knowledge about this conversion process in higher-level net code. To accomplish that, there is an abstract `Transport` class with (currently) a single `V1Transport` implementation. Structurally, the above is accomplished by: * Merging the `TransportDeserializer` and `TransportSerializer` classes into a single `Transport` class, which encompasses both the sending and receiving side. For `V1Transport` these two sides are entirely separate, but this assumption doesn't hold for the BIP324 transport where e.g. the sending encryption key depends on the DH key negotiation data received from the other side. Merging the two means a future `V2Transport` can handle all this interaction without callers needing to be aware. * Removing the assumption that each message is sent using a computed header followed by (unmodified) data bytes. To achieve that, the sending side of `Transport` mirrors what the receiver side does: callers can set a message to be sent, then ask what bytes must be sent out, and then allowing them to transition to the next message. * Adding internal locks to protect the sending and receiving state of the `V1Transport` implementation. I believe these aren't strictly needed (opinions welcome) as there is no real way to use `Transport` objects in a multi-threaded fashion without some form of external synchronization (e.g. "get next bytes to send" isn't meaningful to call from multiple threads at the same time without mechanism to control the order they'll actually get sent). Still, I feel it's cleaner to make the object responsible for its own consistency (as we definitely do not want the entire object to be under a single external GUARDED_BY, as that'd prevent simultaneous sending and receiving). * Moving the conversion of messages to bytes on the sending side from `PushMessage` to `SocketSendData`, which is needed to deal with the fact that a transport may not immediately be able to send messages. This PR is not a refactor, though some commits are. Among the semantic changes are: * Changing the send buffer pushback mechanism to trigger based on the memory usage of the buffer rather than the amount of bytes to be sent. This is both closer to the desired behavior, and makes the buffering independent from transport details (which is why it's included here). * When optimistic send is not applicable, the V1 message checksum calculation now runs in the net thread rather than the message handling thread. I believe that's generally an improvement, as the message handling thread is far more computationally bottlenecked already. * The checksum calculation now runs under the `CNode::cs_vSend` lock, which does mean no two checksum calculations for messages sent to the same node can run in parallel, even if running in separate threads. Despite that limitation, having the checksum for non-optimistic sends moved in the net thread is still an improvement, I believe. * Statistics for per-message-type sent bytes are now updated when the bytes are actually handed to the OS rather than in `PushMessage`. This is because the actual serialized sizes aren't known until they've gone through the transport object. A fuzz test of the entire `V1Transport` is included. More elaborate rationale for each of the changes can be found in the commit messages. ACKs for top commit: theStack: re-ACK 8a3b6f3 vasild: ACK 8a3b6f3 dergoegge: Code review ACK 8a3b6f3 Tree-SHA512: 26e9a6df47f1dd3e3f3edb4874edf365728e5a8bbc9d0d4d71fb6000cb2dfde5574902c47ffcf825af6743922f2ff9d31a5a38942a196f4ca6669122e15e42e4
1 parent 1811837 commit b01bfa3

8 files changed

+578
-138
lines changed

src/init.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ void SetupServerArgs(ArgsManager& argsman)
582582
argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
583583
argsman.AddArg("-maxconnections=<n>", strprintf("Maintain at most <n> connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
584584
argsman.AddArg("-maxreceivebuffer=<n>", strprintf("Maximum per-connection receive buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
585-
argsman.AddArg("-maxsendbuffer=<n>", strprintf("Maximum per-connection send buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
585+
argsman.AddArg("-maxsendbuffer=<n>", strprintf("Maximum per-connection memory usage for the send buffer, <n>*1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
586586
argsman.AddArg("-maxtimeadjustment", strprintf("Maximum allowed median peer time offset adjustment. Local perspective of time may be influenced by outbound peers forward or backward by this amount (default: %u seconds).", DEFAULT_MAX_TIME_ADJUSTMENT), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
587587
argsman.AddArg("-maxuploadtarget=<n>", strprintf("Tries to keep outbound traffic under the given target per 24h. Limit does not apply to peers with 'download' permission or blocks created within past week. 0 = no limit (default: %s). Optional suffix units [k|K|m|M|g|G|t|T] (default: M). Lowercase is 1000 base while uppercase is 1024 base", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);
588588
argsman.AddArg("-onion=<ip:port>", "Use separate SOCKS5 proxy to reach peers via Tor onion services, set -noonion to disable (default: -proxy)", ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION);

src/net.cpp

+146-52
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <crypto/sha256.h>
2020
#include <i2p.h>
2121
#include <logging.h>
22+
#include <memusage.h>
2223
#include <net_permissions.h>
2324
#include <netaddress.h>
2425
#include <netbase.h>
@@ -123,6 +124,14 @@ std::map<CNetAddr, LocalServiceInfo> mapLocalHost GUARDED_BY(g_maplocalhost_mute
123124
static bool vfLimited[NET_MAX] GUARDED_BY(g_maplocalhost_mutex) = {};
124125
std::string strSubVersion;
125126

127+
size_t CSerializedNetMsg::GetMemoryUsage() const noexcept
128+
{
129+
// Don't count the dynamic memory used for the m_type string, by assuming it fits in the
130+
// "small string" optimization area (which stores data inside the object itself, up to some
131+
// size; 15 bytes in modern libstdc++).
132+
return sizeof(*this) + memusage::DynamicUsage(data);
133+
}
134+
126135
void CConnman::AddAddrFetch(const std::string& strDest)
127136
{
128137
LOCK(m_addr_fetches_mutex);
@@ -747,16 +756,15 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
747756
nRecvBytes += msg_bytes.size();
748757
while (msg_bytes.size() > 0) {
749758
// absorb network data
750-
int handled = m_deserializer->Read(msg_bytes);
751-
if (handled < 0) {
752-
// Serious header problem, disconnect from the peer.
759+
if (!m_transport->ReceivedBytes(msg_bytes)) {
760+
// Serious transport problem, disconnect from the peer.
753761
return false;
754762
}
755763

756-
if (m_deserializer->Complete()) {
764+
if (m_transport->ReceivedMessageComplete()) {
757765
// decompose a transport agnostic CNetMessage from the deserializer
758766
bool reject_message{false};
759-
CNetMessage msg = m_deserializer->GetMessage(time, reject_message);
767+
CNetMessage msg = m_transport->GetReceivedMessage(time, reject_message);
760768
if (reject_message) {
761769
// Message deserialization failed. Drop the message but don't disconnect the peer.
762770
// store the size of the corrupt message
@@ -783,8 +791,18 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
783791
return true;
784792
}
785793

786-
int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
794+
V1Transport::V1Transport(const NodeId node_id, int nTypeIn, int nVersionIn) noexcept :
795+
m_node_id(node_id), hdrbuf(nTypeIn, nVersionIn), vRecv(nTypeIn, nVersionIn)
787796
{
797+
assert(std::size(Params().MessageStart()) == std::size(m_magic_bytes));
798+
std::copy(std::begin(Params().MessageStart()), std::end(Params().MessageStart()), m_magic_bytes);
799+
LOCK(m_recv_mutex);
800+
Reset();
801+
}
802+
803+
int V1Transport::readHeader(Span<const uint8_t> msg_bytes)
804+
{
805+
AssertLockHeld(m_recv_mutex);
788806
// copy data to temporary parsing buffer
789807
unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos;
790808
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
@@ -806,7 +824,7 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
806824
}
807825

808826
// Check start string, network magic
809-
if (memcmp(hdr.pchMessageStart, m_chain_params.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
827+
if (memcmp(hdr.pchMessageStart, m_magic_bytes, CMessageHeader::MESSAGE_START_SIZE) != 0) {
810828
LogPrint(BCLog::NET, "Header error: Wrong MessageStart %s received, peer=%d\n", HexStr(hdr.pchMessageStart), m_node_id);
811829
return -1;
812830
}
@@ -822,8 +840,9 @@ int V1TransportDeserializer::readHeader(Span<const uint8_t> msg_bytes)
822840
return nCopy;
823841
}
824842

825-
int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
843+
int V1Transport::readData(Span<const uint8_t> msg_bytes)
826844
{
845+
AssertLockHeld(m_recv_mutex);
827846
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
828847
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
829848

@@ -839,19 +858,22 @@ int V1TransportDeserializer::readData(Span<const uint8_t> msg_bytes)
839858
return nCopy;
840859
}
841860

842-
const uint256& V1TransportDeserializer::GetMessageHash() const
861+
const uint256& V1Transport::GetMessageHash() const
843862
{
844-
assert(Complete());
863+
AssertLockHeld(m_recv_mutex);
864+
assert(CompleteInternal());
845865
if (data_hash.IsNull())
846866
hasher.Finalize(data_hash);
847867
return data_hash;
848868
}
849869

850-
CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds time, bool& reject_message)
870+
CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time, bool& reject_message)
851871
{
872+
AssertLockNotHeld(m_recv_mutex);
852873
// Initialize out parameter
853874
reject_message = false;
854875
// decompose a single CNetMessage from the TransportDeserializer
876+
LOCK(m_recv_mutex);
855877
CNetMessage msg(std::move(vRecv));
856878

857879
// store message type string, time, and sizes
@@ -884,53 +906,122 @@ CNetMessage V1TransportDeserializer::GetMessage(const std::chrono::microseconds
884906
return msg;
885907
}
886908

887-
void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const
909+
bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
888910
{
911+
AssertLockNotHeld(m_send_mutex);
912+
// Determine whether a new message can be set.
913+
LOCK(m_send_mutex);
914+
if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false;
915+
889916
// create dbl-sha256 checksum
890917
uint256 hash = Hash(msg.data);
891918

892919
// create header
893-
CMessageHeader hdr(Params().MessageStart(), msg.m_type.c_str(), msg.data.size());
920+
CMessageHeader hdr(m_magic_bytes, msg.m_type.c_str(), msg.data.size());
894921
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
895922

896923
// serialize header
897-
header.reserve(CMessageHeader::HEADER_SIZE);
898-
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
924+
m_header_to_send.clear();
925+
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
926+
927+
// update state
928+
m_message_to_send = std::move(msg);
929+
m_sending_header = true;
930+
m_bytes_sent = 0;
931+
return true;
932+
}
933+
934+
Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
935+
{
936+
AssertLockNotHeld(m_send_mutex);
937+
LOCK(m_send_mutex);
938+
if (m_sending_header) {
939+
return {Span{m_header_to_send}.subspan(m_bytes_sent),
940+
// We have more to send after the header if the message has payload.
941+
!m_message_to_send.data.empty(),
942+
m_message_to_send.m_type
943+
};
944+
} else {
945+
return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
946+
// We never have more to send after this message's payload.
947+
false,
948+
m_message_to_send.m_type
949+
};
950+
}
951+
}
952+
953+
void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
954+
{
955+
AssertLockNotHeld(m_send_mutex);
956+
LOCK(m_send_mutex);
957+
m_bytes_sent += bytes_sent;
958+
if (m_sending_header && m_bytes_sent == m_header_to_send.size()) {
959+
// We're done sending a message's header. Switch to sending its data bytes.
960+
m_sending_header = false;
961+
m_bytes_sent = 0;
962+
} else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
963+
// We're done sending a message's data. Wipe the data vector to reduce memory consumption.
964+
m_message_to_send.data.clear();
965+
m_message_to_send.data.shrink_to_fit();
966+
m_bytes_sent = 0;
967+
}
968+
}
969+
970+
size_t V1Transport::GetSendMemoryUsage() const noexcept
971+
{
972+
AssertLockNotHeld(m_send_mutex);
973+
LOCK(m_send_mutex);
974+
// Don't count sending-side fields besides m_message_to_send, as they're all small and bounded.
975+
return m_message_to_send.GetMemoryUsage();
899976
}
900977

901978
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
902979
{
903980
auto it = node.vSendMsg.begin();
904981
size_t nSentSize = 0;
905-
906-
while (it != node.vSendMsg.end()) {
907-
const auto& data = *it;
908-
assert(data.size() > node.nSendOffset);
982+
bool data_left{false}; //!< second return value (whether unsent data remains)
983+
984+
while (true) {
985+
if (it != node.vSendMsg.end()) {
986+
// If possible, move one message from the send queue to the transport. This fails when
987+
// there is an existing message still being sent.
988+
size_t memusage = it->GetMemoryUsage();
989+
if (node.m_transport->SetMessageToSend(*it)) {
990+
// Update memory usage of send buffer (as *it will be deleted).
991+
node.m_send_memusage -= memusage;
992+
++it;
993+
}
994+
}
995+
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
996+
data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
909997
int nBytes = 0;
910-
{
998+
if (!data.empty()) {
911999
LOCK(node.m_sock_mutex);
1000+
// There is no socket in case we've already disconnected, or in test cases without
1001+
// real connections. In these cases, we bail out immediately and just leave things
1002+
// in the send queue and transport.
9121003
if (!node.m_sock) {
9131004
break;
9141005
}
9151006
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
9161007
#ifdef MSG_MORE
917-
if (it + 1 != node.vSendMsg.end()) {
1008+
// We have more to send if either the transport itself has more, or if we have more
1009+
// messages to send.
1010+
if (more || it != node.vSendMsg.end()) {
9181011
flags |= MSG_MORE;
9191012
}
9201013
#endif
921-
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
1014+
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
9221015
}
9231016
if (nBytes > 0) {
9241017
node.m_last_send = GetTime<std::chrono::seconds>();
9251018
node.nSendBytes += nBytes;
926-
node.nSendOffset += nBytes;
1019+
// Notify transport that bytes have been processed.
1020+
node.m_transport->MarkBytesSent(nBytes);
1021+
// Update statistics per message type.
1022+
node.AccountForSentBytes(msg_type, nBytes);
9271023
nSentSize += nBytes;
928-
if (node.nSendOffset == data.size()) {
929-
node.nSendOffset = 0;
930-
node.nSendSize -= data.size();
931-
node.fPauseSend = node.nSendSize > nSendBufferMaxSize;
932-
it++;
933-
} else {
1024+
if ((size_t)nBytes != data.size()) {
9341025
// could not send full message; stop sending more
9351026
break;
9361027
}
@@ -943,17 +1034,17 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
9431034
node.CloseSocketDisconnect();
9441035
}
9451036
}
946-
// couldn't send anything at all
9471037
break;
9481038
}
9491039
}
9501040

1041+
node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;
1042+
9511043
if (it == node.vSendMsg.end()) {
952-
assert(node.nSendOffset == 0);
953-
assert(node.nSendSize == 0);
1044+
assert(node.m_send_memusage == 0);
9541045
}
9551046
node.vSendMsg.erase(node.vSendMsg.begin(), it);
956-
return {nSentSize, !node.vSendMsg.empty()};
1047+
return {nSentSize, data_left};
9571048
}
9581049

9591050
/** Try to find a connection to evict when the node is full.
@@ -1338,7 +1429,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
13381429

13391430
for (CNode* pnode : nodes) {
13401431
bool select_recv = !pnode->fPauseRecv;
1341-
bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
1432+
bool select_send;
1433+
{
1434+
LOCK(pnode->cs_vSend);
1435+
// Sending is possible if either there are bytes to send right now, or if there will be
1436+
// once a potential message from vSendMsg is handed to the transport.
1437+
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
1438+
select_send = !to_send.empty() || !pnode->vSendMsg.empty();
1439+
}
13421440
if (!select_recv && !select_send) continue;
13431441

13441442
LOCK(pnode->m_sock_mutex);
@@ -3373,8 +3471,7 @@ CNode::CNode(NodeId idIn,
33733471
ConnectionType conn_type_in,
33743472
bool inbound_onion,
33753473
CNodeOptions&& node_opts)
3376-
: m_deserializer{std::make_unique<V1TransportDeserializer>(V1TransportDeserializer(Params(), idIn, SER_NETWORK, INIT_PROTO_VERSION))},
3377-
m_serializer{std::make_unique<V1TransportSerializer>(V1TransportSerializer())},
3474+
: m_transport{std::make_unique<V1Transport>(idIn, SER_NETWORK, INIT_PROTO_VERSION)},
33783475
m_permission_flags{node_opts.permission_flags},
33793476
m_sock{sock},
33803477
m_connected{GetTime<std::chrono::seconds>()},
@@ -3464,27 +3561,24 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
34643561
msg.data.data()
34653562
);
34663563

3467-
// make sure we use the appropriate network transport format
3468-
std::vector<unsigned char> serializedHeader;
3469-
pnode->m_serializer->prepareForTransport(msg, serializedHeader);
3470-
size_t nTotalSize = nMessageSize + serializedHeader.size();
3471-
34723564
size_t nBytesSent = 0;
34733565
{
34743566
LOCK(pnode->cs_vSend);
3475-
bool optimisticSend(pnode->vSendMsg.empty());
3567+
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
3568+
const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};
34763569

3477-
//log total amount of bytes per message type
3478-
pnode->AccountForSentBytes(msg.m_type, nTotalSize);
3479-
pnode->nSendSize += nTotalSize;
3570+
// Update memory usage of send buffer.
3571+
pnode->m_send_memusage += msg.GetMemoryUsage();
3572+
if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
3573+
// Move message to vSendMsg queue.
3574+
pnode->vSendMsg.push_back(std::move(msg));
34803575

3481-
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
3482-
pnode->vSendMsg.push_back(std::move(serializedHeader));
3483-
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
3484-
3485-
// If write queue empty, attempt "optimistic write"
3486-
bool data_left;
3487-
if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
3576+
// If there was nothing to send before, attempt "optimistic write":
3577+
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
3578+
// doing a send, try sending from the calling thread if the queue was empty before.
3579+
if (queue_was_empty) {
3580+
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
3581+
}
34883582
}
34893583
if (nBytesSent) RecordBytesSent(nBytesSent);
34903584
}

0 commit comments

Comments
 (0)