diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index f0a41026..7f9a2e85 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -19,8 +19,9 @@ import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; -import io.openmessaging.consumer.Consumer; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.message.Message; +import java.util.Arrays; public class PullConsumerApp { public static void main(String[] args) { @@ -29,7 +30,7 @@ public static void main(String[] args) { OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); //Start a PullConsumer to receive messages from the specific queue. - final Consumer consumer = messagingAccessPoint.createConsumer(); + final PullConsumer consumer = messagingAccessPoint.createPullConsumer(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @@ -39,7 +40,7 @@ public void run() { } })); - consumer.bindQueue("NS://HELLO_QUEUE"); + consumer.bindQueue(Arrays.asList("NS://HELLO_QUEUE")); consumer.start(); Message message = consumer.receive(1000); diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java index 98a9a2e5..47ea14eb 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java @@ -21,8 +21,10 @@ import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.manager.ResourceManager; import io.openmessaging.message.Message; +import java.util.Arrays; public class PushConsumerApp { public static void main(String[] args) { @@ -33,7 +35,7 @@ public static void main(String[] args) { //Fetch a ResourceManager to create Queue resource. ResourceManager resourceManager = messagingAccessPoint.resourceManager(); resourceManager.createNamespace("NS://XXXX"); - final Consumer consumer = messagingAccessPoint.createConsumer(); + final PushConsumer consumer = messagingAccessPoint.createPushConsumer(); consumer.start(); //Register a shutdown hook to close the opened endpoints. @@ -49,7 +51,7 @@ public void run() { resourceManager.createQueue(simpleQueue); //This queue doesn't has a source queue, so only the message delivered to the queue directly can //be consumed by this consumer. - consumer.bindQueue(simpleQueue, new MessageListener() { + consumer.bindQueue(Arrays.asList(simpleQueue), new MessageListener() { @Override public void onReceived(Message message, Context context) { System.out.println("Received one message: " + message); @@ -58,7 +60,7 @@ public void onReceived(Message message, Context context) { }); - consumer.unbindQueue(simpleQueue); + consumer.unbindQueue(Arrays.asList(simpleQueue)); consumer.stop(); } diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index c733443c..9084b8a8 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -61,7 +61,7 @@ public void run() { Message message = producer.createMessage( "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); message.header().setBornHost("127.0.0.1").setDurability((short) 0); - message.extensionHeader().get().setPartition(1); + message.extensionHeader().setPartition(1); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java index ba267444..298655cc 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java @@ -17,6 +17,7 @@ package io.openmessaging.samples.routing; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; @@ -24,6 +25,7 @@ import io.openmessaging.consumer.MessageListener; import io.openmessaging.manager.ResourceManager; import io.openmessaging.producer.Producer; +import java.util.Arrays; public class RoutingApp { public static void main(String[] args) { @@ -54,10 +56,10 @@ public static void main(String[] args) { producer.send(message); //Consume messages from the queue behind the routing. - final Consumer consumer = messagingAccessPoint.createConsumer(); + final PushConsumer consumer = messagingAccessPoint.createPushConsumer(); consumer.start(); - consumer.bindQueue(destinationQueue, new MessageListener() { + consumer.bindQueue(Arrays.asList(destinationQueue), new MessageListener() { @Override public void onReceived(Message message, Context context) { //The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java index e99ba2e4..80bee1dd 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java +++ b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java @@ -19,12 +19,15 @@ import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; import io.openmessaging.manager.ResourceManager; import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; +import java.util.Collection; /** * An instance of {@code MessagingAccessPoint} may be obtained from {@link OMS}, which is capable of creating {@code @@ -91,15 +94,43 @@ public interface MessagingAccessPoint { Producer createProducer(TransactionStateCheckListener transactionStateCheckListener); /** - * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer} - * isn't bind to any queue, uses {@link Consumer#bindQueue(String, MessageListener)} to bind queues. + * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. + * The returned {@code PushConsumer} isn't attached to any queue, + * uses {@link PushConsumer#bindQueue(Collection, MessageListener)} to attach queues. * * @return the created {@code PushConsumer} - * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal - * error - * @throws OMSSecurityException if have no authority to create a consumer. + * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request + * due to some internal error + */ + PushConsumer createPushConsumer(); + + /** + * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}. + * + * @return the created {@code PullConsumer} + * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request + * due to some internal error + */ + PullConsumer createPullConsumer(); + + /** + * Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes. + * + * @param attributes the preset attributes + * @return the created {@code PushConsumer} + * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request + * due to some internal error + */ + PushConsumer createPushConsumer(KeyValue attributes); + + /** + * Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}. + * + * @return the created {@code PullConsumer} + * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request + * due to some internal error */ - Consumer createConsumer(); + PullConsumer createPullConsumer(KeyValue attributes); /** * Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}. diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index b871c9d8..4ea4ea61 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -20,185 +20,30 @@ import io.openmessaging.Client; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; -import io.openmessaging.annotation.Optional; import io.openmessaging.exception.OMSDestinationException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; -import io.openmessaging.exception.OMSTimeOutException; import io.openmessaging.interceptor.ConsumerInterceptor; import io.openmessaging.message.Message; +import java.util.Collection; import java.util.List; +import java.util.Set; /** * A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from MOM server to {@code - * PushConsumer} client. + * Consumer} client. * * @version OMS 1.0.0 - * @see MessagingAccessPoint#createConsumer() * @since OMS 1.0.0 */ public interface Consumer extends ServiceLifecycle, Client { - /** - * Resumes the {@code Consumer} in push model after a suspend. - *

- * This method resumes the {@code Consumer} instance after it was suspended. The instance will not receive new - * messages between the suspend and resume calls. - * - * @throws OMSRuntimeException if the instance has not been suspended. - * @see Consumer#suspend() - */ - void resume(); - - /** - * Suspends the {@code Consumer} in push model for later resumption. - *

- * This method suspends the consumer until it is resumed. The consumer will not receive new messages between the - * suspend and resume calls. - *

- * This method behaves exactly as if it simply performs the call {@code suspend(0)}. - * - * @throws OMSRuntimeException if the instance is not currently running. - * @see Consumer#resume() - */ - void suspend(); - - /** - * Suspends the {@code Consumer} in push model for later resumption. - *

- * This method suspends the consumer until it is resumed or a specified amount of time has elapsed. The consumer - * will not receive new messages during the suspended state. - *

- * This method is similar to the {@link #suspend()} method, but it allows finer control over the amount of time to - * suspend, and the consumer will be suspended until it is resumed if the timeout is zero. - * - * @param timeout the maximum time to suspend in milliseconds. - * @throws OMSRuntimeException if the instance is not currently running. - */ - void suspend(long timeout); - - /** - * This method is used to find out whether the {@code Consumer} in push model is suspended. - * - * @return true if this {@code Consumer} is suspended, false otherwise. - */ - boolean isSuspended(); - - /** - * Bind the {@code Consumer} to a specified queue in pull model, user can use {@link Consumer#receive(long)} to get - * message from bind queue. - *

- * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is - * coming. - * - * @param queueName a specified queue. - * @throws OMSSecurityException when have no authority to bind to this queue. - * @throws OMSDestinationException when have no given destination in the server. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - void bindQueue(String queueName); - - /** - * Bind the {@code Consumer} to a collection of queue in pull model, user can use {@link Consumer#receive(long)} to get - * messages from a collection of queue. - *

- * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is - * coming. - * - * @param queueNames a collection of queues. - * @throws OMSSecurityException when have no authority to bind to this queue. - * @throws OMSDestinationException when have no given destination in the server. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - void bindQueue(List queueNames); - - /** - * Bind the {@code Consumer} to a specified queue, with a {@code MessageListener}. - *

- * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is - * coming. - * - * @param queueName a specified queue. - * @param listener a specified listener to receive new message. - * @throws OMSSecurityException when have no authority to bind to this queue. - * @throws OMSDestinationException when have no given destination in the server. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - void bindQueue(String queueName, MessageListener listener); - - /** - * Bind the {@code Consumer} to a collection of queue, with a {@code MessageListener}. - *

- * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is - * coming. - * - * @param queueNames a collection of queues. - * @param listener a specified listener to receive new message. - * @throws OMSSecurityException when have no authority to bind to this queue. - * @throws OMSDestinationException when have no given destination in the server. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - void bindQueues(List queueNames, MessageListener listener); - - /** - * Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}. - *

- * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered - * messages is coming. - * - * @param queueName a specified queue. - * @param listener a specified listener to receive new messages. - * @throws OMSSecurityException when have no authority to bind to this queue. - * @throws OMSDestinationException when have no given destination in the server. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - void bindQueue(String queueName, BatchMessageListener listener); - - /** - * Bind the {@code Consumer} to a collection of queue, with a {@code BatchMessageListener}. - *

- * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered - * messages is coming. - * - * @param queueNames a collection of queues. - * @param listener a specified listener to receive new messages. - * @throws OMSSecurityException when have no authority to bind to this queue. - * @throws OMSDestinationException when have no given destination in the server. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - void bindQueues(List queueNames, BatchMessageListener listener); - - /** - * Unbind the {@code Consumer} from a specified queue. - *

- * After the success call, this consumer won't receive new message from the specified queue any more. - * - * @param queueName a specified queue. - */ - void unbindQueue(String queueName); - - /** - * Unbind the {@code Consumer} from a collection of queues. - *

- * After the success call, this consumer won't receive new message from the specified queue any more. - * - * @param queueNames a collection of queues. - */ - void unbindQueues(List queueNames); - - /** - * This method is used to find out whether the {@code Consumer} in bind queue. - * - * @return true if this {@code Consumer} is bind, false otherwise. - */ - boolean isBindQueue(); - /** * This method is used to find out the collection of queues bind to {@code Consumer}. * * @return the queues this consumer is bind, or null if the consumer is not bind queue. */ - List getBindQueues(); + Set getBindQueues(); /** * Adds a {@code ConsumerInterceptor} instance to this consumer. @@ -214,69 +59,6 @@ public interface Consumer extends ServiceLifecycle, Client { */ void removeInterceptor(ConsumerInterceptor interceptor); - /** - * Receives the next message from the bind queues of this consumer in pull model. - *

- * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} - * is shut down. - * - * @param timeout receive message will blocked at most timeout milliseconds. - * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. - * @throws OMSSecurityException when have no authority to receive messages from this queue. - * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - Message receive(long timeout); - - /** - * Receives the next message from the which bind queue,partition and receiptId of this consumer in pull model. - *

- * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} - * is shut down. - * - * @param queueName receive message from which queueName in Message Queue. - * @param partitionId receive message from which partition in Message Queue. - * @param receiptId receive message from which receipt position in Message Queue. - * @param timeout receive message will blocked at most timeout milliseconds. - * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. - * @throws OMSSecurityException when have no authority to receive messages from this queue. - * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - @Optional - Message receive(String queueName, int partitionId, long receiptId, long timeout); - - - /** - * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic - * should implement in the {@link MessageListener}. - *

- * - * @param timeout receive messages will blocked at most timeout milliseconds. - * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. - * @throws OMSSecurityException when have no authority to receive messages from this queue. - * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - List batchReceive(long timeout); - - /** - * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic - * should implement in the {@link MessageListener}. - *

- * - * @param queueName receive message from which queueName in Message Queue. - * @param partitionId receive message from which partition in Message Queue. - * @param receiptId receive message from which receipt position in Message Queue. - * @param timeout receive messages will blocked at most timeout milliseconds. - * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. - * @throws OMSSecurityException when have no authority to receive messages from this queue. - * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. - * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - */ - @Optional - List batchReceive(String queueName, int partitionId, long receiptId, long timeout); - /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using * manual commit. diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.java new file mode 100755 index 00000000..82f7814a --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.consumer; + +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.annotation.Optional; +import io.openmessaging.exception.OMSDestinationException; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.exception.OMSSecurityException; +import io.openmessaging.exception.OMSTimeOutException; +import io.openmessaging.extension.QueueMetaData; +import io.openmessaging.message.Message; +import java.util.Collection; +import java.util.List; + +/** + * A {@code PullConsumer} pulls messages from the specified queue, and supports submit the consume result by + * acknowledgement. + * + * @version OMS 1.0.0 + * @see MessagingAccessPoint#createPullConsumer() + * @since OMS 1.0.0 + */ +public interface PullConsumer extends Consumer { + + /** + * Bind the {@code Consumer} to a collection of queue in pull model, user can use {@link PullConsumer#receive(long)} + * to get messages from a collection of queue. + *

+ * + * @param queueNames a collection of queues. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueue(Collection queueNames); + + /** + * Unbind the {@code Consumer} from a collection of queues. + *

+ * After the success call, this consumer won't receive new message from the specified queue any more. + * + * @param queueNames a collection of queues. + */ + void unbindQueue(Collection queueNames); + + /** + * Receives the next message from the attached queues of this consumer. + *

+ * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} + * is shut down. + * + * @return the next message received from the attached queues, or null if the consumer is concurrently shut down or + * the timeout expires + * @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error. + */ + Message receive(); + + /** + * Receives the next message from the bind queues of this consumer in pull model. + *

+ * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} + * is shut down. + * + * @param timeout receive message will blocked at most timeout milliseconds. + * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + Message receive(long timeout); + + /** + * Receives the next message from the which bind queue,partition and receiptId of this consumer in pull model. + *

+ * This call blocks indefinitely until a message is arrives, the timeout expires, or until this {@code PullConsumer} + * is shut down. + * + * @param queueName receive message from which queueName in Message Queue. + * @param queueMetaData receive message from which partition in Message Queue. + * @param messageReceipt receive message from which receipt position in Message Queue. + * @param timeout receive message will blocked at most timeout milliseconds. + * @return the next message received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + @Optional + Message receive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt, long timeout); + + /** + * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic + * should implement in the {@link MessageListener}. + *

+ * + * @param timeout receive messages will blocked at most timeout milliseconds. + * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + List batchReceive(long timeout); + + /** + * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic + * should implement in the {@link MessageListener}. + *

+ * + * @param queueName receive message from which queueName in Message Queue. + * @param queueMetaData receive message from which partition in Message Queue. + * @param messageReceipt receive message from which receipt position in Message Queue. + * @param timeout receive messages will blocked at most timeout milliseconds. + * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. + * @throws OMSSecurityException when have no authority to receive messages from this queue. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + @Optional + List batchReceive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt, + long timeout); + + /** + * Acknowledges the specified and consumed message with the unique message receipt handle. + *

+ * Messages that have been received but not acknowledged may be redelivered. + * + * @param receiptHandle the receipt handle associated with the consumed message + * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. + */ + void ack(MessageReceipt receiptHandle); + +} \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.java new file mode 100644 index 00000000..3eaf5ae6 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.consumer; + +import io.openmessaging.KeyValue; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.exception.OMSDestinationException; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.exception.OMSSecurityException; +import io.openmessaging.message.Message; +import java.util.Collection; +import java.util.List; + +/** + * A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from + * MOM server to {@code PushConsumer} client. + * + * @version OMS 1.0.0 + * @see MessagingAccessPoint#createPushConsumer() + * @since OMS 1.0.0 + */ +public interface PushConsumer extends Consumer{ + /** + * Resumes the {@code Consumer} in push model after a suspend. + *

+ * This method resumes the {@code Consumer} instance after it was suspended. The instance will not receive new + * messages between the suspend and resume calls. + * + * @throws OMSRuntimeException if the instance has not been suspended. + * @see PushConsumer#suspend() + */ + void resume(); + + /** + * Suspends the {@code Consumer} in push model for later resumption. + *

+ * This method suspends the consumer until it is resumed. The consumer will not receive new messages between the + * suspend and resume calls. + *

+ * This method behaves exactly as if it simply performs the call {@code suspend(0)}. + * + * @throws OMSRuntimeException if the instance is not currently running. + * @see PushConsumer#resume() + */ + void suspend(); + + /** + * Suspends the {@code Consumer} in push model for later resumption. + *

+ * This method suspends the consumer until it is resumed or a specified amount of time has elapsed. The consumer + * will not receive new messages during the suspended state. + *

+ * This method is similar to the {@link #suspend()} method, but it allows finer control over the amount of time to + * suspend, and the consumer will be suspended until it is resumed if the timeout is zero. + * + * @param timeout the maximum time to suspend in milliseconds. + * @throws OMSRuntimeException if the instance is not currently running. + */ + void suspend(long timeout); + + /** + * This method is used to find out whether the {@code Consumer} in push model is suspended. + * + * @return true if this {@code Consumer} is suspended, false otherwise. + */ + boolean isSuspended(); + + /** + * Bind the {@code Consumer} to a collection of queue, with a {@code MessageListener}. + *

+ * {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new delivered message is + * coming. + * + * @param queueNames a collection of queues. + * @param listener a specified listener to receive new message. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueue(Collection queueNames, MessageListener listener); + + + /** + * Bind the {@code Consumer} to a collection of queue, with a {@code BatchMessageListener}. + *

+ * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered + * messages is coming. + * + * @param queueNames a collection of queues. + * @param listener a specified listener to receive new messages. + * @throws OMSSecurityException when have no authority to bind to this queue. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + void bindQueue(Collection queueNames, BatchMessageListener listener); + + /** + * Unbind the {@code Consumer} from a collection of queues. + *

+ * After the success call, this consumer won't receive new message from the specified queue any more. + * + * @param queueNames a collection of queues. + */ + void unbindQueue(Collection queueNames); + +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java b/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java index f6ea1254..56edceb9 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java @@ -21,6 +21,7 @@ import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; import io.openmessaging.exception.OMSTimeOutException; +import java.util.Set; /** *

@@ -46,5 +47,5 @@ public interface Extension { * @throws OMSDestinationException when have no given destination in the server. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - QueueMetaData getQueueMetaData(String queueName); + Set getQueueMetaData(String queueName); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java index 22f9146a..0a0409b7 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java @@ -18,12 +18,19 @@ import io.openmessaging.annotation.Optional; import java.util.List; +import java.util.Set; /** * This interface {@code QueueMetaData} contains methods are used for getting configurations related some certain * implementation. but this interface are not mandatory. *

* + * In order to improve performance, in some scenarios where message persistence is required, some message middleware + * will store messages on multiple partitions in multi servers. + *

+ * + * In some scenarios, it is very useful to get the relevant partitions meta data for a queue. + * * @version OMS 1.0.0 * @since OMS 1.0.0 */ @@ -31,40 +38,29 @@ public interface QueueMetaData { /** - * In order to improve performance, in some scenarios where message persistence is required, some message middleware - * will store messages on multiple partitions in multi servers. - *

- * In some scenarios, it is very useful to get the relevant partitions meta data for a queue. + * Set queueName to this Message Queue. + * @param queueName */ - interface Partition { - /** - * Partition identifier - * - * @return Partition identifier - */ - int partitionId(); + void setQueueName(String queueName); - /** - * The host of the server where the partition is located - *

- * - * @return The host of the server where the partition is located - */ - String partitonHost(); - } + /** + * Set the specified partition. + * @param partitionId + */ + void setPartitionId(int partitionId); /** - * Queue name - *

+ * Get partition identifier of target queue. * - * @return Queue name. + * @return Partition identifier */ - String queueName(); + int partitionId(); /** - * Get partition list belongs to the {@code queueName} + * Queue name + *

* - * @return List of {@link Partition} belongs to the specified queue. + * @return Queue name. */ - List partitions(); + String queueName(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java index e4705c62..a43d23ed 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java @@ -18,13 +18,13 @@ package io.openmessaging.message; import io.openmessaging.KeyValue; +import io.openmessaging.annotation.Optional; import io.openmessaging.consumer.BatchMessageListener; import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageReceipt; import io.openmessaging.exception.OMSMessageFormatException; import io.openmessaging.extension.ExtensionHeader; -import java.util.Optional; /** * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is @@ -75,8 +75,8 @@ public interface Message { * * @return The implementation of {@link ExtensionHeader} */ - @io.openmessaging.annotation.Optional - Optional extensionHeader(); + @Optional + ExtensionHeader extensionHeader(); /** * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. diff --git a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java index efee3560..b348996e 100644 --- a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java +++ b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java @@ -22,6 +22,8 @@ import io.openmessaging.OMS; import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.Consumer; +import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.manager.ResourceManager; import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; @@ -47,31 +49,48 @@ class TestVendor implements MessagingAccessPoint { public TestVendor(KeyValue keyValue) { } - @Override public Producer createProducer(TransactionStateCheckListener transactionStateCheckListener) { + @Override + public Producer createProducer(TransactionStateCheckListener transactionStateCheckListener) { return null; } - @Override public MessageFactory messageFactory() { + @Override + public PushConsumer createPushConsumer() { return null; } @Override - public String version() { - return OMS.specVersion; + public PullConsumer createPullConsumer() { + return null; } @Override - public KeyValue attributes() { + public PushConsumer createPushConsumer(KeyValue attributes) { return null; } @Override - public Producer createProducer() { + public PullConsumer createPullConsumer(KeyValue attributes) { + return null; + } + + @Override + public MessageFactory messageFactory() { return null; } @Override - public Consumer createConsumer() { + public String version() { + return OMS.specVersion; + } + + @Override + public KeyValue attributes() { + return null; + } + + @Override + public Producer createProducer() { return null; }