Skip to content

Add multiplex support to Kafka reactive binder.  #2691

Description

@kitkars

Issue:

consumer multiplex does not work with kafka reactive binder.

Steps to replicate.

Use this binder with spring version 3.0.5.

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
@SpringBootApplication
public class MyApplication {

      public static void main(String[] args) {
          SpringApplication.run(MyApplication.class, args);
      }

	@Bean
	Consumer<Flux<String>> input() {
		return f -> f.doOnNext(System.out::println).subscribe();
	}

}

spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=topic1,topic2
spring.cloud.stream.bindings.input-in-0.consumer.multiplex=true

We get this error

org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]

The issue is here in the reactive binder.

The non-reactive version seems to handle this properly here.

Metadata

Metadata

Assignees

Type

No type

Fields

No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions