Skip to content

Commit

Permalink
Using an own executor for flow control
Browse files Browse the repository at this point in the history
Added test for slow consumer handling.
  • Loading branch information
MrEasy committed Nov 24, 2023
1 parent 8195394 commit 5b94560
Show file tree
Hide file tree
Showing 2 changed files with 253 additions and 22 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -41,8 +42,8 @@
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
Expand Down Expand Up @@ -84,9 +85,6 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
// Which is an OrderedExecutor
private final Executor flowControlExecutor;

// Number of pending calls on flow control
private final ReusableLatch pendingFlowControl = new ReusableLatch(0);

private final int initialWindow;

private final int clientWindowSize;
Expand Down Expand Up @@ -179,7 +177,8 @@ public ClientConsumerImpl(final ClientSessionInternal session,

this.contextClassLoader = contextClassLoader;

this.flowControlExecutor = flowControlExecutor;
// TODO probably no need even for OrderedExecutor, since single-threaded anyhow
this.flowControlExecutor = new OrderedExecutor(Executors.newSingleThreadExecutor());

if (logger.isTraceEnabled()) {
logger.trace("{}:: being created at", this, new Exception("trace"));
Expand Down Expand Up @@ -822,7 +821,7 @@ public void flowControl(final int messageBytes, final boolean discountSlowConsum
creditsToSend = 0;

if (credits > 0) {
sendCredits(credits);
sendCredits(credits, null);
}
} else {
if (logger.isDebugEnabled()) {
Expand All @@ -834,7 +833,7 @@ public void flowControl(final int messageBytes, final boolean discountSlowConsum
creditsToSend = 0;

if (credits > 0) {
sendCredits(credits);
sendCredits(credits, null);
}
}
}
Expand All @@ -845,13 +844,26 @@ public void flowControl(final int messageBytes, final boolean discountSlowConsum
* Sending an initial credit for slow consumers
*/
private void startSlowConsumer() {
logger.trace("{}::Sending 1 credit to start delivering of one message to slow consumer", this);
sendCredits(1);
if (logger.isTraceEnabled()) {
logger.trace(this + "::Sending 1 credit to start delivering of one message to slow consumer");
}
final CountDownLatch latch = new CountDownLatch(1);
sendCredits(1, latch);
try {
// We use an executor here to guarantee the messages will arrive in order.
// However when starting a slow consumer, we have to guarantee the credit was sent before we can perform any
// operations like forceDelivery
pendingFlowControl.await(10, TimeUnit.SECONDS);
long currentTime = System.currentTimeMillis();
boolean timedOut = !latch.await(10, TimeUnit.SECONDS);
if (timedOut) {
// TODO debug error logs
logger.error("Timed out on startSlowConsumer", new Throwable());
} else {
long end = System.currentTimeMillis();
if (end - currentTime > 1000) {
logger.error("startSlowConsumer took " + (end - currentTime));
}
}
} catch (InterruptedException e) {
// will just ignore and forward the ignored
Thread.currentThread().interrupt();
Expand All @@ -861,19 +873,24 @@ private void startSlowConsumer() {
@Override
public void resetIfSlowConsumer() {
if (clientWindowSize == 0) {
sendCredits(0);

// If resetting a slow consumer, we need to wait the execution
final CountDownLatch latch = new CountDownLatch(1);
flowControlExecutor.execute(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});

sendCredits(0, latch);

try {
latch.await(10, TimeUnit.SECONDS);
long currentTime = System.currentTimeMillis();
boolean timedOut = !latch.await(10, TimeUnit.SECONDS);
if (timedOut) {
// TODO debug error logs
logger.error("Timed out on resetIfSlowConsumer", new Throwable());
}
else {
long end = System.currentTimeMillis();
if (end - currentTime > 1000) {
logger.error("resetIfSlowConsumer took " + (end - currentTime));
}
}
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
}
Expand All @@ -894,15 +911,18 @@ private void queueExecutor() {
/**
* @param credits
*/
private void sendCredits(final int credits) {
pendingFlowControl.countUp();
private void sendCredits(final int credits, CountDownLatch latch) {
//pendingFlowControl.countUp();
flowControlExecutor.execute(new Runnable() {
@Override
public void run() {
try {
sessionContext.sendConsumerCredits(ClientConsumerImpl.this, credits);
} finally {
pendingFlowControl.countDown();
if (latch != null) {
latch.countDown();
}
//pendingFlowControl.countDown();
}
}
});
Expand Down
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.*;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class ConsumerSlowConsumerTest extends ActiveMQTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue");

private ServerLocator locator;

private static final int WINDOW_SIZE = 0;

/*
Test is easily reproducing the issue with settings of 2000 messages, 100 consumers and 50 client-threads on an 8 core machine.
Fewer messages and fewer consumers make the probability lower for the issue to appear.
More client-threads make the issue less likely to appear.
Theory: THREAD_POOL_SIZE_CLIENT being larger than CONSUMERS will make the issue impossible to appear.
With 2000 messages, the test usually runs into the issue of ClientConsumerImpl#startSlowConsumer reaching its 10 second timeout (or taking significant long between 1 and 10 seconds)
on pendingFlowControl#await after about 1200-1500 messages.
Visible in log as 10 second pause before next bulk of messages get processed.
*/
private static final int NUM_OF_MESSAGES = 20_000;
private static final int CONSUMER_COUNT = 500;
private static final int THREAD_POOL_SIZE_CLIENT = 5;

private volatile boolean stopped = false;

private final Set<ClientConsumerImpl> consumers = ConcurrentHashMap.newKeySet();
private final Set<ClientSession> sessions = ConcurrentHashMap.newKeySet();


protected boolean isNetty() {
return false;
}


@Override
@Before
public void setUp() throws Exception {
super.setUp();

locator = createFactory(isNetty());
locator.setConsumerWindowSize(WINDOW_SIZE);
locator.setUseGlobalPools(false);
locator.setThreadPoolMaxSize(THREAD_POOL_SIZE_CLIENT);
}


@Test
public void testSlowConsumer() throws Exception {
ActiveMQServer messagingService = createServer(false, isNetty());

messagingService.start();
messagingService.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));

ClientSessionFactory cf = createSessionFactory(locator);

AtomicInteger sentMessages = new AtomicInteger(0);
sendMessages(cf, sentMessages);

AtomicInteger receivedMessages = new AtomicInteger(0);
createConsumers(cf, receivedMessages);

final long startTime = System.currentTimeMillis();
// allow for duration of 5ms per message (neglecting concurrency, which makes it even a lot faster)
// typical runtime for 1000 messages 100 consumers and 50 threads without issues is 160ms, so deadline is very generous
final long deadLine = System.currentTimeMillis() + sentMessages.get() * 5L;
int counter = 0;
while (receivedMessages.get() < sentMessages.get() && System.currentTimeMillis() < deadLine) {
Thread.sleep(5);
counter++;
if (counter % 1000 == 0) {
logger.info("Waiting for " + (sentMessages.get() - receivedMessages.get()) + " more messages...");
}
}
final long endTime = System.currentTimeMillis();
stopped = true; // signal stop to potentially still running consumer-creation thread

// check amount of sent and received messages
if (receivedMessages.get() < sentMessages.get()) {
logger.error("Received only " + receivedMessages.get() + " messages out of " + sentMessages.get());
}
else {
logger.info("Received all " + receivedMessages.get() + " messages");
}
assertEquals(sentMessages.get(), receivedMessages.get());

final long duration = endTime - startTime;
logger.info("Test took " + duration + " ms");

long expectedDuration = NUM_OF_MESSAGES * 10;

assertTrue("Test took " + duration + " ms, expected " + expectedDuration + " ms", duration < expectedDuration);

cleanup(cf, messagingService);
}

private void cleanup(ClientSessionFactory cf, ActiveMQServer messagingService) throws Exception {
consumers.parallelStream().forEach(c -> {
try {
c.close();
} catch (ActiveMQException e) {
//ignore
}
});
logger.info("Closed " + consumers.size() + " consumers");
sessions.parallelStream().forEach(s -> {
try {
s.close();
} catch (ActiveMQException e) {
//ignore
}
});
logger.info("Closed " + sessions.size() + " sessions");

cf.close();
messagingService.stop();

logger.info("Cleaned up.");
}

private void sendMessages(ClientSessionFactory cf, AtomicInteger sentMessages) throws ActiveMQException {
logger.info("Creating " + NUM_OF_MESSAGES + " messages...");

try (ClientSession sendingSession = cf.createSession(false, true, true);
ClientProducer producer = sendingSession.createProducer(QUEUE)) {
for (int i = 0; i < NUM_OF_MESSAGES; i++) {
ClientMessage message = createTextMessage(sendingSession, "m" + i);
producer.send(message);
sentMessages.incrementAndGet();
}
logger.info("Created " + NUM_OF_MESSAGES + " messages");
}
}


private void createConsumers(ClientSessionFactory cf, AtomicInteger receivedMessages) throws ActiveMQException {
Thread consumerCreator = new Thread() {
@Override
public void run() {
logger.info("Creating " + CONSUMER_COUNT + " consumers...");
try {
for (int i = 0; i < CONSUMER_COUNT; i++) {
if (stopped) {
logger.info("Stopping consumer creation, since test has ended already. Created " + i + " out of " + CONSUMER_COUNT + " consumers so far.");
return;
}

ClientSession session = cf.createSession(false, true, true);
ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(QUEUE);
sessions.add(session);
consumers.add(consumer);

assertEquals(WINDOW_SIZE == 0 ? 0 : WINDOW_SIZE / 2, consumer.getClientWindowSize());
String consumerId = "[" + i + "]";
consumer.setMessageHandler(message -> {
Thread.yield(); // simulate processing and yield to other threads
receivedMessages.incrementAndGet();
//instanceLog.info(consumerId + "\t- Received message: " + message.getMessageID());
});

session.start();
}

logger.info("Created all " + CONSUMER_COUNT + " consumers.");
}
catch (Exception ex) {
logger.error("Error creating consumers!", ex);
//fail("Error creating consumers!");
}
}
};

consumerCreator.start();
}

}

0 comments on commit 5b94560

Please sign in to comment.