Skip to content

Commit

Permalink
Break dependency from CoW to System.Threading.Tasks.Dataflow to avoid…
Browse files Browse the repository at this point in the history
… dependency load problems in a few projects (#525)
  • Loading branch information
erikmav committed Jan 10, 2024
1 parent 1fd6d02 commit 90e9f3e
Showing 1 changed file with 17 additions and 35 deletions.
52 changes: 17 additions & 35 deletions src/CopyOnWrite/Copy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using System.Threading.Tasks;
using Task = Microsoft.Build.Utilities.Task;

#nullable enable annotations

Expand Down Expand Up @@ -554,19 +555,18 @@ private void MakeFileWriteable(FileState file, bool logActivity)

// Lockless flags updated from each thread - each needs to be a processor word for atomicity.
var successFlags = new IntPtr[DestinationFiles.Length];
var actionBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = parallelism,
CancellationToken = _cancellationTokenSource.Token,
EnsureOrdered = parallelism == 1,
};
var partitionCopyActionBlock = new ActionBlock<List<int>>(
async (List<int> partition) =>
{
// Break from synchronous thread context of caller to get onto thread pool thread.
await System.Threading.Tasks.Task.Yield();

for (int partitionIndex = 0; partitionIndex < partition.Count && !_cancellationTokenSource.IsCancellationRequested; partitionIndex++)
Parallel.ForEach(partitionsByDestination.Values,
new ParallelOptions
{
CancellationToken = _cancellationTokenSource.Token,
MaxDegreeOfParallelism = parallelism,
},
(List<int> partition) =>
{
for (int partitionIndex = 0;
partitionIndex < partition.Count && !_cancellationTokenSource.IsCancellationRequested;
partitionIndex++)
{
int fileIndex = partition[partitionIndex];
ITaskItem sourceItem = SourceFiles[fileIndex];
Expand All @@ -583,9 +583,9 @@ private void MakeFileWriteable(FileState file, bool logActivity)
if (!copyComplete)
{
if (DoCopyIfNecessary(
new FileState(sourceItem.ItemSpec),
new FileState(destItem.ItemSpec),
copyFile))
new FileState(sourceItem.ItemSpec),
new FileState(destItem.ItemSpec),
copyFile))
{
copyComplete = true;
}
Expand All @@ -602,25 +602,7 @@ private void MakeFileWriteable(FileState file, bool logActivity)
successFlags[fileIndex] = (IntPtr)1;
}
}
},
actionBlockOptions);

foreach (List<int> partition in partitionsByDestination.Values)
{
bool partitionAccepted = partitionCopyActionBlock.Post(partition);
if (_cancellationTokenSource.IsCancellationRequested)
{
break;
}
else if (!partitionAccepted)
{
// Retail assert...
LogError("Failed posting a file copy to an ActionBlock. Should not happen with block at max int capacity.");
}
}

partitionCopyActionBlock.Complete();
partitionCopyActionBlock.Completion.GetAwaiter().GetResult();
});

// Assemble an in-order list of destination items that succeeded.
destinationFilesSuccessfullyCopied = new List<ITaskItem>(DestinationFiles.Length);
Expand Down

0 comments on commit 90e9f3e

Please sign in to comment.