Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spring.cloud.stream.instanceCount and Autoscaling #1342

Closed
steve-roy opened this issue Apr 3, 2018 · 14 comments
Closed

spring.cloud.stream.instanceCount and Autoscaling #1342

steve-roy opened this issue Apr 3, 2018 · 14 comments

Comments

@steve-roy
Copy link

Hi,

Are these properties required for proper function?
spring.cloud.stream.instanceCount
spring.cloud.stream.instanceIndex

If so, how are these expected to be set in an autoscaling environment?

  • steve
@olegz
Copy link
Contributor

olegz commented Apr 3, 2018

@steve-roy can you please be a bit more specific with regard to what you're trying to achieve?

@steve-roy
Copy link
Author

For example, we will be running our Kafka Consumer / Producer in an Amazon Autoscaling environment. Let's call our Spring Boot / Cloud Stream application CP. When load is light we might be running one instance of CP.

In this case, if we understand the use of the settings they would be:
spring.cloud.stream.instanceCount=1
spring.cloud.stream.instanceIndex=1

During heavy activity, we might ask Amazon to increase the application instance count of CP based on rules we define. Let's say Amazon increases the instances to 3.

In this case, the settings would be:

CP1
spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=1

CP2
spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=2

CP3
spring.cloud.stream.instanceCount=3
spring.cloud.stream.instanceIndex=3

Because, the code is a pre-built jar and deployed we can't update the properties in the jar.

Questions:

  • Are these settings required?

  • What is their purpose?

  • How does the Spring Cloud Stream team envision these properties being updated dynamically in an autoscaling environment if they are needed?

Does this help?

  • steve

@steve-roy
Copy link
Author

Also, looks like this issue was previously opened and then closed but the resolution is not clear.

#1151

@garyrussell
Copy link
Contributor

garyrussell commented Apr 4, 2018

They are used for binder-based partitioning (when using a binder which, unlike Kafka, doesn't support partitioning natively).

With Kafka, the instanceCount is also used (along with concurrency) to check if the topic has enough partitions (count * concurrency), and adjust them up, if so configured (autoAddPartitions).

Similarly, the instance index is used for partitioned consumers; again, not necessary with Kafka as long as Kafka group management is used, so that Kafka allocates partitions to instances.

Some environments (PCF, k8s - with the SCDF deployer) allocate an instance index automatically.

@steve-roy
Copy link
Author

ok - to confirm - if we are using Kafka with Consumer Groups these two properties are not required:

spring.cloud.stream.instanceCount=
spring.cloud.stream.instanceIndex=

@garyrussell
Copy link
Contributor

Correct. But you need to make sure you have sufficient partitions for your topics to support the scale-out you desire, because the binder won't attempt to increase them.

@steve-roy
Copy link
Author

understood - thank you.

Are you able to comment on the Kinesis Spring Cloud Stream binder? Are the properties required with the Kinesis Spring Cloud Stream binder?

https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis

@artembilan
Copy link
Contributor

@steve-roy ,

No, those properties are not required on Kinesis as well.
We have there a logic like:

if (properties.getInstanceCount() > 1) {
	shardOffsets = new HashSet<>();
	KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
	List<Shard> shards = kinesisConsumerDestination.getShards();
	for (int i = 0; i < shards.size(); i++) {
		// divide shards across instances
		if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
			KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset);
			shardOffset.setStream(destination.getName());
			shardOffset.setShard(shards.get(i).getShardId());
			shardOffsets.add(shardOffset);
		}
	}
}

So, if you specify them, each instanceIndex will get its own set of shards from the stream.
Otherwise all the instances will share all the shards, but only one instance will process one record from the shard thanks to consumerGroup management via checkpointer based on the DynamoDbMetaDataStore.

Does it make sense to you?

@steve-roy
Copy link
Author

ok - thank you both for confirming.

@Venkat2811
Copy link

Venkat2811 commented Oct 25, 2018

Hey @artembilan,

I was exactly looking for this. So, from your explanation, with consumerGroup management via checkpointer these properties are not needed to be set for kinesis binder:

spring.cloud.stream.instanceCount=
spring.cloud.stream.instanceIndex=

However, I'm still not clear on this:
I have a consumerGroup let's say with 2 instances. It listens on a single kinesis stream with let's say 4 shards:
If I have this configuration on both my instances:

cloud:
    stream:
      bindings:
        input:
          destination: my_stream_with_4_shards
          group: my_consumer_group
          consumer:
            concurrency: 2
            partitioned: true

will instance1 and instance2 process 2 shards each ?

Also, what happens when I change concurrency: 4

Thanks,
Venkat

@artembilan
Copy link
Contributor

@Venkat2811 ,

Unfortunately an even distribution still doesn't work for Kinesis Binder: spring-projects/spring-integration-aws#99.
We don't have there a rebalance implementation, so there is really a chance that one instance of your app will pick up all the shards for consuming.

However if you use instanceCount and instanceIndex, you'll end up with the static distribution:

		if (properties.getInstanceCount() > 1) {
			shardOffsets = new HashSet<>();
			KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
			List<Shard> shards = kinesisConsumerDestination.getShards();
			for (int i = 0; i < shards.size(); i++) {
				// divide shards across instances
				if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
					KinesisShardOffset shardOffset = new KinesisShardOffset(kinesisShardOffset);
					shardOffset.setStream(destination.getName());
					shardOffset.setShard(shards.get(i).getShardId());
					shardOffsets.add(shardOffset);
				}
			}
		}

Please, consider in the future to ask questions on StackOverflow and don't comment on closed issues. There is a chance it is going to be lost.

@mteodori
Copy link

are these properties required for the rabbitmq binder?

@garyrussell
Copy link
Contributor

Yes; the rabbit binder does not support auto-scaling with partitioned queues.

@mteodori
Copy link

thanks, so I guess in k8s I should move from Deployment to StatefulSet and compute index and count in an init container before I can try and setup HPA

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants