Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition selector strategy does not use the configured partition count for multiple outbounds #2750

Closed
angelbeshirov opened this issue Jun 3, 2023 · 5 comments · Fixed by #2751

Comments

@angelbeshirov
Copy link

angelbeshirov commented Jun 3, 2023

Framework Version:
spring-boot: 3.0.2
spring-cloud: 2022.0.1

Issue:
We have a reactive function, which consumes messages from one topic and sends them to N topics based on some conditions. Additionally, we also have a custom key extractor and partition selector configured for each of the output bindings. However, the number of partitions of the outbound topics is different which does not seem to be reflected in the custom partition selector.

I did some digging and it seems only the binding properties of the first outbound are being used when creating the PartitionAwareFunctionWrapper, which are then passed to the PartitionHandler invoking the overriden selectPartition(Object key, int partitionCount) method with incorrect partition count.

Sending the message to the incorrect partition results in TimeoutException.

Code snippet from org.springframework.cloud.stream.function.FunctionConfiguration

if (!CollectionUtils.isEmpty(outputBindingNames)) {
	BindingProperties bindingProperties = this.serviceProperties.getBindings().get(outputBindingNames.iterator().next());
	ProducerProperties producerProperties = bindingProperties == null ? null : bindingProperties.getProducer();
	if (producerProperties != null) {
		function.setSkipOutputConversion(producerProperties.isUseNativeEncoding());
	}
	functionToInvoke = new PartitionAwareFunctionWrapper(function, this.applicationContext, producerProperties);
}

Expected behavior:
I would expect the framework to honor the configured partition count for each output binding.

@sobychacko
Copy link
Contributor

How do you send them to N topics? Could you provide us with a small sample application where we can see the issue?

@angelbeshirov
Copy link
Author

Hi @sobychacko

I have created a sample application showing the issue: https://github.com/angelbeshirov/spring-cloud-partition-selector-issue

The reactive function reads random numbers from one topic test-input and sends the even numbers to output-even and the odd ones to output-odd. The output-even topic has 10 partitions and output-odd has 5 partitions.

Publishing a message to output-odd fails with:

Exception thrown when sending a message with key='null' and payload='byte[30]' to topic output-odd and partition 9:
org.apache.kafka.common.errors.TimeoutException: Topic output-odd not present in metadata after 60000 ms.

sobychacko added a commit to sobychacko/spring-cloud-stream that referenced this issue Jun 12, 2023
 - When using reactive functions, partition selector strategy does not
   use the configured partition count for multiple outbounds. This is
   because we take the first configured output binding and apply it's
   partition counts on all the outbound reactive streams (Tuples).
   Addressing this issue by properly applying the correct partition handling
   per output binding.

Resolves spring-cloud#2750
olegz pushed a commit that referenced this issue Jun 14, 2023
 - When using reactive functions, partition selector strategy does not
   use the configured partition count for multiple outbounds. This is
   because we take the first configured output binding and apply it's
   partition counts on all the outbound reactive streams (Tuples).
   Addressing this issue by properly applying the correct partition handling
   per output binding.

Resolves #2750
@sobychacko
Copy link
Contributor

@angelbeshirov The issue is now resolved via the commit mentioned ^^. You can try the fix on the latest snapshots.

@angelbeshirov
Copy link
Author

Thanks, @sobychacko, I can confirm the issue is resolved in 4.1.0-SNAPSHOT.

@sobychacko
Copy link
Contributor

Thanks @angelbeshirov for confirming.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants