Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffer Socket Reads #167

Merged
merged 1 commit into from Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 27 additions & 30 deletions src/MySqlConnector/Protocol/Serialization/BufferedByteReader.cs
Expand Up @@ -5,6 +5,11 @@ namespace MySql.Data.Protocol.Serialization
{
internal sealed class BufferedByteReader
{
public BufferedByteReader()
{
m_buffer = new byte[16384];
}

public ValueTask<ArraySegment<byte>> ReadBytesAsync(IByteHandler byteHandler, int count, IOBehavior ioBehavior)
{
if (m_remainingData.Count >= count)
Expand All @@ -14,50 +19,42 @@ public ValueTask<ArraySegment<byte>> ReadBytesAsync(IByteHandler byteHandler, in
return new ValueTask<ArraySegment<byte>>(readBytes);
}

if (m_remainingData.Count == 0)
return ReadBytesAsync(byteHandler, default(ArraySegment<byte>), count, ioBehavior);

// save data from m_remainingData.Array because calling ReadAsync may invalidate it
var buffer = new byte[Math.Max(count, 16384)];
Buffer.BlockCopy(m_remainingData.Array, m_remainingData.Offset, buffer, 0, m_remainingData.Count);
var previousReadBytes = new ArraySegment<byte>(buffer, 0, m_remainingData.Count);
var buffer = count > m_buffer.Length ? new byte[count] : m_buffer;
if (m_remainingData.Count > 0)
{
Buffer.BlockCopy(m_remainingData.Array, m_remainingData.Offset, buffer, 0, m_remainingData.Count);
m_remainingData = new ArraySegment<byte>(buffer, 0, m_remainingData.Count);
}

return ReadBytesAsync(byteHandler, previousReadBytes, count, ioBehavior);
return ReadBytesAsync(byteHandler, new ArraySegment<byte>(buffer, m_remainingData.Count, buffer.Length - m_remainingData.Count), count, ioBehavior);
}

private ValueTask<ArraySegment<byte>> ReadBytesAsync(IByteHandler byteHandler, ArraySegment<byte> previousReadBytes, int count, IOBehavior ioBehavior)
private ValueTask<ArraySegment<byte>> ReadBytesAsync(IByteHandler byteHandler, ArraySegment<byte> buffer, int count, IOBehavior ioBehavior)
{
return byteHandler.ReadBytesAsync(count - previousReadBytes.Count, ioBehavior)
.ContinueWith(readBytes =>
return byteHandler.ReadBytesAsync(buffer, ioBehavior)
.ContinueWith(readBytesCount =>
{
if (readBytes.Count == 0)
return new ValueTask<ArraySegment<byte>>(previousReadBytes);

if (previousReadBytes.Array == null && readBytes.Count >= count)
if (readBytesCount == 0)
{
m_remainingData = readBytes.Slice(count);
return new ValueTask<ArraySegment<byte>>(readBytes.Slice(0, count));
var data = m_remainingData;
m_remainingData = default(ArraySegment<byte>);
return new ValueTask<ArraySegment<byte>>(data);
}

var previousReadBytesArray = previousReadBytes.Array;
if (previousReadBytesArray == null)
previousReadBytesArray = new byte[Math.Max(count, 16384)];
else if (previousReadBytesArray.Length < previousReadBytes.Count + readBytes.Count)
Array.Resize(ref previousReadBytesArray, Math.Max(previousReadBytesArray.Length * 2, previousReadBytes.Count + readBytes.Count));

Buffer.BlockCopy(readBytes.Array, readBytes.Offset, previousReadBytesArray, previousReadBytes.Offset + previousReadBytes.Count, readBytes.Count);
previousReadBytes = new ArraySegment<byte>(previousReadBytesArray, previousReadBytes.Offset, previousReadBytes.Count + readBytes.Count);

if (previousReadBytes.Count >= count)
var bufferSize = buffer.Offset + readBytesCount;
if (bufferSize >= count)
{
m_remainingData = previousReadBytes.Slice(count);
return new ValueTask<ArraySegment<byte>>(previousReadBytes.Slice(0, count));
var bufferBytes = new ArraySegment<byte>(buffer.Array, 0, bufferSize);
var requestedBytes = bufferBytes.Slice(0, count);
m_remainingData = bufferBytes.Slice(count);
return new ValueTask<ArraySegment<byte>>(requestedBytes);
}

return ReadBytesAsync(byteHandler, previousReadBytes, count, ioBehavior);
return ReadBytesAsync(byteHandler, new ArraySegment<byte>(buffer.Array, bufferSize, buffer.Array.Length - bufferSize), count, ioBehavior);
});
}

ArraySegment<byte> m_remainingData;
readonly byte[] m_buffer;
}
}
Expand Up @@ -14,6 +14,7 @@ public CompressedPayloadHandler(IByteHandler byteHandler)
m_uncompressedStreamByteHandler = new StreamByteHandler(m_uncompressedStream);
m_byteHandler = byteHandler;
m_bufferedByteReader = new BufferedByteReader();
m_compressedBufferedByteReader = new BufferedByteReader();
}

public void StartNewConversation()
Expand Down Expand Up @@ -53,26 +54,26 @@ public ValueTask<int> WritePayloadAsync(ArraySegment<byte> payload, IOBehavior i
});
}

private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
private ValueTask<int> ReadBytesAsync(ArraySegment<byte> buffer, ProtocolErrorBehavior protocolErrorBehavior, IOBehavior ioBehavior)
{
// satisfy the read from cache if possible
if (m_remainingData.Count > 0)
{
int bytesToRead = Math.Min(m_remainingData.Count, count);
var result = new ArraySegment<byte>(m_remainingData.Array, m_remainingData.Offset, bytesToRead);
var bytesToRead = Math.Min(m_remainingData.Count, buffer.Count);
Buffer.BlockCopy(m_remainingData.Array, m_remainingData.Offset, buffer.Array, buffer.Offset, bytesToRead);
m_remainingData = m_remainingData.Slice(bytesToRead);
return new ValueTask<ArraySegment<byte>>(result);
return new ValueTask<int>(bytesToRead);
}

// read the compressed header (seven bytes)
return m_bufferedByteReader.ReadBytesAsync(m_byteHandler, 7, ioBehavior)
return m_compressedBufferedByteReader.ReadBytesAsync(m_byteHandler, 7, ioBehavior)
.ContinueWith(headerReadBytes =>
{
if (headerReadBytes.Count < 7)
{
return protocolErrorBehavior == ProtocolErrorBehavior.Ignore ?
default(ValueTask<ArraySegment<byte>>) :
ValueTaskExtensions.FromException<ArraySegment<byte>>(new EndOfStreamException("Wanted to read 7 bytes but only read {0} when reading compressed packet header".FormatInvariant(headerReadBytes.Count)));
default(ValueTask<int>) :
ValueTaskExtensions.FromException<int>(new EndOfStreamException("Wanted to read 7 bytes but only read {0} when reading compressed packet header".FormatInvariant(headerReadBytes.Count)));
}

var payloadLength = (int) SerializationUtility.ReadUInt32(headerReadBytes.Array, headerReadBytes.Offset, 3);
Expand All @@ -84,10 +85,10 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
if (packetSequenceNumber != expectedSequenceNumber)
{
if (protocolErrorBehavior == ProtocolErrorBehavior.Ignore)
return default(ValueTask<ArraySegment<byte>>);
return default(ValueTask<int>);

var exception = new InvalidOperationException("Packet received out-of-order. Expected {0}; got {1}.".FormatInvariant(expectedSequenceNumber, packetSequenceNumber));
return ValueTaskExtensions.FromException<ArraySegment<byte>>(exception);
return ValueTaskExtensions.FromException<int>(exception);
}

// MySQL protocol resets the uncompressed sequence number back to the sequence number of this compressed packet.
Expand All @@ -100,14 +101,14 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
// except this doesn't happen when uncompressed packets need to be broken up across multiple compressed packets
m_isContinuationPacket = payloadLength == ProtocolUtility.MaxPacketSize || uncompressedLength == ProtocolUtility.MaxPacketSize;

return m_bufferedByteReader.ReadBytesAsync(m_byteHandler, payloadLength, ioBehavior)
return m_compressedBufferedByteReader.ReadBytesAsync(m_byteHandler, payloadLength, ioBehavior)
.ContinueWith(payloadReadBytes =>
{
if (payloadReadBytes.Count < payloadLength)
{
return protocolErrorBehavior == ProtocolErrorBehavior.Ignore ?
default(ValueTask<ArraySegment<byte>>) :
ValueTaskExtensions.FromException<ArraySegment<byte>>(new EndOfStreamException("Wanted to read {0} bytes but only read {1} when reading compressed payload".FormatInvariant(payloadLength, payloadReadBytes.Count)));
default(ValueTask<int>) :
ValueTaskExtensions.FromException<int>(new EndOfStreamException("Wanted to read {0} bytes but only read {1} when reading compressed payload".FormatInvariant(payloadLength, payloadReadBytes.Count)));
}

if (uncompressedLength == 0)
Expand All @@ -126,8 +127,8 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
// FLG & 0x40: has preset dictionary (not supported)
// CMF*256+FLG is a multiple of 31: header checksum
return protocolErrorBehavior == ProtocolErrorBehavior.Ignore ?
default(ValueTask<ArraySegment<byte>>) :
ValueTaskExtensions.FromException<ArraySegment<byte>>(new NotSupportedException("Unsupported zlib header: {0:X2}{1:X2}".FormatInvariant(cmf, flg)));
default(ValueTask<int>) :
ValueTaskExtensions.FromException<int>(new NotSupportedException("Unsupported zlib header: {0:X2}{1:X2}".FormatInvariant(cmf, flg)));
}

// zlib format (https://www.ietf.org/rfc/rfc1950.txt) is: [two header bytes] [deflate-compressed data] [four-byte checksum]
Expand All @@ -144,20 +145,22 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
var checksum = ComputeAdler32Checksum(uncompressedData, 0, bytesRead);
int adlerStartOffset = payloadReadBytes.Offset + payloadReadBytes.Count - 4;
if (payloadReadBytes.Array[adlerStartOffset + 0] != ((checksum >> 24) & 0xFF) ||
payloadReadBytes.Array[adlerStartOffset + 1] != ((checksum >> 16) & 0xFF) ||
payloadReadBytes.Array[adlerStartOffset + 2] != ((checksum >> 8) & 0xFF) ||
payloadReadBytes.Array[adlerStartOffset + 3] != (checksum & 0xFF))
payloadReadBytes.Array[adlerStartOffset + 1] != ((checksum >> 16) & 0xFF) ||
payloadReadBytes.Array[adlerStartOffset + 2] != ((checksum >> 8) & 0xFF) ||
payloadReadBytes.Array[adlerStartOffset + 3] != (checksum & 0xFF))
{
return protocolErrorBehavior == ProtocolErrorBehavior.Ignore ?
default(ValueTask<ArraySegment<byte>>) :
ValueTaskExtensions.FromException<ArraySegment<byte>>(new NotSupportedException("Invalid Adler-32 checksum of uncompressed data."));
default(ValueTask<int>) :
ValueTaskExtensions.FromException<int>(new NotSupportedException("Invalid Adler-32 checksum of uncompressed data."));
}
}
}

var result = m_remainingData.Slice(0, count);
m_remainingData = m_remainingData.Slice(count);
return new ValueTask<ArraySegment<byte>>(result);
var bytesToRead = Math.Min(m_remainingData.Count, buffer.Count);
Buffer.BlockCopy(m_remainingData.Array, m_remainingData.Offset, buffer.Array, buffer.Offset, bytesToRead);
m_remainingData = m_remainingData.Slice(bytesToRead);
return new ValueTask<int>(bytesToRead);

});
});
}
Expand Down Expand Up @@ -237,8 +240,8 @@ public CompressedByteHandler(CompressedPayloadHandler compressedPayloadHandler,
m_protocolErrorBehavior = protocolErrorBehavior;
}

public ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, IOBehavior ioBehavior) =>
m_compressedPayloadHandler.ReadBytesAsync(count, m_protocolErrorBehavior, ioBehavior);
public ValueTask<int> ReadBytesAsync(ArraySegment<byte> buffer, IOBehavior ioBehavior) =>
m_compressedPayloadHandler.ReadBytesAsync(buffer, m_protocolErrorBehavior, ioBehavior);

public ValueTask<int> WriteBytesAsync(ArraySegment<byte> data, IOBehavior ioBehavior)
{
Expand All @@ -253,6 +256,7 @@ public ValueTask<int> WriteBytesAsync(ArraySegment<byte> data, IOBehavior ioBeha
readonly IByteHandler m_uncompressedStreamByteHandler;
readonly IByteHandler m_byteHandler;
readonly BufferedByteReader m_bufferedByteReader;
readonly BufferedByteReader m_compressedBufferedByteReader;
int m_compressedSequenceNumber;
int m_uncompressedSequenceNumber;
ArraySegment<byte> m_remainingData;
Expand Down
8 changes: 4 additions & 4 deletions src/MySqlConnector/Protocol/Serialization/IByteHandler.cs
Expand Up @@ -8,13 +8,13 @@ internal interface IByteHandler
/// <summary>
/// Reads data from this byte handler.
/// </summary>
/// <param name="count">The number of bytes to read.</param>
/// <param name="buffer">The buffer to read into.</param>
/// <param name="ioBehavior">The <see cref="IOBehavior"/> to use when reading data.</param>
/// <returns>An <see cref="ArraySegment{Byte}"/> containing the data that was read. This will contain at most <paramref name="count"/> bytes.
/// If not all the data was available, fewer than <paramref name="count"/> bytes may be returned. If reading failed, zero bytes will be returned. This
/// <returns>A <see cref="ValueTask{Int32}"/>. Number of bytes read.</returns>
/// If reading failed, zero bytes will be returned. This
/// <see cref="ArraySegment{Byte}"/> will be valid to read from until the next time <see cref="ReadBytesAsync"/> or
/// <see cref="WriteBytesAsync"/> is called.</returns>
ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, IOBehavior ioBehavior);
ValueTask<int> ReadBytesAsync(ArraySegment<byte> buffer, IOBehavior ioBehavior);

/// <summary>
/// Writes data to this byte handler.
Expand Down
29 changes: 9 additions & 20 deletions src/MySqlConnector/Protocol/Serialization/SocketByteHandler.cs
Expand Up @@ -11,20 +11,18 @@ public SocketByteHandler(Socket socket)
m_socket = socket;
var socketEventArgs = new SocketAsyncEventArgs();
m_socketAwaitable = new SocketAwaitable(socketEventArgs);
m_buffer = new byte[16384];
}

public ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, IOBehavior ioBehavior)
public ValueTask<int> ReadBytesAsync(ArraySegment<byte> buffer, IOBehavior ioBehavior)
{
var buffer = count < m_buffer.Length ? m_buffer : new byte[count];
if (ioBehavior == IOBehavior.Asynchronous && m_socket.Available < count)
if (ioBehavior == IOBehavior.Asynchronous)
{
return new ValueTask<ArraySegment<byte>>(DoReadBytesAsync(buffer, 0, count));
return new ValueTask<int>(DoReadBytesAsync(buffer));
}
else
{
var bytesRead = m_socket.Receive(buffer, 0, count, SocketFlags.None);
return new ValueTask<ArraySegment<byte>>(new ArraySegment<byte>(buffer, 0, bytesRead));
var bytesRead = m_socket.Receive(buffer.Array, buffer.Offset, buffer.Count, SocketFlags.None);
return new ValueTask<int>(bytesRead);
}
}

Expand All @@ -41,30 +39,21 @@ public ValueTask<int> WriteBytesAsync(ArraySegment<byte> data, IOBehavior ioBeha
}
}

private async Task<ArraySegment<byte>> DoReadBytesAsync(byte[] buffer, int offset, int count)
private async Task<int> DoReadBytesAsync(ArraySegment<byte> buffer)
{
m_socketAwaitable.EventArgs.SetBuffer(buffer, offset, count);
m_socketAwaitable.EventArgs.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
await m_socket.ReceiveAsync(m_socketAwaitable);
return new ArraySegment<byte>(buffer, 0, m_socketAwaitable.EventArgs.BytesTransferred);
return m_socketAwaitable.EventArgs.BytesTransferred;
}

