Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions module/PSParallelPipeline.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'

# Version number of this module.
ModuleVersion = '1.2.1'
ModuleVersion = '1.2.2'

# Supported PSEditions
# CompatiblePSEditions = @()
Expand Down Expand Up @@ -77,7 +77,7 @@
VariablesToExport = @()

# Aliases to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no aliases to export.
AliasesToExport = @('parallel')
AliasesToExport = @('parallel', 'asparallel')

# DSC resources to export from this module
# DscResourcesToExport = @()
Expand Down
51 changes: 28 additions & 23 deletions src/PSParallelPipeline/Commands/InvokeParallelCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@
using System.Collections;
using System.Management.Automation;
using System.Management.Automation.Runspaces;
using System.Threading;
using PSParallelPipeline.Poly;

namespace PSParallelPipeline.Commands;

[Cmdlet(VerbsLifecycle.Invoke, "Parallel")]
[Alias("parallel")]
[Alias("parallel", "asparallel")]
[OutputType(typeof(object))]
public sealed class InvokeParallelCommand : PSCmdlet, IDisposable
{
private Worker? _worker;

private readonly CancellationTokenSource _cts = new();

[Parameter(Position = 0, Mandatory = true)]
public ScriptBlock ScriptBlock { get; set; } = null!;

Expand Down Expand Up @@ -46,6 +49,11 @@

protected override void BeginProcessing()
{
if (TimeoutSeconds > 0)
{
_cts.CancelAfter(TimeSpan.FromSeconds(TimeoutSeconds));
}

InitialSessionState iss = InitialSessionState
.CreateDefault2()
.AddFunctions(Functions, this)
Expand All @@ -55,48 +63,42 @@
{
MaxRunspaces = ThrottleLimit,
UseNewRunspace = UseNewRunspace,
InitialSessionState = iss,
UsingStatements = ScriptBlock.GetUsingParameters(this)
InitialSessionState = iss
};

_worker = new Worker(poolSettings);

if (TimeoutSeconds > 0)
TaskSettings workerSettings = new()
{
_worker.CancelAfter(TimeSpan.FromSeconds(TimeoutSeconds));
}
Script = ScriptBlock.ToString(),
UsingStatements = ScriptBlock.GetUsingParameters(this)
};

_worker = new Worker(poolSettings, workerSettings, _cts.Token);
_worker.Run();
}

protected override void ProcessRecord()
{
Dbg.Assert(_worker is not null);
this.ThrowIfInputObjectIsScriptBlock(InputObject);
InputObject.ThrowIfInputObjectIsScriptBlock(this);

try
{
_worker.Enqueue(InputObject, ScriptBlock);
_worker.Enqueue(InputObject);
while (_worker.TryTake(out PSOutputData data))
{
ProcessOutput(data);
}
}
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.Cancel();
_worker.Wait();
CancelAndWait();
throw;
}
catch (OperationCanceledException exception)
{
_worker.Wait();
exception.WriteTimeoutError(this);
}
catch (Exception exception)
{
exception.WriteUnspecifiedError(this);
}
}

protected override void EndProcessing()
Expand All @@ -110,23 +112,19 @@
{
ProcessOutput(data);
}

_worker.Wait();
}
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.Cancel();
_worker.Wait();
CancelAndWait();
throw;
}
catch (OperationCanceledException exception)
{
_worker.Wait();
exception.WriteTimeoutError(this);
}
catch (Exception exception)
{
exception.WriteUnspecifiedError(this);
}
}

private void ProcessOutput(PSOutputData data)
Expand Down Expand Up @@ -166,11 +164,18 @@
}
}

protected override void StopProcessing() => _worker?.Cancel();
private void CancelAndWait()
{
_cts.Cancel();
_worker?.Wait();
}

protected override void StopProcessing() => CancelAndWait();

Check warning on line 173 in src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

View check run for this annotation

Codecov / codecov/patch

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs#L173

Added line #L173 was not covered by tests

public void Dispose()
{
_worker?.Dispose();
_cts.Dispose();
GC.SuppressFinalize(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace PSParallelPipeline;

internal static class ExceptionHelpers
internal static class ExceptionHelper
{
private const string _notsupported =
"Passed-in script block variables are not supported, and can result in undefined behavior.";
Expand All @@ -15,17 +15,33 @@ internal static void WriteTimeoutError(this Exception exception, PSCmdlet cmdlet
ErrorCategory.OperationTimeout,
cmdlet));

internal static void WriteUnspecifiedError(this Exception exception, PSCmdlet cmdlet) =>
cmdlet.WriteError(new ErrorRecord(
exception, "UnspecifiedCmdletError", ErrorCategory.NotSpecified, cmdlet));

internal static PSOutputData CreateProcessingTaskError(this Exception exception, object context) =>
PSOutputData.WriteError(new ErrorRecord(
exception, "ProcessingTask", ErrorCategory.NotSpecified, context));

internal static void ThrowFunctionNotFoundError(
this CommandNotFoundException exception,
Cmdlet cmdlet,
string function) =>
cmdlet.ThrowTerminatingError(new ErrorRecord(
exception, "FunctionNotFound", ErrorCategory.ObjectNotFound, function));

private static bool ValueIsNotScriptBlock(object? value) =>
value is not ScriptBlock and not PSObject { BaseObject: ScriptBlock };

internal static CommandInfo ThrowIfFunctionNotFoundError(
this CommandInfo? command,
string function)
{
if (command is not null)
{
return command;
}

throw new CommandNotFoundException(
$"The function with name '{function}' could not be found.");
}

internal static void ThrowIfVariableIsScriptBlock(this PSCmdlet cmdlet, object? value)
{
if (ValueIsNotScriptBlock(value))
Expand All @@ -40,7 +56,7 @@ internal static void ThrowIfVariableIsScriptBlock(this PSCmdlet cmdlet, object?
value));
}

internal static void ThrowIfInputObjectIsScriptBlock(this PSCmdlet cmdlet, object? value)
internal static void ThrowIfInputObjectIsScriptBlock(this object? value, PSCmdlet cmdlet)
{
if (ValueIsNotScriptBlock(value))
{
Expand Down
24 changes: 14 additions & 10 deletions src/PSParallelPipeline/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@ internal static InitialSessionState AddFunctions(
{
foreach (string function in functionsToAdd)
{
CommandInfo? commandInfo = cmdlet
.InvokeCommand
.GetCommand(function, CommandTypes.Function);

if (commandInfo is null)
try
{
continue;
CommandInfo commandInfo = cmdlet
.InvokeCommand
.GetCommand(function, CommandTypes.Function)
.ThrowIfFunctionNotFoundError(function);

initialSessionState.Commands.Add(
new SessionStateFunctionEntry(
name: function,
definition: commandInfo.Definition));
}
catch (CommandNotFoundException exception)
{
exception.ThrowFunctionNotFoundError(cmdlet, function);
}

initialSessionState.Commands.Add(new SessionStateFunctionEntry(
name: function,
definition: commandInfo.Definition));
}
}

Expand Down
16 changes: 6 additions & 10 deletions src/PSParallelPipeline/PSOutputStreams.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
using System;
using System.Collections.Concurrent;
using System.Management.Automation;
using System.Threading;

namespace PSParallelPipeline;

internal sealed class PSOutputStreams : IDisposable
{
private BlockingCollection<PSOutputData> OutputPipe { get => _worker.OutputPipe; }
private BlockingCollection<PSOutputData> Output { get; }

internal PSDataCollection<PSObject> Success { get; } = [];

Expand All @@ -23,17 +22,15 @@ internal sealed class PSOutputStreams : IDisposable

internal PSDataCollection<WarningRecord> Warning { get; } = [];

private readonly Worker _worker;

internal PSOutputStreams(Worker worker)
internal PSOutputStreams(BlockingCollection<PSOutputData> output)
{
_worker = worker;
SetStreamHandlers();
Output = output;
SetHandlers();
}

internal void AddOutput(PSOutputData data) => OutputPipe.Add(data);
internal void AddOutput(PSOutputData data) => Output.Add(data);

private void SetStreamHandlers()
private void SetHandlers()
{
Success.DataAdding += (s, e) =>
AddOutput(PSOutputData.WriteObject(e.ItemAdded));
Expand Down Expand Up @@ -66,7 +63,6 @@ public void Dispose()
Progress.Dispose();
Verbose.Dispose();
Warning.Dispose();
OutputPipe.Dispose();
GC.SuppressFinalize(this);
}
}
Loading
Loading