diff --git a/src/Proc.Tests.Binary/Program.cs b/src/Proc.Tests.Binary/Program.cs index dc27e4c..25c2995 100644 --- a/src/Proc.Tests.Binary/Program.cs +++ b/src/Proc.Tests.Binary/Program.cs @@ -26,6 +26,7 @@ public static int Main(string[] args) if (testCase == nameof(ReadKeyFirst).ToLowerInvariant()) return ReadKeyFirst(); if (testCase == nameof(ReadKeyAfter).ToLowerInvariant()) return ReadKeyAfter(); + if (testCase == nameof(SlowOutput).ToLowerInvariant()) return SlowOutput(); if (testCase == nameof(ReadLineFirst).ToLowerInvariant()) return ReadLineFirst(); if (testCase == nameof(ReadLineAfter).ToLowerInvariant()) return ReadLineAfter(); if (testCase == nameof(MoreText).ToLowerInvariant()) return MoreText(); @@ -92,6 +93,15 @@ private static int ReadKeyAfter() Console.Write(nameof(ReadKeyAfter)); return 21; } + private static int SlowOutput() + { + for (var i = 1; i <= 10; i++) + { + Console.WriteLine("x:" + i); + Thread.Sleep(500); + } + return 121; + } private static int ReadLineFirst() { Console.ReadLine(); diff --git a/src/Proc.Tests/AsyncReadsStartStopTests.cs b/src/Proc.Tests/AsyncReadsStartStopTests.cs new file mode 100644 index 0000000..9ffdcc4 --- /dev/null +++ b/src/Proc.Tests/AsyncReadsStartStopTests.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using FluentAssertions; +using ProcNet.Std; +using Xunit; + +namespace ProcNet.Tests +{ + public class AsyncReadsStartStopTests : TestsBase + { + [Fact] + public void SlowOutput() + { + var args = TestCaseArguments(nameof(SlowOutput)); + var process = new ObservableProcess(args); + var consoleOut = new List(); + Exception seenException = null; + process.SubscribeLines(c => + { + consoleOut.Add(c); + if (!c.Line.EndsWith("3")) return; + + process.CancelAsyncReads(); + Task.Run(async () => + { + await Task.Delay(TimeSpan.FromSeconds(2)); + process.StartAsyncReads(); + }); + }, e=> seenException = e); + + process.WaitForCompletion(TimeSpan.FromSeconds(20)); + + process.ExitCode.Should().HaveValue().And.Be(121); + seenException.Should().BeNull(); + consoleOut.Should().NotBeEmpty() + //we stopped reads after 3 or 2 seconds + .And.NotContain(l => l.Line.EndsWith("4")) + //each line is delayed 500ms so after 2 seconds + //and subscribing again we should see 9 + .And.Contain(l => l.Line.EndsWith("9")); + } + } +} diff --git a/src/Proc.Tests/ReadInOrderTests.cs b/src/Proc.Tests/ReadInOrderTests.cs index 7088091..97512b7 100644 --- a/src/Proc.Tests/ReadInOrderTests.cs +++ b/src/Proc.Tests/ReadInOrderTests.cs @@ -1,7 +1,4 @@ -using System.Linq; -using FluentAssertions; -using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Serialization; -using Microsoft.VisualStudio.TestPlatform.ObjectModel.Host; +using FluentAssertions; using Xunit; namespace ProcNet.Tests diff --git a/src/Proc.sln b/src/Proc.sln index 4ae3d0f..bdb8346 100644 --- a/src/Proc.sln +++ b/src/Proc.sln @@ -55,6 +55,8 @@ Global {733EB608-B8B4-47FE-AB63-A4C7856C4209}.Debug|Any CPU.Build.0 = Debug|Any CPU {733EB608-B8B4-47FE-AB63-A4C7856C4209}.Release|Any CPU.ActiveCfg = Release|Any CPU {733EB608-B8B4-47FE-AB63-A4C7856C4209}.Release|Any CPU.Build.0 = Release|Any CPU + {D6997ADC-E933-418E-831C-DE1A78897493}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D6997ADC-E933-418E-831C-DE1A78897493}.Release|Any CPU.ActiveCfg = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/Proc/BufferedObservableProcess.cs b/src/Proc/BufferedObservableProcess.cs index 3cc5d8d..80b3d85 100644 --- a/src/Proc/BufferedObservableProcess.cs +++ b/src/Proc/BufferedObservableProcess.cs @@ -1,8 +1,11 @@ using System; +using System.CodeDom; using System.Diagnostics; using System.IO; using System.Reactive.Disposables; using System.Reactive.Linq; +using System.Security.Cryptography; +using System.Threading; using System.Threading.Tasks; using ProcNet.Extensions; using ProcNet.Std; @@ -26,7 +29,6 @@ namespace ProcNet /// public class BufferedObservableProcess : ObservableProcessBase { - /// /// How long we should wait for the output stream readers to finish when the process exits before we call /// is called. By default waits for 5 seconds. @@ -42,19 +44,24 @@ public BufferedObservableProcess(string binary, params string[] arguments) : bas public BufferedObservableProcess(StartArguments startArguments) : base(startArguments) { } + private CancellationTokenSource _ctx = new CancellationTokenSource(); + private Task _stdOutSubscription; + private Task _stdErrSubscription; + private IObserver _observer; + protected override IObservable CreateConsoleOutObservable() { if (this.NoWrapInThread) return Observable.Create(observer => { - KickOff(observer); - return Disposable.Empty; + var disposable = KickOff(observer); + return disposable; }); return Observable.Create(async observer => { - await Task.Run(() => KickOff(observer)); - return Disposable.Empty; + var disposable = await Task.Run(() => KickOff(observer)); + return disposable; }); } @@ -64,28 +71,80 @@ protected override IObservable CreateConsoleOutObservable() /// protected virtual bool ContinueReadingFromProcessReaders() => true; - private void KickOff(IObserver observer) + private IDisposable KickOff(IObserver observer) { - if (!this.StartProcess(observer)) return; + if (!this.StartProcess(observer)) return Disposable.Empty; this.Started = true; if (this.Process.HasExited) { OnExit(observer); - return; + return Disposable.Empty; } - var stdOutSubscription = this.Process.ObserveStandardOutBuffered(observer, BufferSize, ContinueReadingFromProcessReaders); - var stdErrSubscription = this.Process.ObserveErrorOutBuffered(observer, BufferSize, ContinueReadingFromProcessReaders); + this._observer = observer; + this.StartAsyncReads(); this.Process.Exited += (o, s) => { - WaitForEndOfStreams(observer, stdOutSubscription, stdErrSubscription); - + WaitForEndOfStreams(observer, _stdOutSubscription, _stdErrSubscription); OnExit(observer); }; + var disposable = Disposable.Create(() => + { + if (this.Started && !this.StopRequested) + _ctx.Cancel(); + _ctx.Dispose(); + }); + + return disposable; + } + + private readonly object _lock = new object(); + private bool _reading = false; + + /// + /// Allows you to stop reading the console output after subscribing on the observable while leaving the underlying + /// process running. + /// + public void CancelAsyncReads() + { + if (!this._reading) return; + lock (_lock) + { + if (!this._reading) return; + try + { + this._ctx.Cancel(); + } + catch (ObserveOutputExtensions.ObservableProcessAsyncReadCancelledException) { } + catch (AggregateException ae) when (ae.InnerException is ObserveOutputExtensions.ObservableProcessAsyncReadCancelledException) + { + } + finally + { + this._ctx = new CancellationTokenSource(); + this._reading = false; + } + } + } + + /// + /// Start reading the console output again after calling + /// + public void StartAsyncReads() + { + if (this._reading) return; + lock (_lock) + { + if (this._reading) return; + + this._stdOutSubscription = this.Process.ObserveStandardOutBuffered(_observer, BufferSize, ContinueReadingFromProcessReaders, _ctx.Token); + this._stdErrSubscription = this.Process.ObserveErrorOutBuffered(_observer, BufferSize, ContinueReadingFromProcessReaders, _ctx.Token); + this._reading = true; + } } private void WaitForEndOfStreams(IObserver observer, Task stdOutSubscription, Task stdErrSubscription) diff --git a/src/Proc/Extensions/ObserveOutputExtensions.cs b/src/Proc/Extensions/ObserveOutputExtensions.cs index d9e0311..ad5b7f5 100644 --- a/src/Proc/Extensions/ObserveOutputExtensions.cs +++ b/src/Proc/Extensions/ObserveOutputExtensions.cs @@ -43,21 +43,31 @@ public static IObservable ObserveStandardOutLineByLine(this Process pro }); } - public static Task ObserveErrorOutBuffered(this Process process, IObserver observer, int bufferSize, Func keepBuffering) => - RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.ErrorOut, process.StandardError); + public static Task ObserveErrorOutBuffered(this Process process, IObserver observer, int bufferSize, Func keepBuffering, CancellationToken token) => + RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.ErrorOut, process.StandardError, token); - public static Task ObserveStandardOutBuffered(this Process process, IObserver observer, int bufferSize, Func keepBuffering) => - RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.Out, process.StandardOutput); + public static Task ObserveStandardOutBuffered(this Process process, IObserver observer, int bufferSize, Func keepBuffering, CancellationToken token) => + RunBufferedRead(process, observer, bufferSize, keepBuffering, ConsoleOut.Out, process.StandardOutput, token); - private static Task RunBufferedRead(Process process, IObserver observer, int bufferSize, Func keepBuffering, Func m, StreamReader reader) => - Task.Factory.StartNew(() => BufferedRead(process, reader, observer, bufferSize, m, keepBuffering), LongRunning); + private static Task RunBufferedRead(Process process, IObserver observer, int bufferSize, Func keepBuffering, Func m, + StreamReader reader, CancellationToken token) => + Task.Factory.StartNew(() => BufferedRead(process, reader, observer, bufferSize, m, keepBuffering, token), token, LongRunning, TaskScheduler.Current); - private static async Task BufferedRead(Process p, StreamReader r, IObserver o, int b, Func m, Func keepBuffering) + + public class ObservableProcessAsyncReadCancelledException : Exception + { + + } + + private static async Task BufferedRead(Process p, StreamReader r, IObserver o, int b, Func m, Func keepBuffering, + CancellationToken token) { + using(token.Register(()=> throw new ObservableProcessAsyncReadCancelledException(), useSynchronizationContext: false)) while (keepBuffering() && !r.EndOfStream) { var buffer = new char[b]; var read = await r.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false); + token.ThrowIfCancellationRequested(); if (read > 0) o.OnNext(m(buffer)); else diff --git a/src/ScratchPad/Class1.cs b/src/ScratchPad/Class1.cs index 37ba492..012c309 100644 --- a/src/ScratchPad/Class1.cs +++ b/src/ScratchPad/Class1.cs @@ -1,4 +1,5 @@ using System; +using System.Threading.Tasks; using ProcNet; using ProcNet.Std; @@ -8,11 +9,31 @@ public static class Program { public static int Main() { - var result = Proc.Start(new StartArguments("ipconfig", "/all") + var tessCase = TestBinary.TestCaseArguments("SlowOutput"); + var process = new ObservableProcess(tessCase); + process.SubscribeLines(c => { - WorkingDirectory = @"c:\Projects\proc\src\Proc.Tests.Binary", - WaitForStreamReadersTimeout = TimeSpan.FromMinutes(4) - }, TimeSpan.FromMinutes(1), new ConsoleOutColorWriter()); + if (c.Line.EndsWith("3")) + { + process.CancelAsyncReads(); + Task.Run(async () => + { + await Task.Delay(TimeSpan.FromSeconds(4)); + process.StartAsyncReads(); + }); + } + Console.WriteLine(c.Line); + }, e=> Console.Error.WriteLine(e)); + + + process.WaitForCompletion(TimeSpan.FromSeconds(20)); + Console.WriteLine("exitCode:" + process.ExitCode); + +// var result = Proc.Start(new StartArguments("ipconfig", "/all") +// { +// WorkingDirectory = @"c:\Projects\proc\src\Proc.Tests.Binary", +// WaitForStreamReadersTimeout = TimeSpan.FromMinutes(4) +// }, TimeSpan.FromMinutes(1), new ConsoleOutColorWriter()); return 0; }