Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReplaySubject shouldn't keep it's values after all subscriptions are removed #12

Closed
sherlock1982 opened this issue Oct 30, 2019 · 2 comments

Comments

@sherlock1982
Copy link

Hello again!
Please consider the following code in RxJava:

    val subj = BehaviorSubject.createDefault(0)
    val stream = subj.switchMap {
        print("Inside map\n")
        Observable.just(1)
    }.replay(1).refCount()

    val disposable1 = stream.subscribe {
        print("Stream 1 $it\n")
    }

    disposable1.dispose()

    val disposable2 = stream.subscribe {
        print("Stream 2 $it\n")
    }

    disposable2.dispose()

It outputs:

Inside map
Stream 1 1
Inside map
Stream 2 1

Now let's check how Combine+Entwine works:

    let source = CurrentValueSubject<Bool, Never>(true)
    let stream = source.map{ _ -> AnyPublisher<Int, Never> in
        print("Inside map")
        return Just(1).eraseToAnyPublisher()
    }.switchToLatest()
     .share(replay: 1);

    let disposable1 = stream.sink(receiveValue: {
        print("Stream 1 \($0)")
    })

    disposable1.cancel()

    let disposable2 = stream.sink(receiveValue:  {
        print("Stream 2 \($0)")
    } )

    disposable2.cancel()

It outputs:

Inside map
Stream 1 1
Stream 2 1
Inside map
Stream 2 1

Here's the difference: Stream 2 received values twice which is wrong I believe.
This works properly if Source completes.
I believe the difference is that Combine doesn't recreate a Subject in case there are no subscribers left but Source is not completed.
Though RxJava uses new Subject in this case.

I suggest to clean data inside Subject when there are no subscribers left to be consistent.

@tcldr
Copy link
Owner

tcldr commented Oct 30, 2019

Hi @sherlock1982,

Yes, that's because RxJava and other implementations use referenceCounted() instead of autoconnect() in their share(replay:) implementations.

With a referenceCounted() operator there is no need for additional logic in ReplaySubject and therefore any subject that wants to provide this kind of functionality is able to.

I was unsure whether or not Combine had a method of recreating the behaviour of the refCount() operator and after further investigation it appears it does not.

So, in the meantime I've added a new referenceCounted() operator specifically for Multicast publishers. (Ideally it would work for all ConnectablePublishers but the protocol doesn't provide the necessary API for the task.)

Currently available on the master brach.

@sherlock1982
Copy link
Author

Thank you very much. Finally it works as expected at least for me :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants