diff --git a/module/PSParallelPipeline.psd1 b/module/PSParallelPipeline.psd1 index 4662810..470784e 100644 --- a/module/PSParallelPipeline.psd1 +++ b/module/PSParallelPipeline.psd1 @@ -11,7 +11,7 @@ RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll' # Version number of this module. - ModuleVersion = '1.2.1' + ModuleVersion = '1.2.2' # Supported PSEditions # CompatiblePSEditions = @() @@ -77,7 +77,7 @@ VariablesToExport = @() # Aliases to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no aliases to export. - AliasesToExport = @('parallel') + AliasesToExport = @('parallel', 'asparallel') # DSC resources to export from this module # DscResourcesToExport = @() diff --git a/src/PSParallelPipeline/Commands/InvokeParallelCommand.cs b/src/PSParallelPipeline/Commands/InvokeParallelCommand.cs index 3613a18..ba4a810 100644 --- a/src/PSParallelPipeline/Commands/InvokeParallelCommand.cs +++ b/src/PSParallelPipeline/Commands/InvokeParallelCommand.cs @@ -2,17 +2,20 @@ using System.Collections; using System.Management.Automation; using System.Management.Automation.Runspaces; +using System.Threading; using PSParallelPipeline.Poly; namespace PSParallelPipeline.Commands; [Cmdlet(VerbsLifecycle.Invoke, "Parallel")] -[Alias("parallel")] +[Alias("parallel", "asparallel")] [OutputType(typeof(object))] public sealed class InvokeParallelCommand : PSCmdlet, IDisposable { private Worker? _worker; + private readonly CancellationTokenSource _cts = new(); + [Parameter(Position = 0, Mandatory = true)] public ScriptBlock ScriptBlock { get; set; } = null!; @@ -46,6 +49,11 @@ public sealed class InvokeParallelCommand : PSCmdlet, IDisposable protected override void BeginProcessing() { + if (TimeoutSeconds > 0) + { + _cts.CancelAfter(TimeSpan.FromSeconds(TimeoutSeconds)); + } + InitialSessionState iss = InitialSessionState .CreateDefault2() .AddFunctions(Functions, this) @@ -55,28 +63,27 @@ protected override void BeginProcessing() { MaxRunspaces = ThrottleLimit, UseNewRunspace = UseNewRunspace, - InitialSessionState = iss, - UsingStatements = ScriptBlock.GetUsingParameters(this) + InitialSessionState = iss }; - _worker = new Worker(poolSettings); - - if (TimeoutSeconds > 0) + TaskSettings workerSettings = new() { - _worker.CancelAfter(TimeSpan.FromSeconds(TimeoutSeconds)); - } + Script = ScriptBlock.ToString(), + UsingStatements = ScriptBlock.GetUsingParameters(this) + }; + _worker = new Worker(poolSettings, workerSettings, _cts.Token); _worker.Run(); } protected override void ProcessRecord() { Dbg.Assert(_worker is not null); - this.ThrowIfInputObjectIsScriptBlock(InputObject); + InputObject.ThrowIfInputObjectIsScriptBlock(this); try { - _worker.Enqueue(InputObject, ScriptBlock); + _worker.Enqueue(InputObject); while (_worker.TryTake(out PSOutputData data)) { ProcessOutput(data); @@ -84,8 +91,7 @@ protected override void ProcessRecord() } catch (Exception _) when (_ is PipelineStoppedException or FlowControlException) { - _worker.Cancel(); - _worker.Wait(); + CancelAndWait(); throw; } catch (OperationCanceledException exception) @@ -93,10 +99,6 @@ protected override void ProcessRecord() _worker.Wait(); exception.WriteTimeoutError(this); } - catch (Exception exception) - { - exception.WriteUnspecifiedError(this); - } } protected override void EndProcessing() @@ -110,12 +112,12 @@ protected override void EndProcessing() { ProcessOutput(data); } + _worker.Wait(); } catch (Exception _) when (_ is PipelineStoppedException or FlowControlException) { - _worker.Cancel(); - _worker.Wait(); + CancelAndWait(); throw; } catch (OperationCanceledException exception) @@ -123,10 +125,6 @@ protected override void EndProcessing() _worker.Wait(); exception.WriteTimeoutError(this); } - catch (Exception exception) - { - exception.WriteUnspecifiedError(this); - } } private void ProcessOutput(PSOutputData data) @@ -166,11 +164,18 @@ private void ProcessOutput(PSOutputData data) } } - protected override void StopProcessing() => _worker?.Cancel(); + private void CancelAndWait() + { + _cts.Cancel(); + _worker?.Wait(); + } + + protected override void StopProcessing() => CancelAndWait(); public void Dispose() { _worker?.Dispose(); + _cts.Dispose(); GC.SuppressFinalize(this); } } diff --git a/src/PSParallelPipeline/ExceptionHelpers.cs b/src/PSParallelPipeline/ExceptionHelper.cs similarity index 75% rename from src/PSParallelPipeline/ExceptionHelpers.cs rename to src/PSParallelPipeline/ExceptionHelper.cs index 4d3b213..bd60e9a 100644 --- a/src/PSParallelPipeline/ExceptionHelpers.cs +++ b/src/PSParallelPipeline/ExceptionHelper.cs @@ -3,7 +3,7 @@ namespace PSParallelPipeline; -internal static class ExceptionHelpers +internal static class ExceptionHelper { private const string _notsupported = "Passed-in script block variables are not supported, and can result in undefined behavior."; @@ -15,17 +15,33 @@ internal static void WriteTimeoutError(this Exception exception, PSCmdlet cmdlet ErrorCategory.OperationTimeout, cmdlet)); - internal static void WriteUnspecifiedError(this Exception exception, PSCmdlet cmdlet) => - cmdlet.WriteError(new ErrorRecord( - exception, "UnspecifiedCmdletError", ErrorCategory.NotSpecified, cmdlet)); - internal static PSOutputData CreateProcessingTaskError(this Exception exception, object context) => PSOutputData.WriteError(new ErrorRecord( exception, "ProcessingTask", ErrorCategory.NotSpecified, context)); + internal static void ThrowFunctionNotFoundError( + this CommandNotFoundException exception, + Cmdlet cmdlet, + string function) => + cmdlet.ThrowTerminatingError(new ErrorRecord( + exception, "FunctionNotFound", ErrorCategory.ObjectNotFound, function)); + private static bool ValueIsNotScriptBlock(object? value) => value is not ScriptBlock and not PSObject { BaseObject: ScriptBlock }; + internal static CommandInfo ThrowIfFunctionNotFoundError( + this CommandInfo? command, + string function) + { + if (command is not null) + { + return command; + } + + throw new CommandNotFoundException( + $"The function with name '{function}' could not be found."); + } + internal static void ThrowIfVariableIsScriptBlock(this PSCmdlet cmdlet, object? value) { if (ValueIsNotScriptBlock(value)) @@ -40,7 +56,7 @@ internal static void ThrowIfVariableIsScriptBlock(this PSCmdlet cmdlet, object? value)); } - internal static void ThrowIfInputObjectIsScriptBlock(this PSCmdlet cmdlet, object? value) + internal static void ThrowIfInputObjectIsScriptBlock(this object? value, PSCmdlet cmdlet) { if (ValueIsNotScriptBlock(value)) { diff --git a/src/PSParallelPipeline/Extensions.cs b/src/PSParallelPipeline/Extensions.cs index 2bf35f1..c014ea1 100644 --- a/src/PSParallelPipeline/Extensions.cs +++ b/src/PSParallelPipeline/Extensions.cs @@ -20,18 +20,22 @@ internal static InitialSessionState AddFunctions( { foreach (string function in functionsToAdd) { - CommandInfo? commandInfo = cmdlet - .InvokeCommand - .GetCommand(function, CommandTypes.Function); - - if (commandInfo is null) + try { - continue; + CommandInfo commandInfo = cmdlet + .InvokeCommand + .GetCommand(function, CommandTypes.Function) + .ThrowIfFunctionNotFoundError(function); + + initialSessionState.Commands.Add( + new SessionStateFunctionEntry( + name: function, + definition: commandInfo.Definition)); + } + catch (CommandNotFoundException exception) + { + exception.ThrowFunctionNotFoundError(cmdlet, function); } - - initialSessionState.Commands.Add(new SessionStateFunctionEntry( - name: function, - definition: commandInfo.Definition)); } } diff --git a/src/PSParallelPipeline/PSOutputStreams.cs b/src/PSParallelPipeline/PSOutputStreams.cs index c3c8eb6..6b16f74 100644 --- a/src/PSParallelPipeline/PSOutputStreams.cs +++ b/src/PSParallelPipeline/PSOutputStreams.cs @@ -1,13 +1,12 @@ using System; using System.Collections.Concurrent; using System.Management.Automation; -using System.Threading; namespace PSParallelPipeline; internal sealed class PSOutputStreams : IDisposable { - private BlockingCollection OutputPipe { get => _worker.OutputPipe; } + private BlockingCollection Output { get; } internal PSDataCollection Success { get; } = []; @@ -23,17 +22,15 @@ internal sealed class PSOutputStreams : IDisposable internal PSDataCollection Warning { get; } = []; - private readonly Worker _worker; - - internal PSOutputStreams(Worker worker) + internal PSOutputStreams(BlockingCollection output) { - _worker = worker; - SetStreamHandlers(); + Output = output; + SetHandlers(); } - internal void AddOutput(PSOutputData data) => OutputPipe.Add(data); + internal void AddOutput(PSOutputData data) => Output.Add(data); - private void SetStreamHandlers() + private void SetHandlers() { Success.DataAdding += (s, e) => AddOutput(PSOutputData.WriteObject(e.ItemAdded)); @@ -66,7 +63,6 @@ public void Dispose() Progress.Dispose(); Verbose.Dispose(); Warning.Dispose(); - OutputPipe.Dispose(); GC.SuppressFinalize(this); } } diff --git a/src/PSParallelPipeline/PSTask.cs b/src/PSParallelPipeline/PSTask.cs index 87247a2..dc396aa 100644 --- a/src/PSParallelPipeline/PSTask.cs +++ b/src/PSParallelPipeline/PSTask.cs @@ -7,7 +7,7 @@ namespace PSParallelPipeline; -internal sealed class PSTask : IDisposable +internal sealed class PSTask { private readonly PowerShell _powershell; @@ -15,29 +15,37 @@ internal sealed class PSTask : IDisposable private readonly RunspacePool _pool; - private PSOutputStreams OutputStreams { get => _pool.PSOutputStreams; } + private PSOutputStreams OutputStreams { get => _pool.Streams; } - internal Runspace Runspace + private Runspace Runspace { get => _powershell.Runspace; set => _powershell.Runspace = value; } - private PSTask(RunspacePool runspacePool) + private PSTask(RunspacePool pool) { _powershell = PowerShell.Create(); _internalStreams = _powershell.Streams; - _pool = runspacePool; + _pool = pool; } - static internal PSTask Create(RunspacePool runspacePool) + static internal async Task CreateAsync( + object? input, + RunspacePool runspacePool, + TaskSettings settings) { PSTask ps = new(runspacePool); - HookStreams(ps._internalStreams, runspacePool.PSOutputStreams); - return ps; + SetStreams(ps._internalStreams, runspacePool.Streams); + ps.Runspace = await runspacePool.GetRunspaceAsync(); + + return ps + .AddInput(input) + .AddScript(settings.Script) + .AddUsingStatements(settings.UsingStatements); } - private static void HookStreams( + private static void SetStreams( PSDataStreams streams, PSOutputStreams outputStreams) { @@ -56,36 +64,40 @@ private static Task InvokePowerShellAsync( powerShell.BeginInvoke(null, output), powerShell.EndInvoke); - internal PSTask AddInputObject(Dictionary inputObject) + private PSTask AddInput(object? inputObject) { - _powershell - .AddCommand("Set-Variable", useLocalScope: true) - .AddParameters(inputObject); + if (inputObject is not null) + { + _powershell + .AddCommand("Set-Variable", useLocalScope: true) + .AddArgument("_") + .AddArgument(inputObject); + } + return this; } - internal PSTask AddScript(ScriptBlock script) + private PSTask AddScript(string script) { - _powershell.AddScript(script.ToString(), useLocalScope: true); + _powershell.AddScript(script, useLocalScope: true); return this; } - internal void AddUsingStatements(Dictionary usingParams) + private PSTask AddUsingStatements(Dictionary usingParams) { - if (usingParams.Count > 0 ) + if (usingParams.Count > 0) { - _powershell.AddParameters(new Dictionary> - { - ["--%"] = usingParams - }); + _powershell.AddParameter("--%", usingParams); } + + return this; } internal async Task InvokeAsync() { try { - using CancellationTokenRegistration _ = _pool.RegisterCancellation(CancelCallback(this)); + using CancellationTokenRegistration _ = _pool.RegisterCancellation(Cancel); await InvokePowerShellAsync(_powershell, OutputStreams.Success); } catch (Exception exception) @@ -94,20 +106,14 @@ internal async Task InvokeAsync() } finally { - _pool.CompleteTask(this); - _pool.Release(); + _powershell.Dispose(); + _pool.PushRunspace(Runspace); } } - private static Action CancelCallback(PSTask psTask) => delegate - { - psTask.Dispose(); - psTask.Runspace.Dispose(); - }; - - public void Dispose() + private void Cancel() { _powershell.Dispose(); - GC.SuppressFinalize(this); + Runspace.Dispose(); } } diff --git a/src/PSParallelPipeline/PoolSettings.cs b/src/PSParallelPipeline/PoolSettings.cs index c7b8557..bf82319 100644 --- a/src/PSParallelPipeline/PoolSettings.cs +++ b/src/PSParallelPipeline/PoolSettings.cs @@ -1,4 +1,3 @@ -using System.Collections.Generic; using System.Management.Automation.Runspaces; namespace PSParallelPipeline; @@ -6,5 +5,4 @@ namespace PSParallelPipeline; internal record struct PoolSettings( int MaxRunspaces, bool UseNewRunspace, - InitialSessionState InitialSessionState, - Dictionary UsingStatements); + InitialSessionState InitialSessionState); diff --git a/src/PSParallelPipeline/RunspacePool.cs b/src/PSParallelPipeline/RunspacePool.cs index 0f14b2e..bdb8535 100644 --- a/src/PSParallelPipeline/RunspacePool.cs +++ b/src/PSParallelPipeline/RunspacePool.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Management.Automation.Runspaces; using System.Threading; using System.Threading.Tasks; @@ -9,89 +8,68 @@ namespace PSParallelPipeline; internal sealed class RunspacePool : IDisposable { - private CancellationToken Token { get => _worker.Token; } + private readonly CancellationToken _token; - private InitialSessionState InitialSessionState { get => _settings.InitialSessionState; } + private readonly InitialSessionState _iss; - private Dictionary UsingStatements { get => _settings.UsingStatements; } + private readonly ConcurrentQueue _pool = []; - private int MaxRunspaces { get => _settings.MaxRunspaces; } + private readonly ConcurrentDictionary _created; - private readonly ConcurrentQueue _pool; + private readonly bool _useNew; - private readonly PoolSettings _settings; - - private readonly Worker _worker; - - private readonly List _tasks; - - private bool UseNewRunspace { get => _settings.UseNewRunspace; } + private readonly SemaphoreSlim _semaphore; - internal PSOutputStreams PSOutputStreams { get => _worker.OutputStreams; } + internal PSOutputStreams Streams { get; } - private readonly SemaphoreSlim _semaphore; + internal int MaxRunspaces { get; } - internal RunspacePool(PoolSettings settings, Worker worker) + internal RunspacePool( + PoolSettings settings, + PSOutputStreams streams, + CancellationToken token) { - _settings = settings; - _worker = worker; - _pool = new ConcurrentQueue(); - _tasks = new List(MaxRunspaces); + (MaxRunspaces, _useNew, _iss) = settings; + Streams = streams; + _token = token; _semaphore = new SemaphoreSlim(MaxRunspaces, MaxRunspaces); + _created = new ConcurrentDictionary( + Environment.ProcessorCount, + MaxRunspaces); } - internal void Release() => _semaphore.Release(); - - internal void CompleteTask(PSTask psTask) + internal void PushRunspace(Runspace runspace) { - psTask.Dispose(); - - if (UseNewRunspace) + if (_token.IsCancellationRequested) { - psTask.Runspace.Dispose(); return; } - _pool.Enqueue(psTask.Runspace); - } - - internal async Task EnqueueAsync(PSTask psTask) - { - psTask.AddUsingStatements(UsingStatements); - psTask.Runspace = await GetRunspaceAsync(); - _tasks.Add(psTask.InvokeAsync()); - } - - internal async Task ProcessAllAsync() - { - while (_tasks.Count > 0) + if (_useNew) { - await ProcessAnyAsync(); + runspace.Dispose(); + _created.TryRemove(runspace.InstanceId, out _); + runspace = CreateRunspace(); } + + _pool.Enqueue(runspace); + _semaphore.Release(); } internal CancellationTokenRegistration RegisterCancellation(Action callback) => - Token.Register(callback); - - internal async Task WaitOnCancelAsync() => await Task.WhenAll(_tasks); - - private async Task ProcessAnyAsync() - { - Task task = await Task.WhenAny(_tasks); - _tasks.Remove(task); - await task; - } + _token.Register(callback); private Runspace CreateRunspace() { - Runspace rs = RunspaceFactory.CreateRunspace(InitialSessionState); + Runspace rs = RunspaceFactory.CreateRunspace(_iss); + _created[rs.InstanceId] = rs; rs.Open(); return rs; } - private async Task GetRunspaceAsync() + internal async Task GetRunspaceAsync() { - await _semaphore.WaitAsync(Token); + await _semaphore.WaitAsync(_token); if (_pool.TryDequeue(out Runspace runspace)) { return runspace; @@ -102,7 +80,7 @@ private async Task GetRunspaceAsync() public void Dispose() { - while (_pool.TryDequeue(out Runspace runspace)) + foreach (Runspace runspace in _created.Values) { runspace.Dispose(); } diff --git a/src/PSParallelPipeline/TaskSettings.cs b/src/PSParallelPipeline/TaskSettings.cs new file mode 100644 index 0000000..462cccf --- /dev/null +++ b/src/PSParallelPipeline/TaskSettings.cs @@ -0,0 +1,7 @@ +using System.Collections.Generic; + +namespace PSParallelPipeline; + +internal record struct TaskSettings( + string Script, + Dictionary UsingStatements); diff --git a/src/PSParallelPipeline/Worker.cs b/src/PSParallelPipeline/Worker.cs index fa84336..64dd15f 100644 --- a/src/PSParallelPipeline/Worker.cs +++ b/src/PSParallelPipeline/Worker.cs @@ -3,92 +3,93 @@ using System.Threading.Tasks; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Management.Automation; namespace PSParallelPipeline; -internal sealed class Worker : IDisposable +internal sealed class Worker { - private readonly BlockingCollection _inputQueue = []; - - private readonly CancellationTokenSource _cts; + private Task? _worker; - private readonly RunspacePool _runspacePool; + private readonly TaskSettings _taskSettings; - private Task? _worker; + private readonly BlockingCollection _input = []; - private readonly Dictionary _inputObject = new() - { - ["Name"] = "_" - }; + private readonly BlockingCollection _output = []; - internal BlockingCollection OutputPipe { get; } = []; + private readonly RunspacePool _pool; - internal CancellationToken Token { get => _cts.Token; } + private readonly CancellationToken _token; - internal PSOutputStreams OutputStreams { get; } + private readonly PSOutputStreams _streams; - internal Worker(PoolSettings settings) + internal Worker( + PoolSettings poolSettings, + TaskSettings taskSettings, + CancellationToken token) { - _cts = new CancellationTokenSource(); - OutputStreams = new PSOutputStreams(this); - _runspacePool = new RunspacePool(settings, this); + _token = token; + _taskSettings = taskSettings; + _streams = new PSOutputStreams(_output); + _pool = new RunspacePool(poolSettings, _streams, _token); } internal void Wait() => _worker?.GetAwaiter().GetResult(); - internal void Cancel() => _cts.Cancel(); - - internal void CancelAfter(TimeSpan span) => _cts.CancelAfter(span); - - internal void Enqueue(object? input, ScriptBlock script) - { - _inputObject["Value"] = input; - _inputQueue.Add( - item: PSTask - .Create(_runspacePool) - .AddInputObject(_inputObject) - .AddScript(script), - cancellationToken: Token); - } + internal void Enqueue(object? input) => _input.Add(input, _token); internal bool TryTake(out PSOutputData output) => - OutputPipe.TryTake(out output, 0, Token); + _output.TryTake(out output, 0, _token); - internal void CompleteInputAdding() => _inputQueue.CompleteAdding(); + internal void CompleteInputAdding() => _input.CompleteAdding(); internal IEnumerable GetConsumingEnumerable() => - OutputPipe.GetConsumingEnumerable(Token); + _output.GetConsumingEnumerable(_token); - internal void Run() => _worker = Task.Run(Start, cancellationToken: Token); + internal void Run() => _worker = Task.Run(Start, cancellationToken: _token); private async Task Start() { + List tasks = new(_pool.MaxRunspaces); + try { - while (!_inputQueue.IsCompleted) + foreach (object? input in _input.GetConsumingEnumerable(_token)) { - if (_inputQueue.TryTake(out PSTask ps, 0, Token)) + if (tasks.Count == tasks.Capacity) { - await _runspacePool.EnqueueAsync(ps); + await ProcessAnyAsync(tasks); } - } - await _runspacePool.ProcessAllAsync(); - OutputPipe.CompleteAdding(); + PSTask task = await PSTask.CreateAsync( + input: input, + runspacePool: _pool, + settings: _taskSettings); + + tasks.Add(task.InvokeAsync()); + } } catch + { } + finally { - await _runspacePool.WaitOnCancelAsync(); + await Task.WhenAll(tasks); + _output.CompleteAdding(); } } + private static async Task ProcessAnyAsync(List tasks) + { + Task task = await Task.WhenAny(tasks); + tasks.Remove(task); + await task; + } + public void Dispose() { - OutputStreams.Dispose(); - _inputQueue.Dispose(); - _cts.Dispose(); - _runspacePool.Dispose(); + _pool.Dispose(); + _input.Dispose(); + _streams.Dispose(); + _output.Dispose(); GC.SuppressFinalize(this); } } diff --git a/tests/PSParallelPipeline.tests.ps1 b/tests/PSParallelPipeline.tests.ps1 index 84a9ff6..fccad13 100644 --- a/tests/PSParallelPipeline.tests.ps1 +++ b/tests/PSParallelPipeline.tests.ps1 @@ -92,9 +92,10 @@ Describe PSParallelPipeline { Context 'UseNewRunspace Parameter' { It 'Should reuse runspaces by default' { - $runspaces = 0..10 | Invoke-Parallel { [runspace]::DefaultRunspace } | + $rs = 0..10 | Invoke-Parallel { [runspace]::DefaultRunspace } | Select-Object -ExpandProperty InstanceId -Unique - $runspaces.Count | Should -BeLessOrEqual 5 + + $rs.Count| Should -BeLessOrEqual 5 } It 'Should use a new runspace when the -UseNewRunspace is used' { @@ -121,6 +122,11 @@ Describe PSParallelPipeline { Sort-Object | Should -BeExactly @(0..10 | ForEach-Object { Test-Function $_ }) } + + It 'Should throw if a function could not be found' { + { Invoke-Parallel -Functions Test-NotExist { } } | + Should -Throw -ExceptionType ([CommandNotFoundException]) + } } Context 'ThrottleLimit Parameter' { @@ -343,14 +349,14 @@ Describe PSParallelPipeline { Assert-RunspaceCount { { 0..1000 | Invoke-Parallel @invokeParallelSplat } | - Should -Throw + Should -Throw -ExceptionType ([TimeoutException]) } -TestCount 50 # -WaitSeconds 1 Assert-RunspaceCount { $invokeParallelSplat['UseNewRunspace'] = $true $invokeParallelSplat['ThrottleLimit'] = 1001 { 0..1000 | Invoke-Parallel @invokeParallelSplat } | - Should -Throw + Should -Throw -ExceptionType ([TimeoutException]) } -TestCount 50 # -WaitSeconds 1 } }