Skip to content

Commit

Permalink
Make AMQConnection#doFinalShutdown idempotent
Browse files Browse the repository at this point in the history
Now recovery can be triggered from write operations,
late connection failure discoveries can re-trigger the shutdown
and emit spurious exception.

References #341
  • Loading branch information
acogoluegnes committed Feb 6, 2018
1 parent 98d32d4 commit f312fa5
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/main/java/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

final class Copyright {
final static String COPYRIGHT="Copyright (c) 2007-2017 Pivotal Software, Inc.";
Expand Down Expand Up @@ -63,6 +64,8 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti

private final ErrorOnWriteListener errorOnWriteListener;

private final AtomicBoolean finalShutdownStarted = new AtomicBoolean(false);

/**
* Retrieve a copy of the default table of client properties that
* will be sent to the server during connection startup. This
Expand Down Expand Up @@ -700,14 +703,16 @@ private void handleFailure(Throwable ex) {

/** private API */
public void doFinalShutdown() {
_frameHandler.close();
_appContinuation.set(null);
notifyListeners();
// assuming that shutdown listeners do not do anything
// asynchronously, e.g. start new threads, this effectively
// guarantees that we only begin recovery when all shutdown
// listeners have executed
notifyRecoveryCanBeginListeners();
if (finalShutdownStarted.compareAndSet(false, true)) {
_frameHandler.close();
_appContinuation.set(null);
notifyListeners();
// assuming that shutdown listeners do not do anything
// asynchronously, e.g. start new threads, this effectively
// guarantees that we only begin recovery when all shutdown
// listeners have executed
notifyRecoveryCanBeginListeners();
}
}

private void notifyRecoveryCanBeginListeners() {
Expand Down

0 comments on commit f312fa5

Please sign in to comment.