Skip to content

Commit

Permalink
Work around binary writing support (#2445)
Browse files Browse the repository at this point in the history
* Support writing bytea from ReadOnlyMemory<byte> and Memory<byte>
* Support async I/O when making direct writes

Closes #2392, #2444
  • Loading branch information
roji committed May 3, 2019
1 parent 31c1415 commit 951dc83
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/Npgsql/NpgsqlConnector.FrontendMessages.cs
Expand Up @@ -401,7 +401,7 @@ internal async Task WritePassword(byte[] payload, int offset, int count, bool as
}

await WriteBuffer.Flush(async);
WriteBuffer.DirectWrite(payload, offset, count);
await WriteBuffer.DirectWrite(payload, offset, count, async);
}

internal async Task WriteSASLInitialResponse(string mechanism, byte[] initialResponse, bool async)
Expand Down
2 changes: 1 addition & 1 deletion src/Npgsql/NpgsqlRawCopyStream.cs
Expand Up @@ -112,7 +112,7 @@ public override void Write(byte[] buffer, int offset, int count)
}

// Value is too big even after a flush - bypass the buffer and write directly.
_writeBuf.DirectWrite(buffer, offset, count);
_writeBuf.DirectWrite(buffer, offset, count, false).GetAwaiter().GetResult();
} catch {
_connector.Break();
Cleanup();
Expand Down
60 changes: 54 additions & 6 deletions src/Npgsql/NpgsqlWriteBuffer.cs
Expand Up @@ -140,27 +140,37 @@ public async Task Flush(bool async)
[CanBeNull]
internal NpgsqlCommand CurrentCommand { get; set; }

internal void DirectWrite(byte[] buffer, int offset, int count)
#endregion

#region Direct write

internal async Task DirectWrite(byte[] buffer, int offset, int count, bool async)
{
await Flush(async);

if (_copyMode)
{
// Flush has already written the CopyData header, need to update the length
// Flush has already written the CopyData header for us, but write the CopyData
// header to the socket with the write length before we can start writing the data directly.
Debug.Assert(WritePosition == 5);

WritePosition = 1;
WriteInt32(count + 4);
WritePosition = 5;
_copyMode = false;
Flush();
await Flush(async);
_copyMode = true;
WriteCopyDataHeader();
WriteCopyDataHeader(); // And ready the buffer after the direct write completes
}
else
Debug.Assert(WritePosition == 0);

try
{
Underlying.Write(buffer, offset, count);
if (async)
await Underlying.WriteAsync(buffer, offset, count);
else
Underlying.Write(buffer, offset, count);
}
catch (Exception e)
{
Expand All @@ -169,7 +179,45 @@ internal void DirectWrite(byte[] buffer, int offset, int count)
}
}

#endregion
#if !NETSTANDARD2_0 && !NET461
internal async Task DirectWrite(ReadOnlyMemory<byte> memory, bool async)
{
await Flush(async);

if (_copyMode)
{
// Flush has already written the CopyData header for us, but write the CopyData
// header to the socket with the write length before we can start writing the data directly.
Debug.Assert(WritePosition == 5);

WritePosition = 1;
WriteInt32(memory.Length + 4);
WritePosition = 5;
_copyMode = false;
await Flush(async);
_copyMode = true;
WriteCopyDataHeader(); // And ready the buffer after the direct write completes
}
else
Debug.Assert(WritePosition == 0);


try
{
if (async)
await Underlying.WriteAsync(memory);
else
Underlying.Write(memory.Span);
}
catch (Exception e)
{
Connector.Break();
throw new NpgsqlException("Exception while writing to stream", e);
}
}
#endif

#endregion Direct write

#region Write Simple

Expand Down
95 changes: 67 additions & 28 deletions src/Npgsql/TypeHandlers/ByteaHandler.cs
Expand Up @@ -12,8 +12,22 @@ namespace Npgsql.TypeHandlers
/// <remarks>
/// http://www.postgresql.org/docs/current/static/datatype-binary.html
/// </remarks>
[TypeMapping("bytea", NpgsqlDbType.Bytea, DbType.Binary, new[] { typeof(byte[]), typeof(ArraySegment<byte>) })]
[TypeMapping(
"bytea",
NpgsqlDbType.Bytea,
DbType.Binary,
new[] {
typeof(byte[]),
typeof(ArraySegment<byte>),
#if !NETSTANDARD2_0 && !NET461
typeof(ReadOnlyMemory<byte>),
typeof(Memory<byte>)
#endif
})]
public class ByteaHandler : NpgsqlTypeHandler<byte[]>, INpgsqlTypeHandler<ArraySegment<byte>>
#if !NETSTANDARD2_0 && !NET461
, INpgsqlTypeHandler<ReadOnlyMemory<byte>>, INpgsqlTypeHandler<Memory<byte>>
#endif
{
/// <inheritdoc />
public override async ValueTask<byte[]> Read(NpgsqlReadBuffer buf, int len, bool async, FieldDescription fieldDescription = null)
Expand All @@ -38,59 +52,84 @@ ValueTask<ArraySegment<byte>> INpgsqlTypeHandler<ArraySegment<byte>>.Read(Npgsql
throw new NpgsqlSafeReadException(new NotSupportedException("Only writing ArraySegment<byte> to PostgreSQL bytea is supported, no reading."));
}

#region Write
int ValidateAndGetLength(int bufferLen, NpgsqlParameter parameter)
=> parameter == null || parameter.Size <= 0 || parameter.Size >= bufferLen
? bufferLen
: parameter.Size;

/// <inheritdoc />
public override int ValidateAndGetLength(byte[] value, ref NpgsqlLengthCache lengthCache, NpgsqlParameter parameter)
=> parameter == null || parameter.Size <= 0 || parameter.Size >= value.Length
? value.Length
: parameter.Size;
=> ValidateAndGetLength(value.Length, parameter);

/// <inheritdoc />
public int ValidateAndGetLength(ArraySegment<byte> value, ref NpgsqlLengthCache lengthCache, NpgsqlParameter parameter)
=> parameter == null || parameter.Size <= 0 || parameter.Size >= value.Count
? value.Count
: parameter.Size;
=> ValidateAndGetLength(value.Count, parameter);

/// <inheritdoc />
public override async Task Write(byte[] value, NpgsqlWriteBuffer buf, NpgsqlLengthCache lengthCache, [CanBeNull] NpgsqlParameter parameter, bool async)
{
var len = parameter == null || parameter.Size <= 0 || parameter.Size >= value.Length
? value.Length
: parameter.Size;
public override Task Write(byte[] value, NpgsqlWriteBuffer buf, NpgsqlLengthCache lengthCache, [CanBeNull] NpgsqlParameter parameter, bool async)
=> Write(value, buf, 0, ValidateAndGetLength(value.Length, parameter), async);

// The entire array fits in our buffer, copy it into the buffer as usual.
if (len <= buf.WriteSpaceLeft)
/// <inheritdoc />
public Task Write(ArraySegment<byte> value, NpgsqlWriteBuffer buf, NpgsqlLengthCache lengthCache, [CanBeNull] NpgsqlParameter parameter, bool async)
=> Write(value.Array, buf, value.Offset, ValidateAndGetLength(value.Count, parameter), async);

async Task Write(byte[] value, NpgsqlWriteBuffer buf, int offset, int count, bool async)
{
// The entire segment fits in our buffer, copy it as usual.
if (count <= buf.WriteSpaceLeft)
{
buf.WriteBytes(value, 0, len);
buf.WriteBytes(value, offset, count);
return;
}

// The segment is larger than our buffer. Flush whatever is currently in the buffer and
// write the array directly to the socket.
await buf.Flush(async);
buf.DirectWrite(value, 0, len);
await buf.DirectWrite(value, offset, count, async);
}

#if !NETSTANDARD2_0 && !NET461
/// <inheritdoc />
public int ValidateAndGetLength(Memory<byte> value, ref NpgsqlLengthCache lengthCache, NpgsqlParameter parameter)
=> ValidateAndGetLength(value.Length, parameter);

/// <inheritdoc />
public async Task Write(ArraySegment<byte> value, NpgsqlWriteBuffer buf, NpgsqlLengthCache lengthCache, [CanBeNull] NpgsqlParameter parameter, bool async)
public int ValidateAndGetLength(ReadOnlyMemory<byte> value, ref NpgsqlLengthCache lengthCache, NpgsqlParameter parameter)
=> ValidateAndGetLength(value.Length, parameter);

/// <inheritdoc />
public async Task Write(ReadOnlyMemory<byte> value, NpgsqlWriteBuffer buf, NpgsqlLengthCache lengthCache, [CanBeNull] NpgsqlParameter parameter, bool async)
{
if (!(parameter == null || parameter.Size <= 0 || parameter.Size >= value.Count))
value = new ArraySegment<byte>(value.Array, value.Offset, Math.Min(parameter.Size, value.Count));
if (parameter != null && parameter.Size > 0 && parameter.Size < value.Length)
value = value.Slice(0, parameter.Size);

// The entire segment fits in our buffer, copy it as usual.
if (value.Count <= buf.WriteSpaceLeft)
// The entire segment fits in our buffer, copy it into the buffer as usual.
if (value.Length <= buf.WriteSpaceLeft)
{
buf.WriteBytes(value.Array, value.Offset, value.Count);
buf.WriteBytes(value.Span);
return;
}

// The segment is larger than our buffer. Flush whatever is currently in the buffer and
// write the array directly to the socket.
await buf.Flush(async);
buf.DirectWrite(value.Array, value.Offset, value.Count);
// The segment is larger than our buffer. Perform a direct write, flushing whatever is currently in the buffer
// and then writing the array directly to the socket.
await buf.DirectWrite(value, async);
}

/// <inheritdoc />
public Task Write(Memory<byte> value, NpgsqlWriteBuffer buf, NpgsqlLengthCache lengthCache, NpgsqlParameter parameter, bool async)
=> Write((ReadOnlyMemory<byte>)value, buf, lengthCache, parameter, async);

ValueTask<ReadOnlyMemory<byte>> INpgsqlTypeHandler<ReadOnlyMemory<byte>>.Read(NpgsqlReadBuffer buf, int len, bool async, FieldDescription fieldDescription)
{
buf.Skip(len);
throw new NpgsqlSafeReadException(new NotSupportedException("Only writing ReadOnlyMemory<byte> to PostgreSQL bytea is supported, no reading."));
}

#endregion
ValueTask<Memory<byte>> INpgsqlTypeHandler<Memory<byte>>.Read(NpgsqlReadBuffer buf, int len, bool async, FieldDescription fieldDescription)
{
buf.Skip(len);
throw new NpgsqlSafeReadException(new NotSupportedException("Only writing Memory<byte> to PostgreSQL bytea is supported, no reading."));
}
#endif
}
}
21 changes: 21 additions & 0 deletions test/Npgsql.Tests/Types/ByteaTests.cs
Expand Up @@ -173,6 +173,27 @@ public void ArrayOfBytea()
}
}

#if !NETSTANDARD2_0 && !NET461
[Test]
public void Memory()
{
using (var conn = OpenConnection())
using (var cmd = new NpgsqlCommand("SELECT @p1, @p2", conn))
{
var bytes = new byte[] { 1, 2, 3 };
cmd.Parameters.AddWithValue("p1", new ReadOnlyMemory<byte>(bytes));
cmd.Parameters.AddWithValue("p2", new Memory<byte>(bytes));
using (var reader = cmd.ExecuteReader())
{
reader.Read();
Assert.That(reader[0], Is.EqualTo(bytes));
Assert.That(reader[1], Is.EqualTo(bytes));
Assert.That(() => reader.GetFieldValue<ReadOnlyMemory<byte>>(0), Throws.Exception.TypeOf<NotSupportedException>());
Assert.That(() => reader.GetFieldValue<Memory<byte>>(0), Throws.Exception.TypeOf<NotSupportedException>());
}
}
}
#endif

// Older tests from here

Expand Down

0 comments on commit 951dc83

Please sign in to comment.