diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java index 4c7ddf36..c87e44a1 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java @@ -43,9 +43,9 @@ public class AmqpBrokerService { public AmqpBrokerService(PulsarService pulsarService, AmqpServiceConfiguration config) { this.pulsarService = pulsarService; this.amqpTopicManager = new AmqpTopicManager(pulsarService); - this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, initRouteExecutor(config), - config.getAmqpExchangeRouteQueueSize()); - this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer); + this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, + initRouteExecutor(config), config); + this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, config); this.exchangeService = new ExchangeServiceImpl(exchangeContainer); this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer); this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java index b81bb987..b335650a 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java @@ -273,17 +273,32 @@ public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQS log.debug("RECV[{}] QueueBind[ queue: {}, exchange: {}, bindingKey:{}, nowait:{}, arguments:{} ]", channelId, queue, exchange, bindingKey, nowait, argumentsTable); } - queueService.queueBind(connection.getNamespaceName(), getQueueName(queue), exchange.toString(), + + if (connection.getAmqpConfig().isAmqpProxyV2Enable()) { + exchangeService.queueBind(connection.getNamespaceName(), exchange.toString(), getQueueName(queue), + bindingKey != null ? bindingKey.toString() : null, FieldTable.convertToMap(argumentsTable)) + .thenAccept(__ -> { + MethodRegistry methodRegistry = connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); + connection.writeFrame(responseBody.generateFrame(channelId)); + }).exceptionally(t -> { + log.error("Failed to bind queue {} to exchange {} v2.", queue, exchange, t); + handleAoPException(t); + return null; + }); + } else { + queueService.queueBind(connection.getNamespaceName(), getQueueName(queue), exchange.toString(), bindingKey != null ? bindingKey.toString() : null, nowait, argumentsTable, connection.getConnectionId()).thenAccept(__ -> { - MethodRegistry methodRegistry = connection.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - }).exceptionally(t -> { - log.error("Failed to bind queue {} to exchange {}.", queue, exchange, t); - handleAoPException(t); - return null; - }); + MethodRegistry methodRegistry = connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody(); + connection.writeFrame(responseBody.generateFrame(channelId)); + }).exceptionally(t -> { + log.error("Failed to bind queue {} to exchange {}.", queue, exchange, t); + handleAoPException(t); + return null; + }); + } } @Override @@ -344,16 +359,31 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM log.debug("RECV[{}] QueueUnbind[ queue: {}, exchange:{}, bindingKey:{}, arguments:{} ]", channelId, queue, exchange, bindingKey, arguments); } - queueService.queueUnbind(connection.getNamespaceName(), queue.toString(), exchange.toString(), - bindingKey.toString(), arguments, connection.getConnectionId()).thenAccept(__ -> { - AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody(); - connection.writeFrame(responseBody.generateFrame(channelId)); - }).exceptionally(t -> { - log.error("Failed to unbind queue {} with exchange {} in vhost {}", - queue, exchange, connection.getNamespaceName(), t); - handleAoPException(t); - return null; - }); + + if (connection.getAmqpConfig().isAmqpProxyV2Enable()) { + exchangeService.queueUnBind(connection.getNamespaceName(), exchange.toString(), queue.toString(), + bindingKey.toString(), FieldTable.convertToMap(arguments)) + .thenAccept(__ -> { + MethodRegistry methodRegistry = connection.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody(); + connection.writeFrame(responseBody.generateFrame(channelId)); + }).exceptionally(t -> { + log.error("Failed to unbind queue {} to exchange {}.", queue, exchange, t); + handleAoPException(t); + return null; + }); + } else { + queueService.queueUnbind(connection.getNamespaceName(), queue.toString(), exchange.toString(), + bindingKey.toString(), arguments, connection.getConnectionId()).thenAccept(__ -> { + AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody(); + connection.writeFrame(responseBody.generateFrame(channelId)); + }).exceptionally(t -> { + log.error("Failed to unbind queue {} with exchange {} in vhost {}", + queue, exchange, connection.getNamespaceName(), t); + handleAoPException(t); + return null; + }); + } } @Override @@ -431,12 +461,22 @@ private synchronized void subscribe(String consumerTag, String queueName, Topic CompletableFuture subscriptionFuture = topic.createSubscription( defaultSubscription, CommandSubscribe.InitialPosition.Earliest, false, null); subscriptionFuture.thenAccept(subscription -> { - AmqpConsumer consumer = new AmqpConsumer(queueContainer, subscription, - exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared, - topic.getName(), CONSUMER_ID.incrementAndGet(), 0, - consumerTag, true, connection.getServerCnx(), "", null, - false, MessageId.latest, - null, this, consumerTag, queueName, ack); + AmqpConsumer consumer; + if (connection.getAmqpConfig().isAmqpProxyV2Enable()) { + consumer = new AmqpConsumerOriginal(queueContainer, subscription, + exclusive ? CommandSubscribe.SubType.Exclusive : CommandSubscribe.SubType.Shared, + topic.getName(), CONSUMER_ID.incrementAndGet(), 0, + consumerTag, true, connection.getServerCnx(), "", null, + false, MessageId.latest, + null, this, consumerTag, queueName, ack); + } else { + consumer = new AmqpConsumer(queueContainer, subscription, exclusive + ? CommandSubscribe.SubType.Exclusive : + CommandSubscribe.SubType.Shared, topic.getName(), CONSUMER_ID.incrementAndGet(), 0, + consumerTag, true, connection.getServerCnx(), "", null, + false, MessageId.latest, + null, this, consumerTag, queueName, ack); + } subscription.addConsumer(consumer).thenAccept(__ -> { consumer.handleFlow(DEFAULT_CONSUMER_PERMIT); tag2ConsumersMap.put(consumerTag, consumer); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpClientDecoder.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpClientDecoder.java index fd2d65c8..59536497 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpClientDecoder.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpClientDecoder.java @@ -15,6 +15,8 @@ import java.nio.ByteBuffer; import java.util.Objects; + +import io.netty.channel.Channel; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.ProtocolVersion; import org.apache.qpid.server.protocol.v0_8.AMQDecoder; @@ -51,7 +53,7 @@ * Client decoder for client-server interaction tests. * Copied from Qpid tests lib. */ -public class AmqpClientDecoder extends AMQDecoder> +public class AmqpClientDecoder extends AmqpDecoder> { private QpidByteBuffer _incompleteBuffer; @@ -62,7 +64,13 @@ public class AmqpClientDecoder extends AMQDecoder methodProcessor) { - super(false, methodProcessor); + super(false, methodProcessor, null); + } + + public AmqpClientDecoder(final ClientMethodProcessor methodProcessor, + Channel clientChannel) + { + super(false, methodProcessor, clientChannel); } public void decodeBuffer(ByteBuffer incomingBuffer) throws AMQFrameDecodingException, AMQProtocolVersionException diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java index 3c971e60..b5cdd87a 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java @@ -561,6 +561,13 @@ public synchronized void writeFrame(AMQDataBlock frame) { getCtx().writeAndFlush(frame); } + public synchronized void writeData(Object obj) { + if (log.isDebugEnabled()) { + log.debug("write data to client: " + obj); + } + getCtx().channel().writeAndFlush(obj); + } + public MethodRegistry getMethodRegistry() { return methodRegistry; } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java index 688850cc..fca34d37 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumer.java @@ -52,20 +52,20 @@ @Slf4j public class AmqpConsumer extends Consumer { - private final AmqpChannel channel; + protected final AmqpChannel channel; private QueueContainer queueContainer; - private final boolean autoAck; + protected final boolean autoAck; - private final String consumerTag; + protected final String consumerTag; - private final String queueName; + protected final String queueName; /** * map(exchangeName,treeMap(indexPosition,msgPosition)) . */ private final Map> unAckMessages; - private static final AtomicIntegerFieldUpdater MESSAGE_PERMITS_UPDATER = + protected static final AtomicIntegerFieldUpdater MESSAGE_PERMITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AmqpConsumer.class, "availablePermits"); private volatile int availablePermits; diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumerOriginal.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumerOriginal.java new file mode 100644 index 00000000..62287d0f --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConsumerOriginal.java @@ -0,0 +1,121 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelPromise; +import io.netty.util.concurrent.Future; +import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; +import org.apache.pulsar.broker.service.EntryBatchSizes; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; + +/** + * AMQP consumer original is used to forward original messages, not index messages. + */ +@Slf4j +public class AmqpConsumerOriginal extends AmqpConsumer { + + public AmqpConsumerOriginal(QueueContainer queueContainer, Subscription subscription, + CommandSubscribe.SubType subType, String topicName, long consumerId, + int priorityLevel, String consumerName, boolean isDurable, ServerCnx cnx, + String appId, Map metadata, boolean readCompacted, + MessageId messageId, + KeySharedMeta keySharedMeta, AmqpChannel channel, String consumerTag, String queueName, + boolean autoAck) { + super(queueContainer, subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable, + cnx, appId, metadata, readCompacted, messageId, keySharedMeta, channel, consumerTag, + queueName, autoAck); + } + + @Override + public Future sendMessages(final List entries, EntryBatchSizes batchSizes, + EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, + long totalChunkedMessages, RedeliveryTracker redeliveryTracker) { + return sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes, + totalChunkedMessages, redeliveryTracker, Commands.DEFAULT_CONSUMER_EPOCH); + } + + @Override + public Future sendMessages(final List entries, EntryBatchSizes batchSizes, + EntryBatchIndexesAcks batchIndexesAcks, int totalMessages, long totalBytes, + long totalChunkedMessages, RedeliveryTracker redeliveryTracker, long epoch) { + ChannelPromise writePromise = this.channel.getConnection().getCtx().newPromise(); + if (entries.isEmpty() || totalMessages == 0) { + if (log.isDebugEnabled()) { + log.debug("[{}]({}) List of messages is empty, trigger write future immediately for consumerId {}", + queueName, consumerTag, consumerId()); + } + writePromise.setSuccess(null); + return writePromise; + } + if (!autoAck) { + channel.getCreditManager().useCreditForMessages(totalMessages, 0); + if (!channel.getCreditManager().hasCredit()) { + channel.setBlockedOnCredit(); + } + } + MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages); + final AmqpConnection connection = channel.getConnection(); + connection.ctx.channel().eventLoop().execute(() -> { + for (Entry entry : entries) { + if (entry == null) { + // Entry was filtered out + continue; + } + sendMessage(entry); + } + connection.getCtx().writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); + batchSizes.recyle(); + }); + return writePromise; + } + + private void sendMessage(Entry entry) { + try { + long deliveryTag = channel.getNextDeliveryTag(); + if (!autoAck) { + channel.getUnacknowledgedMessageMap().add(deliveryTag, + entry.getPosition(), this, entry.getLength()); + } + + channel.getConnection().getAmqpOutputConverter().writeDeliver( + MessageConvertUtils.entryToAmqpBody(entry), + channel.getChannelId(), + getRedeliveryTracker().contains(entry.getPosition()), + deliveryTag, + AMQShortString.createAMQShortString(consumerTag)); + + if (autoAck) { + messagesAck(entry.getPosition()); + } + } catch (Exception e) { + log.error("[{}]({}) Failed to send message to consumer.", queueName, consumerTag, e); + } finally { + entry.release(); + } + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpDecoder.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpDecoder.java new file mode 100644 index 00000000..73a5f065 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpDecoder.java @@ -0,0 +1,200 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp; + +import static io.streamnative.pulsar.handlers.amqp.AmqpOutputConverter.PROXY_V2_DIRECT_TYPE; + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.ErrorCodes; +import org.apache.qpid.server.protocol.ProtocolVersion; +import org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException; +import org.apache.qpid.server.protocol.v0_8.transport.AMQProtocolVersionException; +import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.server.protocol.v0_8.transport.MethodProcessor; +import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation; + +@Slf4j +public abstract class AmqpDecoder { + + public static final int FRAME_HEADER_SIZE = 7; + private final T methodProcessor; + + /** Holds the protocol initiation decoder. */ + private ProtocolInitiation.Decoder piDecoder = new ProtocolInitiation.Decoder(); + + /** Flag to indicate whether this decoder needs to handle protocol initiation. */ + private boolean expectProtocolInitiation; + + + private boolean firstRead = true; + + public static final int FRAME_MIN_SIZE = 4096; + private int maxFrameSize = FRAME_MIN_SIZE; + + private final Channel clientChannel; + + /** + * Creates a new AMQP decoder. + * @param expectProtocolInitiation true if this decoder needs to handle protocol initiation. + * @param methodProcessor method processor + */ + protected AmqpDecoder(boolean expectProtocolInitiation, T methodProcessor, Channel clientChannel) { + this.expectProtocolInitiation = expectProtocolInitiation; + this.methodProcessor = methodProcessor; + this.clientChannel = clientChannel; + } + + /** + * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol + * initation decoder. This method is expected to be called with false once protocol initation completes. + * + * @param expectProtocolInitiation true to use the protocol initiation decoder, false to use the + * data decoder. + */ + public void setExpectProtocolInitiation(boolean expectProtocolInitiation) { + this.expectProtocolInitiation = expectProtocolInitiation; + } + + public void setMaxFrameSize(final int frameMax) { + maxFrameSize = frameMax; + } + + public T getMethodProcessor() { + return methodProcessor; + } + + protected final int decode(final QpidByteBuffer buf) throws AMQFrameDecodingException { + // If this is the first read then we may be getting a protocol initiation back if we tried to negotiate + // an unsupported version + if (firstRead && buf.hasRemaining()) { + firstRead = false; + if (!expectProtocolInitiation && (((int) buf.get(buf.position())) & 0xff) > 8) { + expectProtocolInitiation = true; + } + } + + int required = 0; + while (required == 0) { + if (!expectProtocolInitiation) { + required = processAMQPFrames(buf); + } else { + required = piDecoder.decodable(buf); + if (required == 0) { + methodProcessor.receiveProtocolHeader(new ProtocolInitiation(buf)); + } + } + } + return buf.hasRemaining() ? required : 0; + } + + protected int processAMQPFrames(final QpidByteBuffer buf) throws AMQFrameDecodingException { + final int required = decodable(buf); + if (required == 0) { + processInput(buf); + } + return required; + } + + protected int decodable(final QpidByteBuffer in) throws AMQFrameDecodingException { + final int remainingAfterAttributes = in.remaining() - FRAME_HEADER_SIZE; + // type, channel, body length and end byte + if (remainingAfterAttributes < 0) { + return -remainingAfterAttributes; + } + + + // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() + final long bodySize = ((long) in.getInt(in.position() + 3)) & 0xffffffffL; + if (bodySize > maxFrameSize) { + throw new AMQFrameDecodingException( + "Incoming frame size of " + + bodySize + + " is larger than negotiated maximum of " + + maxFrameSize); + } + + long required = (1L + bodySize) - remainingAfterAttributes; + return required > 0 ? (int) required : 0; + + } + + protected void processInput(final QpidByteBuffer in) + throws AMQFrameDecodingException, AMQProtocolVersionException { + final byte type = in.get(); + + final int channel = in.getUnsignedShort(); + final long bodySize = in.getUnsignedInt(); + + // bodySize can be zero + if ((channel < 0) || (bodySize < 0)) { + throw new AMQFrameDecodingException( + "Undecodable frame: type = " + type + " channel = " + channel + + " bodySize = " + bodySize); + } + + processFrame(channel, type, bodySize, in); + + byte marker = in.get(); + if ((marker & 0xFF) != 0xCE) { + throw new AMQFrameDecodingException( + "End of frame marker not found. Read " + marker + " length=" + bodySize + + " type=" + type); + } + + } + + protected void processFrame(final int channel, final byte type, final long bodySize, final QpidByteBuffer in) + throws AMQFrameDecodingException { + switch (type) { + case 1 -> processMethod(channel, in); + case 2 -> ContentHeaderBody.process(in, methodProcessor.getChannelMethodProcessor(channel), bodySize); + case 3 -> ContentBody.process(in, methodProcessor.getChannelMethodProcessor(channel), bodySize); + case 8 -> +// HeartbeatBody.process(channel, in, _methodProcessor, bodySize); + redirectData(in, bodySize, (byte) 8); + case PROXY_V2_DIRECT_TYPE -> redirectData(in, bodySize, PROXY_V2_DIRECT_TYPE); + default -> throw new AMQFrameDecodingException("Unsupported frame type: " + type); + } + } + + protected abstract void processMethod(int channelId, QpidByteBuffer in) throws AMQFrameDecodingException; + + protected AMQFrameDecodingException newUnknownMethodException(final int classId, + final int methodId, + ProtocolVersion protocolVersion) { + return new AMQFrameDecodingException(ErrorCodes.COMMAND_INVALID, + "Method " + + methodId + + " unknown in AMQP version " + + protocolVersion + + " (while trying to decode class " + + classId + + " method " + + methodId + + ".", null); + } + + protected void redirectData(QpidByteBuffer in, long messageSize, byte type) { + if (log.isDebugEnabled()) { + log.debug("redirect data type: {}, messageSize: {}", type, messageSize); + } + QpidByteBuffer buffer = in.view(0, (int) messageSize); + in.position((int) (in.position() + messageSize)); + clientChannel.writeAndFlush(buffer); + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java index 1ba8816b..39242817 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java @@ -13,12 +13,14 @@ */ package io.streamnative.pulsar.handlers.amqp; +import io.streamnative.pulsar.handlers.amqp.common.exception.NotSupportedOperationException; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.util.FutureUtil; /** * Interface of the AMQP exchange. @@ -153,4 +155,14 @@ public static Type value(String type) { int getQueueSize(); + default CompletableFuture queueBind(String queue, String routingKey, Map arguments) { + return FutureUtil.failedFuture( + new NotSupportedOperationException("Amqp exchange queue bind operation is not supported.")); + } + + default CompletableFuture queueUnBind(String queue, String routingKey, Map arguments) { + return FutureUtil.failedFuture( + new NotSupportedOperationException("Amqp exchange queue unbind operation is not supported.")); + } + } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpOutputConverter.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpOutputConverter.java index 74931042..d83bfcbd 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpOutputConverter.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpOutputConverter.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.amqp; +import static org.apache.qpid.server.protocol.v0_8.transport.AMQFrame.FRAME_END_BYTE; + import java.io.IOException; import lombok.extern.log4j.Log4j2; import org.apache.qpid.server.QpidException; @@ -32,6 +34,7 @@ import org.apache.qpid.server.transport.ByteBufferSender; import org.apache.qpid.server.util.GZIPUtils; + /** * Used to process command output. */ @@ -42,6 +45,16 @@ public class AmqpOutputConverter { private final AmqpConnection connection; private static final AMQShortString GZIP_ENCODING = AMQShortString.valueOf(GZIPUtils.GZIP_CONTENT_ENCODING); + // data with this type will not be decoded by proxy v2. + public static final byte PROXY_V2_DIRECT_TYPE = 9; + private static final int PROXY_V2_DIRECT_EXTENDS_SIZE = 8; + private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1); + + static { + FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE); + FRAME_END_BYTE_BUFFER.flip(); + } + public AmqpOutputConverter(AmqpConnection connection) { this.connection = connection; } @@ -148,7 +161,7 @@ private void writeMessageDeliveryUnchanged(MessageContentSource content, writeFrame(new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, - new MessageContentSourceBody(chunk))); + new MessageContentSourceBody(chunk), connection.getAmqpConfig().isAmqpProxyV2Enable())); int writtenSize = contentChunkSize; while (writtenSize < bodySize) { @@ -346,27 +359,48 @@ public static final class CompositeAMQBodyBlock extends AMQDataBlock { private final AMQBody headerBody; private final AMQBody contentBody; private final int channel; + private final boolean amqpProxyV2Enable; - public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) { + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody, + boolean amqpProxyV2Enable) { this.channel = channel; this.methodBody = methodBody; this.headerBody = headerBody; this.contentBody = contentBody; + this.amqpProxyV2Enable = amqpProxyV2Enable; } @Override public long getSize() { - return OVERHEAD + (long) methodBody.getSize() + (long) headerBody.getSize() + (long) contentBody.getSize(); + return (amqpProxyV2Enable ? PROXY_V2_DIRECT_EXTENDS_SIZE : 0) + OVERHEAD + (long) methodBody.getSize() + + (long) headerBody.getSize() + (long) contentBody.getSize(); } @Override public long writePayload(final ByteBufferSender sender) { + if (amqpProxyV2Enable) { + // wrap the delivery message data with a special type 9 to skip data decode in proxy + QpidByteBuffer buffer = QpidByteBuffer.allocate(PROXY_V2_DIRECT_EXTENDS_SIZE - 1); + buffer.put(PROXY_V2_DIRECT_TYPE); + buffer.putUnsignedShort(0); + buffer.putUnsignedInt(getSize() - 8); + buffer.flip(); + sender.send(buffer); + } + long size = (new AMQFrame(channel, methodBody)).writePayload(sender); size += (new AMQFrame(channel, headerBody)).writePayload(sender); size += (new AMQFrame(channel, contentBody)).writePayload(sender); + if (amqpProxyV2Enable) { + // wrap the delivery message data to skip data decode in proxy + try (QpidByteBuffer endFrame = FRAME_END_BYTE_BUFFER.duplicate()) { + sender.send(endFrame); + } + size += PROXY_V2_DIRECT_EXTENDS_SIZE; + } return size; } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java index 4742c231..24b0bad2 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.socket.SocketChannel; import io.streamnative.pulsar.handlers.amqp.proxy.ProxyConfiguration; import io.streamnative.pulsar.handlers.amqp.proxy.ProxyService; +import io.streamnative.pulsar.handlers.amqp.proxy.v2.ProxyServiceV2; import io.streamnative.pulsar.handlers.amqp.utils.ConfigurationUtils; import java.net.InetSocketAddress; import java.util.Map; @@ -108,9 +109,14 @@ public void start(BrokerService service) { "plaintext must be configured on internal listener"); proxyConfig.setBrokerServiceURL(internalListener.getBrokerServiceUrl().toString()); - ProxyService proxyService = new ProxyService(proxyConfig, service.getPulsar()); try { - proxyService.start(); + if (amqpConfig.isAmqpProxyV2Enable()) { + ProxyServiceV2 proxyServer = new ProxyServiceV2(proxyConfig, service.getPulsar()); + proxyServer.start(); + } else { + ProxyService proxyService = new ProxyService(proxyConfig, service.getPulsar()); + proxyService.start(); + } log.info("Start amqp proxy service at port: {}", proxyConfig.getAmqpProxyPort()); } catch (Exception e) { log.error("Failed to start amqp proxy service."); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProxyDirectHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProxyDirectHandler.java new file mode 100644 index 00000000..288c4289 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProxyDirectHandler.java @@ -0,0 +1,33 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import lombok.extern.log4j.Log4j2; +import org.apache.qpid.server.bytebuffer.SingleQpidByteBuffer; + +/** + * amqp data direct handler. + */ +@Log4j2 +public class AmqpProxyDirectHandler extends MessageToByteEncoder { + + @Override + public void encode(ChannelHandlerContext ctx, SingleQpidByteBuffer buffer, ByteBuf out) { + out.writeBytes(buffer.getUnderlyingBuffer()); + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java index cf3d2912..ac1e5714 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpPulsarServerCnx.java @@ -15,6 +15,7 @@ import io.netty.channel.ChannelHandlerContext; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.ServerCnx; /** @@ -28,4 +29,8 @@ public AmqpPulsarServerCnx(PulsarService pulsar, ChannelHandlerContext ctx) { this.remoteAddress = ctx.channel().remoteAddress(); } + @Override + public void closeConsumer(Consumer consumer) { + // avoid close the connection when closing the consumer + } } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java index 39b73e31..8be665cd 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpServiceConfiguration.java @@ -125,4 +125,10 @@ public class AmqpServiceConfiguration extends ServiceConfiguration { ) private int amqpExchangeRouteExecutorThreads = Runtime.getRuntime().availableProcessors(); + @FieldContext( + category = CATEGORY_AMQP, + required = false + ) + private boolean amqpProxyV2Enable = false; + } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java index 37672280..807d2d9c 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java @@ -48,14 +48,14 @@ public class ExchangeContainer { private AmqpTopicManager amqpTopicManager; private PulsarService pulsarService; private final Executor routeExecutor; - private final int routeQueueSize; + private final AmqpServiceConfiguration config; protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService, - Executor routeExecutor, int routeQueueSize) { + Executor routeExecutor, AmqpServiceConfiguration config) { this.amqpTopicManager = amqpTopicManager; this.pulsarService = pulsarService; this.routeExecutor = routeExecutor; - this.routeQueueSize = routeQueueSize; + this.config = config; } @Getter @@ -162,9 +162,9 @@ public CompletableFuture asyncGetExchange(NamespaceName namespaceN boolean currentInternal = Boolean.parseBoolean( properties.getOrDefault(INTERNAL, "false")); amqpExchange = new PersistentExchange(exchangeName, - AmqpExchange.Type.value(currentType), - persistentTopic, currentDurable, currentAutoDelete, currentInternal, - currentArguments, routeExecutor, routeQueueSize); + AmqpExchange.Type.value(currentType), persistentTopic, currentDurable, + currentAutoDelete, currentInternal, currentArguments, routeExecutor, + config.getAmqpExchangeRouteQueueSize(), config.isAmqpProxyV2Enable()); } catch (Exception e) { log.error("Failed to init exchange {} in vhost {}.", exchangeName, namespaceName.getLocalName(), e); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java new file mode 100644 index 00000000..144674d9 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeMessageRouter.java @@ -0,0 +1,432 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp; + +import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE; +import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE; +import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; + +import com.google.common.collect.Sets; +import io.streamnative.pulsar.handlers.amqp.impl.HeadersMessageRouter; +import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; +import io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue; +import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.api.proto.KeyValue; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.qpid.server.exchange.topic.TopicParser; + +@Slf4j +public abstract class ExchangeMessageRouter { + + private final PersistentExchange exchange; + + private ManagedCursorImpl cursor; + + private final Map>> producerMap = new ConcurrentHashMap<>(); + + private static final int defaultReadMaxSizeBytes = 4 * 1024 * 1024; + private static final int replicatorQueueSize = 1000; + private volatile int pendingQueueSize = 0; + + private static final AtomicIntegerFieldUpdater PENDING_SIZE_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ExchangeMessageRouter.class, "pendingQueueSize"); + + private volatile int havePendingRead = FALSE; + private static final AtomicIntegerFieldUpdater HAVE_PENDING_READ_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ExchangeMessageRouter.class, "havePendingRead"); + + @AllArgsConstructor + @EqualsAndHashCode + private static class Destination { + String name; + String type; + } + + public ExchangeMessageRouter(PersistentExchange exchange) { + this.exchange = exchange; + } + + public abstract void addBinding(String des, String desType, String routingKey, Map arguments); + + public abstract void removeBinding(String des, String desType, String routingKey, Map arguments); + + abstract Set getDestinations(String routingKey, Map headers); + + public void start() { + start0((ManagedLedgerImpl) ((PersistentTopic) exchange.getTopic()).getManagedLedger()); + } + + private void start0(ManagedLedgerImpl managedLedger) { + managedLedger.asyncOpenCursor("amqp-router", CommandSubscribe.InitialPosition.Earliest, + new AsyncCallbacks.OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + log.info("Start to route messages for exchange {}", exchange.getName()); + ExchangeMessageRouter.this.cursor = (ManagedCursorImpl) cursor; + readMoreEntries(); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + log.error("Failed to open cursor for exchange topic {}, retry", exchange.getName(), exception); + start0(managedLedger); + } + }, null); + } + + private void readMoreEntries() { + int availablePermits = getAvailablePermits(); + if (availablePermits > 0) { + if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) { + if (log.isDebugEnabled()) { + log.debug("{} Schedule read of {} messages.", exchange.getName(), availablePermits); + } + cursor.asyncReadEntriesOrWait(100, defaultReadMaxSizeBytes, + new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + HAVE_PENDING_READ_UPDATER.set(ExchangeMessageRouter.this, FALSE); + processMessages(entries); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + HAVE_PENDING_READ_UPDATER.set(ExchangeMessageRouter.this, FALSE); + log.error("Failed to read entries from exchange {}", exchange.getName(), exception); + } + }, null, null); + } else { + if (log.isDebugEnabled()) { + log.debug("{} Not schedule read due to pending read. Messages to read {}.", + exchange.getName(), availablePermits); + } + } + } else { + // no permits from rate limit + exchange.getTopic().getBrokerService().getPulsar().getExecutor() + .schedule(this::readMoreEntries, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); + } + } + + private int getAvailablePermits() { + int availablePermits = replicatorQueueSize - PENDING_SIZE_UPDATER.get(this); + if (availablePermits <= 0) { + if (log.isDebugEnabled()) { + log.debug("{} Replicator queue is full, availablePermits: {}, pause route.", + exchange.getName(), availablePermits); + } + return 0; + } + return availablePermits; + } + + private void processMessages(List entries) { + for (Entry entry : entries) { + PENDING_SIZE_UPDATER.incrementAndGet(this); + Map props; + MessageImpl message; + try { + message = MessageImpl.deserialize(entry.getDataBuffer()); + props = message.getMessageBuilder().getPropertiesList().stream() + .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); + } catch (IOException e) { + log.error("Deserialize entry dataBuffer failed for exchange {}, skip it first.", + exchange.getName(), e); + PENDING_SIZE_UPDATER.decrementAndGet(this); + entry.release(); + continue; + } + String routingKey = props.getOrDefault(MessageConvertUtils.PROP_ROUTING_KEY, "").toString(); + Set destinations = getDestinations(routingKey, getMessageHeaders()); + + final Position position = entry.getPosition(); + + List> futures = new ArrayList<>(); + if (!destinations.isEmpty()) { + final int readerIndex = message.getDataBuffer().readerIndex(); + for (Destination des : destinations) { + futures.add(sendMessage(message, des, readerIndex)); + } + } + entry.release(); + FutureUtil.waitForAll(futures).whenComplete((__, t) -> { + if (t != null) { + log.error("Failed to route message {}", position, t); + cursor.rewind(); + sendComplete(); + return; + } + sendComplete(); + cursor.asyncDelete(position, new AsyncCallbacks.DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + if (log.isDebugEnabled()) { + log.debug("{} Deleted message at {}", exchange.getName(), ctx); + } + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + log.error("{} Failed to delete message at {}", exchange.getName(), ctx, exception); + } + }, entry.getPosition()); + }); + } + } + + private void sendComplete() { + int pending = PENDING_SIZE_UPDATER.decrementAndGet(this); + if (pending == 0 && HAVE_PENDING_READ_UPDATER.get(this) == FALSE) { + this.readMoreEntries(); + } + } + + private CompletableFuture sendMessage(MessageImpl msg, Destination des, int readerIndex) { + return getProducer(des.name, des.type) + .thenCompose(producer -> { + msg.getDataBuffer().retain(); + msg.getMessageBuilder().clearProducerName(); + msg.getMessageBuilder().clearPublishTime(); + msg.getDataBuffer().readerIndex(readerIndex); + return ((ProducerImpl) producer).sendAsync(msg); + }) + .thenApply(__ -> null); + } + + private CompletableFuture> getProducer(String des, String desType) { + PulsarClient pulsarClient; + try { + pulsarClient = exchange.getTopic().getBrokerService().pulsar().getClient(); + } catch (PulsarServerException e) { + log.error("Failed to get pulsar client", e); + return FutureUtil.failedFuture(e); + } + NamespaceName namespaceName = TopicName.get(exchange.getTopic().getName()).getNamespaceObject(); + String prefix = desType.equals("queue") ? PersistentQueue.TOPIC_PREFIX : PersistentExchange.TOPIC_PREFIX; + return producerMap.computeIfAbsent(des, k -> pulsarClient.newProducer() + .topic(TopicName.get(TopicDomain.persistent.toString(), namespaceName, prefix + des).toString()) + .enableBatching(false) + .createAsync()); + } + + protected Map getMessageHeaders() { + return null; + } + + public static ExchangeMessageRouter getInstance(PersistentExchange exchange) { + return switch (exchange.getType()) { + case Fanout -> new FanoutExchangeMessageRouter(exchange); + case Direct -> new DirectExchangeMessageRouter(exchange); + case Topic -> new TopicExchangeMessageRouter(exchange); + case Headers -> new HeadersExchangeMessageRouter(exchange); + }; + } + + static class FanoutExchangeMessageRouter extends ExchangeMessageRouter { + + private final Set destinationSet; + + public FanoutExchangeMessageRouter(PersistentExchange exchange) { + super(exchange); + destinationSet = Sets.newConcurrentHashSet(); + } + + @Override + public synchronized void addBinding(String des, String desType, String routingKey, + Map arguments) { + destinationSet.add(new Destination(des, desType)); + } + + @Override + public synchronized void removeBinding(String des, String desType, String routingKey, + Map arguments) { + destinationSet.remove(new Destination(des, desType)); + } + + @Override + Set getDestinations(String routingKey, Map headers) { + return destinationSet; + } + + } + + static class DirectExchangeMessageRouter extends ExchangeMessageRouter { + + private final Map> destinationMap; + + public DirectExchangeMessageRouter(PersistentExchange exchange) { + super(exchange); + destinationMap = new ConcurrentHashMap<>(); + } + + @Override + public synchronized void addBinding(String des, String desType, String routingKey, + Map arguments) { + destinationMap.computeIfAbsent(routingKey, k -> Sets.newConcurrentHashSet()) + .add(new Destination(des, desType)); + } + + @Override + public synchronized void removeBinding(String des, String desType, String routingKey, + Map arguments) { + destinationMap.computeIfPresent(routingKey, (k, v) -> { + v.remove(new Destination(des, desType)); + if (v.isEmpty()) { + return null; + } + return v; + }); + } + + @Override + Set getDestinations(String routingKey, Map headers) { + return destinationMap.get(routingKey); + } + } + + static class TopicExchangeMessageRouter extends ExchangeMessageRouter { + + private final Map destinationMap; + + public TopicExchangeMessageRouter(PersistentExchange exchange) { + super(exchange); + destinationMap = new ConcurrentHashMap<>(); + } + + static class TopicRoutingKeyParser { + + Set bindingKeys; + TopicParser topicParser; + + void addBinding(String routingKey) { + if (bindingKeys.add(routingKey)) { + topicParser = new TopicParser(); + topicParser.addBinding(routingKey, null); + } + } + + void unbind(String routingKey) { + bindingKeys.remove(routingKey); + topicParser = new TopicParser(); + for (String bindingKey : bindingKeys) { + topicParser.addBinding(bindingKey, null); + } + } + + } + + + @Override + public synchronized void addBinding(String des, String desType, String routingKey, + Map arguments) { + destinationMap.computeIfAbsent(new Destination(des, desType), k -> new TopicRoutingKeyParser()) + .addBinding(routingKey); + } + + @Override + public synchronized void removeBinding(String des, String desType, String routingKey, + Map arguments) { + destinationMap.computeIfPresent(new Destination(des, desType), (k, v) -> { + v.unbind(routingKey); + if (v.bindingKeys.isEmpty()) { + return null; + } + return v; + }); + } + + @Override + Set getDestinations(String routingKey, Map headers) { + Set destinations = new HashSet<>(); + for (Map.Entry entry : destinationMap.entrySet()) { + if (!entry.getValue().topicParser.parse(routingKey).isEmpty()) { + destinations.add(entry.getKey()); + } + } + return destinations; + } + } + + static class HeadersExchangeMessageRouter extends ExchangeMessageRouter { + + private final Map messageRouterMap; + + public HeadersExchangeMessageRouter(PersistentExchange exchange) { + super(exchange); + messageRouterMap = new ConcurrentHashMap<>(); + } + + @Override + public synchronized void addBinding(String des, String desType, String routingKey, + Map arguments) { + messageRouterMap.computeIfAbsent(new Destination(des, desType), k -> new HeadersMessageRouter()) + .getArguments().putAll(arguments); + } + + @Override + public synchronized void removeBinding(String des, String desType, String routingKey, + Map arguments) { + messageRouterMap.computeIfPresent(new Destination(des, desType), (k, v) -> { + v.getArguments().putAll(arguments); + if (v.getArguments().isEmpty()) { + return null; + } + return v; + }); + } + + @Override + Set getDestinations(String routingKey, Map headers) { + Set destinations = new HashSet<>(); + for (Map.Entry entry : messageRouterMap.entrySet()) { + if (entry.getValue().isMatch(headers)) { + destinations.add(entry.getKey()); + } + } + return destinations; + } + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeService.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeService.java index 3c3015db..be97e2f4 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeService.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeService.java @@ -57,4 +57,11 @@ CompletableFuture exchangeDeclare(NamespaceName namespaceName, Str */ CompletableFuture exchangeBound(NamespaceName namespaceName, String exchange, String routingKey, String queueName); + + CompletableFuture queueBind(NamespaceName namespaceName, String exchange, String queue, String routingKey, + Map arguments); + + CompletableFuture queueUnBind(NamespaceName namespaceName, String exchange, String queue, String routingKey, + Map arguments); + } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java index 2d9d1cc2..aede9728 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeServiceImpl.java @@ -189,4 +189,17 @@ public CompletableFuture exchangeBound(NamespaceName namespaceName, Str return future; } + @Override + public CompletableFuture queueBind(NamespaceName namespaceName, String exchange, String queue, + String routingKey, Map arguments) { + return exchangeContainer.asyncGetExchange(namespaceName, exchange, false, null) + .thenCompose(amqpExchange -> amqpExchange.queueBind(queue, routingKey, arguments)); + } + + @Override + public CompletableFuture queueUnBind(NamespaceName namespaceName, String exchange, String queue, + String routingKey, Map arguments) { + return exchangeContainer.asyncGetExchange(namespaceName, exchange, false, null) + .thenCompose(amqpExchange -> amqpExchange.queueUnBind(queue, routingKey, arguments)); + } } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java index bbe47bec..3521a2e7 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/QueueContainer.java @@ -36,12 +36,14 @@ public class QueueContainer { private AmqpTopicManager amqpTopicManager; private PulsarService pulsarService; private ExchangeContainer exchangeContainer; + private AmqpServiceConfiguration config; protected QueueContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService, - ExchangeContainer exchangeContainer) { + ExchangeContainer exchangeContainer, AmqpServiceConfiguration config) { this.amqpTopicManager = amqpTopicManager; this.pulsarService = pulsarService; this.exchangeContainer = exchangeContainer; + this.config = config; } @Getter @@ -97,15 +99,17 @@ public CompletableFuture asyncGetQueue(NamespaceName namespaceName, S // TODO: reset connectionId, exclusive and autoDelete PersistentQueue amqpQueue = new PersistentQueue(queueName, persistentTopic, 0, false, false); - try { - amqpQueue.recoverRoutersFromQueueProperties(properties, exchangeContainer, - namespaceName); - } catch (Exception e) { - log.error("[{}][{}] Failed to recover routers for queue from properties.", - namespaceName, queueName, e); - queueCompletableFuture.completeExceptionally(e); - removeQueueFuture(namespaceName, queueName); - return; + if (!config.isAmqpProxyV2Enable()) { + try { + amqpQueue.recoverRoutersFromQueueProperties(properties, exchangeContainer, + namespaceName); + } catch (Exception e) { + log.error("[{}][{}] Failed to recover routers for queue from properties.", + namespaceName, queueName, e); + queueCompletableFuture.completeExceptionally(e); + removeQueueFuture(namespaceName, queueName); + return; + } } queueCompletableFuture.complete(amqpQueue); } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/common/exception/NotSupportedOperationException.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/common/exception/NotSupportedOperationException.java new file mode 100644 index 00000000..1a5a15b7 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/common/exception/NotSupportedOperationException.java @@ -0,0 +1,25 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.common.exception; + +/** + * Not supported operation exception. + */ +public class NotSupportedOperationException extends RuntimeException{ + + public NotSupportedOperationException(String message) { + super(message); + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java index 97659c6b..9c3e44e3 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java @@ -17,14 +17,17 @@ import static org.apache.curator.shaded.com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange; import io.streamnative.pulsar.handlers.amqp.AmqpEntryWriter; import io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator; import io.streamnative.pulsar.handlers.amqp.AmqpQueue; +import io.streamnative.pulsar.handlers.amqp.ExchangeMessageRouter; import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils; import io.streamnative.pulsar.handlers.amqp.utils.PulsarTopicMetadataUtils; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -33,6 +36,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -66,15 +73,31 @@ public class PersistentExchange extends AbstractAmqpExchange { public static final String INTERNAL = "INTERNAL"; public static final String ARGUMENTS = "ARGUMENTS"; public static final String TOPIC_PREFIX = "__amqp_exchange__"; + private static final String BINDINGS = "BINDINGS"; private PersistentTopic persistentTopic; private final ConcurrentOpenHashMap> cursors; private AmqpExchangeReplicator messageReplicator; private AmqpEntryWriter amqpEntryWriter; + private ExchangeMessageRouter exchangeMessageRouter; + private Set bindings; + + @NoArgsConstructor + @AllArgsConstructor + @EqualsAndHashCode + @Data + static class Binding implements Serializable { + private String des; + private String desType; + private String key; + private Map arguments; + } + public PersistentExchange(String exchangeName, Type type, PersistentTopic persistentTopic, boolean durable, boolean autoDelete, boolean internal, Map arguments, - Executor routeExecutor, int routeQueueSize) { + Executor routeExecutor, int routeQueueSize, boolean proxyV2Enable) + throws JsonProcessingException { super(exchangeName, type, Sets.newConcurrentHashSet(), durable, autoDelete, internal, arguments); this.persistentTopic = persistentTopic; topicNameValidate(); @@ -85,54 +108,66 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis cursor.setInactive(); } - if (messageReplicator == null) { - messageReplicator = new AmqpExchangeReplicator(this, routeExecutor, routeQueueSize) { - @Override - public CompletableFuture readProcess(ByteBuf data, Position position) { - Map props; - try { - MessageImpl message = MessageImpl.deserialize(data); - props = message.getMessageBuilder().getPropertiesList().stream() - .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); - } catch (Exception e) { - log.error("Failed to deserialize entry dataBuffer. exchangeName: {}", exchangeName, e); - return FutureUtil.failedFuture(e); - } + if (proxyV2Enable) { + bindings = Sets.newConcurrentHashSet(); + if (persistentTopic.getManagedLedger().getProperties().containsKey(BINDINGS)) { + List amqpQueueProperties = JSON_MAPPER.readValue( + persistentTopic.getManagedLedger().getProperties().get(BINDINGS), new TypeReference<>() {}); + this.bindings.addAll(amqpQueueProperties); + } + this.exchangeMessageRouter = ExchangeMessageRouter.getInstance(this); + for (Binding binding : this.bindings) { + this.exchangeMessageRouter.addBinding(binding.des, binding.desType, binding.key, binding.arguments); + } + this.exchangeMessageRouter.start(); + } else { + if (messageReplicator == null) { + messageReplicator = new AmqpExchangeReplicator(this, routeExecutor, routeQueueSize) { + @Override + public CompletableFuture readProcess(ByteBuf data, Position position) { + Map props; + try { + MessageImpl message = MessageImpl.deserialize(data); + props = message.getMessageBuilder().getPropertiesList().stream() + .collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)); + } catch (Exception e) { + log.error("Failed to deserialize entry dataBuffer. exchangeName: {}", exchangeName, e); + return FutureUtil.failedFuture(e); + } - List> routeFutureList = new ArrayList<>(); - if (exchangeType == Type.Direct) { - String bindingKey = props.getOrDefault(MessageConvertUtils.PROP_ROUTING_KEY, "").toString(); - Set queueSet = bindingKeyQueueMap.get(bindingKey); - if (queueSet == null) { - if (log.isDebugEnabled()) { - log.debug("The queue set of the bindingKey {} is not exist.", bindingKey); + List> routeFutureList = new ArrayList<>(); + if (exchangeType == Type.Direct) { + String bindingKey = props.getOrDefault(MessageConvertUtils.PROP_ROUTING_KEY, "").toString(); + Set queueSet = bindingKeyQueueMap.get(bindingKey); + if (queueSet == null) { + if (log.isDebugEnabled()) { + log.debug("The queue set of the bindingKey {} is not exist.", bindingKey); + } + } else { + for (AmqpQueue queue : queueSet) { + routeFutureList.add(queue.writeIndexMessageAsync( + exchangeName, position.getLedgerId(), position.getEntryId(), props)); + } + } + } else if (exchangeType == Type.Fanout) { + for (AmqpQueue queue : queues) { + routeFutureList.add(queue.writeIndexMessageAsync( + exchangeName, position.getLedgerId(), position.getEntryId(), props)); } } else { - for (AmqpQueue queue : queueSet) { - routeFutureList.add( - queue.writeIndexMessageAsync( - exchangeName, position.getLedgerId(), position.getEntryId(), props)); + for (AmqpQueue queue : queues) { + CompletableFuture routeFuture = queue.getRouter(exchangeName).routingMessage( + position.getLedgerId(), position.getEntryId(), + props.getOrDefault(MessageConvertUtils.PROP_ROUTING_KEY, "").toString(), + props); + routeFutureList.add(routeFuture); } } - } else if (exchangeType == Type.Fanout) { - for (AmqpQueue queue : queues) { - routeFutureList.add( - queue.writeIndexMessageAsync( - exchangeName, position.getLedgerId(), position.getEntryId(), props)); - } - } else { - for (AmqpQueue queue : queues) { - CompletableFuture routeFuture = queue.getRouter(exchangeName).routingMessage( - position.getLedgerId(), position.getEntryId(), - props.getOrDefault(MessageConvertUtils.PROP_ROUTING_KEY, "").toString(), - props); - routeFutureList.add(routeFuture); - } + return FutureUtil.waitForAll(routeFutureList); } - return FutureUtil.waitForAll(routeFutureList); - } - }; - messageReplicator.startReplicate(); + }; + messageReplicator.startReplicate(); + } } this.amqpEntryWriter = new AmqpEntryWriter(persistentTopic); } @@ -357,4 +392,61 @@ public void topicNameValidate() { "The exchange topic name does not conform to the rules(__amqp_exchange__exchangeName)."); } + @Override + public CompletableFuture queueBind(String queue, String routingKey, Map arguments) { + this.bindings.add(new Binding(queue, "queue", routingKey, arguments)); + String bindingsJson; + try { + bindingsJson = JSON_MAPPER.writeValueAsString(this.bindings); + } catch (JsonProcessingException e) { + log.error("Failed to bind queue {} to exchange {}", queue, exchangeName, e); + return FutureUtil.failedFuture(e); + } + CompletableFuture future = new CompletableFuture<>(); + this.persistentTopic.getManagedLedger().asyncSetProperty(BINDINGS, bindingsJson, + new AsyncCallbacks.UpdatePropertiesCallback() { + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + PersistentExchange.this.exchangeMessageRouter.addBinding(queue, "queue", routingKey, arguments); + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + log.error("Failed to save binding metadata for bind operation.", exception); + future.completeExceptionally(exception); + } + }, null); + return future; + } + + @Override + public CompletableFuture queueUnBind(String queue, String routingKey, Map arguments) { + this.bindings.remove(new Binding(queue, "queue", routingKey, arguments)); + String bindingsJson; + try { + bindingsJson = JSON_MAPPER.writeValueAsString(this.bindings); + } catch (JsonProcessingException e) { + log.error("Failed to unbind queue {} to exchange {}", queue, exchangeName, e); + return FutureUtil.failedFuture(e); + } + CompletableFuture future = new CompletableFuture<>(); + this.persistentTopic.getManagedLedger().asyncSetProperty("BINDINGS", bindingsJson, + new AsyncCallbacks.UpdatePropertiesCallback() { + @Override + public void updatePropertiesComplete(Map properties, Object ctx) { + PersistentExchange.this.exchangeMessageRouter.removeBinding(queue, "queue", + routingKey, arguments); + future.complete(null); + } + + @Override + public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) { + log.error("Failed to save binding metadata for unbind operation.", exception); + future.completeExceptionally(exception); + } + }, null); + return future; + } + } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/LookupHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/LookupHandler.java index c2a0fce0..96c10097 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/LookupHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/LookupHandler.java @@ -31,6 +31,6 @@ public interface LookupHandler extends Closeable { * @return Pair consist of brokerHost and brokerPort */ CompletableFuture> findBroker(TopicName topicName, - String protocolHandlerName) throws Exception; + String protocolHandlerName); } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java index f4eda176..a3bcd300 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/PulsarServiceLookupHandler.java @@ -49,7 +49,7 @@ public PulsarServiceLookupHandler(ProxyConfiguration proxyConfig, PulsarService @Override public CompletableFuture> findBroker(TopicName topicName, - String protocolHandlerName) throws Exception { + String protocolHandlerName) { CompletableFuture> lookupResult = new CompletableFuture<>(); // lookup the broker for the given topic diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/AmqpProxyClientChannel.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/AmqpProxyClientChannel.java new file mode 100644 index 00000000..59519871 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/AmqpProxyClientChannel.java @@ -0,0 +1,220 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.proxy.v2; + +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ClientChannelMethodProcessor; +import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindOkBody; + + +@Slf4j +public class AmqpProxyClientChannel implements ClientChannelMethodProcessor { + + private final Integer channelId; + private final ProxyBrokerConnection conn; + + public AmqpProxyClientChannel(Integer channelId, ProxyBrokerConnection conn) { + this.channelId = channelId; + this.conn = conn; + } + + @Override + public void receiveChannelOpenOk() { + conn.getChannelState().put(channelId, ProxyBrokerConnection.ChannelState.OPEN); + conn.getProxyConnection().getServerChannelMap().get(channelId).initComplete(); + } + + @Override + public void receiveChannelAlert(int replyCode, AMQShortString replyText, FieldTable details) { + // nothing to do + } + + @Override + public void receiveAccessRequestOk(int ticket) { + // nothing to do + } + + @Override + public void receiveExchangeDeclareOk() { + conn.getClientChannel().writeAndFlush(new ExchangeDeclareOkBody().generateFrame(channelId)); + } + + @Override + public void receiveExchangeDeleteOk() { + // nothing to do + } + + @Override + public void receiveExchangeBoundOk(int replyCode, AMQShortString replyText) { + // nothing to do + } + + @Override + public void receiveQueueBindOk() { + conn.getClientChannel().writeAndFlush(new QueueBindOkBody().generateFrame(channelId)); + } + + @Override + public void receiveQueueUnbindOk() { + conn.getClientChannel().writeAndFlush(new QueueUnbindOkBody().generateFrame(channelId)); + } + + @Override + public void receiveQueueDeclareOk(AMQShortString queue, long messageCount, long consumerCount) { + conn.getClientChannel().writeAndFlush( + new QueueDeclareOkBody(queue, messageCount, consumerCount).generateFrame(channelId)); + } + + @Override + public void receiveQueuePurgeOk(long messageCount) { + // nothing to do + } + + @Override + public void receiveQueueDeleteOk(long messageCount) { + // nothing to do + } + + @Override + public void receiveBasicRecoverSyncOk() { + // nothing to do + } + + @Override + public void receiveBasicQosOk() { + // nothing to do + } + + @Override + public void receiveBasicConsumeOk(AMQShortString consumerTag) { + conn.getClientChannel().writeAndFlush(new BasicConsumeOkBody(consumerTag).generateFrame(channelId)); + } + + @Override + public void receiveBasicCancelOk(AMQShortString consumerTag) { + // nothing to do + } + + @Override + public void receiveBasicReturn(int replyCode, AMQShortString replyText, AMQShortString exchange, + AMQShortString routingKey) { + // nothing to do + } + + @Override + public void receiveBasicDeliver(AMQShortString consumerTag, long deliveryTag, boolean redelivered, + AMQShortString exchange, AMQShortString routingKey) { + // nothing to do + } + + @Override + public void receiveBasicGetOk(long deliveryTag, boolean redelivered, AMQShortString exchange, + AMQShortString routingKey, long messageCount) { + // nothing to do + } + + @Override + public void receiveBasicGetEmpty() { + // nothing to do + } + + @Override + public void receiveTxSelectOk() { + // nothing to do + } + + @Override + public void receiveTxCommitOk() { + // nothing to do + } + + @Override + public void receiveTxRollbackOk() { + // nothing to do + } + + @Override + public void receiveConfirmSelectOk() { + // nothing to do + } + + @Override + public void receiveChannelFlow(boolean active) { + // nothing to do + } + + @Override + public void receiveChannelFlowOk(boolean active) { + // nothing to do + } + + @Override + public void receiveChannelClose(int replyCode, AMQShortString replyText, int classId, int methodId) { + // nothing to do + } + + @Override + public void receiveChannelCloseOk() { + if (log.isDebugEnabled()) { + log.debug("ProxyClientChannel receive channel close ok request."); + } + conn.getChannelState().put(channelId, ProxyBrokerConnection.ChannelState.CLOSE); + AtomicBoolean allClose = new AtomicBoolean(true); + conn.getProxyConnection().getConnectionMap().values().forEach(conn -> { + if (conn.getChannelState().getOrDefault(channelId, null) + == ProxyBrokerConnection.ChannelState.OPEN) { + allClose.set(false); + } + }); + if (allClose.get()) { + conn.getProxyConnection().writeFrame(ChannelCloseOkBody.INSTANCE.generateFrame(channelId)); + } + } + + @Override + public void receiveMessageContent(QpidByteBuffer data) { + // nothing to do + } + + @Override + public void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize) { + // nothing to do + } + + @Override + public boolean ignoreAllButCloseOk() { + return false; + } + + @Override + public void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue) { + // nothing to do + } + + @Override + public void receiveBasicAck(long deliveryTag, boolean multiple) { + // nothing to do + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/AmqpProxyServerChannel.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/AmqpProxyServerChannel.java new file mode 100644 index 00000000..f4f5308a --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/AmqpProxyServerChannel.java @@ -0,0 +1,443 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.proxy.v2; + +import static org.apache.qpid.server.protocol.ErrorCodes.COMMAND_INVALID; +import static org.apache.qpid.server.protocol.ErrorCodes.FRAME_ERROR; +import static org.apache.qpid.server.protocol.ErrorCodes.INTERNAL_ERROR; +import static org.apache.qpid.server.protocol.ErrorCodes.MESSAGE_TOO_LARGE; +import static org.apache.qpid.server.transport.util.Functions.hex; + +import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange; +import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils; +import java.io.UnsupportedEncodingException; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.message.MessageDestination; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.IncomingMessage; +import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentBody; +import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; +import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareBody; +import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; +import org.apache.qpid.server.protocol.v0_8.transport.QueueBindBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody; +import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindBody; +import org.apache.qpid.server.protocol.v0_8.transport.ServerChannelMethodProcessor; + +@Slf4j +public class AmqpProxyServerChannel implements ServerChannelMethodProcessor { + + private final Integer channelId; + private final ProxyClientConnection proxyConnection; + private boolean isInit; + + private IncomingMessage currentMessage; + + public AmqpProxyServerChannel(Integer channelId, ProxyClientConnection proxyConnection) { + this.channelId = channelId; + this.proxyConnection = proxyConnection; + } + + public CompletableFuture getConn(String topic) { + return proxyConnection.getBrokerConnection(topic); + } + + @Override + public void receiveAccessRequest(AMQShortString realm, boolean exclusive, boolean passive, boolean active, + boolean write, boolean read) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive access request."); + } + } + + @Override + public void receiveExchangeDeclare(AMQShortString exchange, AMQShortString type, boolean passive, boolean durable, + boolean autoDelete, boolean internal, boolean nowait, FieldTable arguments) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive exchange declare request, exchange: {}, type: {}, passive: {}," + + "durable: {}, autoDelete: {}, internal: {}, nowait: {}, arguments: {}.", + exchange, type, passive, durable, autoDelete, internal, nowait, arguments); + } + getConn(exchange.toString()).thenAccept(conn -> { + initChannelIfNeeded(conn); + ExchangeDeclareBody exchangeDeclareBody = new ExchangeDeclareBody( + 0, exchange, type, passive, durable, autoDelete, internal, nowait, arguments); + conn.getChannel().writeAndFlush(exchangeDeclareBody.generateFrame(channelId)); + }); + } + + @Override + public void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive exchange delete request, exchange: {}, ifUnused: {}, nowait: {}.", + exchange, ifUnused, nowait); + } + } + + @Override + public void receiveExchangeBound(AMQShortString exchange, AMQShortString routingKey, AMQShortString queue) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive exchange bound request, exchange: {}, routingKey: {}, queue: {}.", + exchange, routingKey, queue); + } + } + + @Override + public void receiveQueueDeclare(AMQShortString queue, boolean passive, boolean durable, boolean exclusive, + boolean autoDelete, boolean nowait, FieldTable arguments) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive queue declare request, queue: {}, passive: {}," + + "durable: {}, exclusive: {}, autoDelete: {}, nowait: {}, arguments: {}.", + queue, passive, durable, exclusive, autoDelete, nowait, arguments); + } + getConn(queue.toString()).thenAccept(conn -> { + initChannelIfNeeded(conn); + log.info("[ProxyServerChannel] write receiveExchangeDeclare frame"); + QueueDeclareBody queueDeclareBody = new QueueDeclareBody( + 0, queue, passive, durable, exclusive, autoDelete, nowait, arguments); + conn.getChannel().writeAndFlush(queueDeclareBody.generateFrame(channelId)); + }); + } + + @Override + public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQShortString bindingKey, + boolean nowait, FieldTable arguments) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive queue bind request, queue: {}, exchange: {}, bindingKey: {}, " + + "nowait: {}, arguments: {}.", queue, exchange, bindingKey, nowait, arguments); + } + getConn(exchange.toString()).thenAccept(conn -> { + initChannelIfNeeded(conn); + QueueBindBody queueBindBody = new QueueBindBody(0, queue, exchange, bindingKey, nowait, arguments); + conn.getChannel().writeAndFlush(queueBindBody.generateFrame(channelId)); + }); + } + + @Override + public void receiveQueuePurge(AMQShortString queue, boolean nowait) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive queue purge request, queue: {}, nowait: {}.", queue, nowait); + } + } + + @Override + public void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive queue purge request, queue: {}, ifUnused: {}, nowait: {}.", + queue, ifUnused, nowait); + } + } + + @Override + public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AMQShortString bindingKey, + FieldTable arguments) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive queue unbind request, queue: {}, exchange: {}, bindingKey: {}, " + + "arguments: {}.", queue, exchange, bindingKey, arguments); + } + getConn(exchange.toString()).thenAccept(conn -> { + initChannelIfNeeded(conn); + QueueUnbindBody command = new QueueUnbindBody(0, queue, exchange, bindingKey, arguments); + conn.getChannel().writeAndFlush(command.generateFrame(channelId)); + }); + } + + @Override + public void receiveBasicRecover(boolean requeue, boolean sync) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic recover request, requeue: {}, sync: {}.", requeue, sync); + } + } + + @Override + public void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic recover request, prefetchSize: {}, prefetchCount: {}, " + + "global: {}.", prefetchSize, prefetchCount, global); + } + } + + @Override + public void receiveBasicConsume(AMQShortString queue, AMQShortString consumerTag, boolean noLocal, boolean noAck, + boolean exclusive, boolean nowait, FieldTable arguments) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic consume request, queue: {}, consumerTag: {}, noLocal: {}, " + + "noAck: {}, exclusive: {}, nowait: {}, arguments: {}.", + queue, consumerTag, noLocal, noAck, exclusive, noAck, arguments); + } + getConn(queue.toString()).thenAccept(conn -> { + initChannelIfNeeded(conn); + BasicConsumeBody basicConsumeBody = + new BasicConsumeBody(0, queue, consumerTag, noLocal, noAck, exclusive, nowait, arguments); + conn.getChannel().writeAndFlush(basicConsumeBody.generateFrame(channelId)); + }); + } + + @Override + public void receiveBasicCancel(AMQShortString consumerTag, boolean noWait) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic cancel request, consumerTag: {}, noWait: {}.", + consumerTag, noWait); + } + } + + @Override + public void receiveBasicPublish(AMQShortString exchange, AMQShortString routingKey, boolean mandatory, + boolean immediate) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic publish request, exchange: {}, routingKey: {}, mandatory: {}, " + + "immediate: {}.", exchange, routingKey, mandatory, immediate); + } + MessagePublishInfo info = new MessagePublishInfo(exchange, immediate, + mandatory, routingKey); + setPublishFrame(info, null); + } + + private void setPublishFrame(MessagePublishInfo info, final MessageDestination e) { + currentMessage = new IncomingMessage(info); + currentMessage.setMessageDestination(e); + } + + @Override + public void receiveBasicGet(AMQShortString queue, boolean noAck) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic get request, queue: {}, noAck: {}.", queue, noAck); + } + } + + @Override + public void receiveBasicAck(long deliveryTag, boolean multiple) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic ack request, deliveryTag: {}, multiple: {}.", + deliveryTag, multiple); + } + } + + @Override + public void receiveBasicReject(long deliveryTag, boolean requeue) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic reject request, deliveryTag: {}, requeue: {}.", + deliveryTag, requeue); + } + } + + @Override + public void receiveTxSelect() { + // nothing to do + } + + @Override + public void receiveTxCommit() { + // nothing to do + } + + @Override + public void receiveTxRollback() { + // nothing to do + } + + @Override + public void receiveConfirmSelect(boolean nowait) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive confirm select request, nowait: {}.", nowait); + } + } + + @Override + public void receiveChannelFlow(boolean active) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive channel flow request, active: {}.", active); + } + } + + @Override + public void receiveChannelFlowOk(boolean active) { + // nothing to do + } + + @Override + public void receiveChannelClose(int replyCode, AMQShortString replyText, int classId, int methodId) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive channel close request, replyCode: {}, replyText: {}, classId: {}, " + + "methodId: {}.", replyCode, replyText, classId, methodId); + } + proxyConnection.getConnectionMap().values().forEach(conn -> { + if (conn.getChannelState().getOrDefault(channelId, null) + == ProxyBrokerConnection.ChannelState.OPEN) { + conn.getChannel().writeAndFlush( + new ChannelCloseBody(replyCode, replyText, classId, methodId).generateFrame(channelId)); + } + }); + } + + @Override + public void receiveChannelCloseOk() { + // nothing to do + } + + @Override + public void receiveMessageContent(QpidByteBuffer data) { + if (log.isDebugEnabled()) { + int binaryDataLimit = 2000; + log.debug("RECV[{}] MessageContent[data:{}]", channelId, hex(data, binaryDataLimit)); + } + + if (hasCurrentMessage()) { + publishContentBody(new ContentBody(data)); + } else { + proxyConnection.sendConnectionClose(COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame."); + } + } + + private void publishContentBody(ContentBody contentBody) { + if (log.isDebugEnabled()) { + log.debug("content body received on channel {}", channelId); + } + + try { + long currentSize = currentMessage.addContentBodyFrame(contentBody); + if (currentSize > currentMessage.getSize()) { + proxyConnection.sendConnectionClose(FRAME_ERROR, + "More message data received than content header defined"); + } else { + deliverCurrentMessageIfComplete(); + } + } catch (RuntimeException e) { + // we want to make sure we don't keep a reference to the message in the + // event of an error + currentMessage = null; + throw e; + } + } + + @Override + public void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize) { + if (log.isDebugEnabled()) { + log.debug("RECV[{}] MessageHeader[ properties: {{}} bodySize: {}]", channelId, properties, bodySize); + } + + // TODO - maxMessageSize ? + long maxMessageSize = 1024 * 1024 * 10; + if (hasCurrentMessage()) { + if (bodySize > maxMessageSize) { + properties.dispose(); + proxyConnection.sendChannelClose(channelId, MESSAGE_TOO_LARGE, + "Message size of " + bodySize + " greater than allowed maximum of " + maxMessageSize); + } else { + publishContentHeader(new ContentHeaderBody(properties, bodySize)); + } + } else { + properties.dispose(); + proxyConnection.sendConnectionClose(COMMAND_INVALID, + "Attempt to send a content header without first sending a publish frame"); + } + } + + private boolean hasCurrentMessage() { + return currentMessage != null; + } + + private void publishContentHeader(ContentHeaderBody contentHeaderBody) { + if (log.isDebugEnabled()) { + log.debug("Content header received on channel {}", channelId); + } + + currentMessage.setContentHeaderBody(contentHeaderBody); + + deliverCurrentMessageIfComplete(); + } + + private void deliverCurrentMessageIfComplete() { + if (currentMessage.allContentReceived()) { + MessagePublishInfo info = currentMessage.getMessagePublishInfo(); +// String routingKey = AMQShortString.toString(info.getRoutingKey()); + String exchangeName = AMQShortString.toString(info.getExchange()); + Message message; + try { + message = MessageConvertUtils.toPulsarMessage(currentMessage); + } catch (UnsupportedEncodingException e) { + proxyConnection.sendConnectionClose(INTERNAL_ERROR, "Message encoding fail."); + return; + } +// boolean createIfMissing = false; +// String exchangeType = null; +// if (isDefaultExchange(AMQShortString.valueOf(exchangeName)) +// || isBuildInExchange(exchangeName)) { +// // Auto create default and buildIn exchanges if use. +// createIfMissing = true; +// exchangeType = getExchangeType(exchangeName); +// } + + if (exchangeName == null || exchangeName.length() == 0) { + exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE; + } + + String topic = "persistent://public/" + proxyConnection.getVhost() + "/__amqp_exchange__" + exchangeName; + proxyConnection.getProxyServer() + .getProducer(topic) + .thenCompose(producer -> { + return producer.newMessage() + .value(message.getData()) + .properties(message.getProperties()) + .sendAsync(); + }) + .thenAccept(position -> { + if (log.isDebugEnabled()) { + log.debug("Publish message success, position {}", position.toString()); + } + }) + .exceptionally(t -> { + log.error("Failed to write message to exchange", t); + return null; + }); + } + } + + @Override + public boolean ignoreAllButCloseOk() { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel ignore all but close ok."); + } + return false; + } + + @Override + public void receiveBasicNack(long deliveryTag, boolean multiple, boolean requeue) { + if (log.isDebugEnabled()) { + log.debug("ProxyServerChannel receive basic nack request, deliveryTag: {}, multiple: {}, requeue: {}.", + deliveryTag, multiple, requeue); + } + } + + public void initChannelIfNeeded(ProxyBrokerConnection coon) { + if (!isInit) { + ChannelOpenBody channelOpenBody = new ChannelOpenBody(); + coon.getChannel().writeAndFlush(channelOpenBody.generateFrame(channelId)); + } + } + + public void initComplete() { + isInit = true; + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyBrokerConnection.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyBrokerConnection.java new file mode 100644 index 00000000..26fb8543 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyBrokerConnection.java @@ -0,0 +1,236 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.proxy.v2; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.flush.FlushConsolidationHandler; +import io.streamnative.pulsar.handlers.amqp.AmqpClientDecoder; +import io.streamnative.pulsar.handlers.amqp.AmqpEncoder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.server.protocol.ProtocolVersion; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; +import org.apache.qpid.server.protocol.v0_8.transport.ClientChannelMethodProcessor; +import org.apache.qpid.server.protocol.v0_8.transport.ClientMethodProcessor; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation; + +/** + * This class is used to transfer data between proxy and broker. + */ +@Slf4j +public class ProxyBrokerConnection { + + @Getter + private Channel channel; + @Getter + private final Channel clientChannel; + private List connectionCommands; + private Map clientProcessorMap; + @Getter + private ProxyClientConnection proxyConnection; + @Getter + private Map channelState = new ConcurrentHashMap<>(); + @Getter + private boolean isClose = false; + + public ProxyBrokerConnection(String host, Integer port, Channel clientChannel, List connectionCommands, + ProxyClientConnection proxyConnection) + throws InterruptedException { + this.connectionCommands = connectionCommands; + this.clientChannel = clientChannel; + EventLoopGroup group = new NioEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + log.info("channel pipeline {}", ch.pipeline()); + ch.pipeline().addLast("consolidation", new FlushConsolidationHandler( + 1000, true)); + ch.pipeline().addLast("frameEncoder", new AmqpEncoder()); + ch.pipeline().addLast("processor", new ProxyBrokerProcessor(clientChannel)); + } + }); + channel = bootstrap.connect(host, port).sync().channel(); + this.clientProcessorMap = new HashMap<>(); + this.proxyConnection = proxyConnection; + } + + protected enum ChannelState { + OPEN, + CLOSE + } + + private class ProxyBrokerProcessor extends ChannelInboundHandlerAdapter + implements ClientMethodProcessor { + + private ChannelHandlerContext ctx; + private final AmqpClientDecoder clientDecoder; + + public ProxyBrokerProcessor(Channel clientChannel) { + clientDecoder = new AmqpClientDecoder(this, clientChannel); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + this.ctx = ctx; + ChannelFuture future = null; + for (ByteBuf command : connectionCommands) { + command.retain(); + future = ctx.channel().writeAndFlush(command); + } + if (future != null) { + future.addListener(future1 -> { + ctx.channel().read(); + }); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { + clientDecoder.decodeBuffer(((ByteBuf) msg).nioBuffer()); + } finally { + ((ByteBuf) msg).release(); + } + } + + public synchronized void writeFrame(AMQDataBlock frame) { + if (log.isDebugEnabled()) { + log.debug("Write data to broker by proxy, frame: {}.", frame); + } + this.ctx.writeAndFlush(frame); + } + + @Override + public void receiveConnectionStart(short versionMajor, short versionMinor, FieldTable serverProperties, + byte[] mechanisms, byte[] locales) { + // nothing to do + } + + @Override + public void receiveConnectionSecure(byte[] challenge) { + // nothing to do + } + + @Override + public void receiveConnectionRedirect(AMQShortString host, AMQShortString knownHosts) { + // nothing to do + } + + @Override + public void receiveConnectionTune(int channelMax, long frameMax, int heartbeat) { + // nothing to do + } + + @Override + public void receiveConnectionOpenOk(AMQShortString knownHosts) { + // nothing to do + } + + @Override + public ProtocolVersion getProtocolVersion() { + return ProtocolVersion.v0_91; + } + + @Override + public ClientChannelMethodProcessor getChannelMethodProcessor(int channelId) { + return clientProcessorMap.computeIfAbsent(channelId, + __ -> new AmqpProxyClientChannel(channelId, ProxyBrokerConnection.this)); + } + + @Override + public void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId) { + for (ProxyBrokerConnection conn : proxyConnection.getConnectionMap().values()) { + conn.getChannel().writeAndFlush( + new ConnectionCloseBody(getProtocolVersion(), replyCode, replyText, classId, methodId) + .generateFrame(0)); + } + } + + @Override + public void receiveConnectionCloseOk() { + isClose = true; + boolean allClose = true; + for (ProxyBrokerConnection conn : proxyConnection.getConnectionMap().values()) { + if (!conn.isClose()) { + allClose = false; + break; + } + } + if (allClose) { + proxyConnection.writeFrame(ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9.generateFrame(0)); + } + } + + @Override + public void receiveHeartbeat() { + // nothing to do + } + + @Override + public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) { + // nothing to do + } + + @Override + public void setCurrentMethod(int classId, int methodId) { + // nothing to do + } + + @Override + public boolean ignoreAllButCloseOk() { + return false; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + isClose = true; + boolean allClose = true; + for (ProxyBrokerConnection conn : proxyConnection.getConnectionMap().values()) { + if (!conn.isClose()) { + allClose = false; + break; + } else { + conn.channel.close(); + } + } + if (allClose) { + proxyConnection.getCtx().close(); + } + } + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyClientConnection.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyClientConnection.java new file mode 100644 index 00000000..0de8a26a --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyClientConnection.java @@ -0,0 +1,321 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.proxy.v2; + +import static java.nio.charset.StandardCharsets.US_ASCII; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; +import io.streamnative.pulsar.handlers.amqp.AmqpBrokerDecoder; +import io.streamnative.pulsar.handlers.amqp.AmqpProtocolHandler; +import io.streamnative.pulsar.handlers.amqp.proxy.ProxyConfiguration; +import io.streamnative.pulsar.handlers.amqp.proxy.PulsarServiceLookupHandler; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.qpid.server.QpidException; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.ProtocolVersion; +import org.apache.qpid.server.protocol.v0_8.AMQShortString; +import org.apache.qpid.server.protocol.v0_8.FieldTable; +import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; +import org.apache.qpid.server.protocol.v0_8.transport.AMQMethodBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody; +import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody; +import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody; +import org.apache.qpid.server.protocol.v0_8.transport.MethodRegistry; +import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation; +import org.apache.qpid.server.protocol.v0_8.transport.ServerChannelMethodProcessor; +import org.apache.qpid.server.protocol.v0_8.transport.ServerMethodProcessor; + +/** + * Proxy connection, it used to manage RabbitMQ client connection. + */ +@Slf4j +public class ProxyClientConnection extends ChannelInboundHandlerAdapter + implements ServerMethodProcessor { + + private final ProxyConfiguration config; + @Getter + private final Map connectionMap = new ConcurrentHashMap<>(); + @Getter + private final Map serverChannelMap = new ConcurrentHashMap<>(); + private final List connectionCommands = new ArrayList<>(); + private final AmqpBrokerDecoder decoder; + private State state; + private final ProtocolVersion protocolVersion; + private final MethodRegistry methodRegistry; + @Getter + private ChannelHandlerContext ctx; + private final PulsarServiceLookupHandler lookupHandler; + @Getter + private String vhost; + @Getter + private final ProxyServiceV2 proxyServer; + @Getter + private int currentClassId; + @Getter + private int currentMethodId; + + enum State { + INIT, + CONNECTED, + FAILED + } + + public ProxyClientConnection(ProxyConfiguration config, + PulsarServiceLookupHandler lookupHandler, + ProxyServiceV2 proxyServer) { + this.config = config; + this.decoder = new AmqpBrokerDecoder(this); + this.state = State.INIT; + protocolVersion = ProtocolVersion.v0_91; + methodRegistry = new MethodRegistry(protocolVersion); + this.lookupHandler = lookupHandler; + this.proxyServer = proxyServer; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection channel active."); + } + this.ctx = ctx; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ByteBuf byteBuf = (ByteBuf) msg; + if (state.equals(State.INIT)) { + byteBuf.retain(); + connectionCommands.add(byteBuf); + } + try { + decoder.decodeBuffer(QpidByteBuffer.wrap(byteBuf.nioBuffer())); + } catch (Exception e) { + this.state = State.FAILED; + log.error("ProxyClientConnection failed to decode requests.", e); + } finally { + byteBuf.release(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("ProxyClientConnection exception caught in channel", cause); + this.state = State.FAILED; + } + + @Override + public void receiveConnectionStartOk(FieldTable clientProperties, AMQShortString mechanism, byte[] response, + AMQShortString locale) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive connection start request, clientProperties: {}, mechanism: {}, " + + "response: {}, locale: {}.", clientProperties, mechanism, response, locale); + } + ConnectionTuneBody tuneBody = + methodRegistry.createConnectionTuneBody(config.getAmqpMaxNoOfChannels(), + config.getAmqpMaxFrameSize(), config.getAmqpHeartBeat()); + writeFrame(tuneBody.generateFrame(0)); + } + + @Override + public void receiveConnectionSecureOk(byte[] response) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive connection secure ok request, response: {}.", response); + } + ConnectionTuneBody tuneBody = + methodRegistry.createConnectionTuneBody(config.getAmqpMaxNoOfChannels(), + config.getAmqpMaxFrameSize(), config.getAmqpHeartBeat()); + writeFrame(tuneBody.generateFrame(0)); + } + + @Override + public void receiveConnectionTuneOk(int channelMax, long frameMax, int heartbeat) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive connection tune ok request, channelMax: {}, frameMax: {}, " + + "heartbeat: {}.", channelMax, frameMax, heartbeat); + } + } + + @Override + public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString capabilities, boolean insist) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive connection open request, virtualHost: {}, capabilities: {}, " + + "insist: {}.", virtualHost, capabilities, insist); + } + this.state = State.CONNECTED; + + String virtualHostStr = AMQShortString.toString(virtualHost); + if ((virtualHostStr != null) && virtualHostStr.charAt(0) == '/') { + virtualHostStr = virtualHostStr.substring(1); + if (StringUtils.isEmpty(virtualHostStr)){ + virtualHostStr = "default"; + } + } + this.vhost = virtualHostStr; + + ConnectionOpenOkBody okBody = + methodRegistry.createConnectionOpenOkBody(virtualHost); + writeFrame(okBody.generateFrame(0)); + } + + @Override + public void receiveChannelOpen(int channelId) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive channel open request, channelId: {}.", channelId); + } + AmqpProxyServerChannel channel = new AmqpProxyServerChannel(channelId, this); + serverChannelMap.put(channelId, channel); + ChannelOpenOkBody okBody = + methodRegistry.createChannelOpenOkBody(); + writeFrame(okBody.generateFrame(channelId)); + } + + @Override + public ProtocolVersion getProtocolVersion() { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection get protocol version."); + } + return ProtocolVersion.v0_91; + } + + @Override + public ServerChannelMethodProcessor getChannelMethodProcessor(int channelId) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection get channel method processor, channelId: {}.", channelId); + } + return serverChannelMap.get(channelId); + } + + @Override + public void receiveConnectionClose(int replyCode, AMQShortString replyText, int classId, int methodId) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive connection close request, replyCode: {}, replyText: {}, " + + "classId: {}, methodId: {}.", replyCode, replyText, classId, methodId); + } + for (ProxyBrokerConnection conn : getConnectionMap().values()) { + conn.getChannel().writeAndFlush( + new ConnectionCloseBody( + getProtocolVersion(), replyCode, replyText, classId, methodId).generateFrame(0)); + } + } + + @Override + public void receiveConnectionCloseOk() { + // nothing to do + } + + @Override + public void receiveHeartbeat() { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive heart beat request."); + } + for (ProxyBrokerConnection conn : getConnectionMap().values()) { + conn.getChannel().writeAndFlush(new HeartbeatBody()); + } + } + + @Override + public void receiveProtocolHeader(ProtocolInitiation protocolInitiation) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection receive protocol header request, protocolInitiation: {}", + protocolInitiation); + } + decoder.setExpectProtocolInitiation(false); + try { + ProtocolVersion pv = protocolInitiation.checkVersion(); // Fails if not correct + AMQMethodBody responseBody = this.methodRegistry.createConnectionStartBody( + protocolVersion.getMajorVersion(), + pv.getActualMinorVersion(), + null, + "PLAIN token".getBytes(US_ASCII), + "en_US".getBytes(US_ASCII)); + writeFrame(responseBody.generateFrame(0)); + } catch (QpidException e) { + log.error("Received unsupported protocol initiation for protocol version: {} ", getProtocolVersion(), e); + writeFrame(new ProtocolInitiation(ProtocolVersion.v0_91)); + throw new ProxyServiceV2Exception(e); + } + } + + @Override + public void setCurrentMethod(int classId, int methodId) { + if (classId != 0 && methodId != 0) { + this.currentClassId = classId; + this.currentMethodId = methodId; + } + } + + @Override + public boolean ignoreAllButCloseOk() { + return false; + } + + public synchronized void writeFrame(AMQDataBlock frame) { + if (log.isDebugEnabled()) { + log.debug("ProxyClientConnection write frame: {}", frame); + } + this.ctx.writeAndFlush(frame); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + for (ByteBuf command : connectionCommands) { + ReferenceCountUtil.safeRelease(command); + } + } + + public void sendChannelClose(int channelId, int replyCode, final String replyText) { + this.ctx.writeAndFlush(new ChannelCloseBody(replyCode, AMQShortString.createAMQShortString(replyText), + currentClassId, currentMethodId).generateFrame(channelId)); + } + + public void sendConnectionClose(int replyCode, String replyText) { + this.ctx.writeAndFlush(new ConnectionCloseBody(getProtocolVersion(), replyCode, + AMQShortString.createAMQShortString(replyText), currentClassId, currentMethodId).generateFrame(0)); + } + + public CompletableFuture getBrokerConnection(String topic) { + CompletableFuture> future = + lookupHandler.findBroker( + TopicName.get("persistent://public/" + vhost + "/" + topic), + AmqpProtocolHandler.PROTOCOL_NAME); + return future.thenApply(pair -> connectionMap.computeIfAbsent( + pair.getLeft() + ":" + pair.getRight(), __ -> { + try { + return new ProxyBrokerConnection( + pair.getLeft(), pair.getRight(), ctx.channel(), connectionCommands, ProxyClientConnection.this); + } catch (InterruptedException e) { + log.error("Failed to create proxy broker connection with address {}:{} for topic {}", + pair.getLeft(), pair.getRight(), topic, e); + return null; + } + })); + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyServiceV2.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyServiceV2.java new file mode 100644 index 00000000..148e665c --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyServiceV2.java @@ -0,0 +1,99 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.proxy.v2; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.flush.FlushConsolidationHandler; +import io.streamnative.pulsar.handlers.amqp.AmqpEncoder; +import io.streamnative.pulsar.handlers.amqp.AmqpProxyDirectHandler; +import io.streamnative.pulsar.handlers.amqp.proxy.ProxyConfiguration; +import io.streamnative.pulsar.handlers.amqp.proxy.PulsarServiceLookupHandler; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.util.FutureUtil; + +/** + * Proxy server, the proxy server should be an individual service, it could be scale up. + */ +@Slf4j +public class ProxyServiceV2 { + + private final ProxyConfiguration config; + private final PulsarService pulsar; + private final Map>> producerMap; + private PulsarServiceLookupHandler lookupHandler; + + public ProxyServiceV2(ProxyConfiguration config, PulsarService pulsarService) { + this.config = config; + this.pulsar = pulsarService; + this.producerMap = new ConcurrentHashMap<>(); + } + + public void start() throws Exception { + this.lookupHandler = new PulsarServiceLookupHandler(config, pulsar); + // listen to the proxy port to receive amqp commands + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast("frameEncoder", new AmqpEncoder()); + ch.pipeline().addLast("consolidation", new FlushConsolidationHandler( + config.getAmqpExplicitFlushAfterFlushes(), true)); + ch.pipeline().addLast("handler", + new ProxyClientConnection(config, lookupHandler, ProxyServiceV2.this)); + ch.pipeline().addLast("directHandler", new AmqpProxyDirectHandler()); + } + }); + bootstrap.bind(config.getAmqpProxyPort()).sync(); + } + + public CompletableFuture> getProducer(String topic) { + PulsarClient client; + try { + client = pulsar.getClient(); + } catch (PulsarServerException e) { + return FutureUtil.failedFuture(e); + } + return producerMap.computeIfAbsent(topic, k -> { + CompletableFuture> producerFuture = new CompletableFuture<>(); + client.newProducer() + .topic(topic) + .enableBatching(false) + .createAsync() + .thenAccept(producerFuture::complete) + .exceptionally(t -> { + producerFuture.completeExceptionally(t); + producerMap.remove(topic, producerFuture); + return null; + }); + return producerFuture; + }); + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyServiceV2Exception.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyServiceV2Exception.java new file mode 100644 index 00000000..b5e73244 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/ProxyServiceV2Exception.java @@ -0,0 +1,22 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.proxy.v2; + +public class ProxyServiceV2Exception extends RuntimeException{ + + public ProxyServiceV2Exception(Exception e) { + super(e); + } + +} diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/package-info.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/package-info.java new file mode 100644 index 00000000..370b4bb1 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/v2/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package info. + */ +package io.streamnative.pulsar.handlers.amqp.proxy.v2; \ No newline at end of file diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java index 6af83ccf..c74f847c 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java @@ -52,8 +52,8 @@ public void exchangeTopicNameValidate() { try { new PersistentExchange( exchangeName, exchangeType, exchangeTopic1, true, false, false, null, - Executors.newSingleThreadExecutor(), 200); - } catch (IllegalArgumentException e) { + Executors.newSingleThreadExecutor(), 200, true); + } catch (Exception e) { fail("Failed to new PersistentExchange. errorMsg: " + e.getMessage()); } @@ -63,8 +63,8 @@ public void exchangeTopicNameValidate() { try { new PersistentExchange( exchangeName, exchangeType, exchangeTopic2, true, false, false, null, - Executors.newSingleThreadExecutor(), 200); - } catch (IllegalArgumentException e) { + Executors.newSingleThreadExecutor(), 200, false); + } catch (Exception e) { assertNotNull(e); log.info("This is expected behavior."); } diff --git a/pom.xml b/pom.xml index 625c2c57..d4fe011a 100644 --- a/pom.xml +++ b/pom.xml @@ -48,12 +48,13 @@ 1.4.9 3.0.rc1 - 3.1.1 + 3.1.2 3.10.1 3.0.0-M1 1.4.1.Final - 8.29 + 8.37 4.2.2 + 4.2.2 8.0.0 5.8.0 3.15.0 @@ -300,11 +301,24 @@ com.github.spotbugs spotbugs-maven-plugin ${spotbugs-maven-plugin.version} - - resources/findbugsExclude.xml - + + + com.github.spotbugs + spotbugs + ${spotbugs.version} + + + + + + + + + + + maven-compiler-plugin ${maven-compiler-plugin.version} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java index 56c162e0..5616cdd6 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandlerTestBase.java @@ -124,6 +124,7 @@ protected void resetConfig() { amqpConfig.setBrokerEntryMetadataInterceptors( Sets.newHashSet("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor")); amqpConfig.setBrokerShutdownTimeoutMs(0L); + amqpConfig.setDefaultNumPartitions(1); // set protocol related config URL testHandlerUrl = this.getClass().getClassLoader().getResource("test-protocol-handler.nar"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java index 48e66ebf..673c7b02 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/AmqpTestBase.java @@ -37,6 +37,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; + /** * Base test class for RabbitMQ Client. */ @@ -114,7 +115,7 @@ protected void basicDirectConsume(String vhost, boolean exclusiveConsume) throws String routingKey = "test.key"; String queueName = randQuName(); - Connection conn = getConnection(vhost, false); + Connection conn = getConnection(vhost, true); Channel channel = conn.createChannel(); channel.exchangeDeclare(exchangeName, "direct", true); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/ProxyV2Test.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/ProxyV2Test.java new file mode 100644 index 00000000..10bb5481 --- /dev/null +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/ProxyV2Test.java @@ -0,0 +1,92 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp; + +import io.streamnative.pulsar.handlers.amqp.rabbitmq.RabbitMQTestCase; +import java.util.concurrent.CountDownLatch; +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * AMQP proxy related test. + */ +@Slf4j +public class ProxyV2Test extends AmqpTestBase { + + @BeforeClass + @Override + public void setup() throws Exception { + setBrokerCount(3); + ((AmqpServiceConfiguration) this.conf).setAmqpProxyV2Enable(true); + super.setup(); + } + + @Test + public void rabbitMQProxyTest() throws Exception { + int proxyPort = getProxyPort(); + RabbitMQTestCase rabbitMQTestCase = new RabbitMQTestCase(admin); + + rabbitMQTestCase.basicFanoutTest(proxyPort, "rabbitmq-proxy-test1", + "vhost1", false, 2); + rabbitMQTestCase.basicFanoutTest(proxyPort, "rabbitmq-proxy-test2", + "vhost2", false, 3); + rabbitMQTestCase.basicFanoutTest(proxyPort, "rabbitmq-proxy-test3", + "vhost3", false, 2); + + CountDownLatch countDownLatch = new CountDownLatch(3); + new Thread(() -> { + try { + rabbitMQTestCase.basicFanoutTest(proxyPort, "rabbitmq-proxy-test4", + "vhost1", false, 2); + countDownLatch.countDown(); + } catch (Exception e) { + log.error("Test4 error for vhost1.", e); + } + }).start(); + new Thread(() -> { + try { + rabbitMQTestCase.basicFanoutTest(proxyPort, "rabbitmq-proxy-test5", + "vhost2", false, 3); + countDownLatch.countDown(); + } catch (Exception e) { + log.error("Test5 error for vhost2.", e); + } + }).start(); + new Thread(() -> { + try { + rabbitMQTestCase.basicFanoutTest(proxyPort, "rabbitmq-proxy-test6", + "vhost3", false, 3); + countDownLatch.countDown(); + } catch (Exception e) { + log.error("Test6 error for vhost3.", e); + } + }).start(); + countDownLatch.await(); + } + + @Test + public void unloadBundleTest() throws Exception { + int proxyPort = getProxyPort(); + + RabbitMQTestCase rabbitMQTestCase = new RabbitMQTestCase(admin); + rabbitMQTestCase.basicFanoutTest(proxyPort, "unload-bundle-test1", + "vhost1", true, 3); + rabbitMQTestCase.basicFanoutTest(proxyPort, "unload-bundle-test2", + "vhost2", true, 2); + rabbitMQTestCase.basicFanoutTest(proxyPort, "unload-bundle-test3", + "vhost3", true, 2); + } + +} diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestCase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestCase.java index db768082..16b81e53 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestCase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestCase.java @@ -154,7 +154,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp countDownLatch.await(); System.out.println("[" + testName + "] Test finish. Receive total msg cnt: " + totalReceiveMsgCnt); - Assert.assertEquals(expectedMsgCntPerQueue * queueList.size(), totalReceiveMsgCnt.get()); + Assert.assertEquals(totalReceiveMsgCnt.get(), expectedMsgCntPerQueue * queueList.size()); } public void defaultEmptyExchangeTest(int aopPort, String vhost) throws Exception {