Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
674d686
Add extension fields
duhenglucky Jan 31, 2019
68ff278
Add extension module for OpenMessaging Runtime interface
duhenglucky Jan 31, 2019
b8c1329
(1) Polish messsage header attributes based on specification (2) Add …
duhenglucky Feb 1, 2019
6eec44a
Fix some spelling mistakes
duhenglucky Feb 20, 2019
dfa2054
A new MessageReceipt interface is introduced to increase the compatib…
Feb 22, 2019
661a77f
Move method io.openmessaging.ConsumeMessage.getMessageReceipt() to io…
Feb 25, 2019
81eb092
Add boolean type to KeyValue
Feb 28, 2019
c62528a
Merge pull request #44 from llIlll/extend_keyvalue
duhenglucky Feb 28, 2019
c184340
Merge pull request #39 from liyue2008/receipt
duhenglucky Feb 28, 2019
b86ca21
Resolve the conflict and polished the extension related interface
duhenglucky Feb 28, 2019
8bbfb36
Modify spelling errors of Durability
duhenglucky Feb 28, 2019
400f5ac
Merge pull request #34 from duhenglucky/master
duhenglucky Feb 28, 2019
4563284
Add optional annotation to extension header
duhenglucky Feb 28, 2019
42bf81f
Add comments for Optional annotation
duhenglucky Feb 28, 2019
497597a
Change @Code to @code for generate java doc
duhenglucky Feb 28, 2019
fde62ab
Change CODE to code in java doc
duhenglucky Feb 28, 2019
49576ac
[maven-release-plugin] prepare release 1.0.0-beta-SNAPSHOT
duhenglucky Feb 28, 2019
321e07f
[maven-release-plugin] prepare for next development iteration
duhenglucky Feb 28, 2019
69bd3ab
[maven-release-plugin] prepare release openmessaging-1.0.0-alpha
duhenglucky Feb 28, 2019
44b95ce
[maven-release-plugin] prepare for next development iteration
duhenglucky Feb 28, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openmessaging-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<parent>
<groupId>io.openmessaging</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-preview</version>
<version>1.0.0-beta-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
5 changes: 3 additions & 2 deletions openmessaging-api-samples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
<parent>
<groupId>io.openmessaging</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-preview</version>
<version>1.0.0-beta-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>openmessaging-api-samples</artifactId>
<version>1.0.0-beta-SNAPSHOT</version>
<name>openmessaging-api-samples ${project.version}</name>

<dependencies>
Expand All @@ -20,7 +21,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>openmessaging-api</artifactId>
<version>${project.version}</version>
<version>1.0.0-beta-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,19 @@

package io.openmessaging.samples.consumer;

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;

public class PullConsumerApp {
public static void main(String[] args) {
//Load and start the vendor implementation from a specific OMS driver URL.
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

//Fetch a ResourceManager to create Queue resource.
ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createQueue("NS://HELLO_QUEUE");

//Start a PullConsumer to receive messages from the specific queue.
final Consumer consumer = messagingAccessPoint.createConsumer();
consumer.start();

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
Expand All @@ -44,10 +38,15 @@ public void run() {
consumer.stop();
}
}));

consumer.bindQueue("NS://HELLO_QUEUE");
consumer.start();

Message message = consumer.receive(1000);
System.out.println("Received message: " + message);
//Acknowledge the consumed message
consumer.ack(message.headers().getMessageId());
consumer.ack(message.getMessageReceipt());
consumer.stop();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package io.openmessaging.samples.consumer;

import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;

public class PushConsumerApp {
public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package io.openmessaging.samples.producer;

import io.openmessaging.Future;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.interceptor.Context;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
Expand All @@ -35,17 +35,19 @@ public static void main(String[] args) {
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");

final Producer producer = messagingAccessPoint.createProducer();
producer.start();
ProducerInterceptor interceptor = new ProducerInterceptor() {
@Override
public void preSend(Message message, Context attributes) {
System.out.println("PreSend message: " + message);
}

@Override
public void postSend(Message message, Context attributes) {
System.out.println("PostSend message: " + message);
}
};
producer.addInterceptor(interceptor);
producer.start();

//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
Expand All @@ -55,9 +57,11 @@ public void run() {
}
}));

//Sends a message to the specified destination synchronously.
//Send a message to the specified destination synchronously.
Message message = producer.createMessage(
"NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
"NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
message.header().setBornHost("127.0.0.1").setDurability((short) 0);
message.extensionHeader().get().setPartition(1);
SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);

Expand All @@ -75,6 +79,7 @@ public void run() {
Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes());
messages.add(msg);
}

producer.send(messages);
producer.removeInterceptor(interceptor);
producer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.openmessaging.samples.producer;

import io.openmessaging.Message;
import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package io.openmessaging.samples.routing;

import io.openmessaging.Message;
import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
Expand Down
2 changes: 1 addition & 1 deletion openmessaging-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<parent>
<groupId>io.openmessaging</groupId>
<artifactId>parent</artifactId>
<version>1.0.0-preview</version>
<version>1.0.0-beta-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
41 changes: 41 additions & 0 deletions openmessaging-api/src/main/java/io/openmessaging/Client.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging;

import io.openmessaging.extension.Extension;
import java.util.Optional;

/**
* <p>
* A {@code Client} interface contains all the common behaviors of producer and consumer. which can be used to achieve
* some basic interaction with the server.
* </p>
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface Client {
/**
* Get the extension method, and this interface is optional, Therefore, users need to check whether this interface
* has been implemented by vendors.
* <p>
*
* @return the implementation of {@link Extension}
*/
@io.openmessaging.annotation.Optional
Optional<Extension> getExtension();
}
62 changes: 29 additions & 33 deletions openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
*/
public interface KeyValue {

/**
* Inserts or replaces {@code boolean} value for the specified key.
*
* @param key the key to be placed into this {@code KeyValue} object
* @param value the value corresponding to <tt>key</tt>
*/
KeyValue put(String key, boolean value);

/**
* Inserts or replaces {@code short} value for the specified key.
*
Expand Down Expand Up @@ -75,6 +83,27 @@ public interface KeyValue {
*/
KeyValue put(String key, String value);

/**
* Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, false is returned.
*
* @param key the property key
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, boolean)
*/
boolean getBoolean(String key);

/**
* Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, false is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, boolean)
*/
boolean getBoolean(String key, boolean defaultValue);

/**
* Searches for the {@code short} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, zero is returned.
Expand Down Expand Up @@ -106,17 +135,6 @@ public interface KeyValue {
*/
int getInt(String key);

/**
* Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, int)
*/
int getInt(String key, int defaultValue);

/**
* Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, zero is returned.
Expand Down Expand Up @@ -148,17 +166,6 @@ public interface KeyValue {
*/
double getDouble(String key);

/**
* Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, double)
*/
double getDouble(String key, double defaultValue);

/**
* Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, {@code null} is returned.
Expand All @@ -169,17 +176,6 @@ public interface KeyValue {
*/
String getString(String key);

/**
* Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, String)
*/
String getString(String key, String defaultValue);

/**
* Returns a {@link Set} view of the keys contained in this {@code KeyValue} object.
* <p>
Expand Down
Loading