Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Option to Not Create an Output Binding for the RoutingFunction #2168

Closed
garyrussell opened this issue May 6, 2021 · 6 comments
Closed
Assignees
Labels
Milestone

Comments

@garyrussell
Copy link
Contributor

garyrussell commented May 6, 2021

See https://stackoverflow.com/questions/67419839/spring-cloud-stream-functionrouter-output-attempts-to-bind-to-kafka-topic

Verified when destination functions are all Consumers.

@SpringBootApplication
public class So67419839Application {

	public static void main(String[] args) {
		SpringApplication.run(So67419839Application.class, args);
	}

	@Bean
	MessageRoutingCallback route() {
		return msg -> new String((byte[]) msg.getPayload()).equals("foo") ? "foo" : "bar";
	}

	@Bean
	Consumer<Message<?>> foo() {
		return msg -> System.out.println("foo: " + msg);
	}

	@Bean
	Consumer<Message<?>> bar() {
		return msg -> System.out.println("bar: " + msg);
	}

}
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839
$ kafka-topics --bootstrap-server localhost:9092 --list
__consumer_offsets
functionRouter-out-0
so67419839
@garyrussell
Copy link
Contributor Author

garyrussell commented May 6, 2021

But I see that it is needed if at least one of the delegates is a function:

	@Bean
	Function<Message<?>, String> foo() {
		return msg -> {
			System.out.println("foo: " + msg);
			return "FOO";
		};
	}

So we probably need a new property to suppress it; I don't see how we could deterministically deteverifyrmine that all delegates are Consumers.

@garyrussell garyrussell changed the title Don't Create an Output Binding for the RoutingFunction Add Option to Don't Create an Output Binding for the RoutingFunction May 6, 2021
@garyrussell garyrussell changed the title Add Option to Don't Create an Output Binding for the RoutingFunction Add Option to Not Create an Output Binding for the RoutingFunction May 6, 2021
@garyrussell
Copy link
Contributor Author

garyrussell commented May 6, 2021

As a work around simply point it to the same destination

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839

Since the delegates are all Consumers we'll never actually send anything.

@olegz olegz self-assigned this May 6, 2021
@olegz olegz added the bug label May 6, 2021
@olegz olegz added this to the 3.1.3 milestone May 6, 2021
@olegz
Copy link
Contributor

olegz commented May 6, 2021

So if I understand correctly. . .
First, I just added a test to validate the routing itself and it works

@Test
public void testRoutingToConsumers() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					RoutingConsumerConfiguration.class))
						.web(WebApplicationType.NONE)
						.run("--spring.jmx.enabled=false",
						"--spring.cloud.function.routing-expression=headers['func_name']")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		Message<byte[]> inputMessage = MessageBuilder
					.withPayload("foo".getBytes())
					.setHeader("func_name", "foo")
					.build();
			inputDestination.send(inputMessage);
	}
}

@EnableAutoConfiguration
public static class RoutingConsumerConfiguration  {
		@Bean
		public Consumer<String> foo() {
			return System.out::println;
		}
		@Bean
		public Consumer<String> bar() {
			return System.out::println;
		}
}

However, and that is what I believe you're saying is that the fact that RoutingFunction is a function it automatically creates an output binding even though there will never be an output.
Correct?

@garyrussell
Copy link
Contributor Author

Exactly, yes.

I don't see how we can programmatically detect that the function never routes to another function (only Consumers), I think we need a property, something like:

spring.cloud.stream.function.routing.consume-only=true

By the way, the docs talk about spring.cloud.stream.function.routing.enabled=true which doesn't seem to be needed.

@olegz
Copy link
Contributor

olegz commented May 6, 2021

I see. . . need to think. I really don't want to introduce another property, precisely for the same reason that you just mentioned as it may crate confusion.
For example if you don't provide property spring.cloud.function.definnition=functionRouter, the spring.cloud.stream.function.routing.enabled=true will do the same thing. So one or the other but I already regret doing that.
Perhaps i can look at dynamic binding as a special case fo RoutingFunction. Basically never create output binding until the first non-null output is produced.

@olegz
Copy link
Contributor

olegz commented May 6, 2021

Ok, it appears that it can work. Just spiked it up and all tests are passing

Basically the idea is to never create output binding for RoutingFunction, only input. In the event the routing is to the Consumer, we're good and if it is to Function, then StreamBridge is going to be used to effectively create an output binding on demand.

Will sleep on it and merge tomorrow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants