Skip to content

Commit 0de48fe

Browse files
committed
net: abstract sending side of transport serialization further
This makes the sending side of P2P transports mirror the receiver side: caller provides message (consisting of type and payload) to be sent, and then asks what bytes must be sent. Once the message has been fully sent, a new message can be provided. This removes the assumption that P2P serialization of messages follows a strict structure of header (a function of type and payload), followed by (unmodified) payload, and instead lets transports decide the structure themselves. It also removes the assumption that a message must always be sent at once, or that no bytes are even sent on the wire when there is no message. This opens the door for supporting traffic shaping mechanisms in the future.
1 parent 649a83c commit 0de48fe

File tree

6 files changed

+163
-38
lines changed

6 files changed

+163
-38
lines changed

src/net.cpp

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -824,8 +824,13 @@ CNetMessage V1Transport::GetReceivedMessage(const std::chrono::microseconds time
824824
return msg;
825825
}
826826

827-
void V1Transport::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const
827+
bool V1Transport::SetMessageToSend(CSerializedNetMsg& msg) noexcept
828828
{
829+
AssertLockNotHeld(m_send_mutex);
830+
// Determine whether a new message can be set.
831+
LOCK(m_send_mutex);
832+
if (m_sending_header || m_bytes_sent < m_message_to_send.data.size()) return false;
833+
829834
// create dbl-sha256 checksum
830835
uint256 hash = Hash(msg.data);
831836

@@ -834,8 +839,50 @@ void V1Transport::prepareForTransport(CSerializedNetMsg& msg, std::vector<unsign
834839
memcpy(hdr.pchChecksum, hash.begin(), CMessageHeader::CHECKSUM_SIZE);
835840

836841
// serialize header
837-
header.reserve(CMessageHeader::HEADER_SIZE);
838-
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
842+
m_header_to_send.clear();
843+
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, m_header_to_send, 0, hdr};
844+
845+
// update state
846+
m_message_to_send = std::move(msg);
847+
m_sending_header = true;
848+
m_bytes_sent = 0;
849+
return true;
850+
}
851+
852+
Transport::BytesToSend V1Transport::GetBytesToSend() const noexcept
853+
{
854+
AssertLockNotHeld(m_send_mutex);
855+
LOCK(m_send_mutex);
856+
if (m_sending_header) {
857+
return {Span{m_header_to_send}.subspan(m_bytes_sent),
858+
// We have more to send after the header if the message has payload.
859+
!m_message_to_send.data.empty(),
860+
m_message_to_send.m_type
861+
};
862+
} else {
863+
return {Span{m_message_to_send.data}.subspan(m_bytes_sent),
864+
// We never have more to send after this message's payload.
865+
false,
866+
m_message_to_send.m_type
867+
};
868+
}
869+
}
870+
871+
void V1Transport::MarkBytesSent(size_t bytes_sent) noexcept
872+
{
873+
AssertLockNotHeld(m_send_mutex);
874+
LOCK(m_send_mutex);
875+
m_bytes_sent += bytes_sent;
876+
if (m_sending_header && m_bytes_sent == m_header_to_send.size()) {
877+
// We're done sending a message's header. Switch to sending its data bytes.
878+
m_sending_header = false;
879+
m_bytes_sent = 0;
880+
} else if (!m_sending_header && m_bytes_sent == m_message_to_send.data.size()) {
881+
// We're done sending a message's data. Wipe the data vector to reduce memory consumption.
882+
m_message_to_send.data.clear();
883+
m_message_to_send.data.shrink_to_fit();
884+
m_bytes_sent = 0;
885+
}
839886
}
840887

841888
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
@@ -2910,27 +2957,40 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
29102957
msg.data.data()
29112958
);
29122959

