Skip to content
Permalink
Browse files

Improve protocol-level receiving code (#9617)

  • Loading branch information
sfan5 committed Apr 20, 2020
1 parent c2ac7b1 commit 8ef239b448c52485cf94d334c1d8b1c6de37d976
Showing with 126 additions and 130 deletions.
  1. +3 −1 src/network/connection.cpp
  2. +122 −128 src/network/connectionthreads.cpp
  3. +1 −1 src/network/connectionthreads.h
@@ -1173,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
m_bc_peerhandler(peerhandler)

{
m_udpSocket.setTimeoutMs(5);
/* Amount of time Receive() will wait for data, this is entirely different
* from the connection timeout */
m_udpSocket.setTimeoutMs(500);

m_sendThread->setParent(this);
m_receiveThread->setParent(this);
@@ -812,6 +812,14 @@ void *ConnectionReceiveThread::run()
ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");

// use IPv6 minimum allowed MTU as receive buffer size as this is
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
// infrastructure
const unsigned int packet_maxsize = 1500;
SharedBuffer<u8> packetdata(packet_maxsize);

bool packet_queued = true;

#ifdef DEBUG_CONNECTION_KBPS
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
@@ -830,7 +838,7 @@ void *ConnectionReceiveThread::run()
#endif

/* receive packets */
receive();
receive(packetdata, packet_queued);

#ifdef DEBUG_CONNECTION_KBPS
debug_print_timer += dtime;
@@ -892,157 +900,142 @@ void *ConnectionReceiveThread::run()
}

// Receive packets from the network and buffers and create ConnectionEvents
void ConnectionReceiveThread::receive()
void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
bool &packet_queued)
{
// use IPv6 minimum allowed MTU as receive buffer size as this is
// theoretical reliable upper boundary of a udp packet for all IPv6 enabled
// infrastructure
unsigned int packet_maxsize = 1500;
SharedBuffer<u8> packetdata(packet_maxsize);

bool packet_queued = true;

unsigned int loop_count = 0;

/* first of all read packets from socket */
/* check for incoming data available */
while ((loop_count < 10) &&
(m_connection->m_udpSocket.WaitData(50))) {
loop_count++;
try {
if (packet_queued) {
bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
while (data_left) {
try {
data_left = getFromBuffers(peer_id, resultdata);
if (data_left) {
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
}
catch (ProcessedSilentlyException &e) {
/* try reading again */
try {
// First, see if there any buffered packets we can process now
if (packet_queued) {
bool data_left = true;
session_t peer_id;
SharedBuffer<u8> resultdata;
while (data_left) {
try {
data_left = getFromBuffers(peer_id, resultdata);
if (data_left) {
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
}
packet_queued = false;
}

Address sender;
s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
packet_maxsize);

if ((received_size < BASE_HEADER_SIZE) ||
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
<< ", protocol: "
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
<< std::endl);
continue;
catch (ProcessedSilentlyException &e) {
/* try reading again */
}
}
packet_queued = false;
}

session_t peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata);
// Call Receive() to wait for incoming data
Address sender;
s32 received_size = m_connection->m_udpSocket.Receive(sender,
*packetdata, packetdata.getSize());
if (received_size < 0)
return;

if (channelnum > CHANNEL_COUNT - 1) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl);
throw InvalidIncomingDataException("Channel doesn't exist");
}
if ((received_size < BASE_HEADER_SIZE) ||
(readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid incoming packet, "
<< "size: " << received_size
<< ", protocol: "
<< ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
<< std::endl);
return;
}

/* Try to identify peer by sender address (may happen on join) */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
// We do not have to remind the peer of its
// peer id as the CONTROLTYPE_SET_PEER_ID
// command was sent reliably.
}
session_t peer_id = readPeerId(*packetdata);
u8 channelnum = readChannel(*packetdata);

/* The peer was not found in our lists. Add it. */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
}
if (channelnum > CHANNEL_COUNT - 1) {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): Invalid channel " << (u32)channelnum << std::endl);
return;
}

PeerHelper peer = m_connection->getPeerNoEx(peer_id);
/* Try to identify peer by sender address (may happen on join) */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
// We do not have to remind the peer of its
// peer id as the CONTROLTYPE_SET_PEER_ID
// command was sent reliably.
}

if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " got packet from unknown peer_id: "
<< peer_id << " Ignoring." << std::endl);
continue;
}
/* The peer was not found in our lists. Add it. */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
}

// Validate peer address
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
LOG(dout_con << m_connection->getDesc()
<< " got packet from unknown peer_id: "
<< peer_id << " Ignoring." << std::endl);
return;
}

Address peer_address;
// Validate peer address

if (peer->getAddress(MTP_UDP, peer_address)) {
if (peer_address != sender) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " sending from different address."
" Ignoring." << std::endl);
continue;
}
} else {

bool invalid_address = true;
if (invalid_address) {
LOG(derr_con << m_connection->getDesc()
<< m_connection->getDesc()
<< " Peer " << peer_id << " unknown."
" Ignoring." << std::endl);
continue;
}
Address peer_address;
if (peer->getAddress(MTP_UDP, peer_address)) {
if (peer_address != sender) {
LOG(derr_con << m_connection->getDesc()
<< " Peer " << peer_id << " sending from different address."
" Ignoring." << std::endl);
return;
}
} else {
LOG(derr_con << m_connection->getDesc()
<< " Peer " << peer_id << " doesn't have an address?!"
" Ignoring." << std::endl);
return;
}

peer->ResetTimeout();

Channel *channel = 0;
peer->ResetTimeout();

if (dynamic_cast<UDPPeer *>(&peer) != 0) {
channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
}
Channel *channel = nullptr;
if (dynamic_cast<UDPPeer *>(&peer)) {
channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
} else {
LOG(derr_con << m_connection->getDesc()
<< "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
" Ignoring." << std::endl);
return;
}

if (channel != 0) {
channel->UpdateBytesReceived(received_size);
}
channel->UpdateBytesReceived(received_size);

// Throw the received packet to channel->processPacket()
// Throw the received packet to channel->processPacket()

// Make a new SharedBuffer from the data without the base headers
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
strippeddata.getSize());
// Make a new SharedBuffer from the data without the base headers
SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
strippeddata.getSize());

try {
// Process it (the result is some data with no headers made by us)
SharedBuffer<u8> resultdata = processPacket
(channel, strippeddata, peer_id, channelnum, false);
try {
// Process it (the result is some data with no headers made by us)
SharedBuffer<u8> resultdata = processPacket
(channel, strippeddata, peer_id, channelnum, false);

LOG(dout_con << m_connection->getDesc()
<< " ProcessPacket from peer_id: " << peer_id
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);
LOG(dout_con << m_connection->getDesc()
<< " ProcessPacket from peer_id: " << peer_id
<< ", channel: " << (u32)channelnum << ", returned "
<< resultdata.getSize() << " bytes" << std::endl);

ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
catch (ProcessedSilentlyException &e) {
}
catch (ProcessedQueued &e) {
packet_queued = true;
}
}
catch (InvalidIncomingDataException &e) {
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e);
}
catch (ProcessedSilentlyException &e) {
}
catch (ProcessedQueued &e) {
// we set it to true anyway (see below)
}

/* Every time we receive a packet it can happen that a previously
* buffered packet is now ready to process. */
packet_queued = true;
}
catch (InvalidIncomingDataException &e) {
}
}

@@ -1189,7 +1182,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
m_connection->TriggerSend();
} catch (NotFoundException &e) {
LOG(derr_con << m_connection->getDesc()
<< "WARNING: ACKed packet not in outgoing queue" << std::endl);
<< "WARNING: ACKed packet not in outgoing queue"
<< " seqnum=" << seqnum << std::endl);
channel->UpdatePacketTooLateCounter();
}

@@ -101,7 +101,7 @@ class ConnectionReceiveThread : public Thread
}

private:
void receive();
void receive(SharedBuffer<u8> &packetdata, bool &packet_queued);

// Returns next data from a buffer if possible
// If found, returns true; if not, false.

0 comments on commit 8ef239b

Please sign in to comment.
You can’t perform that action at this time.