Skip to content
Browse files

Major TaskHelpers.Iterate perf wins. Create a fast-path for Task so t…

…hat synchronous tasks execute immediately and we avoid any task allocations. This reduces iteration overhead to nearly 0.

Remove iteration over Task<T> since nobody used it and it's slow.

Ensure that in task.Finally(), original task exception is observed even if finally clause throws.

Add some unit tests to the task finally extensions.Ensure that the exception from a finally clause is the one that propagates.
  • Loading branch information...
1 parent 88372a0 commit 060280043fae9048b355556b5ea1951bb8a8cfb7 MikeStall committed Apr 5, 2012
View
168 src/Common/TaskHelpers.cs
@@ -1,7 +1,6 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Contracts;
-using System.Linq;
namespace System.Threading.Tasks
{
@@ -90,25 +89,82 @@ internal static Task<TResult> FromResult<TResult>(TResult result)
/// <param name="asyncIterator">collection of tasks to wait on</param>
/// <param name="cancellationToken">cancellation token</param>
/// <returns>a task that signals completed when all the incoming tasks are finished.</returns>
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "The exception is propagated in a Task.")]
internal static Task Iterate(IEnumerable<Task> asyncIterator, CancellationToken cancellationToken = default(CancellationToken))
{
Contract.Assert(asyncIterator != null);
- return IterateEngine.Run(asyncIterator, cancellationToken);
+ IEnumerator<Task> enumerator = null;
+ try
+ {
+ enumerator = asyncIterator.GetEnumerator();
+ Task task = IterateImpl(enumerator, cancellationToken);
+ return (enumerator != null) ? task.Finally(enumerator.Dispose) : task;
+ }
+ catch (Exception ex)
+ {
+ return TaskHelpers.FromError(ex);
+ }
}
/// <summary>
- /// Return a task that runs all the tasks inside the iterator sequentially and collects the results.
- /// It stops as soon as one of the tasks fails or cancels, or after all the tasks have run succesfully.
+ /// Provides the implementation of the Iterate method.
+ /// Contains special logic to help speed up common cases.
/// </summary>
- /// <param name="asyncIterator">collection of tasks to wait on</param>
- /// <param name="cancellationToken">cancellation token</param>
- /// <returns>A task that, upon successful completion, returns the list of results.</returns>
- internal static Task<IEnumerable<TResult>> Iterate<TResult>(IEnumerable<Task<TResult>> asyncIterator, CancellationToken cancellationToken = default(CancellationToken))
+ [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "The exception is propagated in a Task.")]
+ internal static Task IterateImpl(IEnumerator<Task> enumerator, CancellationToken cancellationToken)
{
- Contract.Assert(asyncIterator != null);
+ try
+ {
+ while (true)
+ {
+ // short-circuit: iteration canceled
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return TaskHelpers.Canceled();
+ }
+
+ // short-circuit: iteration complete
+ if (!enumerator.MoveNext())
+ {
+ return TaskHelpers.Completed();
+ }
- return IterateEngine<TResult>.Run(asyncIterator, cancellationToken);
+ // fast case: Task completed synchronously & successfully
+ Task currentTask = enumerator.Current;
+ if (currentTask.Status == TaskStatus.RanToCompletion)
+ {
+ continue;
+ }
+
+ // fast case: Task completed synchronously & unsuccessfully
+ if (currentTask.IsCanceled || currentTask.IsFaulted)
+ {
+ return currentTask;
+ }
+
+ // slow case: Task isn't yet complete
+ return IterateImplIncompleteTask(enumerator, currentTask, cancellationToken);
+ }
+ }
+ catch (Exception ex)
+ {
+ return TaskHelpers.FromError(ex);
+ }
+ }
+
+ /// <summary>
+ /// Fallback for IterateImpl when the antecedent Task isn't yet complete.
+ /// </summary>
+ internal static Task IterateImplIncompleteTask(IEnumerator<Task> enumerator, Task currentTask, CancellationToken cancellationToken)
+ {
+ // There's a race condition here, the antecedent Task could complete between
+ // the check in Iterate and the call to Then below. If this happens, we could
+ // end up growing the stack indefinitely. But the chances of (a) even having
+ // enough Tasks in the enumerator in the first place and of (b) *every* one
+ // of them hitting this race condition are so extremely remote that it's not
+ // worth worrying about.
+ return currentTask.Then(() => IterateImpl(enumerator, cancellationToken));
}
/// <summary>
@@ -325,97 +381,5 @@ private static Task<TResult> GetCancelledTask()
return tcs.Task;
}
}
-
- // These classes are the engine that implements Iterate and Iterate<T>
- private static class IterateEngine
- {
- public static Task Run(IEnumerable<Task> iterator, CancellationToken cancellationToken)
- {
- // WARNING: This code uses LINQ Select to ensure that we get deferred execution (i.e., we
- // don't start running all the tasks all at once). If you touch this code, please ensure
- // that this behavior is preserved.
- return IterateEngine<AsyncVoid>.Run(iterator.Select(t => t.ToTask<AsyncVoid>()), cancellationToken);
- }
- }
-
- private class IterateEngine<TResult>
- {
- private CancellationToken _cancellationToken;
- private TaskCompletionSource<IEnumerable<TResult>> _completionSource;
- private IEnumerator<Task<TResult>> _enumerator;
- private List<TResult> _results;
- private SynchronizationContext _syncContext;
-
- public static Task<IEnumerable<TResult>> Run(IEnumerable<Task<TResult>> iterator, CancellationToken cancellationToken)
- {
- IterateEngine<TResult> engine = new IterateEngine<TResult>
- {
- _cancellationToken = cancellationToken,
- _completionSource = new TaskCompletionSource<IEnumerable<TResult>>(),
- _enumerator = iterator.GetEnumerator(),
- _results = new List<TResult>(),
- _syncContext = SynchronizationContext.Current
- };
-
- RunNext(engine);
- return engine._completionSource.Task.Finally(engine._enumerator.Dispose);
- }
-
- private static void RunNext(IterateEngine<TResult> engine)
- {
- if (engine._syncContext != null && engine._syncContext != SynchronizationContext.Current)
- {
- engine._syncContext.Post(RunNextCallback, engine);
- }
- else
- {
- RunNextCallback(engine);
- }
- }
-
- // TODO: This class can become more efficient once we take a hard 4.5 dependency. In 4.0, ContinueWith
- // does not offer you the ability to pass a state object; once it does, we can change the implementation
- // of RunNextCallback to remove the closure around "engine".
- [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "The caught exception type is reflected into a faulted task.")]
- [SuppressMessage("Microsoft.WebAPI", "CR4001:DoNotCallProblematicMethodsOnTask", Justification = "This usage is known to be safe.")]
- private static void RunNextCallback(object state)
- {
- IterateEngine<TResult> engine = (IterateEngine<TResult>)state;
-
- try
- {
- if (engine._cancellationToken.IsCancellationRequested)
- {
- engine._completionSource.TrySetCanceled();
- }
- else if (engine._enumerator.MoveNext())
- {
- engine._enumerator.Current.ContinueWith(previous =>
- {
- switch (previous.Status)
- {
- case TaskStatus.Faulted:
- case TaskStatus.Canceled:
- engine._completionSource.TrySetFromTask(previous);
- break;
-
- default:
- engine._results.Add(previous.Result);
- RunNext(engine);
- break;
- }
- }, TaskContinuationOptions.ExecuteSynchronously);
- }
- else
- {
- engine._completionSource.TrySetResult(engine._results);
- }
- }
- catch (Exception e)
- {
- engine._completionSource.TrySetException(e);
- }
- }
- }
}
}
View
34 src/Common/TaskHelpersExtensions.cs
@@ -46,7 +46,6 @@ internal static Task<TResult> Catch<TResult>(this Task<TResult> task, Func<Catch
{
return task;
}
-
return task.CatchImpl(() => continuation(new CatchInfo<TResult>(task)).Task, cancellationToken);
}
@@ -81,8 +80,9 @@ private static Task<TResult> CatchImpl<TResult>(this Task task, Func<Task<TResul
{
return TaskHelpers.Canceled<TResult>();
}
+
if (task.Status == TaskStatus.RanToCompletion)
- {
+ {
TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
tcs.TrySetFromTask(task);
return tcs.Task;
@@ -255,6 +255,7 @@ internal static Task Finally(this Task task, Action continuation)
}
catch (Exception ex)
{
+ MarkExceptionsObserved(task);
return TaskHelpers.FromError(ex);
}
}
@@ -280,6 +281,7 @@ internal static Task<TResult> Finally<TResult>(this Task<TResult> task, Action c
}
catch (Exception ex)
{
+ MarkExceptionsObserved(task);
return TaskHelpers.FromError<TResult>(ex);
}
}
@@ -308,14 +310,23 @@ private static Task<TResult> FinallyImplContinuation<TResult>(Task task, Action
}
catch (Exception ex)
{
+ MarkExceptionsObserved(innerTask);
tcs.SetException(ex);
}
}, state: null);
}
else
{
- continuation();
- tcs.TrySetFromTask(innerTask);
+ try
+ {
+ continuation();
+ tcs.TrySetFromTask(innerTask);
+ }
+ catch (Exception ex)
+ {
+ MarkExceptionsObserved(innerTask);
+ tcs.SetException(ex);
+ }
}
return tcs.Task;
@@ -381,6 +392,21 @@ private static Action<Task> GetRethrowWithNoStackLossDelegate()
}
/// <summary>
+ /// Marks a Task as "exception observed". The Task is required to have been completed first.
+ /// </summary>
+ /// <remarks>
+ /// Useful for 'finally' clauses, as if the 'finally' action throws we'll propagate the new
+ /// exception and lose track of the inner exception.
+ /// </remarks>
+ [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "unused", Justification = "We only call the property getter for its side effect; we don't care about the value.")]
+ private static void MarkExceptionsObserved(this Task task)
+ {
+ Contract.Assert(task.IsCompleted);
+
+ Exception unused = task.Exception;
+ }
+
+ /// <summary>
/// Calls the given continuation, after the given task has completed, if the task successfully ran
/// to completion (i.e., was not cancelled and did not fault).
/// </summary>
View
92 test/System.Web.Http.Test/Common/TaskHelpersExtensionsTest.cs
@@ -628,6 +628,52 @@ public Task Finally_NoInputValue_CompletedTaskOfSuccess_RunsOnSameThreadAndDoesN
}
[Fact, ForceGC, PreserveSyncContext]
+ public void Finally_CompletedTaskOfFault_ExceptionInFinally()
+ {
+ Exception exception1 = new InvalidOperationException("From source");
+ Exception exception2 = new InvalidOperationException("FromFinally");
+
+ // When the finally clause throws, that's the exception that propagates.
+ // Still ensure that the original exception from the try block is observed.
+
+ // Act
+ Task faultedTask = TaskHelpers.FromError(exception1);
+ Task t =faultedTask.Finally(() => { throw exception2; });
+
+ // Assert
+ Assert.True(t.IsFaulted);
+ Assert.IsType<AggregateException>(t.Exception);
+ Assert.Equal(1, t.Exception.InnerExceptions.Count);
+ Assert.Equal(exception2, t.Exception.InnerException);
+ }
+
+ [Fact, ForceGC, PreserveSyncContext]
+ public Task Finally_IncompletedTask_ExceptionInFinally()
+ {
+ Exception exception1 = new InvalidOperationException("From source");
+ Exception exception2 = new InvalidOperationException("FromFinally");
+
+ // Like test Finally_CompletedTaskOfFault_ExceptionInFinally, but exercises when the original task doesn't complete synchronously
+
+ // Act
+ Task incompleteTask = new Task(() => { throw exception1; });
+ Task t = incompleteTask.Finally(() => { throw exception2; });
+
+ incompleteTask.Start();
+
+ // Assert
+ return t.ContinueWith(prevTask =>
+ {
+ Assert.Equal(t, prevTask);
+
+ Assert.True(t.IsFaulted);
+ Assert.IsType<AggregateException>(t.Exception);
+ Assert.Equal(1, t.Exception.InnerExceptions.Count);
+ Assert.Equal(exception2, t.Exception.InnerException);
+ });
+ }
+
+ [Fact, ForceGC, PreserveSyncContext]
public Task Finally_NoInputValue_CompletedTaskOfCancellation_RunsOnSameThreadAndDoesNotPostToSynchronizationContext()
{
// Arrange
@@ -791,6 +837,52 @@ public Task Finally_WithInputValue_CompletedTaskOfSuccess_RunsOnSameThreadAndDoe
}
[Fact, ForceGC, PreserveSyncContext]
+ public void Finally_WithInputValue_CompletedTaskOfFault_ExceptionInFinally()
+ {
+ Exception exception1 = new InvalidOperationException("From source");
+ Exception exception2 = new InvalidOperationException("FromFinally");
+
+ // When the finally clause throws, that's the exception that propagates.
+ // Still ensure that the original exception from the try block is observed.
+
+ // Act
+ Task<int> faultedTask = TaskHelpers.FromError<int>(exception1);
+ Task<int> t = faultedTask.Finally(() => { throw exception2; });
+
+ // Assert
+ Assert.True(t.IsFaulted);
+ Assert.IsType<AggregateException>(t.Exception);
+ Assert.Equal(1, t.Exception.InnerExceptions.Count);
+ Assert.Equal(exception2, t.Exception.InnerException);
+ }
+
+ [Fact, ForceGC, PreserveSyncContext]
+ public Task Finally_WithInputValue_IncompletedTask_ExceptionInFinally()
+ {
+ Exception exception1 = new InvalidOperationException("From source");
+ Exception exception2 = new InvalidOperationException("FromFinally");
+
+ // Like test Finally_WithInputValue_CompletedTaskOfFault_ExceptionInFinally, but exercises when the original task doesn't complete synchronously
+
+ // Act
+ Task<int> incompleteTask = new Task<int>(() => { throw exception1; });
+ Task<int> t = incompleteTask.Finally(() => { throw exception2; });
+
+ incompleteTask.Start();
+
+ // Assert
+ return t.ContinueWith(prevTask =>
+ {
+ Assert.Equal(t, prevTask);
+
+ Assert.True(t.IsFaulted);
+ Assert.IsType<AggregateException>(t.Exception);
+ Assert.Equal(1, t.Exception.InnerExceptions.Count);
+ Assert.Equal(exception2, t.Exception.InnerException);
+ });
+ }
+
+ [Fact, ForceGC, PreserveSyncContext]
public Task Finally_WithInputValue_CompletedTaskOfCancellation_RunsOnSameThreadAndDoesNotPostToSynchronizationContext()
{
// Arrange
View
185 test/System.Web.Http.Test/Common/TaskHelpersTest.cs
@@ -307,191 +307,6 @@ private static IEnumerable<Task> SyncContextVerifyingEnumerable(SynchronizationC
}
// -----------------------------------------------------------------
- // Task<IEnumerable<T>> TaskHelpers.Iterate(IEnumerable<Task<T>>)
-
- [Fact]
- public void Iterate_Generic_IfProvidedEnumerationContainsNullValue_ReturnsFaultedTaskWithNullReferenceException()
- {
- List<string> log = new List<string>();
-
- Task<IEnumerable<object>> result = TaskHelpers.Iterate(NullTaskEnumerable_Generic(log));
-
- Assert.NotNull(result);
- result.WaitUntilCompleted();
- Assert.Equal(TaskStatus.Faulted, result.Status);
- Assert.IsType<NullReferenceException>(result.Exception.GetBaseException());
- }
-
- private static IEnumerable<Task<object>> NullTaskEnumerable_Generic(List<string> log)
- {
- log.Add("first");
- yield return null;
- log.Add("second");
- }
-
- [Fact]
- public void Iterate_Generic_IfProvidedEnumerationThrowsException_ReturnsFaultedTask()
- {
- List<string> log = new List<string>();
- Exception exception = new Exception();
-
- Task<IEnumerable<object>> result = TaskHelpers.Iterate(ThrowingTaskEnumerable_Generic(exception, log));
-
- Assert.NotNull(result);
- result.WaitUntilCompleted();
- Assert.Equal(TaskStatus.Faulted, result.Status);
- Assert.Same(exception, result.Exception.InnerException);
- Assert.Equal(new[] { "first" }, log.ToArray());
- }
-
- private static IEnumerable<Task<object>> ThrowingTaskEnumerable_Generic(Exception e, List<string> log)
- {
- log.Add("first");
- bool a = true; // work around unreachable code warning
- if (a) throw e;
- log.Add("second");
- yield return null;
- }
-
- [Fact]
- public void Iterate_Generic_IfProvidedEnumerableExecutesCancellingTask_ReturnsCanceledTaskAndHaltsEnumeration()
- {
- List<string> log = new List<string>();
-
- Task<IEnumerable<object>> result = TaskHelpers.Iterate(CanceledTaskEnumerable_Generic(log));
-
- Assert.NotNull(result);
- result.WaitUntilCompleted();
- Assert.Equal(TaskStatus.Canceled, result.Status);
- Assert.Equal(new[] { "first" }, log.ToArray());
- }
-
- private static IEnumerable<Task<object>> CanceledTaskEnumerable_Generic(List<string> log)
- {
- log.Add("first");
- yield return TaskHelpers.Canceled<object>();
- log.Add("second");
- }
-
- [Fact]
- public void Iterate_Generic_IfProvidedEnumerableExecutesFaultingTask_ReturnsCanceledTaskAndHaltsEnumeration()
- {
- List<string> log = new List<string>();
- Exception exception = new Exception();
-
- Task<IEnumerable<object>> result = TaskHelpers.Iterate(FaultedTaskEnumerable_Generic(exception, log));
-
- Assert.NotNull(result);
- result.WaitUntilCompleted();
- Assert.Equal(TaskStatus.Faulted, result.Status);
- Assert.Same(exception, result.Exception.InnerException);
- Assert.Equal(new[] { "first" }, log.ToArray());
- }
-
- private static IEnumerable<Task<object>> FaultedTaskEnumerable_Generic(Exception e, List<string> log)
- {
- log.Add("first");
- yield return TaskHelpers.FromError<object>(e);
- log.Add("second");
- }
-
- [Fact]
- public void Iterate_Generic_ExecutesNextTaskOnlyAfterPreviousTaskSucceeded()
- {
- Task<IEnumerable<int>> result = TaskHelpers.Iterate(SuccessTaskEnumerable_Generic());
-
- Assert.NotNull(result);
- result.WaitUntilCompleted();
- Assert.Equal(TaskStatus.RanToCompletion, result.Status);
- Assert.Equal(new[] { 42, 2112 }, result.Result);
- }
-
- private static IEnumerable<Task<int>> SuccessTaskEnumerable_Generic()
- {
- yield return Task.Factory.StartNew(() => 42);
- yield return Task.Factory.StartNew(() => 2112);
- }
-
- [Fact]
- public void Iterate_Generic_TasksRunSequentiallyRegardlessOfExecutionTime()
- {
- List<string> log = new List<string>();
-
- Task<IEnumerable<object>> task = TaskHelpers.Iterate(TasksWithVaryingDelays_Generic(log, 100, 1, 50, 2));
-
- task.WaitUntilCompleted();
- Assert.Equal(TaskStatus.RanToCompletion, task.Status);
- Assert.Equal(new[] { "ENTER: 100", "EXIT: 100", "ENTER: 1", "EXIT: 1", "ENTER: 50", "EXIT: 50", "ENTER: 2", "EXIT: 2" }, log);
- }
-
- private static IEnumerable<Task<object>> TasksWithVaryingDelays_Generic(List<string> log, params int[] delays)
- {
- foreach (int delay in delays)
- yield return Task.Factory.StartNew(timeToSleep =>
- {
- log.Add("ENTER: " + timeToSleep);
- Thread.Sleep((int)timeToSleep);
- log.Add("EXIT: " + timeToSleep);
- return (object)null;
- }, delay);
- }
-
- [Fact]
- public void Iterate_Generic_StopsTaskIterationIfCancellationWasRequested()
- {
- List<string> log = new List<string>();
- CancellationTokenSource cts = new CancellationTokenSource();
-
- var result = TaskHelpers.Iterate(CancelingTaskEnumerable_Generic(log, cts), cts.Token);
-
- Assert.NotNull(result);
- result.WaitUntilCompleted();
- Assert.Equal(TaskStatus.Canceled, result.Status);
- Assert.Equal(
- new[] { "first", "Executing first task. Log size: 1" },
- log.ToArray());
- }
-
- private static IEnumerable<Task<object>> CancelingTaskEnumerable_Generic(List<string> log, CancellationTokenSource cts)
- {
- log.Add("first");
- yield return Task.Factory.StartNew(() =>
- {
- log.Add("Executing first task. Log size: " + log.Count);
- cts.Cancel();
- return (object)null;
- });
- log.Add("second");
- yield return Task.Factory.StartNew(() =>
- {
- log.Add("Executing second task. Log size: " + log.Count);
- return (object)null;
- });
- }
-
- [Fact, PreserveSyncContext]
- public Task Iterate_Generic_IteratorRunsInSynchronizationContext()
- {
- ThreadPoolSyncContext sc = new ThreadPoolSyncContext();
- SynchronizationContext.SetSynchronizationContext(sc);
-
- return TaskHelpers.Iterate(SyncContextVerifyingEnumerable_Generic(sc)).Then(result =>
- {
- Assert.Same(sc, SynchronizationContext.Current);
- Assert.Equal(new[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, result);
- });
- }
-
- private static IEnumerable<Task<int>> SyncContextVerifyingEnumerable_Generic(SynchronizationContext sc)
- {
- for (int i = 0; i < 10; i++)
- {
- Assert.Same(sc, SynchronizationContext.Current);
- yield return TaskHelpers.FromResult(i);
- }
- }
-
- // -----------------------------------------------------------------
// TaskHelpers.TrySetFromTask<T>
[Fact]

0 comments on commit 0602800

Please sign in to comment.
Something went wrong with that request. Please try again.