Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why doesn't KStreamBinder have a lifecycle for DefaultBinding? #850

Closed
jaehun2841 opened this issue Feb 19, 2020 · 5 comments · Fixed by #1042
Closed

Why doesn't KStreamBinder have a lifecycle for DefaultBinding? #850

jaehun2841 opened this issue Feb 19, 2020 · 5 comments · Fixed by #1042
Assignees

Comments

@jaehun2841
Copy link

jaehun2841 commented Feb 19, 2020

I previously asked stackoverflow.
(https://stackoverflow.com/questions/60282225/why-doesnt-kstreambinder-have-a-lifecycle-for-defaultbinding)

I'm using KStream parameter in @StreamBuilder.
This will create a DefaultBinding through KStreamBinder.
My requirement is to use Binding visualization and control.

However KStreamBinder.java creates DefaultBinding as below.

return new DefaultBinding <> (name, null, outboundBindTarget, null);

You can't control the state via springboot /actuator/bindings because the lifecycle is null.
(POST /actuator/bindings/{bindings-name} {"state":"PAUSED"})
How can I control the state of a Binding?

Are you not intentionally providing flow control for KafkaStreams?

The version I am using is below.

  • org.springframework.cloud:spring-cloud-stream:3.0.1.RELEASE
  • org.springframework.cloud:spring-cloud-stream-binder-kafka-core:3.0.1.RELEASE
  • org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.0.1.RELEASE
  • org.apache.kafka:kafka-streams:2.3.1

please answer about my question.

Let me show you the code

@EnableBinding(Dispatcher::class)
class MessageDispatcher {
  @StreamListener(Dispatcher.INBOX)
  override fun dispatch(input: KStream<String, Message>) {
   dispatch(input)
  }
}
interface Dispatcher {
   companion object {
      const val INBOX = "inbox"
   }

  @Input(INBOX)
  fun input(): KStream<String, Message>
}

please discuss
Thank you.

@jaehun2841
Copy link
Author

Please review @sobychacko

@sobychacko
Copy link
Contributor

@jaehun2841 Sorry for the delay in responding to the issue. It is intentional that we don't support this feature in Kafka Streams binder in the same way it is supported in the regular Kafka binder. In the regular binder, we are delegating to Spring Kafka and there we are pausing/resuming the poller. In Kafka Streams, this consumer is not exposed. See this comment. What exactly is your use case? Are you trying to pause and then resume a live stream processing system?

If you are concerned about throughput and back pressure, Kafka Streams is designed to handle those things. You can still achieve the pause/resume capability if you are willing to introduce another regular processor in your application before passing that to the Kafka Streams processor. Then control the flow in the first processor which is based on the regular binder, so you can invoke pause and resume endpoints there. Keep in mind though, that this introduces another Kafka topic. Here is the blueprint for such a flow.

@EnableBinding(Dispatcher.class)
class MessageDispatcher {

  @StreamListener(Dispatcher.INBOX)
  public Message dispatch(Message input) {
   //simply pass it down to the Kafka Streams processor
  }

  @StreamListener(Dispatcher.REAL-INBOX)
  public void dispatch(KStream<String, Message> input) {
   dispatch(input)
  }
}

Then if you want to pause the processing, you can do so at the first processor. What do you think about that solution? If you have other ideas, please chime in.

@sobychacko
Copy link
Contributor

@jaehun2841 Did you get a chance to consider my comments above? Here is another thread in which you can find more information on pausing stream processing in Kafka Streams.

@sabbyanandan
Copy link
Contributor

Closing due to no activity.

sobychacko added a commit to sobychacko/spring-cloud-stream-binder-kafka that referenced this issue Mar 12, 2021
Introduce the ability for Kafka Streams application's lifecycle
management through actuator binding endpoints. Kafka Streams
only supports STOP and START operations. PAUSE/RESUME operations
that is available in regular message channel based binders
are not available in Kafka Streams binder.

Adding tests and docs.

Resolves spring-cloud#1038
Resolves spring-cloud#850

https://stackoverflow.com/questions/60282225/why-doesnt-kstreambinder-have-a-lifecycle-for-defaultbinding
garyrussell pushed a commit that referenced this issue Mar 15, 2021
* Support KStream lifecycle through binding endpoint

Introduce the ability for Kafka Streams application's lifecycle
management through actuator binding endpoints. Kafka Streams
only supports STOP and START operations. PAUSE/RESUME operations
that is available in regular message channel based binders
are not available in Kafka Streams binder.

Adding tests and docs.

Resolves #1038
Resolves #850

https://stackoverflow.com/questions/60282225/why-doesnt-kstreambinder-have-a-lifecycle-for-defaultbinding

* Addressing PR review comments

* Addressing PR review

* cleanup unused code
@sobychacko
Copy link
Contributor

This feature is now available in Kafka Streams binder. See the commit above for details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants