Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][feature] New proxy to use the multi bundles feature (part1) #716

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public class AmqpBrokerService {
public AmqpBrokerService(PulsarService pulsarService, AmqpServiceConfiguration config) {
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, initRouteExecutor(config),
config.getAmqpExchangeRouteQueueSize());
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService,
initRouteExecutor(config), config);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, config);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,32 @@ public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQS
log.debug("RECV[{}] QueueBind[ queue: {}, exchange: {}, bindingKey:{}, nowait:{}, arguments:{} ]",
channelId, queue, exchange, bindingKey, nowait, argumentsTable);
}
queueService.queueBind(connection.getNamespaceName(), getQueueName(queue), exchange.toString(),

if (connection.getAmqpConfig().isAmqpProxyV2Enable()) {
exchangeService.queueBind(connection.getNamespaceName(), exchange.toString(), getQueueName(queue),
bindingKey != null ? bindingKey.toString() : null, FieldTable.convertToMap(argumentsTable))
.thenAccept(__ -> {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to bind queue {} to exchange {} v2.", queue, exchange, t);
handleAoPException(t);
return null;
});
} else {
queueService.queueBind(connection.getNamespaceName(), getQueueName(queue), exchange.toString(),
bindingKey != null ? bindingKey.toString() : null, nowait, argumentsTable,
connection.getConnectionId()).thenAccept(__ -> {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to bind queue {} to exchange {}.", queue, exchange, t);
handleAoPException(t);
return null;
});
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to bind queue {} to exchange {}.", queue, exchange, t);
handleAoPException(t);
return null;
});
}
}

@Override
Expand Down Expand Up @@ -344,16 +359,31 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM
log.debug("RECV[{}] QueueUnbind[ queue: {}, exchange:{}, bindingKey:{}, arguments:{} ]", channelId, queue,
exchange, bindingKey, arguments);
}
queueService.queueUnbind(connection.getNamespaceName(), queue.toString(), exchange.toString(),
bindingKey.toString(), arguments, connection.getConnectionId()).thenAccept(__ -> {
AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to unbind queue {} with exchange {} in vhost {}",
queue, exchange, connection.getNamespaceName(), t);
handleAoPException(t);
return null;
});

if (connection.getAmqpConfig().isAmqpProxyV2Enable()) {
exchangeService.queueUnBind(connection.getNamespaceName(), exchange.toString(), queue.toString(),
bindingKey.toString(), FieldTable.convertToMap(arguments))
.thenAccept(__ -> {
MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to unbind queue {} to exchange {}.", queue, exchange, t);
handleAoPException(t);
return null;
});
} else {
queueService.queueUnbind(connection.getNamespaceName(), queue.toString(), exchange.toString(),
bindingKey.toString(), arguments, connection.getConnectionId()).thenAccept(__ -> {
AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
}).exceptionally(t -> {
log.error("Failed to unbind queue {} with exchange {} in vhost {}",
queue, exchange, connection.getNamespaceName(), t);
handleAoPException(t);
return null;
});
}
}

@Override
Expand Down Expand Up @@ -431,12 +461,22 @@ private synchronized void subscribe(String consumerTag, String queueName, Topic
CompletableFuture<Subscription> subscriptionFuture = topic.createSubscription(
defaultSubscription, CommandSubscribe.InitialPosition.Earliest, false, null);
subscriptionFuture.thenAccept(subscription -> {
AmqpConsumer consumer = new AmqpConsumer(queueContainer, subscription,
exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared,
topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
AmqpConsumer consumer;
if (connection.getAmqpConfig().isAmqpProxyV2Enable()) {
consumer = new AmqpConsumerOriginal(queueContainer, subscription,
exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared,
topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
} else {
consumer = new AmqpConsumer(queueContainer, subscription, exclusive
? CommandSubscribe.SubType.Exclusive :
CommandSubscribe.SubType.Shared, topic.getName(), CONSUMER_ID.incrementAndGet(), 0,
consumerTag, true, connection.getServerCnx(), "", null,
false, MessageId.latest,
null, this, consumerTag, queueName, ack);
}
subscription.addConsumer(consumer).thenAccept(__ -> {
consumer.handleFlow(DEFAULT_CONSUMER_PERMIT);
tag2ConsumersMap.put(consumerTag, consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.nio.ByteBuffer;
import java.util.Objects;

import io.netty.channel.Channel;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.AMQDecoder;
Expand Down Expand Up @@ -51,7 +53,7 @@
* Client decoder for client-server interaction tests.
* Copied from Qpid tests lib.
*/
public class AmqpClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
public class AmqpClientDecoder extends AmqpDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
{
private QpidByteBuffer _incompleteBuffer;

Expand All @@ -62,7 +64,13 @@ public class AmqpClientDecoder extends AMQDecoder<ClientMethodProcessor<? extend
*/
public AmqpClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor)
{
super(false, methodProcessor);
super(false, methodProcessor, null);
}

public AmqpClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor,
Channel clientChannel)
{
super(false, methodProcessor, clientChannel);
}

public void decodeBuffer(ByteBuffer incomingBuffer) throws AMQFrameDecodingException, AMQProtocolVersionException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,13 @@ public synchronized void writeFrame(AMQDataBlock frame) {
getCtx().writeAndFlush(frame);
}

public synchronized void writeData(Object obj) {
if (log.isDebugEnabled()) {
log.debug("write data to client: " + obj);
}
getCtx().channel().writeAndFlush(obj);
}

public MethodRegistry getMethodRegistry() {
return methodRegistry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,20 @@
@Slf4j
public class AmqpConsumer extends Consumer {

private final AmqpChannel channel;
protected final AmqpChannel channel;

private QueueContainer queueContainer;

private final boolean autoAck;
protected final boolean autoAck;

private final String consumerTag;
protected final String consumerTag;

private final String queueName;
protected final String queueName;
/**
* map(exchangeName,treeMap(indexPosition,msgPosition)) .
*/
private final Map<String, ConcurrentSkipListMap<PositionImpl, PositionImpl>> unAckMessages;
private static final AtomicIntegerFieldUpdater<AmqpConsumer> MESSAGE_PERMITS_UPDATER =
protected static final AtomicIntegerFieldUpdater<AmqpConsumer> MESSAGE_PERMITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AmqpConsumer.class, "availablePermits");
private volatile int availablePermits;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.amqp;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;

/**
* AMQP consumer original is used to forward original messages, not index messages.
*/
@Slf4j
public class AmqpConsumerOriginal extends AmqpConsumer {

public AmqpConsumerOriginal(QueueContainer queueContainer, Subscription subscription,
CommandSubscribe.SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName, boolean isDurable, ServerCnx cnx,
String appId, Map<String, String> metadata, boolean readCompacted,
MessageId messageId,
KeySharedMeta keySharedMeta, AmqpChannel channel, String consumerTag, String queueName,
boolean autoAck) {
super(queueContainer, subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable,
cnx, appId, metadata, readCompacted, messageId, keySharedMeta, channel, consumerTag,
queueName, autoAck);
}

@Override
public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes,
long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
return sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes,
totalChunkedMessages, redeliveryTracker, Commands.DEFAULT_CONSUMER_EPOCH);
}

@Override
public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes,
long totalChunkedMessages, RedeliveryTracker redeliveryTracker, long epoch) {
ChannelPromise writePromise = this.channel.getConnection().getCtx().newPromise();
if (entries.isEmpty() || totalMessages == 0) {
if (log.isDebugEnabled()) {
log.debug("[{}]({}) List of messages is empty, trigger write future immediately for consumerId {}",
queueName, consumerTag, consumerId());
}
writePromise.setSuccess(null);
return writePromise;
}
if (!autoAck) {
channel.getCreditManager().useCreditForMessages(totalMessages, 0);
if (!channel.getCreditManager().hasCredit()) {
channel.setBlockedOnCredit();
}
}
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
final AmqpConnection connection = channel.getConnection();
connection.ctx.channel().eventLoop().execute(() -> {
for (Entry entry : entries) {
if (entry == null) {
// Entry was filtered out
continue;
}
sendMessage(entry);
}
connection.getCtx().writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
batchSizes.recyle();
});
return writePromise;
}

private void sendMessage(Entry entry) {
try {
long deliveryTag = channel.getNextDeliveryTag();
if (!autoAck) {
channel.getUnacknowledgedMessageMap().add(deliveryTag,
entry.getPosition(), this, entry.getLength());
}

channel.getConnection().getAmqpOutputConverter().writeDeliver(
MessageConvertUtils.entryToAmqpBody(entry),
channel.getChannelId(),
getRedeliveryTracker().contains(entry.getPosition()),
deliveryTag,
AMQShortString.createAMQShortString(consumerTag));

if (autoAck) {
messagesAck(entry.getPosition());
}
} catch (Exception e) {
log.error("[{}]({}) Failed to send message to consumer.", queueName, consumerTag, e);
} finally {
entry.release();
}
}

}