Skip to content

Commit

Permalink
[pulsar-broker] avoid backpressure by skipping dispatching if consume…
Browse files Browse the repository at this point in the history
…r channel is not writable (apache#6740)

### Motivation
Recently we are seeing broker is crashing with OutOfMemory when it has higher dispatch rate with large size messages. High message rate out saturates network and broker will try to write on the channel which is not writable which buffers the message and eventually broker sees OOM and shutdown with below error:
```
java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?]
        at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final]
        at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:335) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
```

### Modification
In order to reduce backpressure, broker should slow down dispatching if consumer cnx-channel is writable. Broker does it for replicator and Exclusive consumer but not doing for shared consumer. So, add similar check for shared subscription to avoid OOM for high dispatch rate. It might be helpful for apache#5896 as well.
  • Loading branch information
rdhabalia authored and Addison Higham committed May 5, 2020
1 parent 91d714a commit e1fbb68
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ public void readMoreEntries() {
if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);

if (!isConsumerWritable()) {
// If the connection is not currently writable, we issue the read request anyway, but for a single
// message. The intent here is to keep use the request as a notification mechanism while avoiding to
// read and dispatch a big batch of messages which will need to wait before getting written to the
// socket.
messagesToRead = 1;
}

// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
Expand Down Expand Up @@ -481,8 +489,13 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}

// round-robin dispatch batch size for this consumer
int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1;
if (log.isDebugEnabled() && !c.isWritable()) {
log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
c);
}
int messagesForC = Math.min(
Math.min(entriesToDispatch, c.getAvailablePermits()),
Math.min(entriesToDispatch, availablePermits),
serviceConfig.getDispatcherMaxRoundRobinBatchSize());

if (messagesForC > 0) {
Expand Down Expand Up @@ -619,6 +632,18 @@ protected boolean isAtleastOneConsumerAvailable() {
return false;
}

private boolean isConsumerWritable() {
for (Consumer consumer : consumerList) {
if (consumer.isWritable()) {
return true;
}
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer is not writable", topic.getName(), name);
}
return false;
}

@Override
public boolean isConsumerAvailable(Consumer consumer) {
return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
return;
}

int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
int availablePermits = consumer.isWritable() ? consumer.getAvailablePermits() : 1;
if (log.isDebugEnabled() && !consumer.isWritable()) {
log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
consumer);
}
int messagesForC = Math.min(entriesWithSameKey.getValue().size(), availablePermits);
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
Expand Down

0 comments on commit e1fbb68

Please sign in to comment.