diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java index c46b2fd4..57658254 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeclareTest.java @@ -26,7 +26,9 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.awaitility.Awaitility; import org.testng.annotations.Test; /** @@ -121,24 +123,47 @@ private void doTestExchangeDeclaredWithEnumerationEquivalent(Channel channel) for (BuiltinExchangeType exchangeType : BuiltinExchangeType.values()) { channel.exchangeDeclare(NAME, exchangeType); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclare(NAME, exchangeType, false); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclare(NAME, exchangeType, false, false, null); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclare(NAME, exchangeType, false, false, false, null); verifyEquivalent(NAME, exchangeType.getType(), false, false, null); - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); channel.exchangeDeclareNoWait(NAME, exchangeType, false, false, false, null); // no check, this one is asynchronous - channel.exchangeDelete(NAME); + deleteExchangeWithRetry(); } } + + public void deleteExchangeWithRetry() throws IOException { + // the replicator cursor of exchange is created in async way, + // delete the exchange may fail due to non-empty directory error + // + // KeeperErrorCode = Directory not empty for + // /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test + // + // - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test + // - /managed-ledgers/public/vhost1/persistent/__amqp_exchange__exchange_test/__amqp_replicator__exchange_test + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> { + try { + channel.exchangeDelete(NAME); + } catch (Exception e) { + return false; + } + return true; + }); + } + }