Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Proc.Tests.Binary/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static int Main(string[] args)

if (testCase == nameof(ReadKeyFirst).ToLowerInvariant()) return ReadKeyFirst();
if (testCase == nameof(ReadKeyAfter).ToLowerInvariant()) return ReadKeyAfter();
if (testCase == nameof(SlowOutput).ToLowerInvariant()) return SlowOutput();
if (testCase == nameof(ReadLineFirst).ToLowerInvariant()) return ReadLineFirst();
if (testCase == nameof(ReadLineAfter).ToLowerInvariant()) return ReadLineAfter();
if (testCase == nameof(MoreText).ToLowerInvariant()) return MoreText();
Expand Down Expand Up @@ -92,6 +93,15 @@ private static int ReadKeyAfter()
Console.Write(nameof(ReadKeyAfter));
return 21;
}
private static int SlowOutput()
{
for (var i = 1; i <= 10; i++)
{
Console.WriteLine("x:" + i);
Thread.Sleep(500);
}
return 121;
}
private static int ReadLineFirst()
{
Console.ReadLine();
Expand Down
44 changes: 44 additions & 0 deletions src/Proc.Tests/AsyncReadsStartStopTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
using ProcNet.Std;
using Xunit;

namespace ProcNet.Tests
{
public class AsyncReadsStartStopTests : TestsBase
{
[Fact]
public void SlowOutput()
{
var args = TestCaseArguments(nameof(SlowOutput));
var process = new ObservableProcess(args);
var consoleOut = new List<LineOut>();
Exception seenException = null;
process.SubscribeLines(c =>
{
consoleOut.Add(c);
if (!c.Line.EndsWith("3")) return;

process.CancelAsyncReads();
Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(2));
process.StartAsyncReads();
});
}, e=> seenException = e);

process.WaitForCompletion(TimeSpan.FromSeconds(20));

process.ExitCode.Should().HaveValue().And.Be(121);
seenException.Should().BeNull();
consoleOut.Should().NotBeEmpty()
//we stopped reads after 3 or 2 seconds
.And.NotContain(l => l.Line.EndsWith("4"))
//each line is delayed 500ms so after 2 seconds
//and subscribing again we should see 9
.And.Contain(l => l.Line.EndsWith("9"));
}
}
}
5 changes: 1 addition & 4 deletions src/Proc.Tests/ReadInOrderTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
using System.Linq;
using FluentAssertions;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Serialization;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Host;
using FluentAssertions;
using Xunit;

namespace ProcNet.Tests
Expand Down
2 changes: 2 additions & 0 deletions src/Proc.sln
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ Global
{733EB608-B8B4-47FE-AB63-A4C7856C4209}.Debug|Any CPU.Build.0 = Debug|Any CPU
{733EB608-B8B4-47FE-AB63-A4C7856C4209}.Release|Any CPU.ActiveCfg = Release|Any CPU
{733EB608-B8B4-47FE-AB63-A4C7856C4209}.Release|Any CPU.Build.0 = Release|Any CPU
{D6997ADC-E933-418E-831C-DE1A78897493}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D6997ADC-E933-418E-831C-DE1A78897493}.Release|Any CPU.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
83 changes: 71 additions & 12 deletions src/Proc/BufferedObservableProcess.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.CodeDom;
using System.Diagnostics;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using ProcNet.Extensions;
using ProcNet.Std;
Expand All @@ -26,7 +29,6 @@ namespace ProcNet
/// </summary>
public class BufferedObservableProcess : ObservableProcessBase<CharactersOut>
{

/// <summary>
/// How long we should wait for the output stream readers to finish when the process exits before we call
/// <see cref="ObservableProcessBase{TConsoleOut}.OnCompleted"/> is called. By default waits for 5 seconds.
Expand All @@ -42,19 +44,24 @@ public BufferedObservableProcess(string binary, params string[] arguments) : bas

public BufferedObservableProcess(StartArguments startArguments) : base(startArguments) { }

private CancellationTokenSource _ctx = new CancellationTokenSource();
private Task _stdOutSubscription;
private Task _stdErrSubscription;
private IObserver<CharactersOut> _observer;

protected override IObservable<CharactersOut> CreateConsoleOutObservable()
{
if (this.NoWrapInThread)
return Observable.Create<CharactersOut>(observer =>
{
KickOff(observer);
return Disposable.Empty;
var disposable = KickOff(observer);
return disposable;
});

return Observable.Create<CharactersOut>(async observer =>
{
await Task.Run(() => KickOff(observer));
return Disposable.Empty;
var disposable = await Task.Run(() => KickOff(observer));
return disposable;
});
}

Expand All @@ -64,28 +71,80 @@ protected override IObservable<CharactersOut> CreateConsoleOutObservable()
/// <returns></returns>
protected virtual bool ContinueReadingFromProcessReaders() => true;

private void KickOff(IObserver<CharactersOut> observer)
private IDisposable KickOff(IObserver<CharactersOut> observer)
{
if (!this.StartProcess(observer)) return;
if (!this.StartProcess(observer)) return Disposable.Empty;

this.Started = true;

if (this.Process.HasExited)
{
OnExit(observer);
return;
return Disposable.Empty;
}

var stdOutSubscription = this.Process.ObserveStandardOutBuffered(observer, BufferSize, ContinueReadingFromProcessReaders);
var stdErrSubscription = this.Process.ObserveErrorOutBuffered(observer, BufferSize, ContinueReadingFromProcessReaders);
this._observer = observer;
this.StartAsyncReads();

this.Process.Exited += (o, s) =>
{
WaitForEndOfStreams(observer, stdOutSubscription, stdErrSubscription);

WaitForEndOfStreams(observer, _stdOutSubscription, _stdErrSubscription);
OnExit(observer);
};

var disposable = Disposable.Create(() =>
{
if (this.Started && !this.StopRequested)
_ctx.Cancel();
_ctx.Dispose();
});

return disposable;
}

private readonly object _lock = new object();
private bool _reading = false;

/// <summary>
/// Allows you to stop reading the console output after subscribing on the observable while leaving the underlying
/// process running.
/// </summary>
public void CancelAsyncReads()
{
if (!this._reading) return;
lock (_lock)
{
if (!this._reading) return;
try
{
this._ctx.Cancel();
}
catch (ObserveOutputExtensions.ObservableProcessAsyncReadCancelledException) { }
catch (AggregateException ae) when (ae.InnerException is ObserveOutputExtensions.ObservableProcessAsyncReadCancelledException)
{
}
finally
{
this._ctx = new CancellationTokenSource();
this._reading = false;
}
}
}

/// <summary>
/// Start reading the console output again after calling <see cref="CancelAsyncReads"/>
/// </summary>
public void StartAsyncReads()
{
if (this._reading) return;
lock (_lock)
{
if (this._reading) return;

this._stdOutSubscription = this.Process.ObserveStandardOutBuffered(_observer, BufferSize, ContinueReadingFromProcessReaders, _ctx.Token);
this._stdErrSubscription = this.Process.ObserveErrorOutBuffered(_observer, BufferSize, ContinueReadingFromProcessReaders, _ctx.Token);
this._reading = true;
}
}

private void WaitForEndOfStreams(IObserver<CharactersOut> observer, Task stdOutSubscription, Task stdErrSubscription)
Expand Down
24 changes: 17 additions & 7 deletions src/Proc/Extensions/ObserveOutputExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,31 @@ public static IObservable<LineOut> ObserveStandardOutLineByLine(this Process pro
});
}

