Skip to content

Commit

Permalink
GetChars fix and infra cleanup (#5618)
Browse files Browse the repository at this point in the history
Fixes #5616

(cherry picked from commit 74178a4)
  • Loading branch information
NinoFloris committed Mar 28, 2024
1 parent 07a0702 commit 1b93828
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 85 deletions.
30 changes: 13 additions & 17 deletions src/Npgsql/Internal/Converters/Primitive/TextConverters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@ public override ValueTask<TextReader> ReadAsync(PgReader reader, CancellationTok
public GetChars(int read) => Read = read;
}

sealed class GetCharsTextConverter : PgStreamingConverter<GetChars>, IResumableRead
sealed class GetCharsTextConverter : PgStreamingConverter<GetChars>
{
readonly Encoding _encoding;
public GetCharsTextConverter(Encoding encoding) => _encoding = encoding;

public override GetChars Read(PgReader reader)
=> reader.IsCharsRead
=> reader.CharsReadActive
? ResumableRead(reader)
: throw new NotSupportedException();

Expand All @@ -230,27 +230,25 @@ public override ValueTask<GetChars> ReadAsync(PgReader reader, CancellationToken
GetChars ResumableRead(PgReader reader)
{
reader.GetCharsReadInfo(_encoding, out var charsRead, out var textReader, out var charsOffset, out var buffer);
if (charsOffset < charsRead || (buffer is null && charsRead > 0))

// With variable length encodings, moving backwards based on bytes means we have to start over.
if (charsRead > charsOffset)
{
// With variable length encodings, moving backwards based on bytes means we have to start over.
reader.ResetCharsRead(out charsRead);
reader.RestartCharsRead();
charsRead = 0;
}

// First seek towards the charsOffset.
// If buffer is null read the entire thing and report the length, see sql client remarks.
// https://learn.microsoft.com/en-us/dotnet/api/system.data.sqlclient.sqldatareader.getchars
int read;
var read = ConsumeChars(textReader, buffer is null ? null : charsOffset - charsRead);
Debug.Assert(buffer is null || read == charsOffset - charsRead);
reader.AdvanceCharsRead(read);
if (buffer is null)
{
read = ConsumeChars(textReader, null);
}
else
{
var consumed = ConsumeChars(textReader, charsOffset - charsRead);
Debug.Assert(consumed == charsOffset - charsRead);
read = textReader.ReadBlock(buffer.GetValueOrDefault().Array!, buffer.GetValueOrDefault().Offset, buffer.GetValueOrDefault().Count);
}
return new(read);

read = textReader.ReadBlock(buffer.GetValueOrDefault().Array!, buffer.GetValueOrDefault().Offset, buffer.GetValueOrDefault().Count);
reader.AdvanceCharsRead(read);
return new(read);

static int ConsumeChars(TextReader reader, int? count)
Expand Down Expand Up @@ -283,8 +281,6 @@ static int ConsumeChars(TextReader reader, int? count)
return totalRead;
}
}

bool IResumableRead.Supported => true;
}

// Moved out for code size/sharing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace Npgsql.Internal.Converters;

sealed class VersionPrefixedTextConverter<T> : PgStreamingConverter<T>, IResumableRead
sealed class VersionPrefixedTextConverter<T> : PgStreamingConverter<T>
{
readonly byte _versionPrefix;
readonly PgConverter<T> _textConverter;
Expand Down Expand Up @@ -52,8 +52,6 @@ async ValueTask Write(bool async, PgWriter writer, [DisallowNull]T value, Cancel
else
_textConverter.Write(writer, value);
}

bool IResumableRead.Supported => _textConverter is IResumableRead { Supported: true };
}

static class VersionPrefixedTextConverter
Expand Down
5 changes: 0 additions & 5 deletions src/Npgsql/Internal/PgConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,6 @@ internal static PgConverter<T> UnsafeDowncast<T>(this PgConverter converter)
}
}

interface IResumableRead
{
bool Supported { get; }
}

public readonly struct SizeContext
{
[SetsRequiredMembers]
Expand Down
84 changes: 37 additions & 47 deletions src/Npgsql/Internal/PgReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ internal PgReader(NpgsqlReadBuffer buffer)

ArrayPool<byte> ArrayPool => ArrayPool<byte>.Shared;

[MemberNotNullWhen(true, nameof(_charsReadReader))]
internal bool IsCharsRead => _charsReadOffset is not null;

// Here for testing purposes
internal void BreakConnection() => throw _buffer.Connector.Break(new Exception("Broken"));

Expand Down Expand Up @@ -193,7 +190,7 @@ public string ReadNullTerminatedString(Encoding encoding)
NpgsqlReadBuffer.ColumnStream GetColumnStream(bool canSeek = false, int? length = null)
{
if (length > CurrentRemaining)
throw new ArgumentOutOfRangeException(nameof(length), "Length is larger than the current remaining value size");
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(length), "Length is larger than the current remaining value size");

_requiresCleanup = true;
// This will cause any previously handed out StreamReaders etc to throw, as intended.
Expand Down Expand Up @@ -348,10 +345,10 @@ public void Rewind(int count)
DisposeUserActiveStream(async: false).GetAwaiter().GetResult();

if (_buffer.ReadPosition < count)
throw new ArgumentOutOfRangeException("Cannot rewind further than the buffer start");
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Cannot rewind further than the buffer start");

if (CurrentOffset < count)
throw new ArgumentOutOfRangeException("Cannot rewind further than the current field offset");
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(count), "Cannot rewind further than the current field offset");

_buffer.ReadPosition -= count;
}
Expand All @@ -374,32 +371,24 @@ async ValueTask DisposeUserActiveStream(bool async)
_userActiveStream = null;
}

internal bool GetCharsReadInfo(Encoding encoding, out int charsRead, out TextReader reader, out int charsOffset, out ArraySegment<char>? buffer)
{
if (!IsCharsRead)
throw new InvalidOperationException("No active chars read");
internal int CharsRead => _charsRead;
internal bool CharsReadActive => _charsReadOffset is not null;

if (_charsReadReader is null)
{
charsRead = 0;
reader = _charsReadReader = GetTextReader(encoding);
charsOffset = _charsReadOffset ??= 0;
buffer = _charsReadBuffer;
return true;
}
internal void GetCharsReadInfo(Encoding encoding, out int charsRead, out TextReader reader, out int charsOffset, out ArraySegment<char>? buffer)
{
if (!CharsReadActive)
ThrowHelper.ThrowInvalidOperationException("No active chars read");

charsRead = _charsRead;
reader = _charsReadReader;
charsOffset = _charsReadOffset!.Value;
reader = _charsReadReader ??= GetTextReader(encoding);
charsOffset = _charsReadOffset ?? 0;
buffer = _charsReadBuffer;

return false;
}

internal void ResetCharsRead(out int charsRead)
internal void RestartCharsRead()
{
if (!IsCharsRead)
throw new InvalidOperationException("No active chars read");
if (!CharsReadActive)
ThrowHelper.ThrowInvalidOperationException("No active chars read");

switch (_charsReadReader)
{
Expand All @@ -411,26 +400,32 @@ internal void ResetCharsRead(out int charsRead)
reader.DiscardBufferedData();
break;
}
_charsRead = charsRead = 0;
_charsRead = 0;
}

internal void AdvanceCharsRead(int charsRead)
{
_charsRead += charsRead;
_charsReadOffset = null;
_charsReadBuffer = null;
}
internal void AdvanceCharsRead(int charsRead) => _charsRead += charsRead;

internal void InitCharsRead(int dataOffset, ArraySegment<char>? buffer, out int? charsRead)
internal void StartCharsRead(int dataOffset, ArraySegment<char>? buffer)
{
if (!Resumable)
throw new InvalidOperationException("Wasn't initialized as resumed");
ThrowHelper.ThrowInvalidOperationException("Wasn't initialized as resumed");

charsRead = _charsReadReader is null ? null : _charsRead;
_charsReadOffset = dataOffset;
_charsReadBuffer = buffer;
}

internal void EndCharsRead()
{
if (!Resumable)
ThrowHelper.ThrowInvalidOperationException("Wasn't initialized as resumed");

if (!CharsReadActive)
ThrowHelper.ThrowInvalidOperationException("No active chars read");

_charsReadOffset = null;
_charsReadBuffer = null;
}

internal PgReader Init(int fieldLength, DataFormat format, bool resumable = false)
{
if (Initialized)
Expand Down Expand Up @@ -511,10 +506,10 @@ internal ValueTask EndReadAsync()
internal async ValueTask<NestedReadScope> BeginNestedRead(bool async, int size, Size bufferRequirement, CancellationToken cancellationToken = default)
{
if (size > CurrentRemaining)
throw new ArgumentOutOfRangeException(nameof(size), "Cannot begin a read for a larger size than the current remaining size.");
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(size), "Cannot begin a read for a larger size than the current remaining size.");

if (size < 0)
throw new ArgumentOutOfRangeException(nameof(size), "Cannot be negative");
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(size), "Cannot be negative");

var previousSize = CurrentSize;
var previousStartPos = _currentStartPos;
Expand Down Expand Up @@ -766,19 +761,14 @@ public bool ShouldBuffer(int byteCount)
bool ShouldBufferSlow()
{
if (byteCount > _buffer.Size)
ThrowArgumentOutOfRange();
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(byteCount),
"Buffer requirement is larger than the buffer size, this can never succeed by buffering data but requires a larger buffer size instead.");
if (byteCount > CurrentRemaining)
ThrowArgumentOutOfRangeOfValue();
ThrowHelper.ThrowArgumentOutOfRangeException(nameof(byteCount),
"Buffer requirement is larger than the remaining length of the value, make sure the value is always at least this size or use an upper bound requirement instead.");

return true;
}

static void ThrowArgumentOutOfRange()
=> throw new ArgumentOutOfRangeException(nameof(byteCount),
"Buffer requirement is larger than the buffer size, this can never succeed by buffering data but requires a larger buffer size instead.");
static void ThrowArgumentOutOfRangeOfValue()
=> throw new ArgumentOutOfRangeException(nameof(byteCount),
"Buffer requirement is larger than the remaining length of the value, make sure the value is always at least this size or use an upper bound requirement instead.");
}

public void Buffer(Size bufferRequirement)
Expand Down Expand Up @@ -828,7 +818,7 @@ internal NestedReadScope(bool async, PgReader reader, int previousSize, int prev
public void Dispose()
{
if (_async)
throw new InvalidOperationException("Cannot synchronously dispose async scopes, call DisposeAsync instead.");
ThrowHelper.ThrowInvalidOperationException("Cannot synchronously dispose async scopes, call DisposeAsync instead.");
DisposeAsync().GetAwaiter().GetResult();
}

Expand Down
25 changes: 12 additions & 13 deletions src/Npgsql/NpgsqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1486,29 +1486,28 @@ public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
throw new IndexOutOfRangeException($"length must be between 0 and {buffer.Length - bufferOffset}");

// Check whether we can do resumable reads.
// Check whether we have a GetChars implementation for this column type.
var field = GetInfo(ordinal, typeof(GetChars), out var converter, out var bufferRequirement, out var asObject);
if (converter is not IResumableRead { Supported: true })
throw new NotSupportedException("The GetChars method is not supported for this column type");

var columnLength = SeekToColumn(async: false, ordinal, field, resumableOp: true).GetAwaiter().GetResult();
if (columnLength == -1)
ThrowHelper.ThrowInvalidCastException_NoValue(CheckRowAndGetField(ordinal));

var reader = PgReader;
dataOffset = buffer is null ? 0 : dataOffset;
PgReader.InitCharsRead(checked((int)dataOffset),
buffer is not null ? new ArraySegment<char>(buffer, bufferOffset, length) : (ArraySegment<char>?)null,
out var previousDataOffset);

if (_isSequential && previousDataOffset > dataOffset)
if (_isSequential && reader.CharsRead > dataOffset)
ThrowHelper.ThrowInvalidOperationException("Attempt to read a position in the column which has already been read");

PgReader.StartRead(bufferRequirement);
reader.StartCharsRead(checked((int)dataOffset),
buffer is not null ? new ArraySegment<char>(buffer, bufferOffset, length) : (ArraySegment<char>?)null);

reader.StartRead(bufferRequirement);
var result = asObject
? (GetChars)converter.ReadAsObject(PgReader)
: ((PgConverter<GetChars>)converter).Read(PgReader);
PgReader.AdvanceCharsRead(result.Read);
PgReader.EndRead();
? (GetChars)converter.ReadAsObject(reader)
: ((PgConverter<GetChars>)converter).Read(reader);
reader.EndRead();

reader.EndCharsRead();
return result.Read;
}

Expand Down
24 changes: 24 additions & 0 deletions test/Npgsql.Tests/ReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,30 @@ public async Task GetChars()
reader.GetChars(6, 0, actual, 0, 2);
}

[Test]
public async Task GetChars_AdvanceConsumed()
{
const string value = "01234567";

using var conn = await OpenConnectionAsync();
using var cmd = new NpgsqlCommand($"SELECT '{value}'", conn);
using var reader = await cmd.ExecuteReaderAsync(Behavior);
reader.Read();

var buffer = new char[2];
// Don't start at the beginning of the column.
reader.GetChars(0, 2, buffer, 0, 2);
reader.GetChars(0, 4, buffer, 0, 2);
reader.GetChars(0, 6, buffer, 0, 2);

// Ask for data past the start and the previous point, exercising restart logic.
if (!IsSequential)
{
reader.GetChars(0, 4, buffer, 0, 2);
reader.GetChars(0, 6, buffer, 0, 2);
}
}

[Test]
public async Task GetTextReader([Values(true, false)] bool isAsync)
{
Expand Down

0 comments on commit 1b93828

Please sign in to comment.