Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.message.Message;
import java.util.Arrays;

public class PullConsumerApp {
public static void main(String[] args) {
Expand All @@ -29,7 +30,7 @@ public static void main(String[] args) {
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

//Start a PullConsumer to receive messages from the specific queue.
final Consumer consumer = messagingAccessPoint.createConsumer();
final PullConsumer consumer = messagingAccessPoint.createPullConsumer();

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
Expand All @@ -39,7 +40,7 @@ public void run() {
}
}));

consumer.bindQueue("NS://HELLO_QUEUE");
consumer.bindQueue(Arrays.asList("NS://HELLO_QUEUE"));
consumer.start();

Message message = consumer.receive(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;
import java.util.Arrays;

public class PushConsumerApp {
public static void main(String[] args) {
Expand All @@ -33,7 +35,7 @@ public static void main(String[] args) {
//Fetch a ResourceManager to create Queue resource.
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("NS://XXXX");
final Consumer consumer = messagingAccessPoint.createConsumer();
final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
consumer.start();

//Register a shutdown hook to close the opened endpoints.
Expand All @@ -49,7 +51,7 @@ public void run() {
resourceManager.createQueue(simpleQueue);
//This queue doesn't has a source queue, so only the message delivered to the queue directly can
//be consumed by this consumer.
consumer.bindQueue(simpleQueue, new MessageListener() {
consumer.bindQueue(Arrays.asList(simpleQueue), new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
System.out.println("Received one message: " + message);
Expand All @@ -58,7 +60,7 @@ public void onReceived(Message message, Context context) {

});

consumer.unbindQueue(simpleQueue);
consumer.unbindQueue(Arrays.asList(simpleQueue));

consumer.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void run() {
Message message = producer.createMessage(
"NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
message.header().setBornHost("127.0.0.1").setDurability((short) 0);
message.extensionHeader().get().setPartition(1);
message.extensionHeader().setPartition(1);
SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package io.openmessaging.samples.routing;

import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.producer.Producer;
import java.util.Arrays;

public class RoutingApp {
public static void main(String[] args) {
Expand Down Expand Up @@ -54,10 +56,10 @@ public static void main(String[] args) {
producer.send(message);

//Consume messages from the queue behind the routing.
final Consumer consumer = messagingAccessPoint.createConsumer();
final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
consumer.start();

consumer.bindQueue(destinationQueue, new MessageListener() {
consumer.bindQueue(Arrays.asList(destinationQueue), new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
//The message sent to the sourceQueue will be delivered to anotherConsumer by the routing rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSSecurityException;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
import java.util.Collection;

/**
* An instance of {@code MessagingAccessPoint} may be obtained from {@link OMS}, which is capable of creating {@code
Expand Down Expand Up @@ -91,15 +94,43 @@ public interface MessagingAccessPoint {
Producer createProducer(TransactionStateCheckListener transactionStateCheckListener);

/**
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}. The returned {@code Consumer}
* isn't bind to any queue, uses {@link Consumer#bindQueue(String, MessageListener)} to bind queues.
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint}.
* The returned {@code PushConsumer} isn't attached to any queue,
* uses {@link PushConsumer#bindQueue(Collection, MessageListener)} to attach queues.
*
* @return the created {@code PushConsumer}
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
* error
* @throws OMSSecurityException if have no authority to create a consumer.
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
* due to some internal error
*/
PushConsumer createPushConsumer();

/**
* Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}.
*
* @return the created {@code PullConsumer}
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
* due to some internal error
*/
PullConsumer createPullConsumer();

/**
* Creates a new {@code PushConsumer} for the specified {@code MessagingAccessPoint} with some preset attributes.
*
* @param attributes the preset attributes
* @return the created {@code PushConsumer}
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
* due to some internal error
*/
PushConsumer createPushConsumer(KeyValue attributes);

/**
* Creates a new {@code PullConsumer} for the specified {@code MessagingAccessPoint}.
*
* @return the created {@code PullConsumer}
* @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request
* due to some internal error
*/
Consumer createConsumer();
PullConsumer createPullConsumer(KeyValue attributes);

/**
* Gets a lightweight {@code ResourceManager} instance from the specified {@code MessagingAccessPoint}.
Expand Down
Loading