Skip to content

Commit

Permalink
Fix sequential buffered seek (#5440)
Browse files Browse the repository at this point in the history
Fixes #5439
  • Loading branch information
NinoFloris committed Nov 28, 2023
1 parent 5ba1592 commit 6cfd399
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 15 deletions.
45 changes: 30 additions & 15 deletions src/Npgsql/NpgsqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1921,10 +1921,13 @@ int SeekToColumnNonSequential(int ordinal, DataFormat dataFormat, bool resumable
Debug.Assert(ordinal != currentColumn);
if (ordinal > currentColumn)
{
for (; currentColumn < ordinal - 1; currentColumn++)
// Written as a while to be able to increment _column directly after reading into it.
while (_column < ordinal - 1)
{
columnLength = buffer.ReadInt32();
if (columnLength is not -1)
_column++;
Debug.Assert(columnLength >= -1);
if (columnLength > 0)
buffer.Skip(columnLength);
}
columnLength = buffer.ReadInt32();
Expand Down Expand Up @@ -1962,7 +1965,7 @@ int SeekBackwards()
for (var lastColumnRead = _columns.Count; ordinal >= lastColumnRead; lastColumnRead++)
{
(Buffer.ReadPosition, var lastLen) = _columns[lastColumnRead - 1];
if (lastLen is not -1)
if (lastLen > 0)
buffer.Skip(lastLen);
var len = Buffer.ReadInt32();
_columns.Add((Buffer.ReadPosition, len));
Expand Down Expand Up @@ -2026,17 +2029,21 @@ async ValueTask<int> Core(bool async, bool commit, int ordinal, DataFormat dataF
}

// Seek to the requested column
int columnLength;
var buffer = Buffer;
for (; _column < ordinal - 1; _column++)
// Written as a while to be able to increment _column directly after reading into it.
while (_column < ordinal - 1)
{
await buffer.Ensure(4, async).ConfigureAwait(false);
var len = buffer.ReadInt32();
if (len != -1)
await buffer.Skip(len, async).ConfigureAwait(false);
columnLength = buffer.ReadInt32();
_column++;
Debug.Assert(columnLength >= -1);
if (columnLength > 0)
await buffer.Skip(columnLength, async).ConfigureAwait(false);
}

await buffer.Ensure(4, async).ConfigureAwait(false);
var columnLength = buffer.ReadInt32();
columnLength = buffer.ReadInt32();
_column = ordinal;

PgReader.Init(columnLength, dataFormat, resumableOp);
Expand All @@ -2054,11 +2061,14 @@ bool TrySeekBuffered(int ordinal, out int columnLength)
// Skip over unwanted fields
columnLength = -1;
var buffer = Buffer;
for (; _column < ordinal - 1; _column++)
// Written as a while to be able to increment _column directly after reading into it.
while (_column < ordinal - 1)
{
if (buffer.ReadBytesLeft < 4)
return false;
columnLength = buffer.ReadInt32();
_column++;
Debug.Assert(columnLength >= -1);
if (columnLength > 0)
{
if (buffer.ReadBytesLeft < columnLength)
Expand All @@ -2085,7 +2095,7 @@ bool TrySeekBuffered(int ordinal, out int columnLength)

Task ConsumeRow(bool async)
{
Debug.Assert(State == ReaderState.InResult || State == ReaderState.BeforeResult);
Debug.Assert(State is ReaderState.InResult or ReaderState.BeforeResult);

if (!_canConsumeRowNonSequentially)
return ConsumeRowSequential(async);
Expand All @@ -2100,13 +2110,18 @@ async Task ConsumeRowSequential(bool async)
await PgReader.CommitAsync(resuming: false).ConfigureAwait(false);
else
PgReader.Commit(resuming: false);

// Skip over the remaining columns in the row
for (; _column < ColumnCount - 1; _column++)
var buffer = Buffer;
// Written as a while to be able to increment _column directly after reading into it.
while (_column < ColumnCount - 1)
{
await Buffer.Ensure(4, async).ConfigureAwait(false);
var len = Buffer.ReadInt32();
if (len != -1)
await Buffer.Skip(len, async).ConfigureAwait(false);
await buffer.Ensure(4, async).ConfigureAwait(false);
var columnLength = buffer.ReadInt32();
_column++;
Debug.Assert(columnLength >= -1);
if (columnLength > 0)
await buffer.Skip(columnLength, async).ConfigureAwait(false);
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions test/Npgsql.Tests/ReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -894,6 +894,22 @@ public async Task Interval_as_TimeSpan()
var ts = dr.GetTimeSpan(0);
}

[Test, IssueLink("https://github.com/npgsql/npgsql/issues/5439")]
public async Task SequentialBufferedSeek()
{
await using var conn = await OpenConnectionAsync();
using var cmd = conn.CreateCommand();
cmd.CommandText = """select v.i, jsonb_build_object(), current_timestamp + make_interval(0, 0, 0, 0, 0, 0, v.i), null::jsonb, '{"value": 42}'::jsonb from generate_series(1, 1000) as v(i)""";
var rdr = await cmd.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
while (await rdr.ReadAsync()) {
var v1 = rdr[0];
var v2 = rdr[1];
//_ = rdr[2]; // uncomment line for successful execution
var v3 = rdr[3];
var v4 = rdr[4];
}
}

[Test]
public async Task Close_connection_in_middle_of_row()
{
Expand Down

0 comments on commit 6cfd399

Please sign in to comment.