Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaling In After a Scale Out With Kafka Binder Causes Uneven Work Distribution #4896

Closed
fiidim opened this issue May 8, 2022 · 7 comments
Closed
Assignees
Labels
status/need-investigation Oh need to look under a hood type/bug Is a bug report

Comments

@fiidim
Copy link

fiidim commented May 8, 2022

Description:
Discussion on the topic here. High level issue may be documentation related, or deployer.count property may need to do additional work.

Consider a stream that consists of source | process | sink, for Scaling out, the documentation recommends stream.properties resembling:

deployer.sink.count=<count>
app.processor.producer.partitionKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
app.sink.spring.cloud.stream.kafka.binder.minPartitionCount=<count>

This will result in Kafka provisioning partitions. Everything is ok. If you scale in from this point as:

deployer.sink.count=<count/2>
app.processor.producer.partitionKeyExpression=headers['X-PartitionKey']?:''
app.sink.spring.cloud.stream.kafka.binder.autoAddPartitions=true
app.sink.spring.cloud.stream.kafka.binder.minPartitionCount=<count/2>

This results in Kafka keeping the original amount of partitions (since partitions aren't typically removed), and Spring Cloud Data Flow only assigning to <count/2> partitions via the default strategy. Sink instances are assigned to all 10 real partitions. Instances may be assigned solely to unused partitions.

Example in going to 10 instances to 5, can result in partition assignment of (note that instance 3 and 5 would be completely idle, and 4 partially idle):

1: 3,2
2: 0,1
3: 9,8
4: 5,4
5: 7,6

The only workaround that has produced correct results is to tell the processor how many physical partitions Kafka has (i.e. the old number), which allows for assigned to ALL partitions, while still having a lower deployed instance count. Example:

app.processor.spring.cloud.stream.bindings.output.producer.partition-count=10

Feels kind of clunky. I'm open to suggestions. If there was a way to tell the sink app only to listen on the first partitions, this would also work. Otherwise, there should be an settings for the processor application to detect the number of physical partitions, or a bit more documentation on the Scaling In part.

Release versions:
All versions

@github-actions github-actions bot added the status/need-triage Team needs to triage and take a first look label May 8, 2022
@fiidim
Copy link
Author

fiidim commented Jul 4, 2022

I guess if I had to re-phrase it in a better way, it would be that consumers are being assigned to partitions that will not be assigned messages. It's possible that consumers are assigned multiple partitions that will never receive messages.

This happens because when you scale out, partitions are added, but when you scale in they are not removed. The producer assigns partitions based on deployer count, but the consumer isn't assigned partitions based on deployer count.

@markpollack markpollack added this to the 2.10.0-M2 milestone Jul 20, 2022
@onobc onobc added status/need-investigation Oh need to look under a hood and removed status/need-triage Team needs to triage and take a first look labels Jul 22, 2022
@markpollack markpollack modified the milestones: 2.10.0-M2, 2.10.0-M3 Aug 17, 2022
@markpollack markpollack modified the milestones: 2.10.0-M3, 2.10.0-RC1 Oct 7, 2022
@sobychacko
Copy link
Contributor

@fiidim Kafka binder provisions ten partitions when you scale out your app to 10 instances. You cannot decrease the number of partitions once they are already added. Thus, when scaling down to 5 instances, as you correctly noted, you are still dealing with ten partitions. Your five sink instances will evenly distribute the available partitions from the Kafka topic. On the processor side, whatever value that partitionKeyExpression resolves to, the default Partition selector will take the hash of that value and then do a modulo operation over the available number of partitions (hash of partitionKeyExpression % number of partitions). The available number of partitions is 10 in this case. Is there a reason why that partitionKeyExpression value resolves to a hash that only produces a value range of [0-4]? How do you populate the header X-PartitionKey? Are you using the instance index as the header value?

Spring Cloud Data Flow only assigning to <count/2> partitions via the default strategy

I don't see how partitions >=5 are ignored in this case unless the partitionKeyExpression limits itself to a corresponding hash range of [0-4]. We need to identify why the processor only writes to 5 partitions when you scale down. Is this a custom processor you built? From the processor's perspective, it does not even see the value of 5 instances since that only applies to the sink app.

This is the standard Spring Cloud Stream behavior with partitions and is only applicable at the Spring Cloud Stream level. We will explore if SCDF has some particular logic to send to only those five partitions on the processor side.

@fiidim
Copy link
Author

fiidim commented Oct 11, 2022

@sobychacko When deployer.sink.count is greater than 1, there must be some additional logic that is communicated to the kafka binders (that I can't see). I assume that both the producer and consumer need to be aware of the actual number of partitions, and not the number of deployed consumers (i.e. deployer.*.count).

It behaves like the algorithm is being applied as hash of partitionKeyExpression % deployed count of consumer instead of hash of partitionKeyExpression % number of partitions as you noted. This causes the producer (processor) to only use MAX deployed consumer partitions, and the consumer to attach to all physical partitions (unbalanced).

This would explain why explicitly telling (or perhaps overriding) the value the producer uses works correctly:
app.processor.spring.cloud.stream.bindings.output.producer.partition-count=10

I'm not intimate with the SCDF code base in order to peek what happens when you set deployer.<sink>.count, but the answer might be in there (and it's influencing the producer partition property).

@sobychacko
Copy link
Contributor

@fiidim Thanks for the feedback. We will look into the SCDF code to see if the sink count has any influence on the partition decision. cc @markpollack

@markpollack
Copy link
Contributor

Somewhere around here

is where we should investigate.

@markpollack markpollack added the type/bug Is a bug report label Oct 13, 2022
@onobc onobc modified the milestones: 2.10.0-RC1, 2.10.0-RC2 Oct 21, 2022
@markpollack markpollack modified the milestones: 2.10.0-RC2, 2.10.1 Nov 10, 2022
@markpollack markpollack modified the milestones: 2.10.1, Priority Backlog Jan 5, 2023
@onobc onobc modified the milestones: 2.10.1, Priority Backlog Feb 9, 2023
@sobychacko
Copy link
Contributor

As described above, one workaround is to set the app.processor.spring.cloud.stream.bindings.output.producer.partition-count to the updated partition count. However, if you are using Spring Cloud Stream 4.0.3 or later, we recommend instead setting app.splitter.spring.cloud.stream.bindings.output.producer.dynamicPartitionUpdatesEnabled to true so that the partition count is detected dynamically at runtime.

Generally speaking with Apache Kafka, it is better to leave partitioning to the middleware itself rather than using the partitioning support provided by the binder. See the important section under this part of the reference doc for more details.

@markpollack
Copy link
Contributor

Closing this issue as there are provided workaround, the follow up issues are #5375 and #5376

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-investigation Oh need to look under a hood type/bug Is a bug report
Projects
None yet
Development

No branches or pull requests

4 participants