From 91fc02d2e2ec24e09cfc4e4329543b18c32ae7b5 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Mon, 6 May 2019 10:54:14 +0800 Subject: [PATCH 1/6] [ISSUE #51]Consumer should support bulk subscription queue and poll message by offset . --- .../io/openmessaging/consumer/Consumer.java | 76 ++++++++++++++++++- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 4391dea9..b4bd3aa2 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -97,6 +97,20 @@ public interface Consumer extends ServiceLifecycle, Client { */ void bindQueue(String queueName); + /** + * Bind the {@code Consumer} to a collection of queue in pull model, user can use {@link Consumer#receive(long)} to get + * messages from a collection of queue. + *

+ * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is + * coming. + * + * @param queueNames a collection of queues. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueue(List queueNames); + /** * Bind the {@code Consumer} to a specified queue, with a {@code MessageListener}. *

@@ -111,6 +125,20 @@ public interface Consumer extends ServiceLifecycle, Client { */ void bindQueue(String queueName, MessageListener listener); + /** + * Bind the {@code Consumer} to a collection of queue, with a {@code MessageListener}. + *

+ * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is + * coming. + * + * @param queueNames a collection of queues. + * @param listener a specified listener to receive new message. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueues(List queueNames, MessageListener listener); + /** * Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}. *

@@ -125,6 +153,20 @@ public interface Consumer extends ServiceLifecycle, Client { */ void bindQueue(String queueName, BatchMessageListener listener); + /** + * Bind the {@code Consumer} to a collection of queue, with a {@code BatchMessageListener}. + *

+ * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered + * messages is coming. + * + * @param queueNames a collection of queues. + * @param listener a specified listener to receive new messages. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueues(List queueNames, BatchMessageListener listener); + /** * Unbind the {@code Consumer} from a specified queue. *

@@ -134,6 +176,15 @@ public interface Consumer extends ServiceLifecycle, Client { */ void unbindQueue(String queueName); + /** + * Unbind the {@code Consumer} from a collection of queues. + *

+ * After the success call, this consumer won't receive new message from the specified queue any more. + * + * @param queueNames a collection of queues. + */ + void unbindQueues(List queueNames); + /** * This method is used to find out whether the {@code Consumer} in bind queue. * @@ -142,11 +193,11 @@ public interface Consumer extends ServiceLifecycle, Client { boolean isBindQueue(); /** - * This method is used to find out the queue bind to {@code Consumer}. + * This method is used to find out the collection of queues bind to {@code Consumer}. * - * @return the queue this consumer is bind, or null if the consumer is not bind queue. + * @return the queues this consumer is bind, or null if the consumer is not bind queue. */ - String getBindQueue(); + String getBindQueues(); /** * Adds a {@code ConsumerInterceptor} instance to this consumer. @@ -176,6 +227,25 @@ public interface Consumer extends ServiceLifecycle, Client { */ Message receive(long timeout); + /** + * Receives the next message from the which bind queue,partition and offset of this consumer in pull model. + *

+ * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} + * is shut down. + * + * @param queueName receive message from which queueName in Message Queue. + * @param partition receive message from which partition in Message Queue. + * @param offset receive message from which offset in Message Queue. + * @param timeout receive message will blocked at most timeout milliseconds. + * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + Message receive(String queueName, String partition, long offset, long timeout); + + + /** * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic * should implement in the {@link MessageListener}. From 33ee570330a9de1c8006e23ead6229c64f5b856b Mon Sep 17 00:00:00 2001 From: huzongtang Date: Mon, 6 May 2019 11:27:35 +0800 Subject: [PATCH 2/6] [ISSUE #51]fix some method parameter issue. --- .../src/main/java/io/openmessaging/consumer/Consumer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index b4bd3aa2..5e1680a4 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -197,7 +197,7 @@ public interface Consumer extends ServiceLifecycle, Client { * * @return the queues this consumer is bind, or null if the consumer is not bind queue. */ - String getBindQueues(); + List getBindQueues(); /** * Adds a {@code ConsumerInterceptor} instance to this consumer. @@ -234,7 +234,7 @@ public interface Consumer extends ServiceLifecycle, Client { * is shut down. * * @param queueName receive message from which queueName in Message Queue. - * @param partition receive message from which partition in Message Queue. + * @param partitionId receive message from which partition in Message Queue. * @param offset receive message from which offset in Message Queue. * @param timeout receive message will blocked at most timeout milliseconds. * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. @@ -242,7 +242,7 @@ public interface Consumer extends ServiceLifecycle, Client { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - Message receive(String queueName, String partition, long offset, long timeout); + Message receive(String queueName, int partitionId, long offset, long timeout); From d21632e292fa3fa807f70a53c927017a740fa156 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Mon, 6 May 2019 12:32:40 +0800 Subject: [PATCH 3/6] [ISSUE #51]add some parameters(such as,queueName,partitionId,offset) in the batchReceive method. --- .../src/main/java/io/openmessaging/consumer/Consumer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 5e1680a4..ad289845 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -251,13 +251,16 @@ public interface Consumer extends ServiceLifecycle, Client { * should implement in the {@link MessageListener}. *

* + * @param queueName receive message from which queueName in Message Queue. + * @param partitionId receive message from which partition in Message Queue. + * @param offset receive message from which offset in Message Queue. * @param timeout receive messages will blocked at most timeout milliseconds. * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. * @throws OMSSecurityException when have no authority to receive messages from this queue. * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - List batchReceive(long timeout); + List batchReceive(String queueName, int partitionId, long offset, long timeout); /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using From 54b66acf92afe965370e904670ba5431011ece59 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Mon, 6 May 2019 13:48:29 +0800 Subject: [PATCH 4/6] [ISSUE #51]retain original interface method. --- .../java/io/openmessaging/consumer/Consumer.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index ad289845..455f7852 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -245,6 +245,18 @@ public interface Consumer extends ServiceLifecycle, Client { Message receive(String queueName, int partitionId, long offset, long timeout); + /** + * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic + * should implement in the {@link MessageListener}. + *

+ * + * @param timeout receive messages will blocked at most timeout milliseconds. + * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + List batchReceive(long timeout); /** * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic From ccdc84b02b0e42c6dcc88dd6827958bb94baed55 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Mon, 6 May 2019 22:39:00 +0800 Subject: [PATCH 5/6] [ISSUE #51]adjust some parameters name in the receive and batch receive method. --- .../main/java/io/openmessaging/consumer/Consumer.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 455f7852..ae129e7c 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -20,6 +20,7 @@ import io.openmessaging.Client; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; +import io.openmessaging.annotation.Optional; import io.openmessaging.exception.OMSDestinationException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; @@ -228,21 +229,21 @@ public interface Consumer extends ServiceLifecycle, Client { Message receive(long timeout); /** - * Receives the next message from the which bind queue,partition and offset of this consumer in pull model. + * Receives the next message from the which bind queue,partition and receiptId of this consumer in pull model. *

* This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} * is shut down. * * @param queueName receive message from which queueName in Message Queue. * @param partitionId receive message from which partition in Message Queue. - * @param offset receive message from which offset in Message Queue. + * @param receiptId receive message from which receipt position in Message Queue. * @param timeout receive message will blocked at most timeout milliseconds. * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. * @throws OMSSecurityException when have no authority to receive messages from this queue. * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - Message receive(String queueName, int partitionId, long offset, long timeout); + Message receive(String queueName,@Optional int partitionId, long receiptId, long timeout); /** @@ -265,14 +266,14 @@ public interface Consumer extends ServiceLifecycle, Client { * * @param queueName receive message from which queueName in Message Queue. * @param partitionId receive message from which partition in Message Queue. - * @param offset receive message from which offset in Message Queue. + * @param receiptId receive message from which receipt position in Message Queue. * @param timeout receive messages will blocked at most timeout milliseconds. * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. * @throws OMSSecurityException when have no authority to receive messages from this queue. * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - List batchReceive(String queueName, int partitionId, long offset, long timeout); + List batchReceive(String queueName,@Optional int partitionId, long receiptId, long timeout); /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using From b3e5d3f4e6517a5d7a3c624c25f13b6084445162 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Tue, 7 May 2019 09:42:59 +0800 Subject: [PATCH 6/6] [ISSUE #51]adjust the annotation @Optional position. --- .../src/main/java/io/openmessaging/consumer/Consumer.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index ae129e7c..b871c9d8 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -243,7 +243,8 @@ public interface Consumer extends ServiceLifecycle, Client { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - Message receive(String queueName,@Optional int partitionId, long receiptId, long timeout); + @Optional + Message receive(String queueName, int partitionId, long receiptId, long timeout); /** @@ -273,7 +274,8 @@ public interface Consumer extends ServiceLifecycle, Client { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - List batchReceive(String queueName,@Optional int partitionId, long receiptId, long timeout); + @Optional + List batchReceive(String queueName, int partitionId, long receiptId, long timeout); /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using