Skip to content

Commit

Permalink
Shave off buffer copies in networking code (#11607)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfan5 authored Sep 17, 2021
1 parent ea250ff commit fd8a850
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 92 deletions.
72 changes: 38 additions & 34 deletions src/network/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,12 @@ RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
return i;
}

RPBSearchResult ReliablePacketBuffer::notFound()
{
return m_list.end();
}

bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
const BufferedPacket &p = *m_list.begin();
const BufferedPacket &p = m_list.front();
result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
return true;
}
Expand All @@ -220,14 +215,14 @@ BufferedPacket ReliablePacketBuffer::popFirst()
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
throw NotFoundException("Buffer is empty");
BufferedPacket p = *m_list.begin();
m_list.erase(m_list.begin());
BufferedPacket p = std::move(m_list.front());
m_list.pop_front();

if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
m_oldest_non_answered_ack =
readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
}
return p;
}
Expand All @@ -241,28 +236,20 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
<< " not found in reliable buffer"<<std::endl);
throw NotFoundException("seqnum not found in buffer");
}
BufferedPacket p = *r;


RPBSearchResult next = r;
++next;
if (next != notFound()) {
u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
m_oldest_non_answered_ack = s;
}
BufferedPacket p = std::move(*r);

m_list.erase(r);

if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
m_oldest_non_answered_ack =
readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
}
return p;
}

void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
{
MutexAutoLock listlock(m_list_mutex);
if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
Expand Down Expand Up @@ -355,7 +342,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
}

/* update last packet number */
m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]);
}

void ReliablePacketBuffer::incrementTimeouts(float dtime)
Expand All @@ -367,17 +354,19 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime)
}
}

std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
unsigned int max_packets)
std::list<BufferedPacket>
ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
{
MutexAutoLock listlock(m_list_mutex);
std::list<BufferedPacket> timed_outs;
for (BufferedPacket &bufferedPacket : m_list) {
if (bufferedPacket.time >= timeout) {
// caller will resend packet so reset time and increase counter
bufferedPacket.time = 0.0f;
bufferedPacket.resend_count++;

timed_outs.push_back(bufferedPacket);

//this packet will be sent right afterwards reset timeout here
bufferedPacket.time = 0.0f;
if (timed_outs.size() >= max_packets)
break;
}
Expand Down Expand Up @@ -1051,20 +1040,20 @@ bool UDPPeer::processReliableSendCommand(
m_connection->GetProtocolID(), m_connection->GetPeerID(),
c.channelnum);

toadd.push(p);
toadd.push(std::move(p));
}

if (have_sequence_number) {
volatile u16 pcount = 0;
while (!toadd.empty()) {
BufferedPacket p = toadd.front();
BufferedPacket p = std::move(toadd.front());
toadd.pop();
// LOG(dout_con<<connection->getDesc()
// << " queuing reliable packet for peer_id: " << c.peer_id
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
chan.queued_reliables.push(p);
chan.queued_reliables.push(std::move(p));
pcount++;
}
sanity_check(chan.queued_reliables.size() < 0xFFFF);
Expand Down Expand Up @@ -1208,12 +1197,19 @@ Connection::~Connection()
}

/* Internal stuff */
void Connection::putEvent(ConnectionEvent &e)

void Connection::putEvent(const ConnectionEvent &e)
{
assert(e.type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(e);
}

void Connection::putEvent(ConnectionEvent &&e)
{
assert(e.type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(std::move(e));
}

void Connection::TriggerSend()
{
m_sendThread->Trigger();
Expand Down Expand Up @@ -1299,14 +1295,22 @@ ConnectionEvent Connection::waitEvent(u32 timeout_ms)
}
}

void Connection::putCommand(ConnectionCommand &c)
void Connection::putCommand(const ConnectionCommand &c)
{
if (!m_shutting_down) {
m_command_queue.push_back(c);
m_sendThread->Trigger();
}
}

void Connection::putCommand(ConnectionCommand &&c)
{
if (!m_shutting_down) {
m_command_queue.push_back(std::move(c));
m_sendThread->Trigger();
}
}

void Connection::Serve(Address bind_addr)
{
ConnectionCommand c;
Expand Down Expand Up @@ -1408,7 +1412,7 @@ void Connection::Send(session_t peer_id, u8 channelnum,
ConnectionCommand c;

c.send(peer_id, channelnum, pkt, reliable);
putCommand(c);
putCommand(std::move(c));
}

Address Connection::GetPeerAddress(session_t peer_id)
Expand Down Expand Up @@ -1508,12 +1512,12 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
<< "createPeer(): giving peer_id=" << peer_id_new << std::endl);

ConnectionCommand cmd;
SharedBuffer<u8> reply(4);
Buffer<u8> reply(4);
writeU8(&reply[0], PACKET_TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
writeU16(&reply[2], peer_id_new);
cmd.createPeer(peer_id_new,reply);
putCommand(cmd);
putCommand(std::move(cmd));

// Create peer addition event
ConnectionEvent e;
Expand Down Expand Up @@ -1560,7 +1564,7 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
writeU16(&ack[2], seqnum);

c.ack(peer_id, channelnum, ack);
putCommand(c);
putCommand(std::move(c));
m_sendThread->Trigger();
}

Expand Down
40 changes: 16 additions & 24 deletions src/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,

#pragma once

#include "irrlichttypes_bloated.h"
#include "irrlichttypes.h"
#include "peerhandler.h"
#include "socket.h"
#include "constants.h"
Expand All @@ -29,7 +29,6 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "util/numeric.h"
#include "networkprotocol.h"
#include <iostream>
#include <fstream>
#include <vector>
#include <map>

Expand Down Expand Up @@ -242,20 +241,19 @@ class ReliablePacketBuffer

BufferedPacket popFirst();
BufferedPacket popSeqnum(u16 seqnum);
void insert(BufferedPacket &p, u16 next_expected);
void insert(const BufferedPacket &p, u16 next_expected);

void incrementTimeouts(float dtime);
std::list<BufferedPacket> getTimedOuts(float timeout,
unsigned int max_packets);
std::list<BufferedPacket> getTimedOuts(float timeout, u32 max_packets);

void print();
bool empty();
RPBSearchResult notFound();
u32 size();


private:
RPBSearchResult findPacket(u16 seqnum); // does not perform locking
inline RPBSearchResult notFound() { return m_list.end(); }

std::list<BufferedPacket> m_list;

Expand Down Expand Up @@ -329,18 +327,6 @@ struct ConnectionCommand
bool raw = false;

ConnectionCommand() = default;
ConnectionCommand &operator=(const ConnectionCommand &other)
{
type = other.type;
address = other.address;
peer_id = other.peer_id;
channelnum = other.channelnum;
// We must copy the buffer here to prevent race condition
data = SharedBuffer<u8>(*other.data, other.data.getSize());
reliable = other.reliable;
raw = other.raw;
return *this;
}

void serve(Address address_)
{
Expand All @@ -364,7 +350,7 @@ struct ConnectionCommand

void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);

void ack(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
void ack(session_t peer_id_, u8 channelnum_, const Buffer<u8> &data_)
{
type = CONCMD_ACK;
peer_id = peer_id_;
Expand All @@ -373,7 +359,7 @@ struct ConnectionCommand
reliable = false;
}

void createPeer(session_t peer_id_, const SharedBuffer<u8> &data_)
void createPeer(session_t peer_id_, const Buffer<u8> &data_)
{
type = CONCMD_CREATE_PEER;
peer_id = peer_id_;
Expand Down Expand Up @@ -707,7 +693,7 @@ struct ConnectionEvent

ConnectionEvent() = default;

std::string describe()
const char *describe() const
{
switch(type) {
case CONNEVENT_NONE:
Expand All @@ -724,7 +710,7 @@ struct ConnectionEvent
return "Invalid ConnectionEvent";
}

void dataReceived(session_t peer_id_, const SharedBuffer<u8> &data_)
void dataReceived(session_t peer_id_, const Buffer<u8> &data_)
{
type = CONNEVENT_DATA_RECEIVED;
peer_id = peer_id_;
Expand Down Expand Up @@ -763,7 +749,9 @@ class Connection

/* Interface */
ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c);
// Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible
void putCommand(const ConnectionCommand &c);
void putCommand(ConnectionCommand &&c);

void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
void Serve(Address bind_addr);
Expand Down Expand Up @@ -802,11 +790,14 @@ class Connection
}

UDPSocket m_udpSocket;
// Command queue: user -> SendThread
MutexedQueue<ConnectionCommand> m_command_queue;

bool Receive(NetworkPacket *pkt, u32 timeout);

void putEvent(ConnectionEvent &e);
// Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible
void putEvent(const ConnectionEvent &e);
void putEvent(ConnectionEvent &&e);

void TriggerSend();

Expand All @@ -815,6 +806,7 @@ class Connection
return getPeerNoEx(PEER_ID_SERVER) != nullptr;
}
private:
// Event queue: ReceiveThread -> user
MutexedQueue<ConnectionEvent> m_event_queue;

session_t m_peer_id = 0;
Expand Down
Loading

0 comments on commit fd8a850

Please sign in to comment.