Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,22 @@ public class AmqpBrokerService {
@Getter
private PulsarService pulsarService;

public AmqpBrokerService(PulsarService pulsarService) {
public AmqpBrokerService(PulsarService pulsarService, AmqpServiceConfiguration serviceConfig) {
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, serviceConfig);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer);
}

public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer) {
public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer,
AmqpServiceConfiguration serviceConfig) {
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, serviceConfig);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.connectionContainer = connectionContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.amqp;

import static io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue.DEFAULT_SUBSCRIPTION;
import static io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil.getExchangeType;
import static io.streamnative.pulsar.handlers.amqp.utils.ExchangeUtil.isBuildInExchange;
import static org.apache.qpid.server.protocol.ErrorCodes.INTERNAL_ERROR;
Expand Down Expand Up @@ -108,7 +109,6 @@ public class AmqpChannel implements ServerChannelMethodProcessor {
*/
private IncomingMessage currentMessage;

private final String defaultSubscription = "defaultSubscription";
public static final AMQShortString EMPTY_STRING = AMQShortString.createAMQShortString((String) null);
/**
* ConsumerTag prefix, the tag is unique per subscription to a queue.
Expand Down Expand Up @@ -428,7 +428,7 @@ private synchronized void subscribe(String consumerTag, String queueName, Topic
}

CompletableFuture<Subscription> subscriptionFuture = topic.createSubscription(
defaultSubscription, CommandSubscribe.InitialPosition.Earliest, false, null);
DEFAULT_SUBSCRIPTION, CommandSubscribe.InitialPosition.Earliest, false, null);
subscriptionFuture.thenAccept(subscription -> {
AmqpConsumer consumer = new AmqpConsumer(queueContainer, subscription,
exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared,
Expand Down Expand Up @@ -544,12 +544,11 @@ public void receiveBasicGet(AMQShortString queue, boolean noAck) {
} else {
Topic topic = amqpQueue.getTopic();
AmqpConsumer amqpConsumer = fetchConsumerMap.computeIfAbsent(queueName, value -> {

Subscription subscription = topic.getSubscription(defaultSubscription);
Subscription subscription = topic.getSubscription(DEFAULT_SUBSCRIPTION);
AmqpConsumer consumer;
try {
if (subscription == null) {
subscription = topic.createSubscription(defaultSubscription,
subscription = topic.createSubscription(DEFAULT_SUBSCRIPTION,
CommandSubscribe.InitialPosition.Earliest, false, null).get();
}
consumer = new AmqpPullConsumer(queueContainer, subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
Expand All @@ -37,7 +33,6 @@
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
Expand All @@ -61,10 +56,7 @@ public class AmqpConsumer extends Consumer {
private final String consumerTag;

private final String queueName;
/**
* map(exchangeName,treeMap(indexPosition,msgPosition)) .
*/
private final Map<String, ConcurrentSkipListMap<PositionImpl, PositionImpl>> unAckMessages;

private static final AtomicIntegerFieldUpdater<AmqpConsumer> MESSAGE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AmqpConsumer.class, "availablePermits");
private volatile int availablePermits;
Expand All @@ -88,7 +80,6 @@ public AmqpConsumer(QueueContainer queueContainer, Subscription subscription,
this.autoAck = autoAck;
this.consumerTag = consumerTag;
this.queueName = queueName;
this.unAckMessages = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -158,8 +149,6 @@ private CompletableFuture<Void> sendMessage(Entry index) {
try {
long deliveryTag = channel.getNextDeliveryTag();

addUnAckMessages(indexMessage.getExchangeName(), (PositionImpl) index.getPosition(),
(PositionImpl) msg.getPosition());
if (!autoAck) {
channel.getUnacknowledgedMessageMap().add(deliveryTag,
index.getPosition(), this, msg.getLength());
Expand Down Expand Up @@ -199,28 +188,7 @@ private CompletableFuture<Void> sendMessage(Entry index) {

public void messagesAck(List<Position> position) {
incrementPermits(position.size());
ManagedCursor cursor = ((PersistentSubscription) getSubscription()).getCursor();
Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();
getSubscription().acknowledgeMessage(position, CommandAck.AckType.Individual, Collections.EMPTY_MAP);
if (!cursor.getMarkDeletedPosition().equals(previousMarkDeletePosition)) {
asyncGetQueue().whenComplete((amqpQueue, throwable) -> {
if (throwable != null) {
log.error("Failed to get queue from queue container", throwable);
} else {
synchronized (this) {
PositionImpl newDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();
unAckMessages.forEach((key, value) -> {
SortedMap<PositionImpl, PositionImpl> ackMap = value.headMap(newDeletePosition, true);
if (ackMap.size() > 0) {
PositionImpl lastValue = ackMap.get(ackMap.lastKey());
amqpQueue.acknowledgeAsync(key, lastValue.getLedgerId(), lastValue.getEntryId());
}
ackMap.clear();
});
}
}
});
}
}

public void messagesAck(Position position) {
Expand Down Expand Up @@ -263,12 +231,6 @@ public boolean isWritable() {
return channel.getConnection().ctx.channel().isWritable();
}

void addUnAckMessages(String exchangeName, PositionImpl index, PositionImpl message) {
ConcurrentSkipListMap<PositionImpl, PositionImpl> map = unAckMessages.computeIfAbsent(exchangeName,
treeMap -> new ConcurrentSkipListMap<>());
map.put(index, message);
}

public String getConsumerTag() {
return consumerTag;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public String getProtocolDataToAdvertise() {
@Override
public void start(BrokerService service) {
brokerService = service;
amqpBrokerService = new AmqpBrokerService(service.getPulsar());
amqpBrokerService = new AmqpBrokerService(service.getPulsar(), amqpConfig);
if (amqpConfig.isAmqpProxyEnable()) {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAmqpTenant(amqpConfig.getAmqpTenant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ public class AmqpServiceConfiguration extends ServiceConfiguration {
)
private int amqpExplicitFlushAfterFlushes = 1000;

@FieldContext(
category = CATEGORY_AMQP,
required = false,
doc = "Exchange clear task interval in milliseconds."
)
private int amqpExchangeClearTaskInterval = 5000;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.qpid.server.protocol.v0_8.transport.BasicGetEmptyBody;
Expand Down Expand Up @@ -111,8 +110,6 @@ public void readEntriesComplete(List<Entry> list, Object o) {
} catch (UnsupportedEncodingException e) {
log.error("Failed to convert entry to AMQP body", e);
}
consumer.addUnAckMessages(indexMessage.getExchangeName(),
(PositionImpl) index.getPosition(), (PositionImpl) msg.getPosition());
} else {
message.complete(Pair.of(index.getPosition(), null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ public class QueueContainer {
private AmqpTopicManager amqpTopicManager;
private PulsarService pulsarService;
private ExchangeContainer exchangeContainer;
@Getter
private final AmqpServiceConfiguration serviceConfig;

protected QueueContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService,
ExchangeContainer exchangeContainer) {
ExchangeContainer exchangeContainer, AmqpServiceConfiguration serviceConfig) {
this.amqpTopicManager = amqpTopicManager;
this.pulsarService = pulsarService;
this.exchangeContainer = exchangeContainer;
this.serviceConfig = serviceConfig;
}

@Getter
Expand Down Expand Up @@ -96,7 +99,8 @@ public CompletableFuture<AmqpQueue> asyncGetQueue(NamespaceName namespaceName, S

// TODO: reset connectionId, exclusive and autoDelete
PersistentQueue amqpQueue = new PersistentQueue(queueName, persistentTopic,
0, false, false);
0, false, false,
serviceConfig.getAmqpExchangeClearTaskInterval());
try {
amqpQueue.recoverRoutersFromQueueProperties(properties, exchangeContainer,
namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,22 @@
import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils;
import io.streamnative.pulsar.handlers.amqp.utils.PulsarTopicMetadataUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.MessageImpl;
Expand All @@ -53,6 +61,7 @@ public class PersistentQueue extends AbstractAmqpQueue {
public static final String QUEUE = "QUEUE";
public static final String ROUTERS = "ROUTERS";
public static final String TOPIC_PREFIX = "__amqp_queue__";
public static final String DEFAULT_SUBSCRIPTION = "defaultSubscription";

@Getter
private PersistentTopic indexTopic;
Expand All @@ -61,14 +70,21 @@ public class PersistentQueue extends AbstractAmqpQueue {

private AmqpEntryWriter amqpEntryWriter;

private final long exchangeClearTaskInterval;
private PositionImpl checkpointLac;
private final Map<String, Position> checkpointExchangeRoutePos = new ConcurrentHashMap<>();

public PersistentQueue(String queueName, PersistentTopic indexTopic,
long connectionId,
boolean exclusive, boolean autoDelete) {
boolean exclusive, boolean autoDelete,
long exchangeClearTaskInterval) {
super(queueName, true, connectionId, exclusive, autoDelete);
this.indexTopic = indexTopic;
topicNameValidate();
this.jsonMapper = new ObjectMapper();
this.amqpEntryWriter = new AmqpEntryWriter(indexTopic);
this.exchangeClearTaskInterval = exchangeClearTaskInterval;
scheduleExchangeClearTask();
}

@Override
Expand All @@ -92,7 +108,7 @@ public CompletableFuture<Entry> readEntryAsync(String exchangeName, long ledgerI

@Override
public CompletableFuture<Void> acknowledgeAsync(String exchangeName, long ledgerId, long entryId) {
return getRouter(exchangeName).getExchange().markDeleteAsync(getName(), ledgerId, entryId);
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down Expand Up @@ -178,12 +194,71 @@ private List<AmqpQueueProperties> getQueueProperties(Map<String, AmqpMessageRout
return propertiesList;
}


private void topicNameValidate() {
String[] nameArr = this.indexTopic.getName().split("/");
checkArgument(nameArr[nameArr.length - 1].equals(TOPIC_PREFIX + queueName),
"The queue topic name does not conform to the rules(%s%s).",
TOPIC_PREFIX, "exchangeName");
}

private void scheduleExchangeClearTask() {
this.indexTopic.getBrokerService().getPulsar().getExecutor()
.schedule(this::exchangeClearCheckpointTask, exchangeClearTaskInterval, TimeUnit.MILLISECONDS);
}

private synchronized void exchangeClearCheckpointTask() {
this.checkpointLac = null;
Collection<CompletableFuture<Void>> futures = new ArrayList<>();
for (AmqpMessageRouter router : routers.values()) {
CompletableFuture<Void> future = new CompletableFuture<>();
((PersistentTopic) router.getExchange().getTopic()).getManagedLedger()
.asyncOpenCursor("__amqp_replicator__" + router.getExchange().getName(),
new AsyncCallbacks.OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
PositionImpl pos = (PositionImpl) cursor.getMarkDeletedPosition();
checkpointExchangeRoutePos.put(router.getExchange().getName(), pos);
future.complete(null);
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Failed to get exchange route position for queue {}.", queueName, exception);
future.completeExceptionally(exception);
}
}, null);
futures.add(future);
}
FutureUtil.waitForAll(futures).whenComplete((__, t) -> {
if (t != null) {
scheduleExchangeClearTask();
return;
}
this.checkpointLac = (PositionImpl) indexTopic.getLastPosition();
exchangeCleanup();
});
}

private void exchangeCleanup() {
PositionImpl indexMarkDeletePos = (PositionImpl) indexTopic.getSubscription(DEFAULT_SUBSCRIPTION)
.getCursor().getMarkDeletedPosition();
Collection<CompletableFuture<Void>> futures = new ArrayList<>();
if (indexMarkDeletePos.compareTo(this.checkpointLac) >= 0) {
for (AmqpMessageRouter router : routers.values()) {
Position position = checkpointExchangeRoutePos.get(router.getExchange().getName());
if (position != null) {
futures.add(router.getExchange().markDeleteAsync(
queueName, position.getLedgerId(), position.getEntryId()));
}
}
FutureUtil.waitForAll(futures).whenComplete((__, t) -> {
// no matter previous task is success or failed, schedule a new clear task.
scheduleExchangeClearTask();
});
} else {
this.indexTopic.getBrokerService().getPulsar().getExecutor().schedule(
this::exchangeCleanup, exchangeClearTaskInterval, TimeUnit.MILLISECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.extern.log4j.Log4j2;
import org.apache.bookkeeper.common.util.OrderedExecutor;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void setup() throws Exception {
mockPulsarService();
mockBrokerService();
ConnectionContainer connectionContainer = mock(ConnectionContainer.class);
amqpBrokerService = new AmqpBrokerService(pulsarService, connectionContainer);
amqpBrokerService = new AmqpBrokerService(pulsarService, connectionContainer, new AmqpServiceConfiguration());
amqpTopicManager = amqpBrokerService.getAmqpTopicManager();

// 1.Init AMQP connection for connection methods and channel methods tests.
Expand Down Expand Up @@ -150,6 +151,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
Mockito.when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
Mockito.when(pulsarService.getOrderedExecutor()).thenReturn(
OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered").build());
Mockito.when(pulsarService.getExecutor()).thenReturn(Executors.newScheduledThreadPool(1));
Mockito.when(serviceConfiguration.getNumIOThreads()).thenReturn(2 * Runtime.getRuntime().availableProcessors());

NamespaceResources namespaceResources = mock(NamespaceResources.class);
Expand Down
Loading