Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] IsExecuting becomes False when cancellation is requested instead of task actually finishes #2153

Closed
dtoppani-twist opened this issue Aug 29, 2019 · 14 comments

Comments

@dtoppani-twist
Copy link

dtoppani-twist commented Aug 29, 2019

Platform: Windows/WPF
ReactiveUI 9.19.0.0
ReactiveUI.WPF 9.19.0.0

It appears the behavior of IsExecuting is not completely accurate. Assuming the command:

SomeCommand = ReactiveCommand.CreateFromTask(async (token) =>
            {
                Debug.WriteLine("started command");
                try
                {
                    await Task.Delay(10000, token);
                }
                catch (OperationCanceledException)
                {
                    Debug.WriteLine("starting cancelling command");
                    await Task.Delay(5000);
                    Debug.WriteLine("finished cancelling  command");
                    throw;
                }
                Debug.WriteLine("finished command");
            });

            SomeCommand.IsExecuting.Subscribe(isExecuting =>
            {
                Debug.WriteLine($"command executing = {isExecuting}");
            });

This simulates a task that needs to do some cleanup work after a cancellation was requested. While this cleanup work is happening, IsExecuting should still be true, because the task is still running and the command should not allow execution until cleanup is completed

And the code to test the command:

  1. executing the command:
    disposable = SomeCommand.Execute().Subscribe();

  2. cancelling the command while the command runs:
    disposable.Dispose();

This is the debug output:
started command
command executing = True
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll
starting cancelling command
command executing = False
finished cancelling command
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll

IsExecuting becomes False as soon as the command cancellation is requested, instead it should stay True for an extra 5000ms. The debug output should be:

started command
command executing = True
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll
starting cancelling command
finished cancelling command
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll
command executing = False

I am trying to trace the code inside ReactiveCommand but still don't understand why this would happen.

@open-collective-bot
Copy link

open-collective-bot bot commented Aug 29, 2019

Hey @dtoppani-twist 👋,

Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider contributing financially.

https://opencollective.com/reactiveui

PS.: We offer priority support for all financial contributors. Don't forget to add priority label once you start contributing 😄

An advanced, composable, functional reactive model-view-viewmodel framework for all .NET platforms!

@glennawatson
Copy link
Contributor

Can you please add details that were removed from the issue template such as platform, which UI platform you are developing for etc. Thanks

@dtoppani-twist
Copy link
Author

Looking at ReactiveCommand source code I am trying to reduce the problem - I hope it can helps isolate the bug faster.

This creates the same observable from ReactiveCommand Execute but using Simple Debug outputs

private IObservable<Unit> MakeObs(Func<Unit, IObservable<Unit>> execute)
        {
            return Observable
                    .Defer(
                        () =>
                        {
                            Debug.WriteLine("begin");
                            return Observable.Empty<Unit>();
                        })
                    .Concat(execute(Unit.Default))
                    .Do(result => Debug.WriteLine("create result"))
                    .Catch<Unit, Exception>(
                        ex =>
                        {

                            Debug.WriteLine("exception");
                            return Observable.Throw<Unit>(ex);
                        })
                    .Finally(() => Debug.WriteLine("end"))
                    .PublishLast()
                    .RefCount()
                    .ObserveOn(RxApp.MainThreadScheduler);
        }

And this is the code to try it:

ob = MakeObs(u => Observable.StartAsync(async (token) =>
            {
                try
                {
                    Debug.WriteLine("start");
                    await Task.Delay(5000, token);
                    Debug.WriteLine("end");
                }
                catch (OperationCanceledException)
                {
                    Debug.WriteLine("start cancel");
                    await Task.Delay(2000);
                    Debug.WriteLine("end cancel");
                    throw;
                }
            }));

            disposable = ob.Subscribe();

The cancellation is done by running:

disposable.Dispose();

The debug output is:

start
begin
end
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll
start cancel
end cancel
Exception thrown: 'System.Threading.Tasks.TaskCanceledException' in mscorlib.dll

AFAICT IsExecuting is calculated based on the number of "begin" and "end" markers. The "end" marker is generated on cancellation instead of when the task is actually completed.

Observable.Finally is supposed to happen when the observable completes or throws an exception, so theoretically the "end" marker should happen after Concat(execute) completes. So far the code looks right but it still does not behave as expected...

@dtoppani-twist
Copy link
Author

dtoppani-twist commented Aug 30, 2019

I think Observable.Finally is also triggered when the last subscription is disposed, which would explain the bug.

Observable.Finally documentation does not specifically states this is the case, but the source code relies on the disposal of the observable, which happens automatically on termination, so a user driven disposal would also trigger Finally to execute

In a way, this makes sense because if the observable is disposed, Finally has to run at this point since there won't be another chance to do so.

@dtoppani-twist
Copy link
Author

I can go around the limitation with a special version of ReactiveCommand implementation for tasks - the task is wrapped with a try/finally and the "end" marker is moved there instead of being on the observable

_execute = new Func<Unit, IObservable<Unit>>((u) => Observable.StartAsync(async (token) => {
                try
                {
                    await execute(token);
                }
                finally
                {
                    _synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateEnd());
                }
                }));

If the task is transformed to an observable, there is no obvious way to fix IsExecuting behavior

@ChrisPulman
Copy link
Member

We are trying to come up with a resolve for this but the root cause is in the System.Reactive code
dotnet/reactive#1256
This issue has been outstanding for quite some time.

@idg10
Copy link

idg10 commented Apr 4, 2023

A basic tenet of Rx is that once you have called Dispose on a subscription, you cannot rely on receiving any further notifications. While it's true that in the window between you calling Dispose and Dispose returning, you might receive notifications, you cannot rely on this. (And in practice, Rx typically shuts off the delivery of notifications for a subscription very early on when you call Dispose.)

This means that there is a problem with trying to use Observable.FromAsync in the way ReactiveCommand.CreateFromTask tries to use it. The basic issue is that ReactiveCommand.CreateFromTask is attempting to use the IObservable<T> returned by Observable.FromAsync in two ways:

  1. as the means of triggering cancellation
  2. to discover what the task did after cancellation has been triggered

The fundamental problem here is that 1 works by calling Dispose on the very same source that 2 relies on for getting notifications from the source after the call to Dispose. (And the reason you are relying on getting notifications after the call to disposal is that cancellation is asynchronous. It could take a long time to finish—in the repro in this issue it takes 5 seconds for cancellation to be fully handled.) This violates the fundamental tenet that from the instant that you have initiated the process of unsubscribing from a source by calling Dispose, the source is free to stop sending you notifications immediately, and must stop sending them once it returns from Dispose.

Furthermore, the problem being reported in dotnet/reactive#1256 is really about something slightly different. The problem you are describing here is that you aren't getting enough information about the post-cancellation outcome of the task, whereas Rx issue 1256 is actually the opposite: it's a complaint that Rx produces too much information, and the request is for Rx to ignore failures that occur after cancellation. More specifically, failures of that kind are currently reported globally as unobserved exceptions, but if you look in the main text of the issue itself you'll find this:

no subscription means we are no longer interested in errors and we shouldn't get them anywhere.

In other words, the person who submitted that issue is saying that we need to be more comprehensive in our application of the fundamental tenet that when you unsubscribe from a source, you are making a statement that you no longer want to hear anything from that source.

@bartdesmet proposed a solution to that Rx issue which entailed:

an optional flag (which defaults to the current behavior, as some people may rely on handling these exceptions "globally", so one would have to opt in to such an ignoreExceptionsAfterDispose flag)

That is entirely in the spirit of what the original poster asked for. And it doesn't help you in the slightest.

I don't believe the problems you've described here can be fixed with any adjustments to the behaviour of the existing Observable.FromAsync methods (at least, not without violating some fundamental assumptions that Rx has embodied for over a decade).

I think what you actually need is a completely different mechanism. I think what you need is something with signatures more like this:

public static IObservable<(IObservable<Unit> Result, Action Cancel)> FromAsyncWithCancel(
    Func<CancellationToken, Task> actionAsync);

public static IObservable<(IObservable<TResult> Result, Action Cancel)> FromAsyncWithCancel<TResult>(
    Func<CancellationToken, Task<TResult>> actionAsync);

My thinking here is that the root cause of the problems you're describing here are that you are attempting to use a single IObservable<T> for two different purposes, and that those purposes are fundamentally incompatible. The idea with the method signature above is that it splits the two features you need into two different pieces.

Each time you subscribe to an observable source representing a task, you need an IObservable<T> that enables you to discover the outcome of the underlying task. And you also need some mechanism by which to cancel that task without also unsubscribing from the source that's going to provide you with the outcome. The problems described in the present issue are caused by the fact that these two things are not separate: it's not possible for you cancel the underlying task without also unsubscribing from the observable source that was going to keep you informed about the task's progress.

The signatures above might look overcomplicated. Why do we need nested observables? I did initially think of this:

// Won't work for multiple invocations
public static (IObservable<Unit> Result, Action Cancel) FromAsyncWithCancel(
    Func<CancellationToken, Task> actionAsync);

That's simpler, but the problem is that you get only a single cancellation callback. That's no good, because a feature of the observables you get back from FromAsync is that if you subscribe to them multiple times, it invokes the callback each time, meaning that if you want, you can return a new task every time. Each one of those should get its own cancellation token. The problem with this simplified signature is that once you've used cancellation once, you've essentially pre-cancelled all subsequent tasks.

So that won't work.

That's why I'm proposing the more complex design. It resembles the existing design in which subscribing to the observable source representing the task will never produce more than one item. The change is that instead of getting a notification after the task is done, this observable now produces a single notification to tell you that the work has begun, and at the same time it hands you a separate IObservable<TResult> that enables you to discover the outcome, along with a callback you can use to cancel the method.

(Aside: it did occur to me that we could actually just return an IObservable<IObservable<TResult>>, and to decide by convention that unsubscribing from the outer observable has the effect of requesting cancellation, and that you could remain subscribed to the inner observable to get the outcome. But this struck me as unnecessarily obscure, and easy to get wrong.)

The crucial difference is that you would now have a way to cancel the underlying task without also unsubscribing from notifications that would have kept you information about the task's progress.

This shows an outline of how you could use this to provide the behaviour you want:

(UPDATE 2023/06/02: I've now created a complete illustration at #3556 - it's not quite the same as the code shown here, because I there were a few details I hadn't considered when I sketched this out back in April.)

public ReactiveCommand
{
    public static ReactiveCommand<Unit, Unit> CreateFromTask(
        Func<CancellationToken, Task> execute,
        IObservable<bool>? canExecute = null,
        IScheduler? outputScheduler = null)
    {
        if (execute is null)
        {
            throw new ArgumentNullException(nameof(execute));
        }

        return CreateFromObservable(() => ObservableEx.FromAsyncWithCancel(execute), canExecute, outputScheduler);
    }

    public static ReactiveCommand<Unit, TResult> CreateFromObservable<TResult>(
        Func<IObservable<(IObservable<TResult> Result, Action Cancel)>> execute,
        IObservable<bool>? canExecute = null,
        IScheduler? outputScheduler = null)
    {
        if (execute is null)
        {
            throw new ArgumentNullException(nameof(execute));
        }

        return new ReactiveCommand<Unit, TResult>(
            _ => execute(),
            canExecute ?? Observable.Return(true),
            outputScheduler);
    }}
}

public ReactiveCommand<TParam, TResult>
{
    private readonly Subject<Exception> _exceptions = new();
    private readonly ISubject<ExecutionInfo, ExecutionInfo> _synchronizedExecutionInfo;
    private readonly Func<TParam, IObservable<(IObservable<TResult> Result, Action Cancel)>> _execute;
    private readonly IObservable<bool> _isExecuting;
    private readonly IObservable<bool> _canExecute;

    public ReactiveCommand(
        Func<TParam, IObservable<(IObservable<TResult> Result, Action Cancel)>> execute,
        IObservable<bool> canExecute,
        IScheduler? scheduler)
    {
        ArgumentNullException.ThrowIfNull(canExecute);
        _execute = execute;

        Subject<ExecutionInfo> executionInfoSubject = new();
        _synchronizedExecutionInfo = Subject.Synchronize(executionInfoSubject, scheduler ?? Scheduler.Default);

        _isExecuting = _synchronizedExecutionInfo
            .Scan(
                0,
                (acc, next) =>
                {
                    return next.Demarcation switch
                    {
                        ExecutionDemarcation.Begin => acc + 1,
                        ExecutionDemarcation.End => acc - 1,
                        _ => acc
                    };
                })
            .Select(inFlightCount => inFlightCount > 0)
            .StartWith(false)
            .DistinctUntilChanged()
            .Replay(1)
            .RefCount();

        _canExecute = canExecute
            .Catch<bool, Exception>(
                ex =>
                {
                    _exceptions.OnNext(ex);
                    return Observable.Return(false);
                })
            .StartWith(false)
            .CombineLatest(_isExecuting, (canEx, isEx) => canEx && !isEx)
            .DistinctUntilChanged()
            .Replay(1)
            .RefCount();

    }

    public IObservable<bool> IsExecuting => _isExecuting;

    public IObservable<TResult> Execute()
    {
        IObservable<(IObservable<TResult> Result, Action Cancel)> sourceAndCancellation = _execute(default!);

        return Observable.Defer(
            () =>
            {
                // We want to subscribe to the sourceAndCancellation, and we essentially need to
                // make the nested IObservable<TResult> that it returns our effective result.
                // However, we need to ensure that we don't unsubscribe from the underlying source until it's done,
                // so we can't just return it directly. Moreover, the IObservable we return must not
                // be the only thing subscribed to that source.
                // We essentially want the part that sends notifications to the
                // _synchronizedExecutionInfo to survive until the source says its done, even
                // if the code that subscribed to the IObservable<TResult> that we returned
                // decides to cancel the operation by unsubscribing early. This means that the
                // observable we return must be distinct from the subscription that makes
                // IsExecutable and Exceptions work so that the caller can be Dispose the
                // subscription we return without us losing the notifications that keep us
                // informed about the progress of the underlying task.

                IConnectableObservable<(IObservable<TResult> Result, Action Cancel)> sharedSource =
                    sourceAndCancellation.PublishLast();

                // This enables separation between our internal subscription that we
                // use to monitor the progress of the task, and the caller's subscription.
                Subject<TResult> executeResult = new();

                _synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateBegin());
                sharedSource
                    .SelectMany(sourceAndCancellation => sourceAndCancellation.Result)
                    .Do(result =>
                    {
                        _synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateResult(result));
                        executeResult.OnNext(result);
                    })
                    .Catch<TResult, Exception>(
                        ex =>
                        {
                            _exceptions.OnNext(ex);
                            return Observable.Throw<TResult>(ex);
                        })
                    .Finally(() => _synchronizedExecutionInfo.OnNext(ExecutionInfo.CreateEnd()))
                    .Subscribe();  // Will remain subscribed until the underlying task says it's done.

                // We can't just return the executeResult directly. We need to ensure that if the
                // caller unsubscribes from the observable we return, we trigger cancellation.
                return sharedSource
                    .RefCount()
                    .SelectMany(
                        sourceAndCancellation =>
                        {
                            // It's important that we don't directly subscribe to the source passed in
                            // here, because that would trigger a second execution of the callback that
                            // runs the task.
                            (_, Action cancel) = sourceAndCancellation;
                            return executeResult
                                .Finally(() => cancel());
                        });
            });
    }
}

That's a bit scrappy, and will need some work to get it to production quality, but it does demonstrate the concept. I used that from this program:

Stopwatch sw = Stopwatch.StartNew();

