Skip to content

Commit

Permalink
Add cancellation support.
Browse files Browse the repository at this point in the history
  • Loading branch information
kentcb committed Mar 19, 2016
1 parent f3ea193 commit 398ebec
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 3 deletions.
23 changes: 23 additions & 0 deletions ReactiveUI.Tests/ReactiveCommandTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,29 @@ public void ExecuteAsyncTicksAnyLambdaException()
Assert.IsType<InvalidOperationException>(exception);
}

[Fact]
public void ExecuteAsyncCanBeCancelled()
{
(new TestScheduler()).With(sched => {
var execute = Observable.Return(Unit.Default).Delay(TimeSpan.FromSeconds(1), sched);
var fixture = ReactiveCommand.CreateFromObservable(() => execute, outputScheduler: sched);
var executed = fixture
.CreateCollection();
var sub1 = fixture.ExecuteAsync().Subscribe();
var sub2 = fixture.ExecuteAsync().Subscribe();
sched.AdvanceByMs(999);
Assert.True(fixture.IsExecuting.FirstAsync().Wait());
Assert.Empty(executed);
sub1.Dispose();
sched.AdvanceByMs(2);
Assert.Equal(1, executed.Count);
Assert.False(fixture.IsExecuting.FirstAsync().Wait());
});
}

[Fact]
public void ExceptionsAreDeliveredOnOutputScheduler()
{
Expand Down
130 changes: 127 additions & 3 deletions ReactiveUI/ReactiveCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace ReactiveUI
{
/// <summary>
/// Encapsulates a user interaction behind a reactive interface.
/// Encapsulates a user action behind a reactive interface.
/// </summary>
/// <remarks>
/// <para>
Expand Down Expand Up @@ -264,6 +264,35 @@ public static ReactiveCommand<Unit, TResult> CreateFromTask<TResult>(
outputScheduler);
}

/// <summary>
/// Creates a parameterless, cancellable <see cref="ReactiveCommand{TParam, TResult}"/> with asynchronous execution logic.
/// </summary>
/// <param name="executeAsync">
/// Provides a <see cref="Task"/> representing the command's asynchronous execution logic.
/// </param>
/// <param name="canExecute">
/// An optional observable that dictates the availability of the command for execution.
/// </param>
/// <param name="outputScheduler">
/// An optional scheduler that is used to surface events. Defaults to <c>RxApp.MainThreadScheduler</c>.
/// </param>
/// <returns>
/// The <c>ReactiveCommand</c> instance.
/// </returns>
/// <typeparam name="TResult">
/// The type of the command's result.
/// </typeparam>
public static ReactiveCommand<Unit, TResult> CreateFromTask<TResult>(
Func<CancellationToken, Task<TResult>> executeAsync,
IObservable<bool> canExecute = null,
IScheduler outputScheduler = null)
{
return CreateFromObservable(
() => Observable.StartAsync(ct => executeAsync(ct)),
canExecute,
outputScheduler);
}

/// <summary>
/// Creates a parameterless <see cref="ReactiveCommand{TParam, TResult}"/> with asynchronous execution logic.
/// </summary>
Expand All @@ -290,6 +319,32 @@ public static ReactiveCommand<Unit, Unit> CreateFromTask(
outputScheduler);
}

/// <summary>
/// Creates a parameterless, cancellable <see cref="ReactiveCommand{TParam, TResult}"/> with asynchronous execution logic.
/// </summary>
/// <param name="executeAsync">
/// Provides a <see cref="Task"/> representing the command's asynchronous execution logic.
/// </param>
/// <param name="canExecute">
/// An optional observable that dictates the availability of the command for execution.
/// </param>
/// <param name="outputScheduler">
/// An optional scheduler that is used to surface events. Defaults to <c>RxApp.MainThreadScheduler</c>.
/// </param>
/// <returns>
/// The <c>ReactiveCommand</c> instance.
/// </returns>
public static ReactiveCommand<Unit, Unit> CreateFromTask(
Func<CancellationToken, Task> executeAsync,
IObservable<bool> canExecute = null,
IScheduler outputScheduler = null)
{
return CreateFromObservable(
() => Observable.StartAsync(ct => executeAsync(ct)),
canExecute,
outputScheduler);
}

