Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENTMQBR-2029] [ARTEMIS-2105] Discovery group connectors can delay broker shutdown #274

Open
wants to merge 1 commit into
base: 2.6.3.jbossorg-x
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -123,6 +123,8 @@ public class ActiveMQActivation {

private boolean lastReceived = false;

private final Object teardownLock = new Object();

// Whether we are in the failure recovery loop
private final AtomicBoolean inReconnect = new AtomicBoolean(false);
private XARecoveryConfig resourceRecovery;
Expand Down Expand Up @@ -352,30 +354,33 @@ protected synchronized void setup() throws Exception {
/**
* Teardown the activation
*/
protected synchronized void teardown(boolean useInterrupt) {
logger.debug("Tearing down " + spec);
protected void teardown(boolean useInterrupt) {

long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();
synchronized (teardownLock) {

if (resourceRecovery != null) {
ra.getRecoveryManager().unRegister(resourceRecovery);
}
logger.debug("Tearing down " + spec);

final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];
long timeout = factory == null ? ActiveMQClient.DEFAULT_CALL_TIMEOUT : factory.getCallTimeout();

// We need to do from last to first as any temporary queue will have been created on the first handler
// So we invert the handlers here
for (int i = 0; i < handlers.size(); i++) {
// The index here is the complimentary so it's inverting the array
handlersCopy[i] = handlers.get(handlers.size() - i - 1);
}
if (resourceRecovery != null) {
ra.getRecoveryManager().unRegister(resourceRecovery);
}

handlers.clear();
final ActiveMQMessageHandler[] handlersCopy = new ActiveMQMessageHandler[handlers.size()];

FutureLatch future = new FutureLatch(handlersCopy.length);
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.interruptConsumer(future);
}
// We need to do from last to first as any temporary queue will have been created on the first handler
// So we invert the handlers here
for (int i = 0; i < handlers.size(); i++) {
// The index here is the complimentary so it's inverting the array
handlersCopy[i] = handlers.get(handlers.size() - i - 1);
}

handlers.clear();

FutureLatch future = new FutureLatch(handlersCopy.length);
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.interruptConsumer(future);
}

//wait for all the consumers to complete any onmessage calls
boolean stuckThreads = !future.await(timeout);
Expand All @@ -393,52 +398,53 @@ protected synchronized void teardown(boolean useInterrupt) {
}
}

Runnable runTearDown = new Runnable() {
@Override
public void run() {
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.teardown();
Runnable runTearDown = new Runnable() {
@Override
public void run() {
for (ActiveMQMessageHandler handler : handlersCopy) {
handler.teardown();
}
}
}
};

Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);
};

try {
threadTearDown.join(timeout);
} catch (InterruptedException e) {
// nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
}
Thread threadTearDown = startThread("TearDown/HornetQActivation", runTearDown);

if (factory != null) {
try {
// closing the factory will help making sure pending threads are closed
factory.close();
} catch (Throwable e) {
ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
threadTearDown.join(timeout);
} catch (InterruptedException e) {
// nothing to be done on this context.. we will just keep going as we need to send an interrupt to threadTearDown and give up
}

factory = null;
}

if (threadTearDown.isAlive()) {
threadTearDown.interrupt();
if (factory != null) {
try {
// closing the factory will help making sure pending threads are closed
factory.close();
} catch (Throwable e) {
ActiveMQRALogger.LOGGER.unableToCloseFactory(e);
}

try {
threadTearDown.join(5000);
} catch (InterruptedException e) {
// nothing to be done here.. we are going down anyways
factory = null;
}

if (threadTearDown.isAlive()) {
ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
threadTearDown.interrupt();

try {
threadTearDown.join(5000);
} catch (InterruptedException e) {
// nothing to be done here.. we are going down anyways
}

if (threadTearDown.isAlive()) {
ActiveMQRALogger.LOGGER.threadCouldNotFinish(threadTearDown.toString());
}
}
}

nodes.clear();
lastReceived = false;
nodes.clear();
lastReceived = false;

logger.debug("Tearing down complete " + this);
logger.debug("Tearing down complete " + this);
}
}

protected void setupCF() throws Exception {
Expand All @@ -462,7 +468,6 @@ protected void setupCF() throws Exception {
} else {
factory = ra.newConnectionFactory(spec);
}

}

/**
Expand Down