2913-
// make sure we use the appropriate network transport format
2914-
std::vector<unsigned char> serializedHeader;
2915-
pnode->m_transport->prepareForTransport(msg, serializedHeader);
2916-
size_t nTotalSize = nMessageSize + serializedHeader.size();
2917-
29182960
size_t nBytesSent = 0;
29192961
{
29202962
LOCK(pnode->cs_vSend);
2921-
bool optimisticSend(pnode->vSendMsg.empty());
2922-
2923-
//log total amount of bytes per message type
2924-
pnode->AccountForSentBytes(msg.m_type, nTotalSize);
2925-
pnode->nSendSize += nTotalSize;
2926-
2927-
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
2928-
pnode->vSendMsg.push_back(std::move(serializedHeader));
2929-
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));
2930-
2931-
// If write queue empty, attempt "optimistic write"
2932-
bool data_left;
2933-
if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
2963+
const bool queue_was_empty{pnode->vSendMsg.empty()};
2964+
2965+
// Give the message to the transport, and add all bytes it wants us to send out as byte
2966+
// vectors to vSendMsg. This is temporary code that exists to support the new transport
2967+
// sending interface using the old way of queueing data. In a future commit vSendMsg will
2968+
// be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code
2969+
// will disappear.
2970+
bool queued = pnode->m_transport->SetMessageToSend(msg);
2971+
assert(queued);
2972+
// In the current transport (V1Transport), GetBytesToSend first returns a header to send,
2973+
// and then the payload data (if any), necessitating a loop.
2974+
while (true) {
2975+
const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend();
2976+
if (bytes.empty()) break;
2977+
// Update statistics per message type.
2978+
pnode->AccountForSentBytes(msg_type, bytes.size());
2979+
// Update number of bytes in the send buffer.
2980+
pnode->nSendSize += bytes.size();
2981+
if (pnode->nSendSize > nSendBufferMaxSize) pnode->fPauseSend = true;
2982+
pnode->vSendMsg.push_back({bytes.begin(), bytes.end()});
2983+
// Notify transport that bytes have been processed (they're not actually sent yet,
2984+
// but pushed onto the vSendMsg queue of bytes to send).
2985+
pnode->m_transport->MarkBytesSent(bytes.size());
2986+
}
2987+
2988+
// If the write queue was empty before and isn't now, attempt "optimistic write":
2989+
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
2990+
// doing a send, try sending from the calling thread if the queue was empty before.
2991+
if (queue_was_empty && !pnode->vSendMsg.empty()) {
2992+
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
2993+
}
29342994
}
29352995
if (nBytesSent) RecordBytesSent(nBytesSent);
29362996
}

src/net.h

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,10 +270,49 @@ class Transport {
270270
/** Retrieve a completed message from transport (only when ReceivedMessageComplete). */
271271
virtual CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) = 0;
272272

273-
// 2. Sending side functions:
273+
// 2. Sending side functions, for converting messages into bytes to be sent over the wire.
274274

275-
// prepare message for transport (header construction, error-correction computation, payload encryption, etc.)
276-
virtual void prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const = 0;
275+
/** Set the next message to send.
276+
*
277+
* If no message can currently be set (perhaps because the previous one is not yet done being
278+
* sent), returns false, and msg will be unmodified. Otherwise msg is enqueued (and
279+
* possibly moved-from) and true is returned.
280+
*/
281+
virtual bool SetMessageToSend(CSerializedNetMsg& msg) noexcept = 0;
282+
283+
/** Return type for GetBytesToSend, consisting of:
284+
* - Span<const uint8_t> to_send: span of bytes to be sent over the wire (possibly empty).
285+
* - bool more: whether there will be more bytes to be sent after the ones in to_send are
286+
* all sent (as signaled by MarkBytesSent()).
287+
* - const std::string& m_type: message type on behalf of which this is being sent.
288+
*/
289+
using BytesToSend = std::tuple<
290+
Span<const uint8_t> /*to_send*/,
291+
bool /*more*/,
292+
const std::string& /*m_type*/
293+
>;
294+
295+
/** Get bytes to send on the wire.
296+
*
297+
* As a const function, it does not modify the transport's observable state, and is thus safe
298+
* to be called multiple times.
299+
*
300+
* The bytes returned by this function act as a stream which can only be appended to. This
301+
* means that with the exception of MarkBytesSent, operations on the transport can only append
302+
* to what is being returned.
303+
*
304+
* Note that m_type and to_send refer to data that is internal to the transport, and calling
305+
* any non-const function on this object may invalidate them.
306+
*/
307+
virtual BytesToSend GetBytesToSend() const noexcept = 0;
308+
309+
/** Report how many bytes returned by the last GetBytesToSend() have been sent.
310+
*
311+
* bytes_sent cannot exceed to_send.size() of the last GetBytesToSend() result.
312+
*
313+
* If bytes_sent=0, this call has no effect.
314+
*/
315+
virtual void MarkBytesSent(size_t bytes_sent) noexcept = 0;
277316
};
278317

279318
class V1Transport final : public Transport
@@ -314,6 +353,17 @@ class V1Transport final : public Transport
314353
return hdr.nMessageSize == nDataPos;
315354
}
316355

356+
/** Lock for sending state. */
357+
mutable Mutex m_send_mutex;
358+
/** The header of the message currently being sent. */
359+
std::vector<uint8_t> m_header_to_send GUARDED_BY(m_send_mutex);
360+
/** The data of the message currently being sent. */
361+
CSerializedNetMsg m_message_to_send GUARDED_BY(m_send_mutex);
362+
/** Whether we're currently sending header bytes or message bytes. */
363+
bool m_sending_header GUARDED_BY(m_send_mutex) {false};
364+
/** How many bytes have been sent so far (from m_header_to_send, or from m_message_to_send.data). */
365+
size_t m_bytes_sent GUARDED_BY(m_send_mutex) {0};
366+
317367
public:
318368
V1Transport(const CChainParams& chain_params, const NodeId node_id, int nTypeIn, int nVersionIn)
319369
: m_chain_params(chain_params),
@@ -354,7 +404,9 @@ class V1Transport final : public Transport
354404

