Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplified and refactored Netty-ReactiveStreams integration #3636

Merged
merged 7 commits into from
Mar 26, 2024

Conversation

ghik
Copy link
Contributor

@ghik ghik commented Mar 25, 2024

No description provided.

@ghik ghik requested a review from kciesielski March 25, 2024 09:35
private[netty] class SimpleSubscriber(contentLength: Option[Int]) extends PromisingSubscriber[Array[Byte], HttpContent] {
private[netty] class SimpleSubscriber(contentLength: Option[Long]) extends PromisingSubscriber[Array[Byte], HttpContent] {
// These don't need to be volatile as Reactive Streams guarantees that onSubscribe/onNext/onError/onComplete are
// called serially (https://github.com/reactive-streams/reactive-streams-jvm?tab=readme-ov-file#1-publisher-code - rule 3)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Isn't the serial guarantee a different thing than memory barrier concerns? We have a guarantee that there will be no parallel access to these values, but they can still happen on different threads, thus a change performed on thread may not be always immediately visible to another thread?

Copy link
Contributor Author

@ghik ghik Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU, it includes memory visibility safety (the "happens-before relationship" which is a term also used by the JMM in the context of synchronized blocks and volatile semantics):

The definition of "serially" says:

In the context of a Signal, non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks.

The way I see it, every "serial" guarantee must include proper happens-before relationship or it doesn't really guarantee anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood this as a guarantee that calls to onNext, onComplete, etc. never overlap, so we are safe to perform check internal state and perform updates without worrying that these values will be changed in a parallel onNext call. I think you may be right regarding these guarantees extending to visibility, at least internally. For example, checking some other open source subscribers: https://github.com/playframework/netty-reactive-streams/blob/89aff754154a118ea134d5ca64cce3dc44c78fc6/netty-reactive-streams/src/main/java/org/playframework/netty/HandlerSubscriber.java#L84
Most mutable members are normal variables. The Subscription and ChannelHandlerContext are volatile, possibly because they are provided from the outside?

Copy link
Contributor Author

@ghik ghik Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood this as a guarantee that calls to onNext, onComplete, etc. never overlap, so we are safe to perform check internal state and perform updates without worrying that these values will be changed in a parallel onNext call.

Without proper happens-before relationship this "non-overlapping" alone doesn't really give you any safety at all because even though subsequent onNext calls happen one after another in real time, they can still see some not-fully-thread-published broken state of a previous invocation, requiring the same synchronization as if they were concurrent. OK, I take it back - there actually is some value in it, e.g. like you said, even though you have to use volatiles, you can assume that noone else will modify these variables in the middle of your invocation.

Most mutable members are normal variables. The Subscription and ChannelHandlerContext are volatile, possibly because they are provided from the outside?

I think subscription must be volatile because it is accessed from ChannelDuplexHandler handler methods. There is no happens-before relationship between onSubscribe (where subscription is set) and these methods.

There's a similar situation with ctx, which is set by a Netty handler method but accessed in Subscriber methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the HandlerSubscriber is a Netty channel handler, so its onNext and other reactive methods are supposed to be called only from Netty handler context, which guarantees the same thread, thus visibility. In case of our SimpleSubscriber it's similar - it is the Netty's underlying publisher that calls these methods, and totalLength and buffers are accessed only there, so I guess it's OK to remove volatile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess this is the way Netty implements reactivestreams spec's requirements.

@@ -46,7 +53,7 @@ class InputStreamPublisher[F[_]](range: InputStreamRange, chunkSize: Int)(implic
case _ => chunkSize
}

val _ = monad
runAsync(monad
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I think we still should add a comment that in case of Id this isn't really async and we are aware that this case violates reactive streams.

@ghik ghik marked this pull request as ready for review March 25, 2024 10:55
@ghik ghik merged commit 758d521 into master Mar 26, 2024
28 checks passed
@ghik ghik deleted the netty-refactors branch March 26, 2024 09:18
object RunAsync {
type Id[A] = A

final val Id: RunAsync[Id] = new RunAsync[Id] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: anything better in final val Id over object Id extends RunAsync[Id]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, no strong opinions.

It arguably generates somewhat simpler bytecode, i.e.

  • no lazy initialization associated with object
  • no separate class associated with object (there's an anonymous class but it will probably be compiled to a lambda so it won't have a classfile)

It also works more naturally with type inference:

  • when it's a val, type of Id will be inferred as RunAsync[Id] (unless explicitly requested to be typed as Id.type)
  • when it's an object, it will be inferred as Id.type, with API potentially extended over RunAsync[Id]

Neither the laziness nor introducing a subtype was my intention, so a val is much closer to expressing exactly what I want, which is just having a plain implementation of RunAsync.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thanks :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants