Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion module/PSParallelPipeline.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'

# Version number of this module.
ModuleVersion = '1.2.4'
ModuleVersion = '1.2.5'

# Supported PSEditions
# CompatiblePSEditions = @()
Expand Down
17 changes: 11 additions & 6 deletions src/PSParallelPipeline/Commands/InvokeParallelCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Management.Automation;
using System.Management.Automation.Runspaces;
using System.Threading;
using PSParallelPipeline.Poly;

namespace PSParallelPipeline.Commands;

Expand Down Expand Up @@ -80,12 +79,15 @@
ScriptBlock.GetUsingParameters(this));

_worker = new Worker(poolSettings, workerSettings, _cts.Token);
_worker.Run();
}

protected override void ProcessRecord()
{
Dbg.Assert(_worker is not null);
if (_worker is null)
{
return;

Check warning on line 88 in src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

View check run for this annotation

Codecov / codecov/patch

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs#L87-L88

Added lines #L87 - L88 were not covered by tests
}

InputObject.ThrowIfInputObjectIsScriptBlock(this);

try
Expand All @@ -103,14 +105,17 @@
}
catch (OperationCanceledException exception)
{
CancelAndWait();
_worker.WaitForCompletion();
exception.WriteTimeoutError(this);
}
}

protected override void EndProcessing()
{
Dbg.Assert(_worker is not null);
if (_worker is null)
{
return;

Check warning on line 117 in src/PSParallelPipeline/Commands/InvokeParallelCommand.cs

View check run for this annotation

Codecov / codecov/patch

src/PSParallelPipeline/Commands/InvokeParallelCommand.cs#L116-L117

Added lines #L116 - L117 were not covered by tests
}

try
{
Expand All @@ -129,7 +134,7 @@
}
catch (OperationCanceledException exception)
{
CancelAndWait();
_worker.WaitForCompletion();
exception.WriteTimeoutError(this);
}
}
Expand Down
20 changes: 17 additions & 3 deletions src/PSParallelPipeline/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,28 @@ private static string ResolvePath(this PSCmdlet cmdlet, string path)
.InvokeReturnAsIs();
}

internal static Task InvokePowerShellAsync(
internal static Task InvokePowerShellAsync<TOut>(
this PowerShell powerShell,
PSDataCollection<PSObject> output)
PSDataCollection<TOut> output)
=> Task.Factory.FromAsync(
powerShell.BeginInvoke<PSObject, PSObject>(null, output),
powerShell.BeginInvoke<PSObject, TOut>(null, output),
powerShell.EndInvoke);

internal static ConfiguredTaskAwaitable NoContext(this Task task) => task.ConfigureAwait(false);

internal static ConfiguredTaskAwaitable<T> NoContext<T>(this Task<T> task) => task.ConfigureAwait(false);

public static IEnumerable<TSource> DistinctBy<TSource, TKey>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector)
{
HashSet<TKey> seenKeys = [];
foreach (TSource element in source)
{
if (seenKeys.Add(keySelector(element)))
{
yield return element;
}
}
}
}
16 changes: 8 additions & 8 deletions src/PSParallelPipeline/ModuleCompleter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Management.Automation;
using System.Management.Automation.Language;

Expand All @@ -15,17 +16,16 @@ public IEnumerable<CompletionResult> CompleteArgument(
CommandAst commandAst,
IDictionary fakeBoundParameters)
{
using PowerShell ps = PowerShell
using PowerShell powershell = PowerShell
.Create(RunspaceMode.CurrentRunspace)
.AddCommand("Get-Module")
.AddParameter("ListAvailable");

foreach (PSModuleInfo module in ps.Invoke<PSModuleInfo>())
{
if (module.Name.StartsWith(wordToComplete, StringComparison.InvariantCultureIgnoreCase))
{
yield return new CompletionResult(module.Name);
}
}
return powershell
.Invoke<PSModuleInfo>()
.DistinctBy(e => e.Name)
.Where(e => e.Name.StartsWith(wordToComplete, StringComparison.InvariantCultureIgnoreCase))
.OrderBy(e => e.Name)
.Select(e => new CompletionResult(e.Name));
}
}
4 changes: 2 additions & 2 deletions src/PSParallelPipeline/PSTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ private PSTask AddUsingStatements(Dictionary<string, object?> usingParams)

private void CompleteTask()
{
_powershell.Dispose();
if (_canceled)
{
_runspace?.Dispose();
return;
}

_powershell.Dispose();
if (_runspace is not null)
{
_pool.PushRunspace(_runspace);
Expand All @@ -129,7 +129,7 @@ private void CompleteTask()

internal void Cancel()
{
_powershell.Dispose();
_powershell.BeginStop(null, null);
_canceled = true;
}
}
11 changes: 0 additions & 11 deletions src/PSParallelPipeline/Poly/Dbg.cs

This file was deleted.

150 changes: 0 additions & 150 deletions src/PSParallelPipeline/Poly/Nullable.cs

This file was deleted.

12 changes: 4 additions & 8 deletions src/PSParallelPipeline/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace PSParallelPipeline;

internal sealed class Worker
{
private Task? _worker;
private readonly Task _worker;

private readonly TaskSettings _taskSettings;

Expand All @@ -31,9 +31,10 @@ internal Worker(
_taskSettings = taskSettings;
_streams = new PSOutputStreams(_output);
_pool = new RunspacePool(poolSettings, _streams, _token);
_worker = Task.Run(Start, cancellationToken: _token);
}

internal void WaitForCompletion() => _worker?.GetAwaiter().GetResult();
internal void WaitForCompletion() => _worker.GetAwaiter().GetResult();

internal void Enqueue(object? input) => _input.Add(input, _token);

Expand All @@ -43,8 +44,6 @@ internal Worker(

internal IEnumerable<PSOutputData> GetConsumingEnumerable() => _output.GetConsumingEnumerable(_token);

internal void Run() => _worker = Task.Run(Start, cancellationToken: _token);

private async Task Start()
{
List<Task> tasks = new(_pool.MaxRunspaces);
Expand All @@ -55,10 +54,7 @@ private async Task Start()
{
if (tasks.Count == tasks.Capacity)
{
Task task = await Task
.WhenAny(tasks)
.NoContext();

Task task = await Task.WhenAny(tasks).NoContext();
tasks.Remove(task);
await task.NoContext();
}
Expand Down