355405
CNetMessage GetReceivedMessage(std::chrono::microseconds time, bool& reject_message) override EXCLUSIVE_LOCKS_REQUIRED(!m_recv_mutex);
356406

357-
void prepareForTransport(CSerializedNetMsg& msg, std::vector<unsigned char>& header) const override;
407+
bool SetMessageToSend(CSerializedNetMsg& msg) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
408+
BytesToSend GetBytesToSend() const noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
409+
void MarkBytesSent(size_t bytes_sent) noexcept override EXCLUSIVE_LOCKS_REQUIRED(!m_send_mutex);
358410
};
359411

360412
struct CNodeOptions
@@ -369,7 +421,8 @@ struct CNodeOptions
369421
class CNode
370422
{
371423
public:
372-
/** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv. */
424+
/** Transport serializer/deserializer. The receive side functions are only called under cs_vRecv, while
425+
* the sending side functions are only called under cs_vSend. */
373426
const std::unique_ptr<Transport> m_transport;
374427

375428
const NetPermissionFlags m_permission_flags;

src/test/fuzz/p2p_transport_serialization.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,16 @@ FUZZ_TARGET(p2p_transport_serialization, .init = initialize_p2p_transport_serial
7979

8080
std::vector<unsigned char> header;
8181
auto msg2 = CNetMsgMaker{msg.m_recv.GetVersion()}.Make(msg.m_type, Span{msg.m_recv});
82-
send_transport.prepareForTransport(msg2, header);
82+
bool queued = send_transport.SetMessageToSend(msg2);
83+
assert(queued);
84+
std::optional<bool> known_more;
85+
while (true) {
86+
const auto& [to_send, more, _msg_type] = send_transport.GetBytesToSend();
87+
if (known_more) assert(!to_send.empty() == *known_more);
88+
if (to_send.empty()) break;
89+
send_transport.MarkBytesSent(to_send.size());
90+
known_more = more;
91+
}
8392
}
8493
}
8594
}

src/test/fuzz/process_messages.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)
6767

6868
CNode& random_node = *PickValue(fuzzed_data_provider, peers);
6969

70-
(void)connman.ReceiveMsgFrom(random_node, net_msg);
70+
(void)connman.ReceiveMsgFrom(random_node, std::move(net_msg));
7171
random_node.fPauseSend = false;
7272

7373
try {

src/test/util/net.cpp

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
4141
relay_txs),
4242
};
4343

44-
(void)connman.ReceiveMsgFrom(node, msg_version);
44+
(void)connman.ReceiveMsgFrom(node, std::move(msg_version));
4545
node.fPauseSend = false;
4646
connman.ProcessMessagesOnce(node);
4747
peerman.SendMessages(&node);
@@ -54,7 +54,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
5454
assert(statestats.their_services == remote_services);
5555
if (successfully_connected) {
5656
CSerializedNetMsg msg_verack{mm.Make(NetMsgType::VERACK)};
57-
(void)connman.ReceiveMsgFrom(node, msg_verack);
57+
(void)connman.ReceiveMsgFrom(node, std::move(msg_verack));
5858
node.fPauseSend = false;
5959
connman.ProcessMessagesOnce(node);
6060
peerman.SendMessages(&node);
@@ -70,14 +70,17 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
7070
}
7171
}
7272

73-
bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg& ser_msg) const
73+
bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
7474
{
75-
std::vector<uint8_t> ser_msg_header;
76-
node.m_transport->prepareForTransport(ser_msg, ser_msg_header);
77-
78-
bool complete;
79-
NodeReceiveMsgBytes(node, ser_msg_header, complete);
80-
NodeReceiveMsgBytes(node, ser_msg.data, complete);
75+
bool queued = node.m_transport->SetMessageToSend(ser_msg);
76+
assert(queued);
77+
bool complete{false};
78+
while (true) {
79+
const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
80+
if (to_send.empty()) break;
81+
NodeReceiveMsgBytes(node, to_send, complete);
82+
node.m_transport->MarkBytesSent(to_send.size());
83+
}
8184
return complete;
8285
}
8386

src/test/util/net.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ struct ConnmanTestMsg : public CConnman {
5454

5555
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;
5656

57-
bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg& ser_msg) const;
57+
bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const;
5858
};
5959

6060
constexpr ServiceFlags ALL_SERVICE_FLAGS[]{

0 commit comments

Comments
 (0)