Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting reactor-kafka #2293

Closed
sobychacko opened this issue Mar 10, 2022 · 21 comments
Closed

Supporting reactor-kafka #2293

sobychacko opened this issue Mar 10, 2022 · 21 comments

Comments

@sobychacko
Copy link
Contributor

@ghahramani commented on Tue May 04 2021

Hi,

After some discussion in the Gitter spring cloud stream channel, I realized to have a sequential pause/resume situation in reactive Kafka we need to use a back pressure mechanism that is not supported by web flux and Stream right now as we do not have any binder for reactor-kafka yet. I would like to raise this feature request for supporting reactor-kafka in spring cloud stream so we can easily use its functionalities.


@garyrussell commented on Tue May 04 2021

@sabbyanandan @olegz This may not be trivial, but I think if we can pull it off, it should become the only way to support reactive applications in SCSt.

Reactor apps on top of non-reactive binders (especially Kafka) is a fool's errand in my opinion.

There is also a reactor-rabbitmq project.

Perhaps we should spike this?


@olegz commented on Wed May 05 2021

@garyrussell I completely agree and believe made it clear internally and externally. The same I believe would need to be made for Rabbit https://github.com/reactor/reactor-rabbitmq. This would also allow us to partition the binder API on reactive and imperative where reactive can still support the imperative model but not the other way around as it doesn't make any sense (as today)


@sabbyanandan commented on Tue May 18 2021

@ghahramani: When you get a chance, can you describe your use-case, and why the backpressure and being fully reactive would help the said scenarios? What are you trying to achieve and what are you OK to compromise live with?

I am curious to learn more about the real-world scenarios to validate and back my own hypothesis.


@ghahramani commented on Tue May 18 2021

@ghahramani: When you get a chance, can you describe your use-case, and why the backpressure and being fully reactive would help the said scenarios? What are you trying to achieve and what are you OK to compromise live with?

I am curious to learn more about the real-world scenarios to validate and back my own hypothesis.

So basically, it is started from this Gitter conversation, feel free to read it if there is any confusion, I would be happy to clarify things

https://gitter.im/spring-cloud/spring-cloud-stream?at=608c38f32cc8c84d85013db4


@UgiR commented on Mon Jun 28 2021

Out of curiosity, has any decision been made on this?


@sobychacko commented on Mon Jun 28 2021

@UgiR Nothing concrete has been done towards this initiative yet.


@roger751 commented on Mon Dec 06 2021

Is there any progress in this issue? Seems to me like the most needed feature when using Project Reactor with Spring to consume from Apache Kafka. Isn't it? Can someone explain how the current situation even works? e.g. how do I currently consume reactively from Kafka with Spring Cloud Stream if I can't use Reactor Kafka?


@sobychacko commented on Mon Dec 06 2021

A related issue for this one: spring-cloud/spring-cloud-function#779


@garyrussell commented on Mon Dec 06 2021

Also see this discussion reactor/reactor-kafka#252 (comment)


@roger751 commented on Tue Dec 07 2021

Replying to @artembilan's comment.

So, you need to decide for yourself if you stick with integration flow solution with whatever reactive capabilities are present there
Are there any missing Reactive capabilities? I generally don't understand how does Spring Integration helps in this use case. My main question is:

Would you say that it is a good option to implement a Spring Integration channel adapter if you would want to use the Spring Cloud ecosystem and read with reactor kafka? And why?

And also as I've mentioned, I don't really understand - does Spring Integration has full reactive support, or do you consider it as a risky move?

Last thing - about the implementation with Spring Integration. Do you consider the implementation of a reactive channel adapter as of the same concept as the implementation of the futuristic spring-cloud-stream-binder-kafka? I mean, does the user's workaround for this case the same as the futuristic implementation plans?


