Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,19 @@

package io.openmessaging.samples.consumer;

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;

public class PullConsumerApp {
public static void main(String[] args) {
//Load and start the vendor implementation from a specific OMS driver URL.
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

//Fetch a ResourceManager to create Queue resource.
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createQueue("NS://HELLO_QUEUE");

//Start a PullConsumer to receive messages from the specific queue.
final Consumer consumer = messagingAccessPoint.createConsumer();
consumer.start();

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
Expand All @@ -44,10 +38,15 @@ public void run() {
consumer.stop();
}
}));

consumer.bindQueue("NS://HELLO_QUEUE");
consumer.start();

Message message = consumer.receive(1000);
System.out.println("Received message: " + message);
//Acknowledge the consumed message
consumer.ack(message.getMessageReceipt());
consumer.stop();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package io.openmessaging.samples.consumer;

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;

public class PushConsumerApp {
public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package io.openmessaging.samples.producer;

import io.openmessaging.Future;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.interceptor.Context;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
Expand All @@ -35,17 +35,19 @@ public static void main(String[] args) {
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

final Producer producer = messagingAccessPoint.createProducer();
producer.start();
ProducerInterceptor interceptor = new ProducerInterceptor() {
@Override
public void preSend(Message message, Context attributes) {
System.out.println("PreSend message: " + message);
}

@Override
public void postSend(Message message, Context attributes) {
System.out.println("PostSend message: " + message);
}
};
producer.addInterceptor(interceptor);
producer.start();

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
Expand All @@ -55,9 +57,11 @@ public void run() {
}
}));

//Sends a message to the specified destination synchronously.
//Send a message to the specified destination synchronously.
Message message = producer.createMessage(
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
"NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
message.header().setBornHost("127.0.0.1").setDurability((short) 0);
message.extensionHeader().get().setPartition(1);
SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);

Expand All @@ -75,6 +79,7 @@ public void run() {
Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes());
messages.add(msg);
}

producer.send(messages);
producer.removeInterceptor(interceptor);
producer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.openmessaging.samples.producer;

import io.openmessaging.Message;
import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.openmessaging.samples.routing;

import io.openmessaging.Message;
import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
Expand Down
28 changes: 0 additions & 28 deletions openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java

This file was deleted.

41 changes: 41 additions & 0 deletions openmessaging-api/src/main/java/io/openmessaging/Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging;

import io.openmessaging.extension.Extension;
import java.util.Optional;

/**
* <p>
* A {@code Client} interface contains all the common behaviors of producer and consumer. which can be used to achieve
* some basic interaction with the server.
* </p>
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface Client {
/**
* Get the extension method, and this interface is optional, Therefore, users need to check whether this interface
* has been implemented by vendors.
* <p>
*
* @return the implementation of {@link Extension}
*/
@io.openmessaging.annotation.Optional
Optional<Extension> getExtension();
}
41 changes: 4 additions & 37 deletions openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public interface KeyValue {
KeyValue put(String key, String value);

/**
* Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, false is returned.
* Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, false is returned.
*
* @param key the property key
* @return the value in this {@code KeyValue} object with the specified key value
Expand All @@ -94,8 +94,8 @@ public interface KeyValue {
boolean getBoolean(String key);

/**
* Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, false is returned.
* Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, false is returned.
*
* @param key the property key
* @param defaultValue a default value
Expand Down Expand Up @@ -135,17 +135,6 @@ public interface KeyValue {
*/
int getInt(String key);

/**
* Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, int)
*/
int getInt(String key, int defaultValue);

/**
* Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, zero is returned.
Expand Down Expand Up @@ -177,17 +166,6 @@ public interface KeyValue {
*/
double getDouble(String key);

/**
* Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, double)
*/
double getDouble(String key, double defaultValue);

/**
* Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, {@code null} is returned.
Expand All @@ -198,17 +176,6 @@ public interface KeyValue {
*/
String getString(String key);

/**
* Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, String)
*/
String getString(String key, String defaultValue);

/**
* Returns a {@link Set} view of the keys contained in this {@code KeyValue} object.
* <p>
Expand Down
Loading