diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 4391dea9..b871c9d8 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -20,6 +20,7 @@ import io.openmessaging.Client; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; +import io.openmessaging.annotation.Optional; import io.openmessaging.exception.OMSDestinationException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; @@ -97,6 +98,20 @@ public interface Consumer extends ServiceLifecycle, Client { */ void bindQueue(String queueName); + /** + * Bind the {@code Consumer} to a collection of queue in pull model, user can use {@link Consumer#receive(long)} to get + * messages from a collection of queue. + *

+ * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is + * coming. + * + * @param queueNames a collection of queues. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueue(List queueNames); + /** * Bind the {@code Consumer} to a specified queue, with a {@code MessageListener}. *

@@ -111,6 +126,20 @@ public interface Consumer extends ServiceLifecycle, Client { */ void bindQueue(String queueName, MessageListener listener); + /** + * Bind the {@code Consumer} to a collection of queue, with a {@code MessageListener}. + *

+ * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is + * coming. + * + * @param queueNames a collection of queues. + * @param listener a specified listener to receive new message. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueues(List queueNames, MessageListener listener); + /** * Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}. *

@@ -125,6 +154,20 @@ public interface Consumer extends ServiceLifecycle, Client { */ void bindQueue(String queueName, BatchMessageListener listener); + /** + * Bind the {@code Consumer} to a collection of queue, with a {@code BatchMessageListener}. + *

+ * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered + * messages is coming. + * + * @param queueNames a collection of queues. + * @param listener a specified listener to receive new messages. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueues(List queueNames, BatchMessageListener listener); + /** * Unbind the {@code Consumer} from a specified queue. *

@@ -134,6 +177,15 @@ public interface Consumer extends ServiceLifecycle, Client { */ void unbindQueue(String queueName); + /** + * Unbind the {@code Consumer} from a collection of queues. + *

+ * After the success call, this consumer won't receive new message from the specified queue any more. + * + * @param queueNames a collection of queues. + */ + void unbindQueues(List queueNames); + /** * This method is used to find out whether the {@code Consumer} in bind queue. * @@ -142,11 +194,11 @@ public interface Consumer extends ServiceLifecycle, Client { boolean isBindQueue(); /** - * This method is used to find out the queue bind to {@code Consumer}. + * This method is used to find out the collection of queues bind to {@code Consumer}. * - * @return the queue this consumer is bind, or null if the consumer is not bind queue. + * @return the queues this consumer is bind, or null if the consumer is not bind queue. */ - String getBindQueue(); + List getBindQueues(); /** * Adds a {@code ConsumerInterceptor} instance to this consumer. @@ -176,6 +228,25 @@ public interface Consumer extends ServiceLifecycle, Client { */ Message receive(long timeout); + /** + * Receives the next message from the which bind queue,partition and receiptId of this consumer in pull model. + *

+ * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} + * is shut down. + * + * @param queueName receive message from which queueName in Message Queue. + * @param partitionId receive message from which partition in Message Queue. + * @param receiptId receive message from which receipt position in Message Queue. + * @param timeout receive message will blocked at most timeout milliseconds. + * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + @Optional + Message receive(String queueName, int partitionId, long receiptId, long timeout); + + /** * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic * should implement in the {@link MessageListener}. @@ -189,6 +260,23 @@ public interface Consumer extends ServiceLifecycle, Client { */ List batchReceive(long timeout); + /** + * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic + * should implement in the {@link MessageListener}. + *

+ * + * @param queueName receive message from which queueName in Message Queue. + * @param partitionId receive message from which partition in Message Queue. + * @param receiptId receive message from which receipt position in Message Queue. + * @param timeout receive messages will blocked at most timeout milliseconds. + * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + @Optional + List batchReceive(String queueName, int partitionId, long receiptId, long timeout); + /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using * manual commit.