Skip to content

Commit

Permalink
[fea] New proxy to use multi bundles
Browse files Browse the repository at this point in the history
Add a new proxy to use multi bundles feature.

- support exchange declare
- support queue declare
- support queue bind
- support queue unbind
- support basic publish
- support basic consume
  • Loading branch information
gaoran10 committed Nov 14, 2022
1 parent 8831494 commit e898266
Show file tree
Hide file tree
Showing 31 changed files with 2,180 additions and 519 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,24 @@ public class AmqpBrokerService {
private ConnectionContainer connectionContainer;
@Getter
private PulsarService pulsarService;
@Getter
private AmqpServiceConfiguration configuration;

public AmqpBrokerService(PulsarService pulsarService) {
public AmqpBrokerService(PulsarService pulsarService, AmqpServiceConfiguration configuration) {
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, configuration);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer);
this.configuration = configuration;
}

public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer) {
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, configuration);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,17 +272,32 @@ public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQS
log.debug("RECV[{}] QueueBind[ queue: {}, exchange: {}, bindingKey:{}, nowait:{}, arguments:{} ]",
channelId, queue, exchange, bindingKey, nowait, argumentsTable);
}
queueService.queueBind(connection.getNamespaceName(), getQueueName(queue), exchange.toString(),

if (connection.getAmqpConfig().isAmqpProxyV2Enable()) {
exchangeService.queueBind(connection.getNamespaceName(), exchange.toString(), getQueueName(queue),
bindingKey != null ? bindingKey.toString() : null, FieldTable.convertToMap(argumentsTable))
.thenAccept(__ -> {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to bind queue {} to exchange {} v2.", queue, exchange, t);
handleAoPException(t);
return null;
});
} else {
queueService.queueBind(connection.getNamespaceName(), getQueueName(queue), exchange.toString(),
bindingKey != null ? bindingKey.toString() : null, nowait, argumentsTable,
connection.getConnectionId()).thenAccept(__ -> {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to bind queue {} to exchange {}.", queue, exchange, t);
handleAoPException(t);
return null;
});
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to bind queue {} to exchange {}.", queue, exchange, t);
handleAoPException(t);
return null;
});
}
}

@Override
Expand Down Expand Up @@ -343,16 +358,31 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM
log.debug("RECV[{}] QueueUnbind[ queue: {}, exchange:{}, bindingKey:{}, arguments:{} ]", channelId, queue,
exchange, bindingKey, arguments);
}
queueService.queueUnbind(connection.getNamespaceName(), queue.toString(), exchange.toString(),
bindingKey.toString(), arguments, connection.getConnectionId()).thenAccept(__ -> {
AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to unbind queue {} with exchange {} in vhost {}",
queue, exchange, connection.getNamespaceName(), t);
handleAoPException(t);
return null;
});

if (connection.getAmqpConfig().isAmqpProxyV2Enable()) {
exchangeService.queueUnBind(connection.getNamespaceName(), exchange.toString(), queue.toString(),
bindingKey.toString(), FieldTable.convertToMap(arguments))
.thenAccept(__ -> {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to unbind queue {} to exchange {}.", queue, exchange, t);
handleAoPException(t);
return null;
});
} else {
queueService.queueUnbind(connection.getNamespaceName(), queue.toString(), exchange.toString(),
bindingKey.toString(), arguments, connection.getConnectionId()).thenAccept(__ -> {
AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to unbind queue {} with exchange {} in vhost {}",
queue, exchange, connection.getNamespaceName(), t);
handleAoPException(t);
return null;
});
}
}

@Override
Expand Down Expand Up @@ -432,12 +462,21 @@ private synchronized void subscribe(String consumerTag, String queueName, Topic
subscriptionFuture.thenAccept(subscription -> {
AmqpConsumer consumer;
try {
consumer = new AmqpConsumer(queueContainer, subscription, exclusive
? CommandSubscribe.SubType.Exclusive :
CommandSubscribe.SubType.Shared, topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
if (connection.getAmqpConfig().isAmqpProxyV2Enable()) {
consumer = new AmqpConsumerOriginal(queueContainer, subscription, exclusive
? CommandSubscribe.SubType.Exclusive :
CommandSubscribe.SubType.Shared, topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
} else {
consumer = new AmqpConsumer(queueContainer, subscription, exclusive
? CommandSubscribe.SubType.Exclusive :
CommandSubscribe.SubType.Shared, topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
}
} catch (BrokerServiceException e) {
exceptionFuture.completeExceptionally(e);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.nio.ByteBuffer;
import java.util.Objects;

import io.netty.channel.Channel;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.AMQDecoder;
Expand Down Expand Up @@ -51,7 +53,7 @@
* Client decoder for client-server interaction tests.
* Copied from Qpid tests lib.
*/
public class AmqpClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
public class AmqpClientDecoder extends AmqpDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
{
private QpidByteBuffer _incompleteBuffer;

Expand All @@ -62,7 +64,13 @@ public class AmqpClientDecoder extends AMQDecoder<ClientMethodProcessor<? extend
*/
public AmqpClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor)
{
super(false, methodProcessor);
super(false, methodProcessor, null);
}

public AmqpClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor,
Channel clientChannel)
{
super(false, methodProcessor, clientChannel);
}

public void decodeBuffer(ByteBuffer incomingBuffer) throws AMQFrameDecodingException, AMQProtocolVersionException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,13 @@ public synchronized void writeFrame(AMQDataBlock frame) {
getCtx().writeAndFlush(frame);
}

public synchronized void writeData(Object obj) {
if (log.isDebugEnabled()) {
log.debug("write data to client: " + obj);
}
getCtx().channel().writeAndFlush(obj);
}

public MethodRegistry getMethodRegistry() {
return methodRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,20 @@
@Slf4j
public class AmqpConsumer extends Consumer {

private final AmqpChannel channel;
protected final AmqpChannel channel;

private QueueContainer queueContainer;

private final boolean autoAck;
protected final boolean autoAck;

private final String consumerTag;
protected final String consumerTag;

private final String queueName;
protected final String queueName;
/**
* map(exchangeName,treeMap(indexPosition,msgPosition)) .
*/
private final Map<String, ConcurrentSkipListMap<PositionImpl, PositionImpl>> unAckMessages;
private static final AtomicIntegerFieldUpdater<AmqpConsumer> MESSAGE_PERMITS_UPDATER =
protected static final AtomicIntegerFieldUpdater<AmqpConsumer> MESSAGE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AmqpConsumer.class, "availablePermits");
private volatile int availablePermits;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.amqp;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;


/**
* Amqp consumer Used to forward messages.
*/
@Slf4j
public class AmqpConsumerOriginal extends AmqpConsumer {

public AmqpConsumerOriginal(QueueContainer queueContainer, Subscription subscription,
CommandSubscribe.SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName, boolean isDurable, ServerCnx cnx,
String appId, Map<String, String> metadata, boolean readCompacted,
MessageId messageId,
KeySharedMeta keySharedMeta, AmqpChannel channel, String consumerTag, String queueName,
boolean autoAck) throws BrokerServiceException {
super(queueContainer, subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable,
cnx, appId, metadata, readCompacted, messageId, keySharedMeta, channel, consumerTag,
queueName, autoAck);
}

@Override
public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes,
long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
return sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes,
totalChunkedMessages, redeliveryTracker, Commands.DEFAULT_CONSUMER_EPOCH);
}

@Override
public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes,
long totalChunkedMessages, RedeliveryTracker redeliveryTracker, long epoch) {
ChannelPromise writePromise = this.channel.getConnection().getCtx().newPromise();
if (entries.isEmpty() || totalMessages == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}]({}) List of messages is empty, trigger write future immediately for consumerId {}",
queueName, consumerTag, consumerId());
}
writePromise.setSuccess(null);
return writePromise;
}
if (!autoAck) {
channel.getCreditManager().useCreditForMessages(totalMessages, 0);
if (!channel.getCreditManager().hasCredit()) {
channel.setBlockedOnCredit();
}
}
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
final AmqpConnection connection = channel.getConnection();
connection.ctx.channel().eventLoop().execute(() -> {
double count = 0;
for (Entry entry : entries) {
if (entry == null) {
// Entry was filtered out
continue;
}
count++;
sendMessage(entry);
}
connection.getCtx().writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
batchSizes.recyle();
});
return writePromise;
}

private void sendMessage(Entry entry) {
try {
long deliveryTag = channel.getNextDeliveryTag();
if (!autoAck) {
channel.getUnacknowledgedMessageMap().add(deliveryTag,
entry.getPosition(), this, entry.getLength());
}

channel.getConnection().getAmqpOutputConverter().writeDeliver(
MessageConvertUtils.entryToAmqpBody(entry),
channel.getChannelId(),
getRedeliveryTracker().contains(entry.getPosition()),
deliveryTag,
AMQShortString.createAMQShortString(consumerTag));

if (autoAck) {
messagesAck(entry.getPosition());
}
} catch (Exception e) {
log.error("[{}]({}) Failed to send message to consumer.", queueName, consumerTag, e);
} finally {
entry.release();
}
}

}

0 comments on commit e898266

Please sign in to comment.