Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured concurrency and cancellation #482

Closed
qwwdfsad opened this issue Feb 25, 2020 · 5 comments
Closed

Structured concurrency and cancellation #482

qwwdfsad opened this issue Feb 25, 2020 · 5 comments

Comments

@qwwdfsad
Copy link

qwwdfsad commented Feb 25, 2020

The general question is "How to maintain structured concurrency between publisher and subscriber in the face of cancellation?"

One of the ideas is to have the ability to merge concurrent (and potentially parallel) control flow.
For example, consider the finite Publisher that creates a resource per subscriber. It could be a thread, a socket or anything else that requires explicit close that may take indefinitely long period of time.

In a basic scenario (let's say subscriber request is always unbounded), a subscriber receives all the data, then the Publisher (or its Subscription, to be more precise) closes the underlying resource (again: it may take indefinite time!) and only then invokes Subscriber.onComplete.
After receiving onComplete event, subscriber knows for sure that all resources associated with its subscription are properly released at this point (aka control flow is merged back into a single point).

In a more advanced scenario, Subscriber cancels its subscription in the middle of the stream. E.g. it does not care about the rest of the elements because a corresponding user's session was terminated or for whatever reason.
At this point, the protocol explicitly allows omitting onComplete: onSubscribe onNext* (onError | onComplete)? (also, untested_spec108_possiblyCanceledSubscriptionShouldNotReceiveOnErrorOrOnCompleteSignals only confirms my suspicions).

Meaning that, after the call to Subscription.cancel, the subscriber cannot be sure that the corresponding resource is properly terminated, neither it has the API to be notified, creating a potentially unbounded memory leak (e.g. in the situations when the new subscription is immediately established as soon as the previous one is complete or cancelled) or an unbounded resource consumption without an ability to throttle it at subscriber level.

Could you please elaborate whether it is designed behaviour, grey area of the specification or I am just missing something and subscription termination can be properly detected?

@rkuhn
Copy link
Member

rkuhn commented Feb 25, 2020

Your observation can be summarised with “Reactive Streams is not a resource management framework”, which is true as per the mission statement: Reactive Streams only concerns itself with transmitting unbounded streams of data over asynchronous boundaries. How these streams are generated or consumed is of no concern to this communication protocol.

In particular, signalling onComplete to the Subscriber does not imply that the Publisher has released resources. For example the Publisher could statically hold a finite stream in memory and send that to any Subscriber that is attached to it, now or in the future.

Cancellation in the same vein only transmits the intent of dropping all future stream elements. How the Publisher reacts to that is left intentionally vague.

I recommend using a resource-managed entity like an Actor for your Publisher such that you can control its lifecycle and resource usage.

@akarnokd
Copy link
Contributor

This level of resource-awareness is not part of the specification or part of the design goals. (In contrast, this was somewhat considered with IAsyncEnumerables.)

The general workaround is to bring the computation to the resource, like an actor (or event loop) that can correlate the subscriber's actions with the resource's lifecycle, as @rkuhn mentioned

@qwwdfsad
Copy link
Author

qwwdfsad commented Feb 26, 2020

Thanks for the detailed response, @rkuhn and @akarnokd.

Unfortunately, I cannot use any resource management framework. I will describe my use-case just to give an additional data-point for your consideration, not expecting a solution or anything to be fixed.

I am working on Kotlin's Flow: an analogue of reactive streams, but with backpressure supported at the language level via suspension (ability to "block" the control flow, but not the underlying thread).

It allows to do things like this:

val flow = flow { // Simpler analogue of Flowable.generate
    delay(1000) // Suspending analogue of Thread.sleep()
    emit(1) // onNext
}
.onCompletion { delay(100); println("Done [1]") } 
.flowOn(Dispatchers.IO) // Analogue Schedulers.io()

The consumer now can do the following:

flow.collect { value -> // ~suspending subscribe
    println(value)
}
// At this point, the consumer knows that flow, its block and its onCompletion are completed, 
// even though the flow was executed on a different thread (IO)

Now, taking into account the popularity of RS and similarities between Flow and Publisher, we provide a back and forth transformations for Publisher. But with this transformation, a nice property of consumer knowing that the original source is completed is gone:

val transformed = flow
.asPublisher()
.map { it + 1 } 
.asFlow()

transformed.collect {
}
// Here it is impossible to implement asPublisher + asFlow in the way that consumer completes only after the source flow finishes all its cleanup phases

@viktorklang
Copy link
Contributor

@qwwdfsad The Reactive Streams interfaces are not designed for, or intended as, an end-user API—it is more of an SPI to facilitate spec-conformant interoperability.

@qwwdfsad
Copy link
Author

Well, the same question applies to java.util.concurrent.Flow, project Reactor and others who comply with RS specification.
Also, it looks like it is used in public API anyway, e.g. https://www.graphql-java.com/documentation/master/subscriptions/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants