Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finalize is not called at end of pipe, different results with delay #5357

Closed
FritzHerbers opened this issue Mar 19, 2020 · 20 comments
Closed
Labels
bug Confirmed bug

Comments

@FritzHerbers
Copy link

Bug Report

Current Behavior
We noticed that finalize is not called at the end of a pipe, in combination with concatMap.

Reproduction
We could reproduce our production problem when using a delay in the pipe. We are using delayWhen, maybe it has the same side-effect as delay:
https://stackblitz.com/edit/typescript-dhe1y9?file=index.ts

In production we use a very simplified version of the following SB:
https://stackblitz.com/edit/typescript-pf9zt7?file=index.ts

With delay, the finalize is called before next value or on complete.

Expected behavior
Finalize should be called at the end of a pipe, and before a next value is emitted from concatMap.

Environment

  • RxJS version: 6.5.4
@Andrei0872
Copy link

Andrei0872 commented Mar 20, 2020

The same things happen in both StackBlitz examples:

When using delay(ms), every complete/next notification will be scheduled as a macrotask, that will be invoked after ms milliseconds. By default, delay uses AsyncScheduler, which is why the notification will be scheduled as a macrotask. In order to visualize this, try using delay(0, queueScheduler), in both examples and you should see the expected results.

Note: delay(msGreaterThanZero, queueScheduler) === delay(ms).


concatMap(fn) === mergeMap(fn, concurrent = 1). When mergeMap has the concurrent set and it is exceeded, it will buffer the extra values. When one of its active inner observables completes, it will take the oldest buffered value and will create an inner observable out of it, using the provided fn.


finalize(cb): the callback will be invoked when its destination subscriber(more one this in the snippet below) unsubscribes. When the source completes or emits an error notification, the subscriber will have the unsubscribe method called as well.

// S#1 -> Subscriber#1
src$.pipe(
 a(), // S#4
 b(), // S#3
 finalize(cb), // S#2;
).subscribe(observer) // S#1 - the `observer` is turned into a `Subscriber`

// S#1 is the destination subscriber for S#2

The finalize's callback will be called when S#1 unsubscribes, which can happen when the source completes or emits an error, or when you manually call S#1.unsubscribe().


Now let's see how this applies to your examples.

const source = of(2000, 1000).pipe(concatMap(val => process(val)));

function process(val) {
  return of(val).pipe(
    tap(val => console.log("process-1", val)),
    delay(val, queueScheduler),
    tap(val => console.log("process-2", val)),
    finalize(() => console.log("finalize", val))
  );
}

When 2000 arrives, concatMap, more precisely, mergeMap, will create an inner observable from it. Notice that the inner observable(what process returns) contains the delay operator, which means that the inner observable will not complete in this tick. 1000 comes next, but as the inner observable has not completed yet, the value(1000) will be buffered.

After 2000 ms pass, the mergeMap's inner observable will emit the value and complete(because of of operator).

Here's what happens when the inner observable completes:

 protected _complete(): void {
    // parent refers to `mergeMap` in this case
    this.parent.notifyComplete(this);

    // When this happens, the `finalize`'s callback will be invoked as well
    this.unsubscribe();
  }

mergeMap's notifyComplete looks like this:

  notifyComplete(innerSub: Subscription): void {
    const buffer = this.buffer;
    this.remove(innerSub);
    this.active--;
    if (buffer.length > 0) {
      this._next(buffer.shift()!); // Create an inner obs from the oldest buffered value
    } /* ... */
  }

So, as you can see, when the inner observable completes, it will first notify the parent(mergeMap in this case), which will in turn create an inner observable from the oldest buffered value(1000 in this case). During the creation process, which involves calling the process function, the line where process-1 1000 is logged will be reached.

If you removed the delay() completely, you'd see the expected result because all would happen synchronously. This means that when the inner observable created from 2000 completes, the buffer will still be empty, because 1000 hasn't arrived yet.

@FritzHerbers
Copy link
Author

Thx Andrei, in shedding some light in the implementation.

I think your note should read:
Note: delay(msGreaterThanZero, asyncScheduler) === delay(ms).

We have a delayWhen, acting as a semaphore, to synchronize that other async tasks have all subscribed.

Independent of scheduler used, the behavior will be the same, the inner finalize will be called at next or complete/error of the outer subscription. Making it dependent on the outer subscription and not being the last part run to finish a pipe.

This is not expected, many other programming language use a try/catch/finally, or using/destroy structure. The final part is called before continuing to the next block (read next value in RxJS).

@ devs
I just need a final statement: Is it a bug, or as-designed, or will-not-change.

When not accepted as a bug, finalize can not be used. As a quick fix, I could move the finalize code to a catchError. As I have a defined end in the expand, I could throw an Error.
Is catchError run before the end of the pipe or has it the same implementation as finalize, running on next value and complete/error of the outer subscription ?

@Andrei0872
Copy link

I think your note should read:
Note: delay(msGreaterThanZero, asyncScheduler) === delay(ms).

I'd disagree. queueScheduler's actions are QueueAction instances. When you schedule a QueueAction with a delay, this line will be reached:

if (delay > 0) {
      return super.schedule(state, delay);
    }

where super.schedule points to AsyncAction.schedule. The AsyncScheduler uses AsyncAction instances.


IMO, the problem in this case is that the inner observable notifies its parent outer observable about a complete notification before it(the inner one) unsubscribes.

If you want to run the callback when the complete notification is sent, you can use tap's third argument, which is a callback function. So you could replace finalize(() => console.log("finalize", val)) with tap(null, null, () => console.log("finalize", val)). This should comply with the expected behavior.

@FritzHerbers
Copy link
Author

Thanks again Andrei, for your exhaustive answer.

About the scheduler, I was referencing:
https://rxjs-dev.firebaseapp.com/guide/scheduler
Time-related operators like bufferTime, debounceTime, delay, auditTime, sampleTime, throttleTime, timeInterval, timeout, timeoutWith, windowTime all take a Scheduler as the last argument, and otherwise operate by default on the asyncScheduler.

I think, you wanted to point me to something else.

Thanks for the tip, this is a "real" finalize, how I expect it to be, and will solve my problem:
tap(null, null, () => console.log("finalize", val))
https://stackblitz.com/edit/typescript-pf9zt7?file=index.ts

@ devs
Would still be nice to have a final statement on this issue: Is it a bug, or as-designed, or will-not-change.

@cartant
Copy link
Collaborator

cartant commented Mar 21, 2020

The repro seems to exhibit expected behaviour, IMO and, FWIW, I don't understand what is meant by this:

Finalize should be called at the end of a pipe, and before a next value is emitted from concatMap.

@FritzHerbers
Copy link
Author

FritzHerbers commented Mar 21, 2020

I expected, and that is also how other programming language implement a finalize is, that finalize is called before leaving a block (RxJS: pipe) and going to a next block (speak reading the next value for concatMap).

Without having a delay / delayWhen, it behaves like expected. With them, finalize is called on the next value and on complete/error of the outer subscription.

An application can work for a long time, just by adding a delay / delayWhen (or others) the behavior changes. Maybe a function returning an Observable is used and changed, it will not be obvious that it will have an impact on a pipe with a finalize.

When you tell me it is as designed, I will never use finalize anymore and use Andrei's tip with tap.

I was just lucky that I kept a console.log in finalize, I saw it for a long time, that the two last finalize came at the same time (ending of the outer subscription). Because I didn't had in some branches (if/else) within the expand no console.log, before doubting the correctness of them having two finalize at the same time (and the others be called at the "wrong" time).

I would urge you, to add this changing behavior, when adding a delay, to the documentation.

EDIT: The last SB in my previous post was not saved, I updated it, showing the side-effect, explanation inside SB
https://stackblitz.com/edit/typescript-pf9zt7?file=index.ts

@Andrei0872
Copy link

I would urge you, to add this changing behavior, when adding a delay, to the documentation.

I think the unexpected behavior will occur whenever you're dealing with observables that don't emit in the same tick they are created. For example: delay will emit the value in the next tick, a click$ observable will emit sometime in the future, but not in this tick.

What I'm trying to emphasize is that it's not finalize()'s fault, it's just how mergeMap handles inner observables. If one inner obsevable completes, it will first notify the parent(mergeMap), which may cause a new inner observable to be created(if the buffer is not empty), then the inner subscriber will unsubscribe.
The buffer will be empty if everything is synchronous.

@cartant
Copy link
Collaborator

cartant commented Mar 21, 2020

@FritzHerbers I'll have another look at this tomorrow, when I am less tired. I think this is the same issue that was raised in #4138 and #4931 and discussed in #4222

... Andrei's tip with tap

That won't run the callback on explicit unsubscription - which might or might not be what you want - so it's not the same thing. However, it is possible to write a finalize-like operator that will ensure the callbacks are run in a less surprising order - i.e. that source subscriptions are finalized before their sinks.

@FritzHerbers
Copy link
Author

@cartant
Did you had the time to look into this issue.
How would the code of such finalize-like operator look like.

@cartant
Copy link
Collaborator

cartant commented Apr 14, 2020

@FritzHerbers It's still on my list if things to do. I know exactly what's going on and I'll write the operator tomorrow and explain how it works, etc.

@cartant
Copy link
Collaborator

cartant commented Apr 14, 2020

There is a dispose operator here.

The difference between it and finalize is this bit:

  unsubscribe() {
    super.unsubscribe();
    const { callback } = this;
    if (callback) {
      callback();
      this.callback = undefined!;
    }
  }

This differs from finalize because the finalize operator adds the callback to the Subscription:

class FinallySubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>, callback: () => void) {
super(destination);
this.add(new Subscription(callback));
}
}

It's added in the Subscription constructor, so the callback is the first teardown in the subscription. The subscription to the source is going to be added after that:

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new FinallySubscriber(subscriber, this.callback));
}

const sink = toSubscriber(observerOrNext, error, complete);
if (operator) {
sink.add(operator.call(sink, this.source));
} else {
sink.add(
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
this._subscribe(sink) :
this._trySubscribe(sink)
);
}

And that means that - unlike in dispose - the finalize callback will be called before the unsubscription from the child subscription occurs.

IMO - well, in my changed opinion - this is a bug, but fixing it would be a breaking change. It's also possible that others might not consider this to be a bug.

@gogakoreli
Copy link

gogakoreli commented Apr 30, 2020

[Edited] Based on the feedback from @cartant my issue is not the same as author's, but I guess someone might end up here.


I have similar problem too, have a look at following code from StackBlitz:

const getData = () =>
  interval(200).pipe(
    take(1),
    mapTo("Network Data")
  );

let loading = true;
const source = getData().pipe(
  finalize(() => (loading = false)),
  shareReplay()
);

source.pipe(
  // when data arrives, loading should be false
  tap(data => console.log('loading', loading)) // console output: loading true
).subscribe();

In my example, the loading starts, data is fetched from the network and with the help of finalize loading stops. I use shareReplay to cache the fetched data. When subscription to source happens, and data arrives, I expect loading to be stopped already. However, as seen in example, finalize hasn't been executed yet and loading is still true at this point in the tap.
I know that I can use tap to stop loading, but finalize does more than tap.
I agree with FritzHerbers, when source Observable completes or errors finalize should be executed before any subsequent code starts to execute from the chain.

@cartant
Copy link
Collaborator

cartant commented Apr 30, 2020

@gogakoreli your's is not the same problem. Your tap is going to be invoked on a next notification and that is guaranteed to happen before the finalize.

@gogakoreli
Copy link

@cartant thanks for the feedback. I made this suggestion because if execution is synchronous finalize happens first and then tap. It behaves differently based on the source Observable. Can you suggest something to overcome this issue and be able to utilize functionality similar to finalize?

@cartant
Copy link
Collaborator

cartant commented Apr 30, 2020

@gogakoreli I see what you mean. The behaviour is surprising when the source is synchronous, but it's unrelated to finalize. It's because the destination subscribes to the subject after the subject subscribes to the source.

@gogakoreli
Copy link

@cartant I was experimenting with it more and looks like the issue still happens without shareReplay. (shareReplay really doesn't seem to matter for the issue, I wanted to just showcase closely my real example)

@benlesh
Copy link
Member

benlesh commented May 14, 2020

@cartant This doesn't look like an issue with finalize, rather it seems to be a side effect of the implementation of mergeMap with a concurrency limit and how it interacts with synchronous observables like of.

In the majority of cases, observables should not be synchronous, however, so this would be a non-issue.

Here's what's playing out:

  1. finalize always happens during the teardown phase, which happens after error or complete are called, OR during unsubscription.
  2. concatMap is really mergeMap with a concurrency limit of 1.
  3. The implementation of mergeMap with a concurrency limit:
    a. when an inner observable completes, during that complete step will synchronously check to see if there is another value buffered.
    b. If there is a buffered value, it will synchronously "project" it with the mapping function, and subscribe to the returned observable.
    c. IF that returned observable is synchronous, it will emit before the inner observable gets to the teardown (because we're still technically synchronously in the first inner observable's complete event).

In the case of the first example in the original post, there's a tap before the delay in the inner observable, and our developer is confused by the outcome. However, this is the expected outcome, and we can't really get it to be any different unless we force microtask scheduling everywhere (ala Promises). In terms of some sort of standardization track maybe that's the right choice... however, in terms of performance, it would hurt us pretty bad.

I'm going to close this issue as "expected behavior". And chalk it up to another case of "synchronous observables produce surprising outcomes because they're synchronous".

I hope my explanation helps, @FritzHerbers.

@benlesh benlesh closed this as completed May 14, 2020
@FritzHerbers
Copy link
Author

@benlesh
Thanks for your reply and summarizing the implementation.

From a programmers perspective the "expected behavior" is the one I know from various other programming languages. Not being disrespectful, for me it is "as implemented" or "as designed".

That behavior changes when a delay (and others, which was maybe even added in a very deep stream) is added might not be noticed, is a second "misbehavior" of the issue.

@cartant made a "dispose" operator, hooking up things differently, would this the way to go for finalize or add "dispose" to rxjs?
Didn't try the operator (I noticed some days later it was your own repository), but will, as I was hoping others would comment on your implementation and this issue would resolve.

At the moment we are using the "extended" tap proposed by @Andrei0872, which works just fine and "as expected". We are not interested that the callback runs explicitly on unsubscription, it just needs to be called at the end of the "block".

@benlesh benlesh reopened this May 14, 2020
@benlesh
Copy link
Member

benlesh commented May 14, 2020

I see @cartant ... so it's called during teardown, but we should be calling it last, I agree. The fix for this should be as simple as moving the addition of the callback to the operator call instead of in the constructor.

It is a breaking change, however I doubt it will break many people because it'll only really be noticeable in synchronous cases, or cases with multiple finalize actions touching shared state in ways that don't behave well if they're out of order. Seems unlikely.

@benlesh benlesh added the bug Confirmed bug label May 14, 2020
cartant added a commit to cartant/rxjs that referenced this issue May 14, 2020
@cartant
Copy link
Collaborator

cartant commented May 15, 2020

@FritzHerbers

Didn't try the operator (I noticed some days later it was your own repository), but will, as I was hoping others would comment on your implementation and this issue would resolve.

It's in the rxjs-etc repo 'cause it's a breaking change and any changes to finalize won't be released until the next major version. It's MIT licensed, so if it solves a problem for you, you can copy dispose and the licence to your project, etc. without having to wait for the next major version.

As for the change to finalize, see #5433.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 24, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Confirmed bug
Projects
None yet
Development

No branches or pull requests

5 participants