@artembilan commented on Tue Dec 07 2021

  1. With the current Spring Cloud Stream Binder implementations there won't be technical difference if you just switch to plain Spring Integration flow solution.
    Just because current Spring Cloud Stream Binders are fully based on Spring Integration.
    Probably we can consider a conceptual difference since you won't be able to have binding with your custom solution: everything is going to be in your hands from that point.

  2. Spring Integration is just a library, a set of components you can combine into a flow. In most cases a distributed async computation. So, it is wrong to judge it from the programming model perspective, since it is not a goal of this library.
    However with some convenient hooks it allows to implement a reactive solution. It is just might be the fact that not all the protocols Spring Integration has channel adapters for provide some reactive API.
    And even if they does, it doesn't mean we are able to catch up with everything what is there in the market. Like in this case with reactor-kafka: we just haven't implemented yet channel adapters for its abstractions.

  3. No, Spring Integration channel adapter implementation has nothing to do with the binder abstraction. It might be the case that binder is based on the channel adapter, but might be not, like existing Binder for Apache Kafka Streams: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka#apache-kafka-streams-binder.
    And no, the workaround for your reactor-kafka move cannot be treated as what going to happen in the binder. You probably miss many Spring Cloud Stream bit just seeing only a channel adapter from the implementation perspective. There is a binder configuration to distinguish between others, there is bindings to handle, provisioning etc. The short and easy solution won't be able benefit from the rest of Spring Cloud Stream infrastructure. At least that's how I feel it...


@roger751 commented on Sun Dec 12 2021

@artembilan is there any example for the Reactor-Kafka with Spring Integration usage? It would be very helpful.

Also - @sobychacko is there any news about the progress \ plans for this issue? Can you tell what is needed to be implemented for this to be released?


@artembilan commented on Mon Dec 13 2021

@roger751 ,

here is some sample about Reactor Kafka and Spring Integration interactions: https://github.com/artembilan/sandbox/tree/master/spring-integration-reactor-kafka.

I didn't add any README (yet), but behavior is simple:

  1. Reactor Kafka producer used from the @ServiceActivator
  2. Reactor Kafka consumer used from the IntegrationFlow
  3. The test covers their behavior against embedded Kafka.

@roger751 commented on Wed Dec 15 2021

@artembilan looks awesome!!! Thank you very much!! Do you think it is worth being added to the spring-integration / spring-integration-kafka project as an official example/ official feature? Looks pretty important to me and in my opinion worth being an official channel adapter. What do you think?

And more related to the official feature request of this very important issue - @olegz @sobychacko @garyrussell do you think that a solution for writing a reactive Spring-Cloud-Stream application will be available anytime soon? Are there any plans of working on this? I think it's pretty misleading that the Spring ecosystem encourages devs to write Kafka consumer applications and reactive applications, but won't be able to offer a basic solution for doing both. And if the decision is to not use spring-cloud-stream, wouldn't it be more appropriate if there would be some blogs about the subject and its alternative solutions?

It is just very unclear right now what a developer who just want to consume and produce to Kafka with Reactor and Spring Boot should do, and it seems to me like the most obvious use case. Spring-Cloud-Stream is a great projects for synchronous applications, but if the Spring ecosystem so encouraging to develop Reactive applications, there should be an official and easy solution for this too.


@sobychacko commented on Thu Dec 16 2021

@roger751 The plan is to rethink this idea of a reactive binder for Kafka as a new feature for the Spring Cloud Stream 4.0 roadmap. We will have more updates on this after the holidays.


@roger751 commented on Thu Dec 16 2021

@sobychacko ok. I'll be glad if you'll update us as soon as there will be progress. I think that 100% of the Reactor Kafka users are waiting for this, and probably lots of the Spring Cloud Stream users would be glad for such an integration between the two libraries as well. This integration is super important, and has a huge demand.


@lynch19 commented on Sat Dec 18 2021

+1


@kairurooj commented on Sun Dec 19 2021

@sobychacko what's the timeline for Spring Cloud Stream 4.0?


@thepaep commented on Tue Dec 21 2021

I would also like to know about progress in this area. Some blogs with updates will be great. If contributors' help is needed I would also like to know that.


@roger751 commented on Mon Jan 03 2022

@sobychacko for those of us that are waiting for this feature and need to implement data flow in the meantime, would you suggest implementing non Spring Cloud Stream using Function interfaces? I'm not sure what would be the easiest way of implementing applications so that the migration to Spring Cloud Stream 4.0.0 would be the easiest...


@ipanimnim commented on Fri Jan 07 2022

Definitely an important feature (in my opinion, that's the most important feature of this project).
Any progress has been started? @sobychacko @artembilan @olegz

Isn't it ironic that SCSt supports all of Apache Kafka's clients (Consumer API and Streams API) except the one that has been implemented at Pivotal?


@sobychacko commented on Fri Jan 07 2022

We are actively thinking through this right now and coming up with some design considerations for a new reactive binder. As soon as we have something shareable, we will do so. We hope to reach that state in a few weeks. Please stay tuned. Thank you!!


@roger751 commented on Sat Jan 08 2022

@sobychacko Great news! Looking forward to this. I also have a question about the conversion above. What’s the meaning of the Spring Cloud Sleuth support for “Reactive Spring Cloud Stream”, if there are no reactive binders? spring-cloud/spring-cloud-sleuth@90417f1


@fluxstirngchanle commented on Wed Jan 12 2022

A very anticipated issue. Any timelines?


@rancherz commented on Sun Jan 16 2022

Hey. I'd like to know if I understand correctly. I have a few questions:

  1. At the current situation it is safe to use Reactor Kafka with Spring Integration but not with Spring Cloud Stream? I saw in the gitter conversation that there was an issue with SCSt. https://gitter.im/spring-cloud/spring-cloud-stream?at=608c38f32cc8c84d85013db4
  2. I want to write a nio data pipeline that reads from Kafka, transforms (with usage of Out of Order Commits from Reactor-Kafka) and saves the output into Mongo. I wonder what is the most recommended approach to write such a pipeline. Should I use Spring Integration (and inject queues for each writer and transformer), or just us Reactor Kafka with a Processor class that serves as the "main"?

Any examples for applications that reads with Reactor Kafka and writes to some sink would be very appreciated (not just for Mongo).

@artembilan @ghahramani @garyrussell


@ipanimnim commented on Fri Feb 04 2022

Any updates? I'm willing to deploy a reactive Spring Integration application with Spring Cloud Dataflow, and this one blocks my migration from regular Spring Integration application, to Spring Cloud Stream.


@devlap2 commented on Sat Feb 26 2022

Does the existence of this issue mean that there is also not a real reactive support fr other binders in spring cloud stream (like azure eventhub and more)?


@garyrussell commented on Tue Mar 01 2022

We've made some progress should have an initial PR for this soon.

@devlap2 Those binders are maintained by third parties.

garyrussell added a commit to garyrussell/spring-cloud-stream that referenced this issue Mar 10, 2022
Note that it requires the message channel to be a `FluxMessageChannel`.

It currently only supports `Consumer<ReceiverRecord<?, ?>>` (no conversion).

The function binding should use the message channel directly instead of bridging
the direct channel to a reactive channel.

Producer Side PoC

Binder specific changes in ReactorKafkaBinder

Binder configuration

More changes to the reactive binder

Move KafkaBinderEnvironmentPostProcessor to the core module

Change binder key in spring.binders

Adding an e2e IT test for the reactive binder
@garyrussell
Copy link
Contributor

I have ported the initial PoC commits to this branch: https://github.com/garyrussell/spring-cloud-stream/tree/GH-2293

Need to fix a couple of TODOs before issuing a PR.

sobychacko pushed a commit to sobychacko/spring-cloud-stream that referenced this issue Mar 10, 2022
Note that it requires the message channel to be a `FluxMessageChannel`.

It currently only supports `Consumer<ReceiverRecord<?, ?>>` (no conversion).

The function binding should use the message channel directly instead of bridging
the direct channel to a reactive channel.

Producer Side PoC

Binder specific changes in ReactorKafkaBinder

Binder configuration

More changes to the reactive binder

Move KafkaBinderEnvironmentPostProcessor to the core module

Change binder key in spring.binders

Adding an e2e IT test for the reactive binder

Add lifecycle to integration adapters.

Remove unnecessary doStop().

Fix import order.
@sobychacko
Copy link
Contributor Author

Hi everyone @ghahramani @UgiR @roger751 @lynch19 @kairurooj @thepaep @ipanimnim @fluxstirngchanle @rancherz @devlap2 The initial version of a reactive binder for Kafka is now merged upstream to the main branch. Please see this commit for details: f3ccc7b

See the end-to-end integration test for a basic idea of its usage from an end-user application perspective.

Thanks to @garyrussell for this great foundation.

Core Spring Cloud Stream has now the necessary foundations for supporting a reactive binder, (although reactive Kafka binder is the only implementation available).

This is an initial step. We welcome and value any suggestions and feedback which we could use to enhance this initial effort. Please send in your comments, feedback in the form of GitHub issues and pull requests! Thank you all.

cc @olegz @artembilan

@roger751
Copy link

Great news. Waiting for the TODOs to be resolved, but that seems like amazing progress. Thank you !!!!

@devlap2
Copy link

devlap2 commented Mar 11, 2022

That's awesome. There's should be now some more samples of this and docs references. A sample addition in the samples repo can fill the gap.

@markusherbert
Copy link

I wonder why there is a use of DefaultKafkaConsumerFactory and not KafkaRecevier & ReceiverOptions (also SenderOptions and stuff). How is this test shows the usage of SCSt with Reactor Kafka? I think some explanations docs indeed would help

@garyrussell
Copy link
Contributor

garyrussell commented Mar 12, 2022

I wonder why there is a use of DefaultKafkaConsumerFactory...

It's just using that to get the properties to create the ReceiverOptions and SenderOptions; it will eventually expose properties for them too; this is a very early implementation.

There is an open issue in Boot spring-projects/spring-boot#29080

When that is resolved, this will utilize that code instead.

garyrussell added a commit to garyrussell/spring-cloud-stream that referenced this issue Mar 12, 2022
garyrussell added a commit to garyrussell/spring-cloud-stream that referenced this issue Mar 12, 2022
Add test to consume full `ReceiverRecord`.
garyrussell added a commit to garyrussell/spring-cloud-stream that referenced this issue Mar 12, 2022
Add test to consume full `ReceiverRecord`.
garyrussell added a commit to garyrussell/spring-cloud-stream that referenced this issue Mar 12, 2022
Add test to consume full `ReceiverRecord`.
olegz pushed a commit that referenced this issue Mar 15, 2022
Add test to consume full `ReceiverRecord`.

Resolves #2295
@devlap2
Copy link

devlap2 commented Apr 8, 2022

When will this issue get resolved?

@markusherbert
Copy link

Any updates?

@sobychacko
Copy link
Contributor Author

@devlap2 @ipanimnim Did you get a chance to try out the reactive Kafka binder based on the 4.0.0-M1 (or SNAPSHOT)? We would love to receive some feedback on this as we are preparing for the next milestone release.

@markusherbert
Copy link

@sobychacko I was struggling with trying this feature. What's the artifact's name? Can you attach a simple stupid sample (a nice addition to the samples dir would be awesome)

@sobychacko
Copy link
Contributor Author

sobychacko commented Jun 13, 2022

@markusherbert Sure, we will add a sample there. But in the meantime, please take a look at this test to get a basic idea.

@dajemet379
Copy link

@sobychacko Is there any dependency that contains the binder?

@rolkhas2
Copy link

Does this feature deprecated?

@AttitudeL
Copy link

AttitudeL commented Oct 26, 2022

Hi @sobychacko sorry if this may sound pushy, I'm just wondering if this support will be available on the 4.x version of spring-cloud-stream? Once available will there be new doc for the reactive kafka binder? I cannot wait to try it out! You guys rock!

@sobychacko
Copy link
Contributor Author

@AttitudeL Yes, the reactive Kafka binder is only available on the 4.x version of Spring Cloud Stream. We are adding this as a new feature in the 4.0.0 line. Docs are available here. Any feedback is appreciated. In the coming days, we are hoping to add a sample app for the reactive binder.

@ThiemoFischer
Copy link

This is great! We will be absolutely thrilled to check this out

@sobychacko
Copy link
Contributor Author

@ThiemoFischer Looking forward to any feedback and suggestions for this binder.

@zhenya-yadlovskij
Copy link

@sobychacko

Thanks for adding it!
It's awesome and do exactly that is needed, but there's a lack of some cool features

Feedback:

  • as a developer I cannot configure nor kafka receiver not ReceiverOptions :(
    • I think if would be nice to have something like customizer bean or so on...
  • there's some pretty nice feature at reactor kafka but now i can see only 2 options:
    • to create custom binder
    • or maybe not to use spring and to use just kafka reactor :(

For example: I would like to listen for autobalancing, but now I don't have a possibility to add a listener :(

@garyrussell
Copy link
Contributor

garyrussell commented Jan 23, 2023

@zhenya-yadlovskij I suggest that you open a new issue to request these features. We were hoping to get feedback like this when it was available in milestone releases before GA.

@zhenya-yadlovskij
Copy link

@garyrussell Thanks, will do!
Unfortunately that time there was no need from my side in it :)

@sobychacko
Copy link
Contributor Author

Closing this issue now as the reactive Kafka binder is implemented as part of Spring Cloud Stream 4.0.0. Feel free to create new feature requests, bug reports, etc. for the reactive Kafka binder as new issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests