diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java index cdacbd89..3092d866 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java @@ -37,21 +37,22 @@ public class AmqpBrokerService { @Getter private PulsarService pulsarService; - public AmqpBrokerService(PulsarService pulsarService) { + public AmqpBrokerService(PulsarService pulsarService, AmqpServiceConfiguration serviceConfig) { this.pulsarService = pulsarService; this.amqpTopicManager = new AmqpTopicManager(pulsarService); this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService); - this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer); + this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, serviceConfig); this.exchangeService = new ExchangeServiceImpl(exchangeContainer); this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer); this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer); } - public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer) { + public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer, + AmqpServiceConfiguration serviceConfig) { this.pulsarService = pulsarService; this.amqpTopicManager = new AmqpTopicManager(pulsarService); this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService); - this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer); + this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, serviceConfig); this.exchangeService = new ExchangeServiceImpl(exchangeContainer); this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer); this.connectionContainer = connectionContainer; diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java index 687c8de3..d38a36cd 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.amqp; +import static io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue.DEFAULT_SUBSCRIPTION; import static io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil.getExchangeType; import static io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil.isBuildInExchange; import static org.apache.qpid.server.protocol.ErrorCodes.INTERNAL_ERROR; @@ -108,7 +109,6 @@ public class AmqpChannel implements ServerChannelMethodProcessor { */ private IncomingMessage currentMessage; - private final String defaultSubscription = "defaultSubscription"; public static final AMQShortString EMPTY_STRING = AMQShortString.createAMQShortString((String) null); /** * ConsumerTag prefix, the tag is unique per subscription to a queue. @@ -428,7 +428,7 @@ private synchronized void subscribe(String consumerTag, String queueName, Topic } CompletableFuture subscriptionFuture = topic.createSubscription( - defaultSubscription, CommandSubscribe.InitialPosition.Earliest, false, null); + DEFAULT_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null); subscriptionFuture.thenAccept(subscription -> { AmqpConsumer consumer = new AmqpConsumer(queueContainer, subscription, exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared, @@ -544,12 +544,11 @@ public void receiveBasicGet(AMQShortString queue, boolean noAck) { } else { Topic topic = amqpQueue.getTopic(); AmqpConsumer amqpConsumer = fetchConsumerMap.computeIfAbsent(queueName, value -> { - - Subscription subscription = topic.getSubscription(defaultSubscription); + Subscription subscription = topic.getSubscription(DEFAULT_SUBSCRIPTION); AmqpConsumer consumer; try { if (subscription == null) { - subscription = topic.createSubscription(defaultSubscription, + subscription = topic.createSubscription(DEFAULT_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null).get(); } consumer = new AmqpPullConsumer(queueContainer, subscription, diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java index 1908d0ff..c9a8672f 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java @@ -21,14 +21,10 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.SortedMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Consumer; @@ -37,7 +33,6 @@ import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.Subscription; -import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -61,10 +56,7 @@ public class AmqpConsumer extends Consumer { private final String consumerTag; private final String queueName; - /** - * map(exchangeName,treeMap(indexPosition,msgPosition)) . - */ - private final Map> unAckMessages; + private static final AtomicIntegerFieldUpdater MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AmqpConsumer.class, "availablePermits"); private volatile int availablePermits; @@ -88,7 +80,6 @@ public AmqpConsumer(QueueContainer queueContainer, Subscription subscription, this.autoAck = autoAck; this.consumerTag = consumerTag; this.queueName = queueName; - this.unAckMessages = new ConcurrentHashMap<>(); } @Override @@ -158,8 +149,6 @@ private CompletableFuture sendMessage(Entry index) { try { long deliveryTag = channel.getNextDeliveryTag(); - addUnAckMessages(indexMessage.getExchangeName(), (PositionImpl) index.getPosition(), - (PositionImpl) msg.getPosition()); if (!autoAck) { channel.getUnacknowledgedMessageMap().add(deliveryTag, index.getPosition(), this, msg.getLength()); @@ -199,28 +188,7 @@ private CompletableFuture sendMessage(Entry index) { public void messagesAck(List position) { incrementPermits(position.size()); - ManagedCursor cursor = ((PersistentSubscription) getSubscription()).getCursor(); - Position previousMarkDeletePosition = cursor.getMarkDeletedPosition(); getSubscription().acknowledgeMessage(position, CommandAck.AckType.Individual, Collections.EMPTY_MAP); - if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) { - asyncGetQueue().whenComplete((amqpQueue, throwable) -> { - if (throwable != null) { - log.error("Failed to get queue from queue container", throwable); - } else { - synchronized (this) { - PositionImpl newDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); - unAckMessages.forEach((key, value) -> { - SortedMap ackMap = value.headMap(newDeletePosition, true); - if (ackMap.size() > 0) { - PositionImpl lastValue = ackMap.get(ackMap.lastKey()); - amqpQueue.acknowledgeAsync(key, lastValue.getLedgerId(), lastValue.getEntryId()); - } - ackMap.clear(); - }); - } - } - }); - } } public void messagesAck(Position position) { @@ -263,12 +231,6 @@ public boolean isWritable() { return channel.getConnection().ctx.channel().isWritable(); } - void addUnAckMessages(String exchangeName, PositionImpl index, PositionImpl message) { - ConcurrentSkipListMap map = unAckMessages.computeIfAbsent(exchangeName, - treeMap -> new ConcurrentSkipListMap<>()); - map.put(index, message); - } - public String getConsumerTag() { return consumerTag; } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java index 1bbfc3ba..4742c231 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java @@ -93,7 +93,7 @@ public String getProtocolDataToAdvertise() { @Override public void start(BrokerService service) { brokerService = service; - amqpBrokerService = new AmqpBrokerService(service.getPulsar()); + amqpBrokerService = new AmqpBrokerService(service.getPulsar(), amqpConfig); if (amqpConfig.isAmqpProxyEnable()) { ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAmqpTenant(amqpConfig.getAmqpTenant()); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java index 36f54866..3302b35e 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java @@ -112,4 +112,11 @@ public class AmqpServiceConfiguration extends ServiceConfiguration { ) private int amqpExplicitFlushAfterFlushes = 1000; + @FieldContext( + category = CATEGORY_AMQP, + required = false, + doc = "Exchange clear task interval in milliseconds." + ) + private int amqpExchangeClearTaskInterval = 5000; + } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/MessageFetchContext.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/MessageFetchContext.java index 788f80c7..d564d5a6 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/MessageFetchContext.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/MessageFetchContext.java @@ -25,7 +25,6 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.qpid.server.protocol.v0_8.transport.BasicGetEmptyBody; @@ -111,8 +110,6 @@ public void readEntriesComplete(List list, Object o) { } catch (UnsupportedEncodingException e) { log.error("Failed to convert entry to AMQP body", e); } - consumer.addUnAckMessages(indexMessage.getExchangeName(), - (PositionImpl) index.getPosition(), (PositionImpl) msg.getPosition()); } else { message.complete(Pair.of(index.getPosition(), null)); } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java index 462cccea..0fc6ccae 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java @@ -36,12 +36,15 @@ public class QueueContainer { private AmqpTopicManager amqpTopicManager; private PulsarService pulsarService; private ExchangeContainer exchangeContainer; + @Getter + private final AmqpServiceConfiguration serviceConfig; protected QueueContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService, - ExchangeContainer exchangeContainer) { + ExchangeContainer exchangeContainer, AmqpServiceConfiguration serviceConfig) { this.amqpTopicManager = amqpTopicManager; this.pulsarService = pulsarService; this.exchangeContainer = exchangeContainer; + this.serviceConfig = serviceConfig; } @Getter @@ -96,7 +99,8 @@ public CompletableFuture asyncGetQueue(NamespaceName namespaceName, S // TODO: reset connectionId, exclusive and autoDelete PersistentQueue amqpQueue = new PersistentQueue(queueName, persistentTopic, - 0, false, false); + 0, false, false, + serviceConfig.getAmqpExchangeClearTaskInterval()); try { amqpQueue.recoverRoutersFromQueueProperties(properties, exchangeContainer, namespaceName); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentQueue.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentQueue.java index 3d9fdf9f..a3fd5850 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentQueue.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentQueue.java @@ -29,14 +29,22 @@ import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils; import io.streamnative.pulsar.handlers.amqp.utils.PulsarTopicMetadataUtils; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.MessageImpl; @@ -53,6 +61,7 @@ public class PersistentQueue extends AbstractAmqpQueue { public static final String QUEUE = "QUEUE"; public static final String ROUTERS = "ROUTERS"; public static final String TOPIC_PREFIX = "__amqp_queue__"; + public static final String DEFAULT_SUBSCRIPTION = "defaultSubscription"; @Getter private PersistentTopic indexTopic; @@ -61,14 +70,21 @@ public class PersistentQueue extends AbstractAmqpQueue { private AmqpEntryWriter amqpEntryWriter; + private final long exchangeClearTaskInterval; + private PositionImpl checkpointLac; + private final Map checkpointExchangeRoutePos = new ConcurrentHashMap<>(); + public PersistentQueue(String queueName, PersistentTopic indexTopic, long connectionId, - boolean exclusive, boolean autoDelete) { + boolean exclusive, boolean autoDelete, + long exchangeClearTaskInterval) { super(queueName, true, connectionId, exclusive, autoDelete); this.indexTopic = indexTopic; topicNameValidate(); this.jsonMapper = new ObjectMapper(); this.amqpEntryWriter = new AmqpEntryWriter(indexTopic); + this.exchangeClearTaskInterval = exchangeClearTaskInterval; + scheduleExchangeClearTask(); } @Override @@ -92,7 +108,7 @@ public CompletableFuture readEntryAsync(String exchangeName, long ledgerI @Override public CompletableFuture acknowledgeAsync(String exchangeName, long ledgerId, long entryId) { - return getRouter(exchangeName).getExchange().markDeleteAsync(getName(), ledgerId, entryId); + return CompletableFuture.completedFuture(null); } @Override @@ -178,7 +194,6 @@ private List getQueueProperties(Map> futures = new ArrayList<>(); + for (AmqpMessageRouter router : routers.values()) { + CompletableFuture future = new CompletableFuture<>(); + ((PersistentTopic) router.getExchange().getTopic()).getManagedLedger() + .asyncOpenCursor("__amqp_replicator__" + router.getExchange().getName(), + new AsyncCallbacks.OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + PositionImpl pos = (PositionImpl) cursor.getMarkDeletedPosition(); + checkpointExchangeRoutePos.put(router.getExchange().getName(), pos); + future.complete(null); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + log.warn("Failed to get exchange route position for queue {}.", queueName, exception); + future.completeExceptionally(exception); + } + }, null); + futures.add(future); + } + FutureUtil.waitForAll(futures).whenComplete((__, t) -> { + if (t != null) { + scheduleExchangeClearTask(); + return; + } + this.checkpointLac = (PositionImpl) indexTopic.getLastPosition(); + exchangeCleanup(); + }); + } + + private void exchangeCleanup() { + PositionImpl indexMarkDeletePos = (PositionImpl) indexTopic.getSubscription(DEFAULT_SUBSCRIPTION) + .getCursor().getMarkDeletedPosition(); + Collection> futures = new ArrayList<>(); + if (indexMarkDeletePos.compareTo(this.checkpointLac) >= 0) { + for (AmqpMessageRouter router : routers.values()) { + Position position = checkpointExchangeRoutePos.get(router.getExchange().getName()); + if (position != null) { + futures.add(router.getExchange().markDeleteAsync( + queueName, position.getLedgerId(), position.getEntryId())); + } + } + FutureUtil.waitForAll(futures).whenComplete((__, t) -> { + // no matter previous task is success or failed, schedule a new clear task. + scheduleExchangeClearTask(); + }); + } else { + this.indexTopic.getBrokerService().getPulsar().getExecutor().schedule( + this::exchangeCleanup, exchangeClearTaskInterval, TimeUnit.MILLISECONDS); + } + } + } diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java index fba75d9f..615dcdeb 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java @@ -34,6 +34,7 @@ import java.net.SocketAddress; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import lombok.extern.log4j.Log4j2; import org.apache.bookkeeper.common.util.OrderedExecutor; @@ -99,7 +100,7 @@ public void setup() throws Exception { mockPulsarService(); mockBrokerService(); ConnectionContainer connectionContainer = mock(ConnectionContainer.class); - amqpBrokerService = new AmqpBrokerService(pulsarService, connectionContainer); + amqpBrokerService = new AmqpBrokerService(pulsarService, connectionContainer, new AmqpServiceConfiguration()); amqpTopicManager = amqpBrokerService.getAmqpTopicManager(); // 1.Init AMQP connection for connection methods and channel methods tests. @@ -150,6 +151,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Mockito.when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration); Mockito.when(pulsarService.getOrderedExecutor()).thenReturn( OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered").build()); + Mockito.when(pulsarService.getExecutor()).thenReturn(Executors.newScheduledThreadPool(1)); Mockito.when(serviceConfiguration.getNumIOThreads()).thenReturn(2 * Runtime.getRuntime().availableProcessors()); NamespaceResources namespaceResources = mock(NamespaceResources.class); diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java index 8d58b0ab..b269cf55 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.amqp.test; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.fail; @@ -21,12 +22,14 @@ import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange; import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; import io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.mockito.Mockito; import org.testng.annotations.Test; /** @@ -39,15 +42,15 @@ public class TopicNameTest { public void exchangeTopicNameValidate() { String exchangeName = "ex-test"; AbstractAmqpExchange.Type exchangeType = AbstractAmqpExchange.Type.Direct; - ManagedLedgerImpl managedLedger = Mockito.mock(ManagedLedgerImpl.class); + ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class); - BrokerService brokerService = Mockito.mock(BrokerService.class); - Mockito.when(brokerService.executor()).thenReturn(mock(EventLoopGroup.class)); - PersistentTopic exchangeTopic1 = Mockito.mock(PersistentTopic.class); - Mockito.when(exchangeTopic1.getName()).thenReturn(PersistentExchange.TOPIC_PREFIX + exchangeName); - Mockito.when(exchangeTopic1.getManagedLedger()).thenReturn(managedLedger); - Mockito.when(exchangeTopic1.getBrokerService()).thenReturn(brokerService); - Mockito.when(managedLedger.getCursors()).thenReturn(new ManagedCursorContainer()); + BrokerService brokerService = mock(BrokerService.class); + when(brokerService.executor()).thenReturn(mock(EventLoopGroup.class)); + PersistentTopic exchangeTopic1 = mock(PersistentTopic.class); + when(exchangeTopic1.getName()).thenReturn(PersistentExchange.TOPIC_PREFIX + exchangeName); + when(exchangeTopic1.getManagedLedger()).thenReturn(managedLedger); + when(exchangeTopic1.getBrokerService()).thenReturn(brokerService); + when(managedLedger.getCursors()).thenReturn(new ManagedCursorContainer()); try { new PersistentExchange( exchangeName, exchangeType, exchangeTopic1, false); @@ -55,9 +58,9 @@ public void exchangeTopicNameValidate() { fail("Failed to new PersistentExchange. errorMsg: " + e.getMessage()); } - PersistentTopic exchangeTopic2 = Mockito.mock(PersistentTopic.class); - Mockito.when(exchangeTopic2.getName()).thenReturn(PersistentExchange.TOPIC_PREFIX + "_" + exchangeName); - Mockito.when(exchangeTopic2.getManagedLedger()).thenReturn(managedLedger); + PersistentTopic exchangeTopic2 = mock(PersistentTopic.class); + when(exchangeTopic2.getName()).thenReturn(PersistentExchange.TOPIC_PREFIX + "_" + exchangeName); + when(exchangeTopic2.getManagedLedger()).thenReturn(managedLedger); try { new PersistentExchange( exchangeName, exchangeType, exchangeTopic2, false); @@ -69,25 +72,33 @@ public void exchangeTopicNameValidate() { @Test public void queueTopicNameValidate() { + BrokerService brokerService = mock(BrokerService.class); + PulsarService pulsarService = mock(PulsarService.class); + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + when(pulsarService.getExecutor()).thenReturn(executorService); + when(brokerService.getPulsar()).thenReturn(pulsarService); + String queueName = "ex-test"; - ManagedLedgerImpl managedLedger = Mockito.mock(ManagedLedgerImpl.class); + ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class); - PersistentTopic queueTopic1 = Mockito.mock(PersistentTopic.class); - Mockito.when(queueTopic1.getName()).thenReturn(PersistentQueue.TOPIC_PREFIX + queueName); - Mockito.when(queueTopic1.getManagedLedger()).thenReturn(managedLedger); + PersistentTopic queueTopic1 = mock(PersistentTopic.class); + when(queueTopic1.getName()).thenReturn(PersistentQueue.TOPIC_PREFIX + queueName); + when(queueTopic1.getManagedLedger()).thenReturn(managedLedger); + when(queueTopic1.getBrokerService()).thenReturn(brokerService); try { new PersistentQueue( - queueName, queueTopic1, 0, false, false); + queueName, queueTopic1, 0, false, false, 5000); } catch (IllegalArgumentException e) { fail("Failed to new PersistentExchange. errorMsg: " + e.getMessage()); } - PersistentTopic queueTopic2 = Mockito.mock(PersistentTopic.class); - Mockito.when(queueTopic2.getName()).thenReturn(PersistentQueue.TOPIC_PREFIX + "_" + queueName); - Mockito.when(queueTopic2.getManagedLedger()).thenReturn(managedLedger); + PersistentTopic queueTopic2 = mock(PersistentTopic.class); + when(queueTopic2.getName()).thenReturn(PersistentQueue.TOPIC_PREFIX + "_" + queueName); + when(queueTopic2.getManagedLedger()).thenReturn(managedLedger); + when(queueTopic2.getBrokerService()).thenReturn(brokerService); try { new PersistentQueue( - queueName, queueTopic2, 0, false, false); + queueName, queueTopic2, 0, false, false, 5000); } catch (IllegalArgumentException e) { assertNotNull(e); log.info("This is expected behavior."); diff --git a/tests/pom.xml b/tests/pom.xml index 95dfbac9..04d2e801 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -25,7 +25,6 @@ 2.11.0-SNAPSHOT - io.streamnative.pulsar.handlers pulsar-protocol-handler-amqp-tests StreamNative :: Pulsar Protocol Handler :: AoP Tests Tests for AMQP on Pulsar @@ -107,6 +106,11 @@ test + + org.awaitility + awaitility + + diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java index 56c162e0..a6abaf88 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java @@ -124,6 +124,7 @@ protected void resetConfig() { amqpConfig.setBrokerEntryMetadataInterceptors( Sets.newHashSet("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor")); amqpConfig.setBrokerShutdownTimeoutMs(0L); + amqpConfig.setAmqpExchangeClearTaskInterval(500); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java index 48e66ebf..ef4511f8 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java @@ -24,14 +24,18 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; @@ -155,8 +159,39 @@ public void handleDelivery(String consumerTag, .pollInterval(100, TimeUnit.MILLISECONDS) .atMost(5, TimeUnit.SECONDS) .until(messageSet::isEmpty); + verifyBacklog(vhost, exchangeName); channel.close(); conn.close(); } + private void verifyBacklog(String vhost, String exchangeName) { + Awaitility.await() + .pollInterval(1, TimeUnit.SECONDS) + .atMost(5, TimeUnit.SECONDS) + .until(() -> checkBacklog(vhost, exchangeName)); + } + + private boolean checkBacklog(String vhost, String exchangeName) throws PulsarAdminException { + String topic = "public/" + vhost + "/__amqp_exchange__" + exchangeName; + PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic); + Map map = stats.cursors; + if (map.isEmpty()) { + return false; + } + for (Map.Entry entry : map.entrySet()) { + if (entry.getKey().startsWith("__amqp_replicator") || entry.getKey().equals("pulsar.dedup")) { + continue; + } + log.debug("Stats exchange: {}, cursor: {}, lac: {}, markDelete: {}", + exchangeName, + entry.getKey(), + stats.lastConfirmedEntry, + entry.getValue().markDeletePosition); + if (!stats.lastConfirmedEntry.equals(entry.getValue().markDeletePosition)) { + return false; + } + } + return true; + } + }