public static Task ObserveErrorOutBuffered(this Process process, IObserver<CharactersOut> observer, int bufferSize, Func<bool> keepBuffering) =>
RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.ErrorOut, process.StandardError);
public static Task ObserveErrorOutBuffered(this Process process, IObserver<CharactersOut> observer, int bufferSize, Func<bool> keepBuffering, CancellationToken token) =>
RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.ErrorOut, process.StandardError, token);

public static Task ObserveStandardOutBuffered(this Process process, IObserver<CharactersOut> observer, int bufferSize, Func<bool> keepBuffering) =>
RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.Out, process.StandardOutput);
public static Task ObserveStandardOutBuffered(this Process process, IObserver<CharactersOut> observer, int bufferSize, Func<bool> keepBuffering, CancellationToken token) =>
RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.Out, process.StandardOutput, token);

private static Task RunBufferedRead(Process process, IObserver<CharactersOut> observer, int bufferSize, Func<bool> keepBuffering, Func<char[], CharactersOut> m, StreamReader reader) =>
Task.Factory.StartNew(() => BufferedRead(process, reader, observer, bufferSize, m, keepBuffering), LongRunning);
private static Task RunBufferedRead(Process process, IObserver<CharactersOut> observer, int bufferSize, Func<bool> keepBuffering, Func<char[], CharactersOut> m,
StreamReader reader, CancellationToken token) =>
Task.Factory.StartNew(() => BufferedRead(process, reader, observer, bufferSize, m, keepBuffering, token), token, LongRunning, TaskScheduler.Current);

private static async Task BufferedRead(Process p, StreamReader r, IObserver<CharactersOut> o, int b, Func<char[], CharactersOut> m, Func<bool> keepBuffering)

public class ObservableProcessAsyncReadCancelledException : Exception
{

}

private static async Task BufferedRead(Process p, StreamReader r, IObserver<CharactersOut> o, int b, Func<char[], CharactersOut> m, Func<bool> keepBuffering,
CancellationToken token)
{
using(token.Register(()=> throw new ObservableProcessAsyncReadCancelledException(), useSynchronizationContext: false))
while (keepBuffering() && !r.EndOfStream)
{
var buffer = new char[b];
var read = await r.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
token.ThrowIfCancellationRequested();
if (read > 0)
o.OnNext(m(buffer));
else
Expand Down
29 changes: 25 additions & 4 deletions src/ScratchPad/Class1.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using ProcNet;
using ProcNet.Std;

Expand All @@ -8,11 +9,31 @@ public static class Program
{
public static int Main()
{
var result = Proc.Start(new StartArguments("ipconfig", "/all")
var tessCase = TestBinary.TestCaseArguments("SlowOutput");
var process = new ObservableProcess(tessCase);
process.SubscribeLines(c =>
{
WorkingDirectory = @"c:\Projects\proc\src\Proc.Tests.Binary",
WaitForStreamReadersTimeout = TimeSpan.FromMinutes(4)
}, TimeSpan.FromMinutes(1), new ConsoleOutColorWriter());
if (c.Line.EndsWith("3"))
{
process.CancelAsyncReads();
Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(4));
process.StartAsyncReads();
});
}
Console.WriteLine(c.Line);
}, e=> Console.Error.WriteLine(e));


process.WaitForCompletion(TimeSpan.FromSeconds(20));
Console.WriteLine("exitCode:" + process.ExitCode);

// var result = Proc.Start(new StartArguments("ipconfig", "/all")
// {
// WorkingDirectory = @"c:\Projects\proc\src\Proc.Tests.Binary",
// WaitForStreamReadersTimeout = TimeSpan.FromMinutes(4)
// }, TimeSpan.FromMinutes(1), new ConsoleOutColorWriter());

return 0;
}
Expand Down