Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KinesisMessageDrivenChannelAdapter consumes nearly all CPU time #70

Closed
volodarsky opened this issue May 31, 2017 · 21 comments
Closed

KinesisMessageDrivenChannelAdapter consumes nearly all CPU time #70

volodarsky opened this issue May 31, 2017 · 21 comments

Comments

@volodarsky
Copy link

I've encountered that KinesisMessageDrivenChannelAdapter's ConsumerDispatcher & ConsumerInvoker consumes nearly all CPU even without any load. I used 'KinesisMessageDrivenChannelAdapter' with default settings. What could I do to decrease CPU load?

@artembilan
Copy link
Member

artembilan commented May 31, 2017

Well, that is normal behavior of the infinite loop, like in the ConsumerDispatcher:

	public void run() {
			while (KinesisMessageDrivenChannelAdapter.this.active) {
                          ...
				for (Iterator<ShardConsumer> iterator =
						KinesisMessageDrivenChannelAdapter.this.shardConsumers
								.values()
								.iterator();
						iterator.hasNext(); ) {
					ShardConsumer shardConsumer = iterator.next();
					shardConsumer.execute();
					}
				}
			}

And it really eats the Thread and, therefore, one of the CPU cores.

Do you want to have some sleep in between cycles and let other resources to use the CPU?
I think in the modern Microservice universe when we distribute our application in the cloud we can't afford such a loyalty for slow CPU... I mean we are not going to add something like sleep into these loops.

Hope I am clear.

@volodarsky
Copy link
Author

Especially in the cloud environment (e.g AWS) I would like to distinguish situations either the server is running under load or not. And don't want to get CloudWatch alarms about 100% CPU consumption constantly.

@ltv12
Copy link

ltv12 commented May 31, 2017

I think in the modern Microservice universe when we distribute our application in the cloud we can't afford such a loyalty for slow CPU.

@artembilan Sorry. It is not clear for me. It would be nice if you could give me more details?

@artembilan
Copy link
Member

Well, my idea that we dedicate a Thread (or the whole Thread Pool) to our application and don't care how it manages them.

As I said the code looks like an infinite loop without any timeouts in between cycles, especially with the ConsumerInvoker. That's why we see that busy CPU.

If you have some ideas how to resolve that I'm open for suggestions.

Like WaitStrategy from LMAX Disruptor: https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/WaitStrategy.java ?

My current point like I don't want to add sleeps into those loops because we will have some lag between consumer cycles.

@garyrussell , @mbogoevici , WDYT about this concern?
Thanks

@garyrussell
Copy link
Contributor

If there is no blocking API, I think there is no alternative but to add a (configurable) Thread.sleep() in the loop.

I am not familiar with the code, but the sleep can be made conditional on no data being received.

@artembilan
Copy link
Member

Well, I went ahead and decided to fix this any way: a8d370f.

Thank you @garyrussell for advice.

And special thanks to @volodarsky for catching this problem.

My apologies not being so thoughtful from the beginning.

Would be great if you are able to test against the latest 1.1.0.BUILD-SNAPSHOT.

@garyrussell
Copy link
Contributor

I can't read the code on my phone but think the sleep must be configurable and 1ms is not a suitable default. Maybe 100ms would be better. We should only sleep if no data is received.

@garyrussell garyrussell reopened this May 31, 2017
@artembilan
Copy link
Member

Well, 1 ms is something similar what LMAX Disraptor has: https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java#L82.

The problem that we don't know in that place that we should sleep or not. It is a dispatcher thread. The real task is done by the target ShardConsumers. That's why I don't want to sleep for long time since one of those consumers might be ready for consuming. That's how configurable sleep might be evil in this case. And 100 ms isn't good, too...

But I'm open for suggestions. I'm almost agreed with my self that there should be some queue to loop active consumers. And, therefore, block dispatcher waiting on poll() from queue. But I don't see the solution yet. And I'm still sure that we should treat the target application as a Microservice. Exactly how LMAX Exchange promotes their Disraptor solution.

But yeah... That's IMO and any recommendations are welcome

@garyrussell
Copy link
Contributor

I understand, but cloud billing is usually based on compute time, so spinning a CPU to do no work is just terribly awful.

I am not familiar with the code so I don't have a recommendation; sorry.

@volodarsky
Copy link
Author

volodarsky commented Jun 1, 2017

