Skip to content

Commit

Permalink
Composite fixes (#5553)
Browse files Browse the repository at this point in the history
(cherry picked from commit fb2bceb)

# Conflicts:
#	src/Npgsql/Internal/PgWriter.cs
  • Loading branch information
NinoFloris committed Jan 31, 2024
1 parent 62701ba commit 6c2f1b7
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ private protected CompositeFieldInfo(string name, PgTypeInfo typeInfo, PgTypeId
return;
}
_binaryBufferRequirements = bufferRequirements;
Converter = resolution.Converter;
}
}

Expand All @@ -48,7 +49,6 @@ public PgConverter GetReadInfo(out Size readRequirement)
return Converter;
}

// TODO this is effectively static work, we could optimize this away.
if (!PgTypeInfo.TryBind(new Field(Name, PgTypeInfo.PgTypeId.GetValueOrDefault(), -1), DataFormat.Binary, out var converterInfo))
ThrowHelper.ThrowInvalidOperationException("Converter must support binary format to participate in composite types.");

Expand Down
14 changes: 3 additions & 11 deletions src/Npgsql/Internal/Converters/CompositeConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,19 @@ public CompositeConverter(CompositeInfo<T> composite)
writeReq = writeReq.Combine(Size.CreateUpperBound(0));
}

req = req.Combine(
// If a read is Unknown (streaming) we can map it to zero as we just want a minimum buffered size.
readReq is { Kind: SizeKind.Unknown } ? Size.Zero : readReq,
// For writes Unknown means our size is dependent on the value so we can't ignore it.
writeReq);
req = req.Combine(readReq, writeReq);
}

// We have to put a limit on the requirements we report otherwise smaller buffer sizes won't work.
req = BufferRequirements.Create(Limit(req.Read), Limit(req.Write));

_bufferRequirements = req;

// Return unknown if we hit the limit.
Size Limit(Size requirement)
{
const int maxByteCount = 1024;
return requirement switch
{
{ Kind: SizeKind.UpperBound } => Size.CreateUpperBound(Math.Min(maxByteCount, requirement.Value)),
{ Kind: SizeKind.Exact } => Size.Create(Math.Min(maxByteCount, requirement.Value)),
_ => Size.Unknown
};
return requirement.GetValueOrDefault() > maxByteCount ? requirement.Combine(Size.Unknown) : requirement;
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/Npgsql/Internal/PgWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -461,19 +461,20 @@ internal ValueTask Flush(bool async, CancellationToken cancellationToken = defau
internal ValueTask<NestedWriteScope> BeginNestedWrite(bool async, Size bufferRequirement, int byteCount, object? state, CancellationToken cancellationToken)
{
Debug.Assert(bufferRequirement != -1);
if (ShouldFlush(bufferRequirement))
return Core(async, bufferRequirement, byteCount, state, cancellationToken);
// ShouldFlush depends on the current size for upper bound requirements, so we must set it beforehand.
_current = new() { Format = _current.Format, Size = byteCount, BufferRequirement = bufferRequirement, WriteState = state };
if (ShouldFlush(bufferRequirement))
return Core(async, cancellationToken);
return new(new NestedWriteScope());
#if NET6_0_OR_GREATER
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
#endif
async ValueTask<NestedWriteScope> Core(bool async, Size bufferRequirement, int byteCount, object? state, CancellationToken cancellationToken)
async ValueTask<NestedWriteScope> Core(bool async, CancellationToken cancellationToken)
{
await Flush(async, cancellationToken).ConfigureAwait(false);
_current = new() { Format = _current.Format, Size = byteCount, BufferRequirement = bufferRequirement, WriteState = state };
return new();
}
}
Expand Down
6 changes: 5 additions & 1 deletion test/Npgsql.Tests/Types/CompositeHandlerTests.Read.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ async Task Read<T>(T composite, Action<Func<T>, T> assert, string? schema = null
{
await using var dataSource = await OpenAndMapComposite(composite, schema, nameof(Read), out var name);
await using var connection = await dataSource.OpenConnectionAsync();
await using var command = new NpgsqlCommand($"SELECT ROW({composite.GetValues()})::{name}", connection);

var literal = $"ROW({composite.GetValues()})::{name}";
var arrayLiteral = $"ARRAY[{literal}]::{name}[]";
await using var command = new NpgsqlCommand($"SELECT {literal}, {arrayLiteral}", connection);
await using var reader = command.ExecuteReader();

await reader.ReadAsync();
assert(() => reader.GetFieldValue<T>(0), composite);
assert(() => reader.GetFieldValue<T[]>(1)[0], composite);
}

[Test]
Expand Down
26 changes: 20 additions & 6 deletions test/Npgsql.Tests/Types/CompositeHandlerTests.Write.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,28 @@ async Task Write<T>(T composite, Action<NpgsqlDataReader, T>? assert = null, str
{
await using var dataSource = await OpenAndMapComposite(composite, schema, nameof(Write), out var _);
await using var connection = await dataSource.OpenConnectionAsync();
await using var command = new NpgsqlCommand("SELECT (@c).*", connection);
{
await using var command = new NpgsqlCommand("SELECT (@c).*", connection);

command.Parameters.AddWithValue("c", composite);
await using var reader = await command.ExecuteReaderAsync();
await reader.ReadAsync();

if (assert is not null)
assert(reader, composite);
}

{
await using var command = new NpgsqlCommand("SELECT (@arrayc)[1].*", connection);

command.Parameters.AddWithValue("arrayc", new[] { composite });
await using var reader = await command.ExecuteReaderAsync();
await reader.ReadAsync();

command.Parameters.AddWithValue("c", composite);
await using var reader = await command.ExecuteReaderAsync();
await reader.ReadAsync();

if (assert is not null)
assert(reader, composite);
if (assert is not null)
assert(reader, composite);
}
}

[Test]
Expand Down

0 comments on commit 6c2f1b7

Please sign in to comment.