var cmd = RxCommand.CreateFromTask(async (token) =>
{
    Console.WriteLine($"{sw.Elapsed}: started command");
    try
    {
        await Task.Delay(10000, token);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine($"{sw.Elapsed}: starting cancelling command");
        await Task.Delay(5000);
        Console.WriteLine($"{sw.Elapsed}: finished cancelling command");
        throw;
    }
    Console.WriteLine($"{sw.Elapsed}: finished command");
});

cmd.IsExecuting.Subscribe(isExecuting =>
{
    Console.WriteLine($"command executing = {isExecuting}");
});

Console.WriteLine($"{sw.Elapsed}: Starting command");
IDisposable subscription = cmd.Execute().Subscribe(
    _ => { Console.WriteLine($"{sw.Elapsed} Execute.OnNext"); },
    ex => { Console.WriteLine($"{sw.Elapsed} Execute.OnError {ex}"); },
    () => { Console.WriteLine($"{sw.Elapsed} Execute.OnCompleted"); });
await Task.Delay(1000);
Console.WriteLine($"{sw.Elapsed}: Unsubscribing from IObservable returned by Execute");
subscription.Dispose();
Console.WriteLine($"{sw.Elapsed}: Unsubscribed");

Console.ReadLine();

and got the following output:

command executing = False
00:00:00.0349055: Starting command
command executing = True
00:00:00.0506291: started command
00:00:01.0690447: Unsubscribing from IObservable returned by Execute
00:00:01.0706055: Unsubscribed
00:00:01.0754488: starting cancelling command
00:00:06.0860115: finished cancelling command
command executing = False

The critical point here is that the cmd.IsExecuting source doesn't change until after the underlying task has run to completion.

If you agree with me that merely fixing Rx issue 1256 won't help you at all, and that you need a mechanism of this kind, in which cancellation of the underlying task is separated from the observation of the task's progress, then I would ask you to create a new issue in Rx asking for this new feature.

Note that you don't technically need the feature to be built into Rx. The spike I created to do the research behind this message just referenced the Rx libraries and implemented its own FromAsyncWithCancel method as a layer over Rx. So I don't think this is a blocker to you being able to close this and the various other issues that you are currently saying required a fix to Rx issue 1256. (But it might be possible to implement more efficiently as a built in feature.)

@ChrisPulman
Copy link
Member

@idg10 Hi Ian, I know that you have spent much time over the past few days looking into this, I am very grateful for your assistance in getting this issue resolved, as you can see its been open for a while.

Personally I agree that a different function is required due to the underlying functionality of StartAsync not fitting the requirements of ReactiveCommand.CreateFromTask with Cancellation Token.

This event trail is the trail that I understand we require to ensure that every execution of the command returns a value.

Normal Execution run till Task completion

command executing = false
Start Task execution
command executing = true
Task Completes
OnNext - Task Result is returned
OnComplete
Finally
command executing = false

Cancel Execution to Task Cancellation is complete

command executing = false
Start Task execution
command executing = true
Task is Cancelled by disposing the Execute IObservable
Task Cancellation Exception is thrown internally
starting cancelling command
finished cancelling command
OnError with Task Cancellation Exception
Finally
command executing = false

The user may have the command locked for a longer period of time than normal in the event of Task execution cancellation however presently the operation does not return ever due to the cancellation exception being swallowed internally in StartAsync, therefore leaving the command locked out indefinitely.

I am currently in Saudi Arabia, but should return to the UK on Friday evening

@idg10
Copy link

idg10 commented Apr 5, 2023

I'm going to copy out your final sequence of events but with numbers, because I want to ask a question:

  1. command executing = false
  2. Start Task execution
  3. command executing = true
  4. Task is Cancelled by disposing the Execute IObservable
  5. Task Cancellation Exception is thrown internally
  6. starting cancelling command
  7. finished cancelling command
  8. OnError with Task Cancellation Exception
  9. Finally
  10. command executing = false

Which observable is handling the OnError at step 8? Is it the same observable you already unsubscribed by calling Dispose at step 4?

I think it is possible for users of ReactiveCommand to be able to observe delayed completion or errors that occur after cancellation even if cancellation of the command is triggered by calling Dispose on the observable returned by Execute. But it won't be possible to observe these things through that observable, because you burned the relevant subscription by calling Dispose. I think they can only be visible through the command's IsExecuting and ThrownException observables.

If you want the IObservable<T> returned by Execute to be able to report post-cancellation completion (or, should they occur, post-cancellation errors) then you won't also be able to use that IObservable<T> to trigger the cancellation—designs have to make a choice between either enabling cancellation to be triggered by unsubscription, or being able to observe what happened after cancellation was triggered. It's not possible to have both, which is why Observable.FromAsync doesn't and can't do what you want. And your current design prevents post-cancellation notifications from being supplied through the IObservable<T> returned by Execute for exactly the same reason the currently available Observable.FromAsync methods are incompatible with observing anything that happens after you've triggered cancellation, unless you just decide to ignore the rule that unsubscription means that an observer stops receiving events.

Of course nothing stops you from writing an observable source that disobeys the rules. Nothing stops you from writing an observable source that calls OnNext after calling OnCompleted for example, but this will confuse any subscriber that was expecting you to abide by the rules of Rx. Likewise, you could call OnError or OnCompleted on an observer long after the relevant subscription was Disposed, but again, this will confuse any subscriber that was expecting you to abide by the rules of Rx.

This text has been in the Rx Design Guidelines at least as far back as 2010 (my emphasis):

Any work that is already in progress might still complete as it is not always safe to abort work that is in
progress. Results from this work will not be signaled to any previously subscribed observer instances

This is in section 4.4. And although the document technically only defines "guidelines", I'd point out that the grammar rules for calling IObserver<T> methods (section 4.1) are also in here, and I don't think those are realistically up for debate. My view is that all of Section 4 (The Rx Contract) defines rules that are, in practice, immutable. Users of Rx have been relying on the rules in this contract for over a decade.

I don't intend to modify System.Reactive to supply observable sources that break these rules. (That's why I proposed a completely different mechanism from Observable.FromAsync.) I don't believe it would be a good idea for ReactiveUI to break these rules either. I'm happy to continue to pursue the addition of a new feature that wraps tasks in observables in a way that enables cancellation without unsubscription, but if ReactiveUI is going to insist on a design that is incompatible with Rx's rules (e.g., by invoking either OnError or OnCompleted on an observer that has already been unsubscribed) then System.Reactive isn't going to help you with that.

(BTW, I'll be off work all of next week, so I'll be unavailable just as you get back...)

@ChrisPulman
Copy link
Member

Yes its the same Execute observable - In section 5.9 of the Rx Design Guidelines it says that messages can continue to be pushed until either the Dispose (UnSubscribe) method has returned or OnError has been raised or OnCompleted has been raised, after this point no further messages are expected.

In my mind this still gives opportunity for us to push the OnError with a OperationCanceledException and a inner exception containing an AggregateException of any errors from the Task being Executed before the Unsubscribe has completed without breaking these rules.

in very simple terms this is what I am proposing

try
{
	observer.OnNext(await Task.Delay(10000, token));
	observer.OnCompleted();
}
catch (OperationCanceledException ex)
{
	// Disposing
	TaskCleanUpAction();
	observer.OnError(ex);
}

This is basically what happens with Observable.FromAsync

try
{
	observer.OnNext(await Task.Delay(10000, token));
	observer.OnCompleted();
}
catch (Exception)
{
	// Disposing
	// Nothing happens, exceptions are swallowed
}

I understand that in some cases task cancellation may take some time, but giving an optional TimeOut input parameter to the function would allow the end user to control how long they wish to wait before giving up the wait for the Task to clean up, however OnError should still be fired at this point with something like a FailedToCleanUpException as a inner exception as part of the OperationCanceledException.

Modifying Observable.FromAsync to operate this way would be a breaking change so a new function is required to enable the errors to be propagated upon cancellation (Disposal) so your idea of Observable.FromAsyncWithCancel or a similar name would be excellent if it were able to handle the tasks in a way that a result is always returned thereby Catch and Finally can be chained and be triggered thereby allowing the ReactiveCommand.CreateFromTask to operate as expected and as per all other implementations without a cancelation token.

@idg10
Copy link

idg10 commented Apr 19, 2023

The exact wording from section 5.9 is:

Messages can be in flight while calling unsubscribe. These messages can still come through while the call to unsubscribe is in
progress.

The critical point here is that it says "can" and not "will".

And reading through this all again after having been away, I've realised that it might not be completely clear that there are two important dimensions to consider:

  • what can we presume about whether notifications will reach our observers?
  • what can we presume about the state of the underlying process represented by the observable source?

In cases where we remain subscribed until the source completes it's straightforward. Early unsubscription is where it gets complex.

Taking that document as a whole, the implication of sections 4 and 5 is that there are three phases for a subscription:

  1. before you've called Dispose to unsubscribe
  2. after you've called Dispose but before Dispose has returned
  3. after Dispose has returned

And here's what you can presume about the two aspects above (assuming we unsubscribed before the source completed naturally):

Phase Will we get notifications? Is the underlying process still active?
1 Yes Yes
2 Maybe Maybe
3 No Maybe

The implication of both section 4 and 5 is that there are no guarantees in 2. For as long as phase 2 persists, it's possible that you might get notifications (because unsubscription takes some time to complete; 5.9 makes this explicit) but it's also possible that you won't even though the underlying process represented by the observable source might still be in progress (section 4.4 makes this clear). You also can't detect anything about whether the underlying process is complete either in phase 2 or phase 3 (as section 4.4 makes clear). The only way you know anything about the state of the underlying process is if you remain subscribed until the source completes or fails.

Any design that makes assumptions about what will emerge during phase 2 is broken.

That's true whether you presume that notifications will continue to emerge, or you presume that they won't. Both of those presumptions will be wrong at least some of the time. Any design that relies on either presumption will fail at least some of the time.

Furthermore, any design that makes assumptions about the state of the underlying process represented by an IObservable (e.g., a Task as in these discussions) when it unsubscribed before OnError or OnComplete was called is also broken.

That's true because there's no guarantee that the underlying process represented by the observable source has actually been stopped just because you've reached phase 3. (Section 4.4 makes it explicitly clear that work in progress might continue to run even after Dispose returns.)

As it happens, the Rx.NET implementation has always erred heavily on the side of shutting down notifications early: for over a decade it has done that before making any attempt to stop the underlying activity.

If you need to be able to know how a task finished, then you simply cannot use the subscription that achieves this as the mechanism by which you also cancel a task. These are mutually exclusive usage patterns.

That final row of that table above make it clear why you can never achieve both cancellation and notification of completion with a single subscription: once you're in phase 3 you know for certain that you're never receiving any more notifications, but as far as you know, the underlying work might still be in progress. Since completion might not occur until after you've entered phase 3, you're not going to be notified of it.

Now I think you might be still hoping for an implementation in which we a) prevent Dispose from returning until work really has completed and b) build on the leniency that 5.9 offers by stating that particular operators will in fact emit either an OnError or OnCompleted while in phase 2. But there are two major problems with this. First, a) creates lots of exciting new opportunities for deadlocks. (There are reasons Rx currently skews heavily in the opposite direction.) Second, even if one particular bit of Rx were modified to attempt to guarantee to deliver particular notifications during phase 2 (i.e. if we do 'b') all the rest of Rx's plumbing presumes that it's absolutely fine not to do this. As a case in point, the safe observers that Rx typically wraps user-supplied observers with presume that 4.4 always applies. So even if the SlowTaskObservable did what you wanted, it wouldn't help you because you'd be defeated by other bit of Rx plumbing. And even if we somehow special cased it so that you don't get wrapped in a SafeObserver when observing a FromTask observable, it would all break again as soon as you put anything else in the way. (E.g., even if you have a source that goes to extraordinary lengths to ensure that it delivers OnCompleted before Dispose returns, it will all be for nothing as soon as you wrap that in a Where or ObserveOn, or any other Rx operator, because almost all of them take actions that rely on the fact that they're allowed to stop sending you notifications as soon as you've called Dispose.)

