Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mono violates Reactive Streams rule 2.7 #3117

Closed
dkhalanskyjb opened this issue Jul 13, 2022 · 6 comments
Closed

Mono violates Reactive Streams rule 2.7 #3117

dkhalanskyjb opened this issue Jul 13, 2022 · 6 comments
Labels
type/enhancement A general enhancement wide-change

Comments

@dkhalanskyjb
Copy link

   val TestMono: Mono<Int> = Mono.from { s ->
       val lock = ReentrantLock()
       s.onSubscribe(object : Subscription {
           override fun request(n: Long) {
               check(lock.tryLock())
               s.onNext(42)
               lock.unlock()
           }

           override fun cancel() {
               check(lock.tryLock())
               lock.unlock()
           }
       })
   }

The subscription here checks the rule https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2.7, which stipulates that the subscribers have to call request and cancel in a serialized manner. In practice, this means that, if cancellation may happen in parallel, when implementing a subscriber, one has to guard the calls to request and cancel with locks.

Example of such safety measure:

    mono.subscribe(object : Subscriber<T> {
        private var seenValue = false

        override fun onSubscribe(s: Subscription) {
            invokeOnCancellation { synchronized(this) { s.cancel() } }
            synchronized(this) { s.request(Long.MAX_VALUE) }
        }

        override fun onComplete() {
            if (!seenValue) resume(null)
        }

        override fun onNext(t: T) {
            seenValue = true
            resume(t)
        }

        override fun onError(error: Throwable) { resumeWithException(error) }
    })

The problem is, with Mono, this measure is not sufficient, because the lock around s.request is useless: Mono will first call onSubscribe, remember the requested number of elements, and only then call the request on the actual Subscription, by which time, the lock is no longer held.

Expected Behavior

When a Subscription provided by Mono is accessed in a serialized manner, Mono should also guarantee that the subscription passed to CoreSubscriber in its subscribe method will also be accessed in a serialized manner.

Actual Behavior

Even when a subscriber calls cancel and request in a serialized manner, it is possible to have a Mono that forwards these calls in such a way that Subscription is not accessed in a serialized manner.

Steps to Reproduce

Run https://github.com/Kotlin/kotlinx.coroutines/blob/b0d1aaee024794116a62e161cbd3eb14016d12ed/reactive/kotlinx-coroutines-reactor/test/MonoAwaitCancellationStressTest.kt, observe failures filling the build log. Optionally, replace the test Mono with the more realistic version provided above.

  • Reactor version(s) used: 3.4.1
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jul 13, 2022
@OlegDokuka OlegDokuka added type/enhancement A general enhancement wide-change and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Jul 13, 2022
@OlegDokuka
Copy link
Contributor

OlegDokuka commented Jul 13, 2022

@dkhalanskyjb, it is a known issue and it is definitely on the road map ( we do have racing on requests methods). Also, it is in the progress of fixing. Not going to come soon in 3.5 but stay tuned ( the main reason why is that the scope is too wide so we can not rework everything at once )

Also, one thing to clarify. Serialization here is about the request and cancel methods independently. Spec does not mandate serializing both. The library must ensure request is serialized with other requests and cancel must be serialized with other cancels calls. (related PR -> reactive-streams/reactive-streams-jvm#489)

@OlegDokuka
Copy link
Contributor

Also, please see related PR in RS spec (see reactive-streams/reactive-streams-jvm#489)

@OlegDokuka
Copy link
Contributor

superseded by #3120

@qwwdfsad
Copy link

qwwdfsad commented Oct 5, 2022

Hi, what is the issue we can keep track of regarding changes in 3.5?

@OlegDokuka
Copy link
Contributor

@qwwdfsad check that one #3120

@qwwdfsad
Copy link

qwwdfsad commented Oct 5, 2022

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement wide-change
Projects
None yet
Development

No branches or pull requests

4 participants