-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
I'm building an Spring Integration application (using spring-integration-ip:4.3.7.RELEASE) that uses a TcpNioServerConnectionFactory. Unfortunately, during load tests the application stops processing incoming messages, and here's my analysis.
This is my connection factory:
@Bean
public TcpNioServerConnectionFactory tcpServerConnectionFactory() {
ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer();
serializer.setMaxMessageSize(100 * 1024);
serializer.setApplicationEventPublisher(applicationEventPublisher);
TcpNioServerConnectionFactory connectionFactory = new TcpNioServerConnectionFactory(serverProperties.getPort());
connectionFactory.setDeserializer(serializer);
connectionFactory.setSerializer(serializer);
connectionFactory.getMapper().setApplySequence(true);
AtomicInteger threadCount = new AtomicInteger();
connectionFactory.setTaskExecutor(new ThreadPoolExecutor(
4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(serverProperties.getMessaging().getLegacyAdapterInboundQueueSize()),
(Runnable r) -> new Thread(r, "legacy-tcp-" + threadCount.incrementAndGet())
));
return connectionFactory;
}
As you can see, I create 4 threads to process incoming messages. Let's have a look at the thread dumps at the time of the lock:
"legacy-tcp-4" #43 prio=5 os_prio=0 tid=0x000000001e909000 nid=0x141a0 waiting on condition [0x000000002753e000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec547d18> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:385)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection$ChannelInputStream.write(TcpNioConnection.java:734)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.sendToPipe(TcpNioConnection.java:442)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.doRead(TcpNioConnection.java:422)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.readPacket(TcpNioConnection.java:480)
at org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory$1.run(AbstractConnectionFactory.java:659)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
"legacy-tcp-3" #42 prio=5 os_prio=0 tid=0x000000001e907800 nid=0x16628 waiting on condition [0x000000002743f000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec548a18> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:385)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection$ChannelInputStream.write(TcpNioConnection.java:734)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.sendToPipe(TcpNioConnection.java:442)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.doRead(TcpNioConnection.java:422)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.readPacket(TcpNioConnection.java:480)
at org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory$1.run(AbstractConnectionFactory.java:659)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
"legacy-tcp-2" #41 prio=5 os_prio=0 tid=0x000000001eba9800 nid=0xa83c waiting on condition [0x000000002733e000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000ec4d1960> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:385)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection$ChannelInputStream.write(TcpNioConnection.java:734)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.sendToPipe(TcpNioConnection.java:442)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.doRead(TcpNioConnection.java:422)
at org.springframework.integration.ip.tcp.connection.TcpNioConnection.readPacket(TcpNioConnection.java:480)
at org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory$1.run(AbstractConnectionFactory.java:659)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
"legacy-tcp-1" #29 prio=5 os_prio=0 tid=0x000000001eba0800 nid=0xfab4 runnable [0x000000002673e000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll0(Native Method)
at sun.nio.ch.WindowsSelectorImpl$SubSelector.poll(WindowsSelectorImpl.java:296)
at sun.nio.ch.WindowsSelectorImpl$SubSelector.access$400(WindowsSelectorImpl.java:278)
at sun.nio.ch.WindowsSelectorImpl.doSelect(WindowsSelectorImpl.java:159)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000000ec01d3e0> (a sun.nio.ch.Util$3)
- locked <0x00000000ec01d3f0> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000000ec01d360> (a sun.nio.ch.WindowsSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.doSelect(TcpNioServerConnectionFactory.java:177)
at org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory.run(TcpNioServerConnectionFactory.java:138)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
3 of 4 threads have data in their buffers and would like to put them into the buffer queue here:
if (!this.buffers.offer(buffer, TcpNioConnection.this.pipeTimeout, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for buffer space");
}
However, the buffer queue is full so the threads are blocked until the offer times out.
Wondering who is responsible to empty the buffer, I set a breakpoint at where the buffer queue is polled:
buffer = this.buffers.poll(1, TimeUnit.SECONDS);
And it turns out that the very same threads who write into the queue are also responsible for emptying the queue. It may well be that I don't understand something but to me it makes no sense that the producer and consumers are the same threads because, well, the application can lock up :-)
If I increase the number of threads to 16, the problem does not occur. But not only would I prefer having the least amount of threads possible, but also it seems to be a gamble no matter which number I pick.
Please enlighten me if I'm doing something wrong, otherwise a fix would be highly appreciated :)