Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix support for sending large individual messages without fragmenting…

… the transfers when using TCPMessageConnection.
  • Loading branch information...
commit 8c92c963e1552bb3cfcf687038a0d44ad7b523d7 1 parent 08a2689
Jukka Jyl�nki authored
View
9 include/kNet/DataSerializer.h
@@ -218,7 +218,7 @@ class DataSerializer
/// @return The number of bits filled so far total.
size_t BitsFilled() const { return elemOfs * 8 + bitOfs; }
- /// @return The total capacity of the buffer we are filling into.
+ /// @return The total capacity of the buffer we are filling into, in bytes.
size_t Capacity() const { return maxBytes; }
/// Returns the current byte offset the DataSerializer is writing to.
@@ -227,6 +227,13 @@ class DataSerializer
/// Returns the current bit offset in the current byte this DataSerializer is writing to, [0, 7].
size_t BitOffset() const { return bitOfs; }
+ /// Returns the total number of bits that can still be serialized into this DataSerializer object before overflowing (which throws an exception).
+ size_t BitsLeft() const { return Capacity()*8 - BitsFilled(); }
+
+ /// Returns the total number of full bytes that can still be serialized into this DataSerializer object before overflowing (which throws an exception).
+ /// @return floor(BitsLeft()/8).
+ size_t BytesLeft() const { return BitsLeft() / 8; }
+
/// Returns the bit serialized at the given bit index of this buffer.
bool DebugReadBit(int bitIndex) const;
View
8 include/kNet/Socket.h
@@ -117,6 +117,9 @@ struct OverlappedTransferBuffer
/// specify the actual number of bytes filled to buffer.buf here.
int bytesContains;
+ /// Stores the total number of bytes allocated to the buffer in the overlapped structure.
+ int bytesAllocated;
+
sockaddr_in from;
socklen_t fromLen;
};
@@ -188,9 +191,12 @@ class Socket : public RefCountable
/// Starts the sending of new data. After having filled the data to send to the OverlappedTransferBuffer that is
/// returned here, commit the send by calling EndSend. If you have called BeginSend, but decide not to send any data,
/// call AbortSend instead (otherwise memory will leak).
+ /// @param maxBytesToSend Specifies the size of the buffer that must be returned. Specify the size (or at least an
+ /// upper limit) of the message you are sending here. Specify the actual number of bytes filled in the resulting
+ /// structure.
/// @return A transfer buffer where the data to send is to be filled in. If no new data can be sent at this time,
/// this function returns 0.
- OverlappedTransferBuffer *BeginSend();
+ OverlappedTransferBuffer *BeginSend(int maxBytesToSend);
/// Finishes and queues up the given transfer that was created with a call to BeginSend.
/// @return True if send succeeded, false otherwise. In either case, the ownership of the passed buffer send
/// is taken by this Socket and may not be accessed anymore. Discard the pointer after calling this function.
View
6 src/Network.cpp
@@ -774,23 +774,23 @@ Socket *Network::StoreSocket(const Socket &cp)
void Network::SendUDPConnectDatagram(Socket &socket, Datagram *connectMessage)
{
- OverlappedTransferBuffer *sendData = socket.BeginSend();
+ const int connectMessageSize = connectMessage ? connectMessage->size : 8;
+ OverlappedTransferBuffer *sendData = socket.BeginSend(connectMessageSize);
if (!sendData)
{
LOG(LogError, "Network::SendUDPConnectDatagram: socket.BeginSend failed! Cannot send UDP connection datagram!");
return;
}
+ sendData->bytesContains = connectMessageSize;
if (connectMessage)
{
///\todo Craft the proper connection attempt datagram.
- sendData->bytesContains = std::min<int>(connectMessage->size, sendData->buffer.len);
memcpy(sendData->buffer.buf, connectMessage->data, sendData->buffer.len);
LOG(LogVerbose, "Network::SendUDPConnectDatagram: Sending UDP connect message of size %d.", (int)sendData->buffer.len);
}
else
{
///\todo Craft the proper connection attempt datagram.
- sendData->bytesContains = std::min<int>(8, sendData->buffer.len);
memset(sendData->buffer.buf, 0, sendData->buffer.len);
LOG(LogVerbose, "Network::SendUDPConnectDatagram: Sending null UDP connect message of size %d.", (int)sendData->buffer.len);
}
View
2  src/NetworkSimulator.cpp
@@ -64,7 +64,7 @@ void NetworkSimulator::SubmitSendBuffer(kNet::OverlappedTransferBuffer *buffer,
{
QueuedBuffer b;
assert(socket);
- b.buffer = socket->BeginSend();
+ b.buffer = socket->BeginSend(buffer->bytesContains);
if (b.buffer)
{
assert(b.buffer->buffer.len >= buffer->bytesContains);
View
24 src/Socket.cpp
@@ -181,6 +181,7 @@ OverlappedTransferBuffer *AllocateOverlappedTransferBuffer(int bytes)
buffer->buffer.buf = new char[bytes];
buffer->buffer.len = bytes;
buffer->bytesContains = 0;
+ buffer->bytesAllocated = bytes;
#ifdef WIN32
buffer->overlapped.hEvent = WSACreateEvent();
if (buffer->overlapped.hEvent == WSA_INVALID_EVENT)
@@ -801,7 +802,7 @@ bool Socket::IsOverlappedSendReady()
#endif
}
-OverlappedTransferBuffer *Socket::BeginSend()
+OverlappedTransferBuffer *Socket::BeginSend(int maxBytesToSend)
{
if (!writeOpen)
return 0;
@@ -819,9 +820,21 @@ OverlappedTransferBuffer *Socket::BeginSend()
if (ret == TRUE)
{
queuedSendBuffers.PopFront();
- sentData->buffer.len = maxSendSize; // This is the number of bytes that the client is allowed to fill.
- sentData->bytesContains = 0; // No bytes currently in use.
- return sentData;
+
+ // If the buffer we pulled off was too small, free it and allocate a new one which is of the desired size.
+ if (sentData->bytesAllocated < maxBytesToSend)
+ {
+ DeleteOverlappedTransferBuffer(sentData);
+ return AllocateOverlappedTransferBuffer(maxBytesToSend); ///\todo In debug mode - track this pointer.
+ }
+ else
+ {
+ // The existing transfer buffer is large enough. Prepare it for reuse and pass back to caller.
+ sentData->buffer.len = sentData->bytesAllocated; // This is the number of bytes that the client is allowed to fill.
+ sentData->bytesContains = 0; // No bytes currently in use.
+
+ return sentData;
+ }
}
if (ret == FALSE && error != WSA_IO_INCOMPLETE)
{
@@ -837,8 +850,7 @@ OverlappedTransferBuffer *Socket::BeginSend()
#endif
// No previous send buffer has finished from use (or not using overlapped transfers) - allocate a new buffer.
- OverlappedTransferBuffer *transfer = AllocateOverlappedTransferBuffer(maxSendSize);
- return transfer; ///\todo In debug mode - track this pointer.
+ return AllocateOverlappedTransferBuffer(maxBytesToSend);
}
bool Socket::EndSend(OverlappedTransferBuffer *sendBuffer)
View
39 src/TCPMessageConnection.cpp
@@ -174,25 +174,22 @@ MessageConnection::PacketSendResult TCPMessageConnection::SendOutPacket()
return PacketSendSocketClosed;
}
+ // 'serializedMessages' is a temporary data structure used only by this member function.
+ // It caches a list of all the messages we are pushing out during this call.
+ serializedMessages.clear();
+
// In the following, we start coalescing multiple messages into a single socket send() calls.
// Get the maximum number of bytes we can coalesce for the send() call. This is only a soft limit
// in the sense that if we encounter a single message that is larger than this limit, then we try
// to send that through in one send() call.
- const size_t maxSendSize = socket->MaxSendSize();
+// const size_t maxSendSize = socket->MaxSendSize();
// Push out all the pending data to the socket.
-// assert(ContainerUniqueAndNoNullElements(serializedMessages));
-// assert(ContainerUniqueAndNoNullElements(outboundQueue));
- serializedMessages.clear(); // 'serializedMessages' is a temporary data structure used only by this member function.
- OverlappedTransferBuffer *overlappedTransfer = socket->BeginSend();
- if (!overlappedTransfer)
- {
- LOG(LogError, "TCPMessageConnection::SendOutPacket: Starting an overlapped send failed!");
- return PacketSendSocketClosed;
- }
+ OverlappedTransferBuffer *overlappedTransfer = 0;
int numMessagesPacked = 0;
- DataSerializer writer(overlappedTransfer->buffer.buf, overlappedTransfer->buffer.len);
+ DataSerializer writer;
+// assert(ContainerUniqueAndNoNullElements(outboundQueue)); // This precondition should always hold (but very heavy to test, uncomment to debug)
while(outboundQueue.Size() > 0)
{
#ifdef KNET_NO_MAXHEAP
@@ -208,12 +205,26 @@ MessageConnection::PacketSendResult TCPMessageConnection::SendOutPacket()
outboundQueue.PopFront();
continue;
}
+
const int encodedMsgIdLength = VLE8_16_32::GetEncodedBitLength(msg->id) / 8;
const size_t messageContentSize = msg->dataSize + encodedMsgIdLength; // 1 byte: Message ID. X bytes: Content.
const int encodedMsgSizeLength = VLE8_16_32::GetEncodedBitLength(messageContentSize) / 8;
const size_t totalMessageSize = messageContentSize + encodedMsgSizeLength; // 2 bytes: Content length. X bytes: Content.
- // If this message won't fit into the buffer, send out all previously gathered messages (except if there were none, then try to get the big message through).
- if (writer.BytesFilled() + totalMessageSize >= maxSendSize && numMessagesPacked > 0)
+
+ if (!overlappedTransfer)
+ {
+ overlappedTransfer = socket->BeginSend(std::max<size_t>(socket->MaxSendSize(), totalMessageSize));
+ if (!overlappedTransfer)
+ {
+ LOG(LogError, "TCPMessageConnection::SendOutPacket: Starting an overlapped send failed!");
+ assert(serializedMessages.size() == 0);
+ return PacketSendSocketClosed;
+ }
+ writer = DataSerializer(overlappedTransfer->buffer.buf, overlappedTransfer->buffer.len);
+ }
+
+ // If this message won't fit into the buffer, send out all previously gathered messages.
+ if (writer.BytesLeft() < totalMessageSize)
break;
writer.AddVLE<VLE8_16_32>(messageContentSize);
@@ -230,7 +241,7 @@ MessageConnection::PacketSendResult TCPMessageConnection::SendOutPacket()
#endif
outboundQueue.PopFront();
}
-// assert(ContainerUniqueAndNoNullElements(serializedMessages));
+// assert(ContainerUniqueAndNoNullElements(serializedMessages)); // This precondition should always hold (but very heavy to test, uncomment to debug)
if (writer.BytesFilled() == 0 && outboundQueue.Size() > 0)
LOG(LogError, "Failed to send any messages to socket %s! (Probably next message was too big to fit in the buffer).", socket->ToString().c_str());
View
4 src/UDPMessageConnection.cpp
@@ -290,7 +290,7 @@ void UDPMessageConnection::HandleFlowControl()
}
// Do a fixed flow control for testing.
- datagramSendRate = 100; ///\todo Remove.
+ datagramSendRate = 1000; ///\todo Remove.
}
void UDPMessageConnection::SendOutPackets()
@@ -335,7 +335,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
if (!CanSendOutNewDatagram())
return PacketSendThrottled;
- OverlappedTransferBuffer *data = socket->BeginSend();
+ OverlappedTransferBuffer *data = socket->BeginSend(socket->MaxSendSize());
if (!data)
return PacketSendThrottled;
Please sign in to comment.
Something went wrong with that request. Please try again.