private async Task<int> DoWriteBytesAsync(ArraySegment<byte> payload)
{
if (payload.Count <= m_buffer.Length)
{
Buffer.BlockCopy(payload.Array, payload.Offset, m_buffer, 0, payload.Count);
m_socketAwaitable.EventArgs.SetBuffer(m_buffer, 0, payload.Count);
}
else
{
m_socketAwaitable.EventArgs.SetBuffer(payload.Array, payload.Offset, payload.Count);
}
m_socketAwaitable.EventArgs.SetBuffer(payload.Array, payload.Offset, payload.Count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code was intentional: d705e20

I wonder if keeping m_buffer in this new code and copying in/out of it would be an improvement or not.

await m_socket.SendAsync(m_socketAwaitable);
return 0;
}

readonly Socket m_socket;
readonly SocketAwaitable m_socketAwaitable;
readonly byte[] m_buffer;
}
}
17 changes: 7 additions & 10 deletions src/MySqlConnector/Protocol/Serialization/StreamByteHandler.cs
Expand Up @@ -9,20 +9,18 @@ internal sealed class StreamByteHandler : IByteHandler
public StreamByteHandler(Stream stream)
{
m_stream = stream;
m_buffer = new byte[16384];
}

public ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, IOBehavior ioBehavior)
public ValueTask<int> ReadBytesAsync(ArraySegment<byte> buffer, IOBehavior ioBehavior)
{
var buffer = count < m_buffer.Length ? m_buffer : new byte[count];
if (ioBehavior == IOBehavior.Asynchronous)
{
return new ValueTask<ArraySegment<byte>>(DoReadBytesAsync(buffer, count));
return new ValueTask<int>(DoReadBytesAsync(buffer));
}
else
{
var bytesRead = m_stream.Read(buffer, 0, count);
return new ValueTask<ArraySegment<byte>>(new ArraySegment<byte>(buffer, 0, bytesRead));
var bytesRead = m_stream.Read(buffer.Array, buffer.Offset, buffer.Count);
return new ValueTask<int>(bytesRead);
}
}

Expand All @@ -39,10 +37,10 @@ public ValueTask<int> WriteBytesAsync(ArraySegment<byte> data, IOBehavior ioBeha
}
}

private async Task<ArraySegment<byte>> DoReadBytesAsync(byte[] buffer, int count)
private async Task<int> DoReadBytesAsync(ArraySegment<byte> buffer)
{
var bytesRead = await m_stream.ReadAsync(buffer, 0, count).ConfigureAwait(false);
return new ArraySegment<byte>(buffer, 0, bytesRead);
var bytesRead = await m_stream.ReadAsync(buffer.Array, buffer.Offset, buffer.Count).ConfigureAwait(false);
return bytesRead;
}

private async Task<int> DoWriteBytesAsync(ArraySegment<byte> payload)
Expand All @@ -52,6 +50,5 @@ private async Task<int> DoWriteBytesAsync(ArraySegment<byte> payload)
}

readonly Stream m_stream;
readonly byte[] m_buffer;
}
}