Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openmessaging-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<parent>
<groupId>io.openmessaging</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-beta-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
6 changes: 3 additions & 3 deletions openmessaging-api-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
<parent>
<groupId>io.openmessaging</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-beta-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>openmessaging-api-samples</artifactId>
<version>1.0.0-beta-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<name>openmessaging-api-samples ${project.version}</name>

<dependencies>
Expand All @@ -21,7 +21,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>openmessaging-api</artifactId>
<version>1.0.0-beta-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,42 @@

package io.openmessaging.samples.consumer;

import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.message.Message;
import java.util.Arrays;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.PullConsumer;
import io.openmessaging.api.TopicPartition;
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.Set;

public class PullConsumerApp {
public static void main(String[] args) {
//Load and start the vendor implementation from a specific OMS driver URL.
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

Properties properties = new Properties();
//Start a PullConsumer to receive messages from the specific queue.
final PullConsumer consumer = messagingAccessPoint.createPullConsumer();
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(properties);

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.stop();
consumer.shutdown();
}
}));

consumer.bindQueue(Arrays.asList("NS://HELLO_QUEUE"));
Set<TopicPartition> topicPartitions = consumer.topicPartitions("NS://TOPIC");
consumer.assign(topicPartitions);
consumer.start();

Message message = consumer.receive(1000);
List<Message> message = consumer.poll(Duration.ofMillis(1000));
System.out.println("Received message: " + message);
//Acknowledge the consumed message
consumer.ack(message.getMessageReceipt());
consumer.stop();
consumer.commitSync();
consumer.shutdown();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,45 @@

package io.openmessaging.samples.consumer;

import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;
import java.util.Arrays;
import io.openmessaging.api.Action;
import io.openmessaging.api.ConsumeContext;
import io.openmessaging.api.Consumer;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessageListener;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import java.util.Properties;

public class PushConsumerApp {
public static void main(String[] args) {
//Load and start the vendor implementation from a specific OMS driver URL.
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:10911/us-east");
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876");

//Fetch a ResourceManager to create Queue resource.
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("NS://XXXX");
final PushConsumer consumer = messagingAccessPoint.createPushConsumer();
Properties properties = new Properties();
final Consumer consumer = messagingAccessPoint.createConsumer(properties);
consumer.start();

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.stop();
consumer.shutdown();
}
}));

//Consume messages from a simple queue.
String simpleQueue = "NS://HELLO_QUEUE";
resourceManager.createQueue(simpleQueue);
//This queue doesn't has a source queue, so only the message delivered to the queue directly can
//be consumed by this consumer.
consumer.bindQueue(Arrays.asList(simpleQueue), new MessageListener() {
String topic = "NS://HELLO_TOPIC";

consumer.subscribe(topic, "*", new MessageListener(){
@Override
public void onReceived(Message message, Context context) {
System.out.println("Received one message: " + message);
context.ack();
}
public Action consume(Message message, ConsumeContext context) {

return Action.CommitMessage;
}
});

consumer.unbindQueue(Arrays.asList(simpleQueue));

consumer.stop();
consumer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,52 @@

package io.openmessaging.samples.producer;

import io.openmessaging.Future;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.interceptor.Context;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OnExceptionContext;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendCallback;
import io.openmessaging.api.SendResult;
import java.util.Properties;

public class ProducerApp {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org");

final Producer producer = messagingAccessPoint.createProducer();
ProducerInterceptor interceptor = new ProducerInterceptor() {
@Override
public void preSend(Message message, Context attributes) {
System.out.println("PreSend message: " + message);
}

@Override
public void postSend(Message message, Context attributes) {
System.out.println("PostSend message: " + message);
}
};
producer.addInterceptor(interceptor);
final Producer producer = messagingAccessPoint.createProducer(new Properties());
producer.start();

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.stop();
producer.shutdown();
}
}));

//Send a message to the specified destination synchronously.
Message message = producer.createMessage(
"NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
message.header().setBornHost("127.0.0.1").setDurability((short) 0);
message.extensionHeader().setPartition(1);
Message message = new Message("NS://Topic", "TagA", "Hello MQ".getBytes());

SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);

//Sends a message to the specified destination async.
Future<SendResult> sendResultFuture = producer.sendAsync(message);
sendResult = sendResultFuture.get(1000);
System.out.println("SendResult: " + sendResult);
producer.sendAsync(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("SendResult: " + sendResult);
}

@Override
public void onException(OnExceptionContext context) {

}
});

//Sends a message to the specified destination in one way mode.
producer.sendOneway(message);

//Sends messages to the specified destination in batch mode.
List<Message> messages = new ArrayList<Message>(10);
for (int i = 0; i < 10; i++) {
Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes());
messages.add(msg);
}

producer.send(messages);
producer.removeInterceptor(interceptor);
producer.stop();
producer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@

package io.openmessaging.samples.producer;

import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.producer.TransactionalResult;
import java.nio.charset.Charset;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.SendResult;
import io.openmessaging.api.transaction.LocalTransactionChecker;
import io.openmessaging.api.transaction.LocalTransactionExecuter;
import io.openmessaging.api.transaction.TransactionProducer;
import io.openmessaging.api.transaction.TransactionStatus;
import java.util.Properties;

public class TransactionProducerApp {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

final Producer producer = messagingAccessPoint.createProducer(new TransactionStateCheckListener() {
@Override public void check(Message message, TransactionalContext context) {

final TransactionProducer producer = messagingAccessPoint.createTransactionProducer(new Properties(), new LocalTransactionChecker() {
@Override
public TransactionStatus check(Message msg) {
return TransactionStatus.CommitTransaction;
}
});
producer.start();
Expand All @@ -41,23 +44,19 @@ public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.stop();
producer.shutdown();
}
}));

Message message = producer.createMessage(
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
Message message = new Message("NS://Topic", "TagA", "Hello MQ".getBytes());

//Sends a transaction message to the specified destination synchronously.
TransactionalResult result = producer.prepare(message);
executeLocalTransaction(result);
result.commit();
producer.stop();
System.out.println("Send transaction message OK, message id is: " + result.messageId());
SendResult result = producer.send(message, new LocalTransactionExecuter() {
@Override public TransactionStatus execute(Message message, Object arg) {
return TransactionStatus.CommitTransaction;
}
}, null);
System.out.println("Send transaction message OK, message id is: " + result.getMessageId());
}

private static void executeLocalTransaction(TransactionalResult result) {
System.out.println("transactionId: " + result.transactionId());
System.out.println("execute local transaction");
}
}
Loading