-
-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Race? #12
Comments
Basically, because FlushAsync is named terribly (also: I didn't name it),
and doesn't do what you think it does. In particular, awaiting the flush
doesn't mean - and cannot mean - that the data has been *consumed* by the
pipe consumer: it simply asserts that it will awaken the consumer if
necessary, and apply and backoff conditions.
The flush behaviour is a bit of a sore point, all told!
What you *actually* want to await here is the consumed completion. If that
isn't currently exposed, we can probably find a way to expose it.
…On Mon, 10 Sep 2018, 23:54 Henric Jungheim, ***@***.***> wrote:
We've been seeing intermittent problems with data getting lost and have
finally narrowed it down to a simple test case that sometimes fails. It
almost always fails on a Threadripper box and perhaps one of three attempts
fail on a six year old laptop. Is this code reasonable and, if so, why does
it sometimes fail with no data in the MemoryStream?
Trying to wait for the reader to complete doesn't help since it doesn't
actually complete.
[TestMethod]
public async Task SingleByte()
{
var expected = new byte[] { 7 };
using (var ms = new MemoryStream())
{
var pipeWriter = StreamConnection.GetWriter(ms);
var memory = pipeWriter.GetMemory(expected.Length);
expected.AsMemory().CopyTo(memory);
pipeWriter.Advance(expected.Length);
await pipeWriter.FlushAsync().ConfigureAwait(false);
pipeWriter.Complete();
ms.Position = 0;
var actual = ms.ToArray();
Assert.IsNotNull(actual);
CollectionAssert.AreEqual(expected, actual);
}
}
[TestMethod]
public async Task SingleByteWithWait()
{
var expected = new byte[] { 7 };
using (var ms = new MemoryStream())
{
var pipeWriter = StreamConnection.GetWriter(ms);
var memory = pipeWriter.GetMemory(expected.Length);
expected.AsMemory().CopyTo(memory);
pipeWriter.Advance(expected.Length);
await pipeWriter.FlushAsync().ConfigureAwait(false);
pipeWriter.Complete();
var doneTcs = new TaskCompletionSource<bool>();
pipeWriter.OnReaderCompleted((ex, obj) =>
{
var tcs = (TaskCompletionSource<bool>)obj;
if (null != ex) tcs.TrySetException(ex);
else tcs.TrySetResult(true);
}, doneTcs);
await doneTcs.Task.ConfigureAwait(false);
ms.Position = 0;
var actual = ms.ToArray();
Assert.IsNotNull(actual);
CollectionAssert.AreEqual(expected, actual);
}
}
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#12>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AABDsIztvcss-UVvIG31wy4cd0JyfbE_ks5uZu2ggaJpZM4WiT08>
.
|
What we wound up doing was to have an extension method on Stream that creates a pipe, does a Task.Run() on a loop that copies from the pipe to the stream until it sees an empty buffer and with IsCompleted true. The pipe's writer is returned and a the created task is provided as an "out" parameter. A value tuple return with both the PipeWriter and a Task might be an alternative, but we couldn't see a clean way to avoid having two separate things to return. Perhaps if the reader was guaranteed to complete after the last write propagated to the stream and something like the OnReaderCompleted() callback gunk were wrapped in an extension method? |
It is certainly something I can put some thought to when I get a sec.
…On Tue, 11 Sep 2018, 00:52 Henric Jungheim, ***@***.***> wrote:
What we wound up doing was to have an extension method on Stream that
creates a pipe, does a Task.Run() on a loop that copies from the pipe to
the stream until it sees an empty buffer and with IsCompleted true. The
pipe's writer is returned and a the created task is provided as an "out"
parameter. A value tuple return with both the PipeWriter and a Task might
be an alternative, but we couldn't see a clean way to avoid having two
separate things to return.
Perhaps if the reader was guaranteed to complete after the last write
propagated to the stream and something like the OnReaderCompleted()
callback gunk were wrapped in an extension method?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#12 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AABDsH-R2rauSF7-IFjZIxmJJVIRpZRDks5uZvtTgaJpZM4WiT08>
.
|
CopyFromWritePipeToStream could call reader.complete() on the way to oblivion. That would follow the whole pipelines thing, and PipeWriter.OnReaderCompleted will do the right thing, even if that is a bit Pipelines.Sockets.Unofficial/src/Pipelines.Sockets.Unofficial/StreamConnection.AsyncStreamPipe.cs Line 129 in 6f9e54e
(i'm new to this GitHub thing) so excuse any lack of Etiquette) |
It will do in the next build |
We've been seeing intermittent problems with data getting lost and have finally narrowed it down to a simple test case that sometimes fails. It almost always fails on a Threadripper box and perhaps one of three attempts fail on a six year old laptop. Is this code reasonable and, if so, why does it sometimes fail with no data in the MemoryStream?
Trying to wait for the reader to complete doesn't help since it doesn't actually complete.
The text was updated successfully, but these errors were encountered: