Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first/tap operators should unsubscribe before emit next value #5487

Closed
Enuvid opened this issue Jun 13, 2020 · 8 comments · Fixed by #6394 or #6396
Closed

first/tap operators should unsubscribe before emit next value #5487

Enuvid opened this issue Jun 13, 2020 · 8 comments · Fixed by #6394 or #6396
Assignees
Labels
8.x Issues and PRs for version 8.x bug Confirmed bug

Comments

@Enuvid
Copy link
Contributor

Enuvid commented Jun 13, 2020

Bug Report

Current Behavior

Currently, we are unsubscribing from source observables after emit the last message next to the chain. In some cases, it can raise incorrect and unexpected behavior (it seems to me).
In example below, we are still have processing new messages second observable in merge after first have emitted the first message. But expected behavior is that first operator must unsubscribe from all sources on receiving the first message.

Also, we have strange behavior: we can emit error from second observable and this exception will go through first operator and handle (in my example) in catchError, but if we try to pass just a value through first, this operator (actually tap operator inside of first) will omit this message.

Reproduction

const notificator = new Subject();

merge(
  timer(1)
    .pipe(
      tap((m) => console.log('value from first: ', m)),
    ),
  notificator.pipe(
    tap(() => console.log('value from notificator')),
    switchMap(() => throwError(new Error('Some error'))),
))
  .pipe(
    first(),
    catchError((err) => {
      console.log('Catching error: ', err)
      return throwError('Continue receive error after first value');
    }),
    tap((value) => {
      console.log('value: ', value);
      notificator.next(null);
    })
  ).subscribe(
    (value) => console.log('Next value: ', value),
    (err) => console.log('Error: ', err),
  );

Output:

value from first: 0
value: 0
value from notificator
Catching error: Error {}
Error: Continue receive error after first value

You can try it here.

Expected behavior

As I told before, I think that we shouldn't get any effects (include errors) from source observables after the limit of allowed values in stream reached. And more, we should unsubscribe from them, to prevent any calculations inside source chain, we should unsubscribe before passing the last value next.

Environment

  • Runtime: Chrome/83.0.4103.97
  • RxJS version: 6.5.2

Possible Solution

In take operator we overriding next handler to:

protected _next(value: T): void {
const total = this.count;
const count = ++this._valueCount;
if (count <= total) {
this.destination.next(value);
if (count === total) {
this.destination.complete();
this.unsubscribe();
}
}
}

I suggest changing them to make unsubscribe stuff before emitting next value. Also, this will allow us to omit if (count <= total) this strange check, because if we already unsubscribed from source that we will no longer receive a message.

If this change can affect something (backward compatibility), I think we should at least override _error handler to prevent passing errors. But I think, more correct just unsubscribe before next.

I will be glad to make a pull request if you agree with my arguments.

@backbone87
Copy link
Contributor

this behavior is kind of expected.

a workaround is to use a scheduler:

notificator.pipe(
  observeOn(asapScheduler),
  tap(() => console.log('value from notificator')),
  switchMap(() => throwError(new Error('Some error'))),
)

@Enuvid
Copy link
Contributor Author

Enuvid commented May 10, 2021

I am not sure that it kind of expected that a simple operator, which should control values number in the stream and unsubscribe, can emit errors and process messages (I believe they shouldn't calculate) in some cases even if the maximum count was reached.

Maybe nice to unsubscribe source (because we don't need this stream anymore and calculation inside) before put last value next.

@benlesh
Copy link
Member

benlesh commented May 10, 2021

You're synchronously erroring before your next handler ever even sees the first value

    tap((value) => {
      console.log('value: ', value);
      notificator.next(null); // <--- right here
    })

That goes through the operators applied to notificator here...

notificator.pipe(
    tap(() => console.log('value from notificator')),
    switchMap(() => throwError(new Error('Some error'))),

...synchronously, emitting an error which then makes it past that tap first, because it all happens synchronously before you can go to the next step from that first emission. I'll admit that's confusing, but honestly so is the code.

If I were to psuedo-code inline it it would be more obvious:

timer next 0
merge next 0
tap 0 -> log 'value from first: 0'
first next 0
catchError next // passthrough
tap next 0 -> log 'value: 0'
       -> notificator next null
            tap null -> log 'value from notificator'
            switchMap null -> subscribe to throwError(new Error('some error'))
                 <-
          merge error Error('some error');
          first error // passthrough
          catchError error Error('some error') -> log 'Catching error: ', Error('some error')
                 -> subscribe to throwError('Continue receive error after first value');
                 <-
          tap error // passthrough (no handler)
          subscriber error 'Continue receive error after first value' -> log 'Error: Continue receive error after first value'
###### EVERYTHING IS CLOSED AFTER THIS POINT AND ALTHOUGH EXECUTED WILL NOT YIELD ANYTHING ######
subscriber next 0 // After the `tap next 0` above ... but noop because closed.

So I'm going to close this as it's not a bug, just some confusion around the order of execution.

@benlesh benlesh closed this as completed May 10, 2021
@benlesh
Copy link
Member

benlesh commented May 10, 2021

Well @cartant has other opinions. So I'm reopening this to hear him out.

@benlesh benlesh reopened this May 10, 2021
@benlesh
Copy link
Member

benlesh commented May 10, 2021

I suspect the argument is that the first operator should close after it gets the nexted value? That's an interesting thing to think about... I'll think about it more... perhaps you're right. It's certainly a weird case though.

@benlesh benlesh added the bug Confirmed bug label May 10, 2021
@benlesh benlesh self-assigned this May 10, 2021
@benlesh
Copy link
Member

benlesh commented May 10, 2021

I see the issue, and I can fix it. Shouldn't be too bad. first, take, takeWhile, et al, will need to be addressed.

@backbone87
Copy link
Contributor

was closed unintentionally, because of linked PR which just contained the failing test case was merged

benlesh added a commit to benlesh/rxjs that referenced this issue May 10, 2021
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue May 10, 2021
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes ReactiveX#5487
@benlesh benlesh reopened this May 10, 2021
benlesh added a commit to benlesh/rxjs that referenced this issue May 11, 2021
- adds test, fixed by changes to `take`.

Related ReactiveX#5487
@benlesh benlesh added the 8.x Issues and PRs for version 8.x label May 12, 2021
@benlesh
Copy link
Member

benlesh commented May 12, 2021

Unfortunately, the fix for this is a breaking change, and we're unable to have it until v8.

benlesh added a commit to benlesh/rxjs that referenced this issue Jan 4, 2022
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 4, 2022
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 4, 2022
- adds test, fixed by changes to `take`.

Related ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Dec 3, 2022
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Dec 3, 2022
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Dec 3, 2022
- adds test, fixed by changes to `take`.

Related ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 20, 2023
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 20, 2023
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 20, 2023
- adds test, fixed by changes to `take`.

Related ReactiveX#5487
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 20, 2023
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related ReactiveX#5487

BREAKING CHANGE: If a the source synchronously errors after it recieves a completion notification, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before the takeWhile notifier is notified.
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 20, 2023
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes ReactiveX#5487

BREAKING CHANGE: If a the source synchronously errors after `take` or `first` notice a completion, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before `take` or `first` notice the completion.
benlesh added a commit to benlesh/rxjs that referenced this issue Jan 20, 2023
- adds test, fixed by changes to `take`.

Related ReactiveX#5487

BREAKING CHANGE: If a the source synchronously errors after it recieves a completion from `elementAt`, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before `elementAt` finds the element at the index.
benlesh added a commit that referenced this issue Mar 6, 2023
- Resolves an issue where a reentrant error notification would short circuit the completion.
- Adds additional tests.

Related #5487

BREAKING CHANGE: If a the source synchronously errors after it recieves a completion notification, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before the takeWhile notifier is notified.
benlesh added a commit that referenced this issue Mar 6, 2023
- Resolves an issue with both `first` and `take` where an error emitted by a reentrant source at the moment the last value it taken would result in superceding the expected emitted value and completion.
- Resolves a similar issue where a reentrant completion would supercede the expected last value and completion

Fixes #5487

BREAKING CHANGE: If a the source synchronously errors after `take` or `first` notice a completion, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before `take` or `first` notice the completion.
benlesh added a commit that referenced this issue Mar 6, 2023
- adds test, fixed by changes to `take`.

Related #5487

BREAKING CHANGE: If a the source synchronously errors after it recieves a completion from `elementAt`, the error will no longer be emitted. This is a bug fix, but may be a breaking change for those relying on this behavior. If you need to mimic the behavior, you'll need to throw the error before `elementAt` finds the element at the index.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
8.x Issues and PRs for version 8.x bug Confirmed bug
Projects
None yet
3 participants