Skip to content

Commit

Permalink
Refactoring based on feedback from Jeremy Kuhne
Browse files Browse the repository at this point in the history
  • Loading branch information
stevejgordon committed Sep 5, 2019
1 parent e02189c commit 5683653
Showing 1 changed file with 41 additions and 25 deletions.
66 changes: 41 additions & 25 deletions SequenceReaderSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,38 @@ namespace SequenceReaderSample
{
internal class Program
{
private const int MaxStackLength = 128;
private const int ItemCount = 10;
private const byte Comma = (byte)',';

private static async Task Main()
{
var stream = CreateStreamOfItems();

var pipeReader = PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: 64)); // forcing a small buffer
var pipeReader = PipeReader.Create(stream, new StreamPipeReaderOptions(bufferSize: 32)); // forcing a small buffer for this sample

while (true)
{
var result = await pipeReader.ReadAsync();
var result = await pipeReader.ReadAsync(); // read from the pipe

var buffer = result.Buffer;

var position = ReadItems(buffer, result.IsCompleted); // read complete items from the current buffer

if (result.IsCompleted && buffer.Length > 0)
{
ReadLastItem(buffer);
break;
}
if (result.IsCompleted)
break; // exit if we've read everything from the pipe

var position = ReadItem(buffer);
pipeReader.AdvanceTo(position, buffer.End);
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}

pipeReader.Complete();
pipeReader.Complete(); // mark the PipeReader as complete

Console.ReadKey();
}

/// <summary>
/// Setup a stream. For this sample we can imagine came from a HttpResponseMessage.
/// </summary>
private static Stream CreateStreamOfItems()
{
var stream = new MemoryStream();
Expand All @@ -59,17 +61,23 @@ private static Stream CreateStreamOfItems()
return stream;
}

private static SequencePosition ReadItem(in ReadOnlySequence<byte> sequence)
private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
{
var reader = new SequenceReader<byte>(sequence);

while (!reader.End)
while (!reader.End) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, Comma, true))
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, Comma, advancePastDelimiter: true)) // we have an item to handle
{
var stringLine = Encoding.UTF8.GetString(itemBytes);
Console.WriteLine(stringLine);
}
else if (isCompleted) // read last item which has no final delimiter
{
var stringLine = ReadLastItem(sequence.Slice(reader.Position));
Console.WriteLine(stringLine);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
Expand All @@ -79,26 +87,34 @@ private static SequencePosition ReadItem(in ReadOnlySequence<byte> sequence)
return reader.Position;
}

private static void ReadLastItem(in ReadOnlySequence<byte> sequence)
private static string ReadLastItem(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;

var reader = new SequenceReader<byte>(sequence);

var bytes = ArrayPool<byte>.Shared.Rent(length);
string stringLine;

var byteSpan = bytes.AsSpan(0, length);

try
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
reader.TryCopyTo(byteSpan);
var stringLine = Encoding.UTF8.GetString(byteSpan);
Console.WriteLine(stringLine);
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
stringLine = Encoding.UTF8.GetString(byteBuffer);
}
finally
else // otherwise we'll rent an array to use as the buffer
{
ArrayPool<byte>.Shared.Return(bytes);
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

try
{
sequence.CopyTo(byteBuffer);
stringLine = Encoding.UTF8.GetString(byteBuffer.AsSpan().Slice(0, length));
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}

return stringLine;
}
}
}

0 comments on commit 5683653

Please sign in to comment.