idg10 added a commit to idg10/ReactiveUI that referenced this issue Jun 2, 2023
This is a response to reactiveui#2153

For a while now, the ReactiveUI team has asserted that the root cause for this issue (and some other related ones) is that if you ask Rx to wrap a Task<T> as an IObservable<T>, you don't get completion or error notifications if you unsubscribe before the task completes.

This behaviour is by design, but it is problematic in cases where you want to cancel the task, because task cancellation is triggered by unsubscription.

Our view is that if you're using Rx's task wrapping, then you can either get notifications through to completion, or you can unsubscribe early to cancel the task, but you can't do both. (Rx always shuts down notifications as the first act of unsubscription. This is permitted within the rules of IObservable, and is deeply baked into the framework. Although it is not technically illegal to implement an IObservable that continues to produce notifications up until the call to Dispose returns, you shouldn't rely on that, because such behaviour intentionally leaves implementations some wiggle room. Basically, once you've called Dispose, all best are off.)

But since we want to engage constructively with Rx users, we don't just want to say "by design" and leave it at that. So this PR is an illustrative spike showing one way to solve this that doesn't rely on post-Dispose notifications.
@idg10
Copy link

idg10 commented Jun 2, 2023

To demonstrate the viability of what I'm proposing, I've created a spike showing the basic idea at #3556

I'm not proposing we merge that in its current state. It's mainly to:

  • prove that it's possible to use the technique I described earlier in this thread in context, and get all tests passing
  • to provide a way to discuss details of the code

The code has a lot of comments in to explain my thinking, because the point of this PR is to explain the idea, rather than to submit actual code.

If the ReactiveUI team decides that they are happy with this direction, there are a few things we could do as next steps. Firstly, note that this includes an ObservableEx class with a more cancellation-capable alternative to FromTask. I'd be open to adding this to Rx if you believe you would use this method. (You don't strictly need it in Rx though—as this spike illustrates, it's perfectly possible to implement it without System.Reactive baking it in.)

Then there's the code in the spike itself. We could try to hammer it into a shape where you'd be happy to accept the PR. Or you could just use it just as an illustration for developing your own solution.

Then again, there might be something terribly wrong with this approach that I haven't spotted, in which case please let me know and I'll have another think.

ChrisPulman added a commit that referenced this issue Jan 1, 2024
…Task (#3704)

<!-- Please be sure to read the
[Contribute](https://github.com/reactiveui/reactiveui#contribute)
section of the README -->

**What kind of change does this PR introduce?**
<!-- Bug fix, feature, docs update, ... -->

Fix for #1245
Fix for #2153
Fix for #3450

**What is the current behavior?**
<!-- You can also link to an open issue here. -->

ReactiveCommand does not properly support Cancellation tokens properly
for CreateFromTask due to an underlying issue in System.Reactive

**What is the new behavior?**
<!-- If this is a feature change -->

Fix the issues with the base functions within ReactiveCommand due to an
issue with Observable.FromAsync from System.Reactive by using a new
ObservableMixins.FromAsyncWithAllNotifications as the new function, this
extends Observable.FromAsync handling the error bubbling as required.

ObservableMixins.FromAsyncWithAllNotifications can be used to transform
a Cancellation Task into an Observable producing the expected
cancellation, errors and results.

**What might this PR break?**

ReactiveCommand.CreateFromTask will now handle exceptions as expected,
any existing workarounds could be removed once tested with actual
implementation in end users code.

**Please check if the PR fulfills these requirements**
- [x] Tests for the changes have been added (for bug fixes / features)
- [ ] Docs have been added / updated (for bug fixes / features)

**Other information**:

Co-authored-by: @idg10 - created the base code in #3556
@ChrisPulman
Copy link
Member

This should now be resolved, please raise a new issue with any new findings, many thanks
Thanks to @idg10 for creating the base code for the resolve, hopefully we will have a shiny new function in the System.Reactive set of code based on the FromAsyncWithPostCancelNotifications function demonstrated.

Copy link

This issue has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jan 16, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

4 participants