Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

concatAll() unexpectedly overlaps inner Observable subscriptions #3338

Closed
mattflix opened this issue Feb 22, 2018 · 34 comments · Fixed by #6010
Closed

concatAll() unexpectedly overlaps inner Observable subscriptions #3338

mattflix opened this issue Feb 22, 2018 · 34 comments · Fixed by #6010

Comments

@mattflix
Copy link

mattflix commented Feb 22, 2018

EDIT: Note that the following repro code always produces the expected output with RxJS 4.x. The output with RxJS 5.x is unintuitive (IMO) and can actually vary depending on the formulation of the inner observables (see further discussion of such cases in subsequent comments).


RxJS version: 5.5.0 (and all previous 5.x versions, seemingly)

Code to reproduce:

const { Observable } = require(`rxjs`);

function Task(name) {
  return (
    Observable
      .defer(() => {
        console.log(`begin task ${ name }`);
        return (
          Observable
            .of(`simulated work`)
            .delay(100)
            .finally(() => {
              console.log(`end task ${ name }`);
            })
          );
      })
  );
}

Observable
  .of(
    Task(`1`),
    Task(`2`),
    Task(`3`)
  )
  .concatAll()
  .subscribe();

Expected behavior:

begin task 1
end task 1
begin task 2
end task 2
begin task 3
end task 3

Actual behavior:

begin task 1
begin task 2
end task 1
begin task 3
end task 2
end task 3

Additional information:
I discovered this behavior when using concatAll() as the basis for a serialized "task queue" for executing tasks (represented as Observables) one at a time, much like my (simplified) sample code above. Some of these tasks would allocate resources in defer() (or similar manner) and release them in finally() (or similar manner).

My task queue implementation all seemed to make logical sense, yet I was running into errors caused by concurrent access to resources that were only supposed to be accessed in a mutually-exclusive manner, but somehow weren't. I was stumped, I couldn't see the the problem in my task queue or my tasks that was leading to this unexpected concurrent access.

After much head-scratching and debugging, I eventually tracked the problem to the InnerSubscriber implementation within rxjs, which contains logic that subscribes to the next sequence before unsubscribing from the previous sequence:

    InnerSubscriber.prototype._error = function (error) {
        this.parent.notifyError(error, this); // <== subscribes to the next sequence
        this.unsubscribe();
    };
    InnerSubscriber.prototype._complete = function () {
        this.parent.notifyComplete(this); // <== subscribes to the next sequence
        this.unsubscribe();
    };

Ah-ha.

This causes the "creation" logic (defer() callback or other invocation) of the next sequence to execute before the "disposal" logic (finally() or other invocation) of the previous sequence has executed, making it impossible for the inner observables managed by concatAll() to behave like serialized tasks -- since rxjs actually "overlaps" the tasks at this critical moment.

As an experiment, I simply reversed the logic:

    InnerSubscriber.prototype._error = function (error) {
        this.unsubscribe(); // <==  unsubscribe from previous sequence first!
        this.parent.notifyError(error, this);
    };
    InnerSubscriber.prototype._complete = function () {
        this.unsubscribe(); // <== unsubscribe from previous sequence first!
        this.parent.notifyComplete(this);
    };

Then, using the original "Code to reproduce" above, I actually see the expected behavior.

I don't know, however, what other ramifications this change might have on other use cases. In the case of concatAll() (and variants) however, it would definitely seem "more correct" and "less surprising" for rxjs to always unsubscribe from the previous sequence before subscribing to the next.

@mattflix
Copy link
Author

mattflix commented Feb 22, 2018

As an aside, for the meantime, does anybody have any idea how to obtain the desired behavior (truly serialized inner observables) without having to write my own version of concatAll() or modify rxjs?

@kwonoj
Copy link
Member

kwonoj commented Feb 22, 2018

This is interesting q, while our operator does guarantee about order of emission for next, it doesn't really guarantee around how does inner subscription order is managed. I personally do not think this is something considered as a bug which shouldn't occur due to those reasons.

Put aside of debating if it's expected or not, achieving desired behavior seems somewhat tricky though. Mostly cause most operator might have similar behavior. If you can guarantee inner observables are not overlapping when emitted, you could may consider to use switch - i.e

... same Task()

Observable
  .of(
    Task(`1`),
    Task(`2`),
    Task(`3`)
  )
  .switch()
  .subscribe();

will gives you each task execution occur after teardown, cause nature of switch operator unsubscribes prior inner (which executes teardown) then move to next. But also if inner have collapsing emission and you can't lose any, switch might not be fit for you. Otherwise some complex thing like actually delay subsequent task until each of task completes. that'll work but guess will quite verbose.

@mattflix
Copy link
Author

mattflix commented Feb 22, 2018

I agree that one might not view this behavior as a "bug" with regard to the combined emissions of the inner observables, since the emissions still occur as expected, independent of any subtleties in the subscribe/unsubscribe order of the inner observables.

That said... in cases where the only thing that matters about the inner observables is their execution (esp. if they represent tasks, or periods of time, etc. have no emissions, and only complete/error), there is strong case to be made that the operation of "concatenation" should actually take care to execute the inner observables in a truly "back to back" fashion.

So, I still find it "surprising", and quite unintuitive really, that the subscription of the next observable is overlapped with the teardown of the previous. Certainly, in my case, I wrote a bunch of code (my task queue) thinking that concatAll() couldn't possibly work that way -- as it wasn't congruent with the notion of "concatenation" in my mind.

Per my experiment, the subscribe/unsubscribe order within InnerSubscriber seems (at least at first glance) to be completely immaterial to the proper functioning of concatAll(), or any other operator, with regard to the combined emissions. The choice of order seems to have been arbitrary?

And, if that choice was indeed arbitrary, and could actually be re-made more purposefully, then choosing to explicitly avoid overlapping subscriptions within InnerSubscriber, or at least for concatAll() (and variants), could be viewed as the "least surprising", more intuitive, and potentially more useful implementation.

I hope that change can be considered.

@kwonoj
Copy link
Member

kwonoj commented Feb 22, 2018

I'll leave other chime in for opinions, but my personal opinion is still inner subscription management is internals of operator implementation and it's not expected to behave in some way. (Still, it's personal though)

Again put aside, I feel like this issue's crux is you'd like to queue certain task in given order, including it's teardown - am I understanding right? in those case, have you considered to use complete for indication of teardown for each innerobservable instead of unsubscription? For example if we reuse your examples and make rough pseudo

const { Observable } = require(`rxjs`);

function Task(name) {
  return (
    Observable
      .defer(() => {
        console.log(`begin task ${ name }`);
        return (
          Observable
            .of(`simulated work`)
            .delay(100)
            .materialize()
            .do((x) => {
              if (x.kind === 'C') {
                console.log(`end task ${ name }`);
              }
            })
            .dematerialize()
          );
      })
  );
}

Observable
  .of(
    Task(`1`),
    Task(`2`),
    Task(`3`)
  )
  .concatAll()
  .subscribe();

you'll get

begin task 1
end task 1
begin task 2
end task 2
begin task 3
end task 3

since teardown will execute on completion of each notification metadata.

@mattflix
Copy link
Author

mattflix commented Feb 22, 2018

Thanks for the idea. Seems like it could work. One note: The inner observables may not always complete "cleanly" though, so error would also have to be "hooked" for this work.

Still, this feels quite unintuitive, and unergonomic to have to resort to such esoteric interfaces (materialize()) to successfully manage observable "lifecycle" issues, when higher-level, more unintuitive interfaces (e.g, finally()) could be used instead -- if it weren't for a seemingly arbitrary internal implementation choice within InnerSubscriber.

@kwonoj
Copy link
Member

kwonoj commented Feb 22, 2018

may not always complete "cleanly" though, so error would also have to be"hooked" for this work.

this doesn't matter, cause metadata allows you to access either error or complete, (or next value if needed) without blow up observable chain.

Still, this feels quite unintuitive, and unergonomic to have to resort...

for this I've expressed my opinion sufficiently, will leave other member's opinion around.

@mattflix
Copy link
Author

If you can guarantee inner observables are not overlapping when emitted, you could may consider to use switch

switch() is not suitable because I don't want to "abort" the current task just because another has arrived. The purpose of the queue is to execute the current task to completion (or error) before allowing the next task to begin.

Indeed, if I could already "guarantee inner observables are not overlapping when emitted", then I would not need a queue.

@kwonoj
Copy link
Member

kwonoj commented Feb 22, 2018

switch() is not suitable

yeah, ignore that - I've already backed out those.

@kwonoj
Copy link
Member

kwonoj commented Feb 22, 2018

I vaguely recall discussed about some operator's subscription behavior in last core meeting, maybe these operator is one of the cases? /cc @benlesh

@mattflix
Copy link
Author

An alternative solution might be to provide another operator, perhaps named serializeAll() that does guarantee subscription/unsubscription order. But, this would seem to be needlessly duplicative, as it would be concatAll() in virtually all ways but name.

@mattflix
Copy link
Author

mattflix commented Feb 22, 2018

Also note that you do (somewhat unexpectedly ;) get the expected output when queuing up synchronous tasks.

That is, make the following small change in my original example code:

function Task(name) {
  return (
    Observable
      .defer(() => {
        console.log(`begin task ${ name }`);
        return (
          Observable
            .of(`simulated work`)
            // .delay(100) <== remove the asynchronous element
            .finally(() => {
              console.log(`end task ${ name }`);
            })
          );
      })
  );
}

...and the output is as expected.

This is another way in which the current behavior of concatAll() is surprising, and fighting the user's intuition about what "concatenation of observables" should always mean (in my opinion).

@mattflix
Copy link
Author

mattflix commented Feb 22, 2018

Lastly, if it provides any motivation, or consideration of this behavior as a bug, note that I see the expected output from my repro code in all cases (discussed so far) when running with RxJS 4.x.


I have added an "EDIT" about this to the original issue description above.

@benlesh
Copy link
Member

benlesh commented Feb 23, 2018

It's because the complete of the previous observable is what triggers subscription to the next observable in the queue. This is more of a quirk of the behavior of finally.

complete basically executes completion callback and THEN unsubscribes (which triggers finally).

To get the behavior you want, you'll need to use do or tap like type({ complete() { console.log('this is done'); } })

We could look into triggering the next subscription in concat, merge, etc after the unsubscribe of the previous observable. That seems reasonable.

@benlesh benlesh added bug Confirmed bug and removed type: question labels Feb 23, 2018
@benlesh
Copy link
Member

benlesh commented Feb 23, 2018

I think this is a bug.

@jayphelps, this is actually a use-case for the marble changes we were discussing. We need the ability to test that we're synchronously unsubscribing from one observable and THEN subscribing to the next in our tests.

@kwonoj
Copy link
Member

kwonoj commented Feb 23, 2018

@benlesh do we have guarantee around innersubscription overlapping in operators?

@benlesh
Copy link
Member

benlesh commented Feb 23, 2018

@kwonoj we've never offered a "guarantee", but I think we need to start, because I agree this is a bug.

@kwonoj
Copy link
Member

kwonoj commented Feb 23, 2018

I'm still bit on the fence if this is considered as a bug, instead of good to have behavior. Operator guarantees around emission of values, that's for sure but what makes this considered as a bug when this is behavior around internal behavior?

@benlesh
Copy link
Member

benlesh commented Feb 23, 2018

it's a really minor bug if it's a bug, honestly.

@benlesh benlesh removed the bug Confirmed bug label Feb 23, 2018
@benlesh
Copy link
Member

benlesh commented Feb 23, 2018

But you're right, @kwonoj ... ugh... maybe it's not a bug per say.

One of the reasons that older versions of Rx didn't have this issue is they were all scheduling via a queue, I think. Which we're not doing for performance reasons.

In fact, looking at this, I'm not sure we can make this behave the way it's desired without queue scheduling, which is a non-starter. I'll have to think about it.

@mattflix
Copy link
Author

mattflix commented Feb 23, 2018

@benlesh wrote:

it's a really minor bug if it's a bug, honestly.

Depends on your use case. It breaks using Rx to represent mutually exclusive tasks in the most intuitive way (IMO). For me, that is a major bug.

With other operators that draw parallels to the world of "tasks", such as forkJoin(), it would be nice if Rx could provide a way guarantee of true Observable serialization in cases where it makes logical sense.

As I mentioned earlier, the fact that concatAll() didn't already guarantee that was a bit of a surprise. To me, this operation should definitely concatenate the execution of the observables, not "merely" logically concatenate their emissions.

Otherwise, the high-level Observable lifecycle management afforded by otherwise intuitive subscription and disposal entry-points (create(), finally()) loses some very real meaning/value.

@mattflix
Copy link
Author

mattflix commented Feb 23, 2018

@benlesh wrote:

In fact, looking at this, I'm not sure we can make this behave the way it's desired without queue scheduling, which is a non-starter. I'll have to think about it.

Simply reversing the logic (as I did in my "experiment" mentioned above) is not sufficient?

Perhaps I don't understand some other internal implementation detail, but if the unsubscribe is always executed first (which I assume will always execute to completion synchronously), before sending the notification of completion or error, how can the unsubscribe (aka, disposal logic) ever overlap with the following subscribe (aka, initialization logic)?

@benlesh
Copy link
Member

benlesh commented Feb 24, 2018

No... complete is executed first, then unsubscribe... finally relies on unsubscribe to be executed (ensuring it's after both complete and error)... HOWEVER... the internal observable's complete is what notifies concatAll that it's time to subscribe to the next observable in the buffer.

This is simply how concat is implemented... subscribe to one, when it's complete subscribe to the next one. Not when it's torn down, which happens synchronously right afterward. We might be able to change this behavior by moving the logic for the next subscription into the unsubscribe handler instead of complete. I don't know

This is really edge casey though.l Until then, just use tap or do's complete or error handlers to execute whatever logging/side-effect you wanted to do.

@mattflix
Copy link
Author

@benlesh, I think something got confused.

Sorry, when I said:

but if the unsubscribe is always executed first

...I was not referring to how things currently work. I was referring to the experiment where I simply reversed the logic within InnerSubscriber so that unsubscribe() was always executed first.

This change seemed to cleanly restore the expected behavior and, logically, appears to make more sense as an implementation choice. Would not such a change be sufficient to guarantee "serialized" subscription behavior for concatAll() and variants?

(The experimental code change appears in my original issue description above.)

@mattflix
Copy link
Author

@benlesh, if you agree with my last comment, can we consider getting this issue marked as a bug?

@benlesh
Copy link
Member

benlesh commented Feb 26, 2018

@mattflix I'm sorry, I don't quite follow your last two comments. Do you have a PR I can review?

@mattflix
Copy link
Author

mattflix commented Feb 26, 2018

(I was referring, overall, to our exchange in the prior 5 comments in this issue, which were only between you and me, all on 2/23.)

I don't have a PR. I could try to create one, but I am not knowledgeable enough to author any tests, or whatever else, for a "complete" PR in this repro.

That said, the change I am proposing, or at least just trying to point out, is trivial.

I literally just reversed the lines that are (apparently) responsible for the ordering "subscribe -> unsubscribe" between all the Observables handled by concatAll() (via InnerSubscriber) and, with that change, now get "unsubscribe -> subscribe" instead.

The description of this issue includes a snippet from InnerSubscriber -- which shows the entire change -- and which restored the expected behavior, in all cases, as far as I could see.

Does what I am proposing/referencing not solve the issue?

(Again, I am not knowledgeable enough to know if such as change will and will not have other ramifications, or how to verify that. But, it seems simple enough, and logically correct, to me.)

@mattflix
Copy link
Author

mattflix commented Mar 7, 2018

@benlesh, this thread seems stalled. Any thoughts wrt to my last comment on Feb 26?

@samal-rasmussen
Copy link
Contributor

So I just ran into this issue. I have a list of Observables that need to do some cleanup in a finally block before the next one can run. I cannot use Observable.concat to run these Observables in sequence, because the first Observable hasn't cleaned up when the next one is run.

@benlesh I don't understand what you are suggesting when you say "Until then, just use tap or do's complete or error handlers to execute whatever logging/side-effect you wanted to do". Right now I am at a loss how I should work around this issue.

Don't know how often people run in to this issue, but this is definitely a major issue for me right now. It is stopping me from doing something I need to do.

@samal-rasmussen
Copy link
Contributor

Ok figured out one workaround, which is to use observeOn to revert to the rxjs 4 behavior as ben describes using another scheduler.

function Task(name) {
    return Rx.Observable
    .defer(() => {
      console.log(`begin task ${ name }`);
      return (
        Rx.Observable
        .of(`simulated work`)
        .delay(100)
        .finally(() => {
          console.log(`end task ${ name }`);
        })
      );
    })
    .observeOn(Rx.Scheduler.asap);
  }
  
Rx.Observable
  .of(
    Task(`1`),
    Task(`2`),
    Task(`3`)
  )
  .concatAll()
  .subscribe();

https://jsbin.com/yevayuf/1/edit?html,js,console

@mattflix
Copy link
Author

mattflix commented Mar 21, 2018

@samal84, yeah, I figured out the same observeOn() workaround, but it seems kinda hacky and makes code work "merely by luck". I hate writing code that only works by luck (and may be subject to "running out of luck" in the future).

Another workaround is to insert "dummy" Observables between the "real" (in my case, the serialized Tasks) Observables that simply introduce an artificial asynchronous delay. Again, this is a hack and feels totally unnecessary.

@benlesh, can you weigh-in on whether this issue can be considered as bug?

@cartant
Copy link
Collaborator

cartant commented Feb 3, 2019

@samal84 @mattflix I came across this issue when reading through old issues to see whether any could be closed. A slightly better alternative to observeOn - which will see every emission dispatched on the specified scheduler - would be to use subscribeOn instead - which will see only the subscription to the source dispatched by a scheduler. It's obviously not a fix for the issue; it's just a more efficient workaround.

@samal-rasmussen
Copy link
Contributor

Thanks for the tip @cartant !

I had the same concern, so I made this deviant that I am currently using:

import {
    concat,
    Observable,
} from 'rxjs';

export function startObservableAfterAsyncWait<T>(observable: Observable<T>): Observable<T> {
    return concat(
        Observable.create((observer) => {
            // Wait for this timeout, in order to make sure that this observable stream is properly async,
            // so that we are sure all subscriptions are ready and set up before we emit any values
            setTimeout(() => {
                observer.complete();
            });
        }) as Observable<never>,
        observable,
    );
}

Good to know that subscribeOn will have the same effect. Less lines for me to maintain :)

@cartant
Copy link
Collaborator

cartant commented Feb 5, 2019

@samal84

Less lines for me to maintain

Deleting code is the best!

benlesh added a commit to benlesh/rxjs that referenced this issue Feb 9, 2021
… next

Resolves an issue where inner observables would not finalize before the next inner observable got subscribed to. This happened in concat variants, and merge variants with concurrency limits, and could be surprising behavior to some users.

fixes ReactiveX#3338
@benlesh
Copy link
Member

benlesh commented Feb 9, 2021

I finally have a fix for this (and all concat variants) and it will likely land in v7: #6010

benlesh added a commit that referenced this issue Feb 10, 2021
… next (#6010)

Resolves an issue where inner observables would not finalize before the next inner observable got subscribed to. This happened in concat variants, and merge variants with concurrency limits, and could be surprising behavior to some users.

fixes #3338
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants