Skip to content

Commit

Permalink
Revise MessagePackSerializer.Deserialize<T>(Stream) to support multip…
Browse files Browse the repository at this point in the history
…le reads
  • Loading branch information
AArnott committed Dec 2, 2019
1 parent 1ee72d0 commit 5f1b3e6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,46 +297,79 @@ public static T Deserialize<T>(ReadOnlyMemory<byte> buffer, MessagePackSerialize
}

/// <summary>
/// Deserializes the entire content of a <see cref="Stream"/>.
/// Deserializes a MessagePack structure from a <see cref="Stream"/> that is entirely made up of messagepack data.
/// </summary>
/// <typeparam name="T">The type of value to deserialize.</typeparam>
/// <param name="stream">
/// The stream to deserialize from.
/// The entire stream will be read, and the first msgpack token deserialized will be returned.
/// If <see cref="Stream.CanSeek"/> is true on the stream, its position will be set to just after the last deserialized byte.
/// The stream to deserialize from. More bytes may be read than those that define a single messagepack structure.
/// </param>
/// <param name="state">
/// State to carry from one call of this method to the next, allowing sequential reading of messagepack structures from the stream.
/// The first time this method is called, this value should be provided by a variable initialized with the default value of the structure.
/// Each subsequent call to this method with the same <paramref name="stream"/> argument should use the same variable for this state argument as well.
/// </param>
/// <param name="options">The options. Use <c>null</c> to use default options.</param>
/// <param name="cancellationToken">A cancellation token.</param>
/// <returns>The deserialized value.</returns>
/// <exception cref="MessagePackSerializationException">Thrown when any error occurs during deserialization.</exception>
public static T Deserialize<T>(Stream stream, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
public static T Deserialize<T>(Stream stream, ref MessagePackStreamReadingState state, MessagePackSerializerOptions options = null, CancellationToken cancellationToken = default)
{
// TODO: wrap all (non-local argument) exceptions
if (TryDeserializeFromMemoryStream(stream, options, cancellationToken, out T result))
{
return result;
}

using (var sequenceRental = SequencePool.Shared.Rent())
if (state.IsDefault)
{
var sequence = sequenceRental.Value;
state = new MessagePackStreamReadingState(stream);
}
else if (state.Stream != stream)
{
throw new ArgumentException("State mismatched with stream.", nameof(state));
}

if (state.EndOfLastMessage.HasValue)
{
// A previously returned message can now be safely recycled since the caller wants more.
state.ReadData.AdvanceTo(state.EndOfLastMessage.Value);
state.EndOfLastMessage = null;
}

while (true)
{
// Check if we have a complete message and return it if we have it.
// We do this before reading anything since a previous read may have brought in several messages.
cancellationToken.ThrowIfCancellationRequested();
if (state.ReadData.Length > 0)
{
var reader = new MessagePackReader(state.ReadData);
if (reader.TrySkip())
{
state.EndOfLastMessage = reader.Position;
ReadOnlySequence<byte> completeMessage = reader.Sequence.Slice(0, reader.Position);
return Deserialize<T>(completeMessage, options, cancellationToken);
}
}

cancellationToken.ThrowIfCancellationRequested();
Span<byte> buffer = state.ReadData.GetSpan(sizeHint: 0);
int bytesRead = 0;
try
{
int bytesRead;
do
bytesRead = state.Stream.Read(buffer);
if (bytesRead == 0)
{
cancellationToken.ThrowIfCancellationRequested();
Span<byte> span = sequence.GetSpan(stream.CanSeek ? (int)(stream.Length - stream.Position) : 0);
bytesRead = stream.Read(span);
sequence.Advance(bytesRead);
// We've reached the end of the stream.
// We already checked for a complete message with what we already had, so evidently it's not a complete message.
throw new EndOfStreamException();
}
while (bytesRead > 0);
}
catch (Exception ex)
finally
{
throw new MessagePackSerializationException("Error occurred while reading from the stream.", ex);
// Keep our state clean in case the caller wants to call us again.
state.ReadData.Advance(bytesRead);
}

return DeserializeFromSequenceAndRewindStreamIfPossible<T>(stream, options, sequence, cancellationToken);
}
}

Expand Down Expand Up @@ -399,6 +432,10 @@ private static bool TryDeserializeFromMemoryStream<T>(Stream stream, MessagePack
ms.Seek(bytesRead, SeekOrigin.Current);
return true;
}
else if (stream is null)
{
throw new ArgumentNullException(nameof(stream));
}

result = default;
return false;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) All contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Nerdbank.Streams;

namespace MessagePack
{
public struct MessagePackStreamReadingState
{
internal readonly Stream Stream;
internal SequencePool.Rental SequenceRental;
internal SequencePosition? EndOfLastMessage;

/// <summary>
/// Initializes a new instance of the <see cref="MessagePackStreamReadingState"/> struct.
/// </summary>
/// <param name="stream">The stream to read from.</param>
internal MessagePackStreamReadingState(Stream stream)
{
this.Stream = stream ?? throw new ArgumentNullException(nameof(stream));
this.SequenceRental = SequencePool.Shared.Rent();
this.EndOfLastMessage = null;
}

/// <summary>
/// Gets the sequence that we read data from the <see cref="Stream"/> into.
/// </summary>
internal Sequence<byte> ReadData => this.SequenceRental.Value;

internal bool IsDefault => this.Stream == null;

/// <summary>
/// Gets any bytes that have been read since the last complete message returned from <see cref="ReadAsync(CancellationToken)"/>.
/// </summary>
public ReadOnlySequence<byte> RemainingBytes => this.EndOfLastMessage.HasValue ? this.ReadData.AsReadOnlySequence.Slice(this.EndOfLastMessage.Value) : this.ReadData.AsReadOnlySequence;
}
}

0 comments on commit 5f1b3e6

Please sign in to comment.