Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.openmessaging.Client;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.annotation.Optional;
import io.openmessaging.exception.OMSDestinationException;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSSecurityException;
Expand Down Expand Up @@ -97,6 +98,20 @@ public interface Consumer extends ServiceLifecycle, Client {
*/
void bindQueue(String queueName);

/**
* Bind the {@code Consumer} to a collection of queue in pull model, user can use {@link Consumer#receive(long)} to get
* messages from a collection of queue.
* <p>
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is
* coming.
*
* @param queueNames a collection of queues.
* @throws OMSSecurityException when have no authority to bind to this queue.
* @throws OMSDestinationException when have no given destination in the server.
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
*/
void bindQueue(List<String> queueNames);

/**
* Bind the {@code Consumer} to a specified queue, with a {@code MessageListener}.
* <p>
Expand All @@ -111,6 +126,20 @@ public interface Consumer extends ServiceLifecycle, Client {
*/
void bindQueue(String queueName, MessageListener listener);

/**
* Bind the {@code Consumer} to a collection of queue, with a {@code MessageListener}.
* <p>
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is
* coming.
*
* @param queueNames a collection of queues.
* @param listener a specified listener to receive new message.
* @throws OMSSecurityException when have no authority to bind to this queue.
* @throws OMSDestinationException when have no given destination in the server.
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
*/
void bindQueues(List<String> queueNames, MessageListener listener);

/**
* Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}.
* <p>
Expand All @@ -125,6 +154,20 @@ public interface Consumer extends ServiceLifecycle, Client {
*/
void bindQueue(String queueName, BatchMessageListener listener);

/**
* Bind the {@code Consumer} to a collection of queue, with a {@code BatchMessageListener}.
* <p>
* {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered
* messages is coming.
*
* @param queueNames a collection of queues.
* @param listener a specified listener to receive new messages.
* @throws OMSSecurityException when have no authority to bind to this queue.
* @throws OMSDestinationException when have no given destination in the server.
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
*/
void bindQueues(List<String> queueNames, BatchMessageListener listener);

/**
* Unbind the {@code Consumer} from a specified queue.
* <p>
Expand All @@ -134,6 +177,15 @@ public interface Consumer extends ServiceLifecycle, Client {
*/
void unbindQueue(String queueName);

/**
* Unbind the {@code Consumer} from a collection of queues.
* <p>
* After the success call, this consumer won't receive new message from the specified queue any more.
*
* @param queueNames a collection of queues.
*/
void unbindQueues(List<String> queueNames);

/**
* This method is used to find out whether the {@code Consumer} in bind queue.
*
Expand All @@ -142,11 +194,11 @@ public interface Consumer extends ServiceLifecycle, Client {
boolean isBindQueue();

/**
* This method is used to find out the queue bind to {@code Consumer}.
* This method is used to find out the collection of queues bind to {@code Consumer}.
*
* @return the queue this consumer is bind, or null if the consumer is not bind queue.
* @return the queues this consumer is bind, or null if the consumer is not bind queue.
*/
String getBindQueue();
List<String> getBindQueues();

/**
* Adds a {@code ConsumerInterceptor} instance to this consumer.
Expand Down Expand Up @@ -176,6 +228,25 @@ public interface Consumer extends ServiceLifecycle, Client {
*/
Message receive(long timeout);

/**
* Receives the next message from the which bind queue,partition and receiptId of this consumer in pull model.
* <p>
* This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer}
* is shut down.
*
* @param queueName receive message from which queueName in Message Queue.
* @param partitionId receive message from which partition in Message Queue.
* @param receiptId receive message from which receipt position in Message Queue.
* @param timeout receive message will blocked at most <code>timeout</code> milliseconds.
* @return the next message received from the bind queues, or null if the consumer is concurrently shut down.
* @throws OMSSecurityException when have no authority to receive messages from this queue.
* @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
*/
@Optional
Message receive(String queueName, int partitionId, long receiptId, long timeout);


/**
* Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic
* should implement in the {@link MessageListener}.
Expand All @@ -189,6 +260,23 @@ public interface Consumer extends ServiceLifecycle, Client {
*/
List<Message> batchReceive(long timeout);

/**
* Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic
* should implement in the {@link MessageListener}.
* <p>
*
* @param queueName receive message from which queueName in Message Queue.
* @param partitionId receive message from which partition in Message Queue.
* @param receiptId receive message from which receipt position in Message Queue.
* @param timeout receive messages will blocked at most <code>timeout</code> milliseconds.
* @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down.
* @throws OMSSecurityException when have no authority to receive messages from this queue.
* @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
*/
@Optional
List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout);

/**
* Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using
* manual commit.
Expand Down