@artembilan very thanks for your quick fix, however I'm observing that CPU consumption stay nearly on the same level. Even then I've re-build snapshot with 100 ms sleep delay, I discovered the same behaviour. (I've attached some screenshot - hope it could helps)

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
6600 root 20 0 5850068 378244 16676 S 746,0 2,3 56:46.88 java

screenshot from 2017-06-01 12-15-34

with-100-ms

@volodarsky
Copy link
Author

volodarsky commented Jun 1, 2017

Btw, then I've added sleep delay in the main cycle and situation has improved significantly:
while (KinesisMessageDrivenChannelAdapter.this.active) sleep(50);

@artembilan
Copy link
Member

Thank you, @volodarsky , for testing and feedback.

Well, since it doesn't help I'll think more about some barrier feature.

Just let me know: do we how now problem only with the ConsumerDispatcher?
Or ConsumerInvoker is still guilty, too ?

@artembilan
Copy link
Member

H-m. Funny.
The KCL Worker has this code:

while (!shouldShutdown()) {
        runProcessLoop();
}
...
 void runProcessLoop() {
        try {
            ...
            wlog.info("Sleeping ...");
            Thread.sleep(idleTimeInMilliseconds);
        } catch (Exception e) {
           ...
        }
       }

So, they sleep after each consuming cycle.
Weird. That significantly decreases a performance of our application, especially when we have a recordLimit = 25 (default), but our shards contains millions records.

And that is like:

/**
   * Idle time between record reads in milliseconds.
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;

by default...

@artembilan
Copy link
Member

Well, OK. I see this: http://docs.aws.amazon.com/streams/latest/dev/kinesis-low-latency.html

For most applications, we recommend polling each shard one time per second per application. This enables you to have multiple consumer applications processing a stream concurrently without hitting Amazon Kinesis Streams limits of 5 GetRecords calls per second. Additionally, processing larger batches of data tends to be more efficient at reducing network and other downstream latencies in your application.

And then in the end:

Because Streams has a limit of 5 GetRecords calls per second, per shard, setting the idleTimeBetweenReadsInMillis property lower than 200ms may result in your application observing the ProvisionedThroughputExceededException exception.

So, I intend to go ahead with the 1000 sleep between cycles by default. 10000 records per poll. And let to override consumerBackoff to the smaller value, but not less then 250.

How is this plan for you?

@volodarsky
Copy link
Author

volodarsky commented Jun 1, 2017

@artembilan

Just let me know: do we how now problem only with the ConsumerDispatcher?

Yep.

So, I intend to go ahead with the 1000 sleep between cycles by default.

Seems it will be ok.

10000 records per poll.

25 records limit are definitely too small, but I'm not sure about 10k - as I remember, kinesis record size could be up to 2mb. (remember about billing of number of records in AWS) I'd rather suggest 100 - 500

@artembilan
Copy link
Member

Hm, I can't find where I picked up 25 as default value. So yes, let's

10000 is what limit requires:

 /**
     * <p>
     * The maximum number of records to return. Specify a value of up to 10,000. If you specify a value that is greater
     * than 10,000, <a>GetRecords</a> throws <code>InvalidArgumentException</code>.
     * </p>
     */
    private Integer limit;

And this is a default for Kinesis Client Library. Looks like the reply is restricted to the 10MB anyway:

The size of the data returned by get-records varies depending on the utilization of the shard. The maximum size of data that get-records can return is 10 MB. If a call returns this amount of data, subsequent calls made within the next 5 seconds throw ProvisionedThroughputExceededException

http://docs.aws.amazon.com/cli/latest/reference/kinesis/get-records.html

What you talk about the price is a number of records in the stream, not how many you would like to poll.
You pay for the storage on the AWS. Also yes, we might pay for the requests as well, but for the number, not the limit argument.

@volodarsky
Copy link
Author

Hm, I can't find where I picked up 25 as default value. So yes, let's

private int recordsLimit = 25;

Also yes, we might pay for the requests as well,

My point was that AWS customers will tend to use big zipped records to reduce their costs. And this could leads to unexpected memory load. But with 10 mb constraint it doesn't make sense.

@artembilan
Copy link
Member

OK! I added idleBetweenPolls option to the KinesisMessageDrivenChannelAdapter and fixed ConsumerInvoker race condition around lock barrier. Now it is a Semaphore(0).

Let me know how does it work now for you.

Thank you very much for all the feedback!

@volodarsky
Copy link
Author

@artembilan Thanks a lot! Now it works fine. Btw, when could I expect this fix will be appeared as a milestone/release in maven repo?

@artembilan
Copy link
Member

That's great!

Well, look, having the started work for the SCSt Kinesis Binder (https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis), we realised that there are some missed options and features to be implemented here.

OTOH I understand your expectation and won't mind to release M2 for this critical bug.
Everything rest can go to the next Milestone.

We are releasing Spring Integration 4.3.10 this week. So, this one will be upgraded and released after that.

@artembilan
Copy link
Member

@volodarsky ,

The version 1.1.0.M2 is here: https://repo.spring.io/milestone/org/springframework/integration/spring-integration-aws/1.1.0.M2/

Thank you for your feedback one more time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants