Skip to content

Commit

Permalink
Redo binary exporter column reading (npgsql#5464)
Browse files Browse the repository at this point in the history
  • Loading branch information
NinoFloris committed Dec 3, 2023
1 parent 7a288d8 commit cdf9841
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 80 deletions.
8 changes: 4 additions & 4 deletions src/Npgsql/Internal/PgReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ namespace Npgsql.Internal;

public class PgReader
{
// We don't want to add a ton of memory pressure for large strings.
internal const int MaxPreparedTextReaderSize = 1024 * 64;

readonly NpgsqlReadBuffer _buffer;

bool _resumable;
Expand Down Expand Up @@ -210,11 +213,8 @@ public ValueTask<TextReader> GetTextReaderAsync(Encoding encoding, CancellationT

async ValueTask<TextReader> GetTextReader(bool async, Encoding encoding, CancellationToken cancellationToken)
{
// We don't want to add a ton of memory pressure for large strings.
const int maxPreparedSize = 1024 * 64;

_requiresCleanup = true;
if (CurrentRemaining > _buffer.ReadBytesLeft || CurrentRemaining > maxPreparedSize)
if (CurrentRemaining > _buffer.ReadBytesLeft || CurrentRemaining > MaxPreparedTextReaderSize)
return new StreamReader(GetColumnStream(), encoding, detectEncodingFromByteOrderMarks: false);

if (_preparedTextReader is { IsDisposed: false })
Expand Down
153 changes: 77 additions & 76 deletions src/Npgsql/NpgsqlBinaryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public sealed class NpgsqlBinaryExporter : ICancelable
/// <summary>
/// The number of columns, as returned from the backend in the CopyInResponse.
/// </summary>
internal int NumColumns { get; private set; }
int NumColumns { get; set; }

PgConverterInfo[] _columnInfoCache;

Expand Down Expand Up @@ -140,16 +140,18 @@ async Task ReadHeader(bool async)

async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken = default)
{

CheckDisposed();
ThrowIfDisposed();
if (_isConsumed)
return -1;

using var registration = _connector.StartNestedCancellableOperation(cancellationToken);

// Consume and advance any active column.
if (_column >= 0)
await Commit(async, resumableOp: false).ConfigureAwait(false);
{
await Commit(async).ConfigureAwait(false);
_column++;
}

// The very first row (i.e. _column == -1) is included in the header's CopyData message.
// Otherwise we need to read in a new CopyData row (the docs specify that there's a CopyData
Expand Down Expand Up @@ -210,29 +212,6 @@ public ValueTask<T> ReadAsync<T>(CancellationToken cancellationToken = default)
ValueTask<T> Read<T>(bool async, CancellationToken cancellationToken = default)
=> Read<T>(async, null, cancellationToken);

PgConverterInfo CreateConverterInfo(Type type, NpgsqlDbType? npgsqlDbType = null)
{
var options = _connector.SerializerOptions;
PgTypeId? pgTypeId = null;
if (npgsqlDbType.HasValue)
{
pgTypeId = npgsqlDbType.Value.ToDataTypeName() is { } name
? options.GetCanonicalTypeId(name)
// Handle plugin types via lookup.
: GetRepresentationalOrDefault(npgsqlDbType.Value.ToUnqualifiedDataTypeNameOrThrow());
}
var info = options.GetTypeInfo(type, pgTypeId)
?? throw new NotSupportedException($"Reading is not supported for type '{type}'{(npgsqlDbType is null ? "" : $" and NpgsqlDbType '{npgsqlDbType}'")}");
// Binary export has no type info so we only do caller-directed interpretation of data.
return info.Bind(new Field("?", info.PgTypeId!.Value, -1), DataFormat.Binary);

PgTypeId GetRepresentationalOrDefault(string dataTypeName)
{
var type = options.DatabaseInfo.GetPostgresType(dataTypeName);
return options.ToCanonicalTypeId(type.GetRepresentationalType());
}
}

/// <summary>
/// Reads the current column, returns its value according to <paramref name="type"/> and
/// moves ahead to the next column.
Expand Down Expand Up @@ -269,39 +248,22 @@ public ValueTask<T> ReadAsync<T>(NpgsqlDbType type, CancellationToken cancellati

async ValueTask<T> Read<T>(bool async, NpgsqlDbType? type, CancellationToken cancellationToken)
{
CheckDisposed();
if (_column is BeforeRow)
ThrowHelper.ThrowInvalidOperationException("Not reading a row");
ThrowIfNotOnRow();

using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);

// Allow one more read if the field is a db null.
// We cannot allow endless rereads otherwise it becomes quite unclear when a column advance happens.
if (PgReader is { Initialized: true, Resumable: true, FieldSize: -1 })
{
await Commit(async, resumableOp: false).ConfigureAwait(false);
return DbNullOrThrow();
}
if (!IsInitializedAndAtStart)
await MoveNextColumn(async, resumableOp: false).ConfigureAwait(false);

// We must commit the current column before reading the next one unless it was an IsNull call.
PgConverterInfo info;
bool asObject;
if (!PgReader.Initialized || !PgReader.Resumable || PgReader.CurrentRemaining != PgReader.FieldSize)
if (PgReader.FieldSize is (-1 or 0) and var fieldSize)
{
await Commit(async, resumableOp: false).ConfigureAwait(false);
info = GetInfo(type, out asObject);

// We need to get info after potential I/O as we don't know beforehand at what column we're at.
var columnLen = await ReadColumnLenIfNeeded(async, resumableOp: false).ConfigureAwait(false);
if (_column == NumColumns)
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");

if (columnLen is -1)
// Commit, otherwise we'll have no way of knowing this column is finished.
await Commit(async).ConfigureAwait(false);
if (fieldSize is -1)
return DbNullOrThrow();

}
else
info = GetInfo(type, out asObject);

var info = GetInfo(type, out var asObject);

T result;
if (async)
Expand All @@ -323,6 +285,14 @@ async ValueTask<T> Read<T>(bool async, NpgsqlDbType? type, CancellationToken can

return result;

static T DbNullOrThrow()
{
// When T is a Nullable<T>, we support returning null
if (default(T) is null && typeof(T).IsValueType)
return default!;
throw new InvalidCastException("Column is null");
}

PgConverterInfo GetInfo(NpgsqlDbType? type, out bool asObject)
{
ref var cachedInfo = ref _columnInfoCache[_column];
Expand All @@ -331,12 +301,27 @@ PgConverterInfo GetInfo(NpgsqlDbType? type, out bool asObject)
return converterInfo;
}

T DbNullOrThrow()
PgConverterInfo CreateConverterInfo(Type type, NpgsqlDbType? npgsqlDbType = null)
{
// When T is a Nullable<T>, we support returning null
if (default(T) is null && typeof(T).IsValueType)
return default!;
throw new InvalidCastException("Column is null");
var options = _connector.SerializerOptions;
PgTypeId? pgTypeId = null;
if (npgsqlDbType.HasValue)
{
pgTypeId = npgsqlDbType.Value.ToDataTypeName() is { } name
? options.GetCanonicalTypeId(name)
// Handle plugin types via lookup.
: GetRepresentationalOrDefault(npgsqlDbType.Value.ToUnqualifiedDataTypeNameOrThrow());
}
var info = options.GetTypeInfo(type, pgTypeId)
?? throw new NotSupportedException($"Reading is not supported for type '{type}'{(npgsqlDbType is null ? "" : $" and NpgsqlDbType '{npgsqlDbType}'")}");
// Binary export has no type info so we only do caller-directed interpretation of data.
return info.Bind(new Field("?", info.PgTypeId!.Value, -1), DataFormat.Binary);

PgTypeId GetRepresentationalOrDefault(string dataTypeName)
{
var type = options.DatabaseInfo.GetPostgresType(dataTypeName);
return options.ToCanonicalTypeId(type.GetRepresentationalType());
}
}
}

Expand All @@ -347,8 +332,11 @@ public bool IsNull
{
get
{
Commit(async: false, resumableOp: true);
return ReadColumnLenIfNeeded(async: false, resumableOp: true).GetAwaiter().GetResult() is -1;
ThrowIfNotOnRow();
if (!IsInitializedAndAtStart)
return MoveNextColumn(async: false, resumableOp: true).GetAwaiter().GetResult() is -1;

return PgReader.FieldSize is - 1;
}
}

Expand All @@ -365,46 +353,59 @@ public Task SkipAsync(CancellationToken cancellationToken = default)

async Task Skip(bool async, CancellationToken cancellationToken = default)
{
CheckDisposed();
ThrowIfNotOnRow();

using var registration = _connector.StartNestedCancellableOperation(cancellationToken);

// We allow IsNull to have been called before skip.
if (PgReader.Initialized && PgReader is not { Resumable: true, FieldSize: -1 })
await Commit(async, resumableOp: false).ConfigureAwait(false);
await ReadColumnLenIfNeeded(async, resumableOp: false).ConfigureAwait(false);
if (!IsInitializedAndAtStart)
await MoveNextColumn(async, resumableOp: false).ConfigureAwait(false);

await PgReader.Consume(async, cancellationToken: cancellationToken).ConfigureAwait(false);

// Commit, otherwise we'll have no way of knowing this column is finished.
if (PgReader.FieldSize is -1 or 0)
await Commit(async).ConfigureAwait(false);
}

#endregion

#region Utilities

ValueTask Commit(bool async, bool resumableOp)
{
var resuming = PgReader is { Initialized: true, Resumable: true } && resumableOp;
if (!resuming)
_column++;
bool IsInitializedAndAtStart => PgReader.Initialized && (PgReader.FieldSize is -1 || PgReader.FieldOffset is 0);

ValueTask Commit(bool async)
{
if (async)
return PgReader.CommitAsync(resuming);
return PgReader.CommitAsync(resuming: false);

PgReader.Commit(resuming);
PgReader.Commit(resuming: false);
return new();
}

async ValueTask<int> ReadColumnLenIfNeeded(bool async, bool resumableOp)
async ValueTask<int> MoveNextColumn(bool async, bool resumableOp)
{
if (PgReader is { Initialized: true, Resumable: true, FieldSize: -1 })
return -1;
if (async)
await PgReader.CommitAsync(resuming: false).ConfigureAwait(false);
else
PgReader.Commit(resuming: false);

if (_column + 1 == NumColumns)
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");
_column++;
await _buf.Ensure(4, async).ConfigureAwait(false);
var columnLen = _buf.ReadInt32();
PgReader.Init(columnLen, DataFormat.Binary, resumableOp);
return PgReader.FieldSize;
}

void CheckDisposed()
void ThrowIfNotOnRow()
{
ThrowIfDisposed();
if (_column is BeforeRow)
ThrowHelper.ThrowInvalidOperationException("Not reading a row");
}

void ThrowIfDisposed()
{
if (_isDisposed)
ThrowHelper.ThrowObjectDisposedException(nameof(NpgsqlBinaryExporter), "The COPY operation has already ended.");
Expand Down
67 changes: 67 additions & 0 deletions test/Npgsql.Tests/CopyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,73 @@ public async Task Wrong_table_definition_binary_export()
Assert.That(await conn.ExecuteScalarAsync("SELECT 1"), Is.EqualTo(1));
}

[Test, IssueLink("https://github.com/npgsql/npgsql/issues/5457")]
public async Task MixedOperations()
{
if (IsMultiplexing)
Assert.Ignore("Multiplexing: fails");
using var conn = await OpenConnectionAsync();

var reader = conn.BeginBinaryExport("""
COPY (values ('foo', 1), ('bar', null), (null, 2)) TO STDOUT BINARY
""");
while(reader.StartRow() != -1)
{
string? col1 = null;
if (reader.IsNull)
reader.Skip();
else
col1 = reader.Read<string>();
int? col2 = null;
if (reader.IsNull)
reader.Skip();
else
col2 = reader.Read<int>();
}
}

[Test]
public async Task ReadMoreColumnsThanExist()
{
if (IsMultiplexing)
Assert.Ignore("Multiplexing: fails");
using var conn = await OpenConnectionAsync();

var reader = conn.BeginBinaryExport("""
COPY (values ('foo', 1), ('bar', null), (null, 2)) TO STDOUT BINARY
""");
while(reader.StartRow() != -1)
{
string? col1 = null;
if (reader.IsNull)
reader.Skip();
else
col1 = reader.Read<string>();
int? col2 = null;
if (reader.IsNull)
reader.Skip();
else
col2 = reader.Read<int>();

Assert.Throws<InvalidOperationException>(() => _ = reader.IsNull);
}
}

[Test]
public async Task StreamingRead()
{
if (IsMultiplexing)
Assert.Ignore("Multiplexing: fails");
using var conn = await OpenConnectionAsync();

var str = new string('a', PgReader.MaxPreparedTextReaderSize + 1);
var reader = conn.BeginBinaryExport($"""COPY (values ('{str}')) TO STDOUT BINARY""");
while (reader.StartRow() != -1)
{
using var _ = reader.Read<TextReader>(NpgsqlDbType.Text);
}
}

[Test, IssueLink("https://github.com/npgsql/npgsql/issues/2330")]
public async Task Wrong_format_binary_export()
{
Expand Down

0 comments on commit cdf9841

Please sign in to comment.