/// <summary>
/// Creates a <see cref="ReactiveCommand{TParam, TResult}"/> with asynchronous execution logic that takes a parameter of type <typeparamref name="TParam"/>.
/// </summary>
Expand Down Expand Up @@ -354,6 +409,38 @@ public static ReactiveCommand<TParam, TResult> CreateFromTask<TParam, TResult>(
outputScheduler);
}

/// <summary>
/// Creates a <see cref="ReactiveCommand{TParam, TResult}"/> with asynchronous, cancellable execution logic that takes a parameter of type <typeparamref name="TParam"/>.
/// </summary>
/// <param name="executeAsync">
/// Provides a <see cref="Task"/> representing the command's asynchronous execution logic.
/// </param>
/// <param name="canExecute">
/// An optional observable that dictates the availability of the command for execution.
/// </param>
/// <param name="outputScheduler">
/// An optional scheduler that is used to surface events. Defaults to <c>RxApp.MainThreadScheduler</c>.
/// </param>
/// <returns>
/// The <c>ReactiveCommand</c> instance.
/// </returns>
/// <typeparam name="TParam">
/// The type of the parameter passed through to command execution.
/// </typeparam>
/// <typeparam name="TResult">
/// The type of the command's result.
/// </typeparam>
public static ReactiveCommand<TParam, TResult> CreateFromTask<TParam, TResult>(
Func<TParam, CancellationToken, Task<TResult>> executeAsync,
IObservable<bool> canExecute = null,
IScheduler outputScheduler = null)
{
return CreateFromObservable<TParam, TResult>(
param => Observable.StartAsync(ct => executeAsync(param, ct)),
canExecute,
outputScheduler);
}

/// <summary>
/// Creates a <see cref="ReactiveCommand{TParam, TResult}"/> with asynchronous execution logic that takes a parameter of type <typeparamref name="TParam"/>.
/// </summary>
Expand Down Expand Up @@ -383,6 +470,35 @@ public static ReactiveCommand<TParam, Unit> CreateFromTask<TParam>(
outputScheduler);
}

/// <summary>
/// Creates a <see cref="ReactiveCommand{TParam, TResult}"/> with asynchronous, cancellable execution logic that takes a parameter of type <typeparamref name="TParam"/>.
/// </summary>
/// <param name="executeAsync">
/// Provides a <see cref="Task"/> representing the command's asynchronous execution logic.
/// </param>
/// <param name="canExecute">
/// An optional observable that dictates the availability of the command for execution.
/// </param>
/// <param name="outputScheduler">
/// An optional scheduler that is used to surface events. Defaults to <c>RxApp.MainThreadScheduler</c>.
/// </param>
/// <returns>
/// The <c>ReactiveCommand</c> instance.
/// </returns>
/// <typeparam name="TParam">
/// The type of the parameter passed through to command execution.
/// </typeparam>
public static ReactiveCommand<TParam, Unit> CreateFromTask<TParam>(
Func<TParam, CancellationToken, Task> executeAsync,
IObservable<bool> canExecute = null,
IScheduler outputScheduler = null)
{
return CreateFromObservable<TParam, Unit>(
param => Observable.StartAsync(ct => executeAsync(param, ct)),
canExecute,
outputScheduler);
}

/// <summary>
/// Creates a <see cref="CombinedReactiveCommand{TParam, TResult}"/> that composes all the provided child commands.
/// </summary>
Expand Down Expand Up @@ -691,7 +807,9 @@ public override IDisposable Subscribe(IObserver<TResult> observer)
try {
return this
.executeAsync(parameter)
.Do(result => this.synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateResult(result)))
.Do(
result => this.synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateResult(result)),
() => this.synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateEnded()))
.Catch<TResult, Exception>(ex => {
this.synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateFail());
exceptions.OnNext(ex);
Expand All @@ -716,7 +834,8 @@ private enum ExecutionDemarcation
{
Begin,
EndWithResult,
EndWithException
EndWithException,
Ended
}

private struct ExecutionInfo
Expand Down Expand Up @@ -754,6 +873,11 @@ public static ExecutionInfo CreateFail()
{
return new ExecutionInfo(ExecutionDemarcation.EndWithException, default(TResult));
}

public static ExecutionInfo CreateEnded()
{
return new ExecutionInfo(ExecutionDemarcation.Ended, default(TResult));
}
}
}

Expand Down

0 comments on commit 398ebec

Please sign in to comment.