org.slf4j
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java
index b60f89e0..f0a41026 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java
@@ -17,11 +17,10 @@
package io.openmessaging.samples.consumer;
-import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
-import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.message.Message;
public class PullConsumerApp {
public static void main(String[] args) {
@@ -29,13 +28,8 @@ public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
- //Fetch a ResourceManager to create Queue resource.
- ResourceManager resourceManager = messagingAccessPoint.resourceManager();
- resourceManager.createQueue("NS://HELLO_QUEUE");
-
//Start a PullConsumer to receive messages from the specific queue.
final Consumer consumer = messagingAccessPoint.createConsumer();
- consumer.start();
//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -44,10 +38,15 @@ public void run() {
consumer.stop();
}
}));
+
consumer.bindQueue("NS://HELLO_QUEUE");
+ consumer.start();
+
Message message = consumer.receive(1000);
System.out.println("Received message: " + message);
//Acknowledge the consumed message
- consumer.ack(message.headers().getMessageId());
+ consumer.ack(message.getMessageReceipt());
+ consumer.stop();
+
}
}
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java
index 0fed2032..98a9a2e5 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java
@@ -17,12 +17,12 @@
package io.openmessaging.samples.consumer;
-import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.message.Message;
public class PushConsumerApp {
public static void main(String[] args) {
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java
index 2da775fc..c733443c 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java
@@ -18,11 +18,11 @@
package io.openmessaging.samples.producer;
import io.openmessaging.Future;
-import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.interceptor.Context;
import io.openmessaging.interceptor.ProducerInterceptor;
+import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
@@ -35,17 +35,19 @@ public static void main(String[] args) {
OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east");
final Producer producer = messagingAccessPoint.createProducer();
- producer.start();
ProducerInterceptor interceptor = new ProducerInterceptor() {
@Override
public void preSend(Message message, Context attributes) {
+ System.out.println("PreSend message: " + message);
}
@Override
public void postSend(Message message, Context attributes) {
+ System.out.println("PostSend message: " + message);
}
};
producer.addInterceptor(interceptor);
+ producer.start();
//Register a shutdown hook to close the opened endpoints.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@@ -55,9 +57,11 @@ public void run() {
}
}));
- //Sends a message to the specified destination synchronously.
+ //Send a message to the specified destination synchronously.
Message message = producer.createMessage(
- "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ message.header().setBornHost("127.0.0.1").setDurability((short) 0);
+ message.extensionHeader().get().setPartition(1);
SendResult sendResult = producer.send(message);
System.out.println("SendResult: " + sendResult);
@@ -75,6 +79,7 @@ public void run() {
Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes());
messages.add(msg);
}
+
producer.send(messages);
producer.removeInterceptor(interceptor);
producer.stop();
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java
index f6db2730..bed57642 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java
@@ -17,7 +17,7 @@
package io.openmessaging.samples.producer;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java
index 3f3a9a66..ba267444 100644
--- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java
+++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java
@@ -17,7 +17,7 @@
package io.openmessaging.samples.routing;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
diff --git a/openmessaging-api/pom.xml b/openmessaging-api/pom.xml
index 693fd0e3..9e493ee9 100644
--- a/openmessaging-api/pom.xml
+++ b/openmessaging-api/pom.xml
@@ -2,7 +2,7 @@
io.openmessaging
parent
- 1.0.0-preview
+ 1.0.0-beta-SNAPSHOT
4.0.0
diff --git a/openmessaging-api/src/main/java/io/openmessaging/Client.java b/openmessaging-api/src/main/java/io/openmessaging/Client.java
new file mode 100644
index 00000000..ed20ebad
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/Client.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging;
+
+import io.openmessaging.extension.Extension;
+import java.util.Optional;
+
+/**
+ *
+ * A {@code Client} interface contains all the common behaviors of producer and consumer. which can be used to achieve
+ * some basic interaction with the server.
+ *
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+public interface Client {
+ /**
+ * Get the extension method, and this interface is optional, Therefore, users need to check whether this interface
+ * has been implemented by vendors.
+ *
+ *
+ * @return the implementation of {@link Extension}
+ */
+ @io.openmessaging.annotation.Optional
+ Optional getExtension();
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
index 53f5b52f..54f47061 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
@@ -35,6 +35,14 @@
*/
public interface KeyValue {
+ /**
+ * Inserts or replaces {@code boolean} value for the specified key.
+ *
+ * @param key the key to be placed into this {@code KeyValue} object
+ * @param value the value corresponding to key
+ */
+ KeyValue put(String key, boolean value);
+
/**
* Inserts or replaces {@code short} value for the specified key.
*
@@ -75,6 +83,27 @@ public interface KeyValue {
*/
KeyValue put(String key, String value);
+ /**
+ * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
+ * not found in this property list, false is returned.
+ *
+ * @param key the property key
+ * @return the value in this {@code KeyValue} object with the specified key value
+ * @see #put(String, boolean)
+ */
+ boolean getBoolean(String key);
+
+ /**
+ * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is
+ * not found in this property list, false is returned.
+ *
+ * @param key the property key
+ * @param defaultValue a default value
+ * @return the value in this {@code KeyValue} object with the specified key value
+ * @see #put(String, boolean)
+ */
+ boolean getBoolean(String key, boolean defaultValue);
+
/**
* Searches for the {@code short} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, zero is returned.
@@ -106,17 +135,6 @@ public interface KeyValue {
*/
int getInt(String key);
- /**
- * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not
- * found in this property list, the default value argument is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, int)
- */
- int getInt(String key, int defaultValue);
-
/**
* Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not
* found in this property list, zero is returned.
@@ -148,17 +166,6 @@ public interface KeyValue {
*/
double getDouble(String key);
- /**
- * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, the default value argument is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, double)
- */
- double getDouble(String key, double defaultValue);
-
/**
* Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
* not found in this property list, {@code null} is returned.
@@ -169,17 +176,6 @@ public interface KeyValue {
*/
String getString(String key);
- /**
- * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is
- * not found in this property list, the default value argument is returned.
- *
- * @param key the property key
- * @param defaultValue a default value
- * @return the value in this {@code KeyValue} object with the specified key value
- * @see #put(String, String)
- */
- String getString(String key, String defaultValue);
-
/**
* Returns a {@link Set} view of the keys contained in this {@code KeyValue} object.
*
diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java
deleted file mode 100644
index 9d06f889..00000000
--- a/openmessaging-api/src/main/java/io/openmessaging/Message.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.openmessaging;
-
-import io.openmessaging.exception.OMSMessageFormatException;
-
-/**
- * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is
- * {@link Message}.
- *
- * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and
- * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ.
- *
- * The header contains fields used by the messaging system that describes the message's meta information, while the body
- * contains the application data being transmitted.
- *
- * As for the message header, OMS defines two kinds types: headers {@link Headers} and properties {@link KeyValue}, with
- * respect to flexibility in vendor implementation and user usage.
- *
- * -
- * System Headers, OMS defines some standard attributes that represent the characteristics of the message.
- *
- * -
- * User properties, some OMS vendors may require enhanced extra attributes of the message or some users may want to
- * clarify some customized attributes to draw the body. OMS provides the improved scalability for these scenarios.
- *
- *
- * The body contains the application data being transmitted, which is generally ignored by the messaging system and
- * simply transmitted to its destination.
- *
- * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the
- * messaging system. The application is responsible for explaining the concrete content and format of the message body,
- * OMS is never aware of that.
- *
- * The body part is placed in the implementation classes of {@code Message}.
- *
- * @version OMS 1.0.0
- * @since OMS 1.0.0
- */
-public interface Message {
-
- interface Headers {
-
- /**
- * The {@code DESTINATION} header field contains the destination to which the message is being sent.
- *
- * When a message is sent this value is set to the right {@code Queue}, then the message will be sent to the
- * specified destination.
- *
- * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in.
- */
- Headers setDestination(String destination);
-
- /**
- * The {@code MESSAGE_ID} header field contains a value that uniquely identifies each message sent by a {@code
- * Producer}.
- */
- Headers setMessageId(String messageId);
-
- /**
- * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to
- * be sent.
- *
- * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message
- * in client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this
- * value.
- *
- * When a message is received its, BORN_TIMESTAMP header field contains this same value.
- *
- * This filed is a {@code long} value, measured in milliseconds.
- */
- Headers setBornTimestamp(long bornTimestamp);
-
- /**
- * The {@code BORN_HOST} header field contains the born host info of a message in client side.
- *
- * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the
- * message's BORN_HOST header field contains this value.
- *
- * When a message is received, its BORN_HOST header field contains this same value.
- */
- Headers setBornHost(String bornHost);
-
- /**
- * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side.
- *
- * When a message is sent, STORE_TIMESTAMP is ignored.
- *
- * When the send method returns it contains a server-assigned value.
- *
- * This filed is a {@code long} value, measured in milliseconds.
- */
- Headers setStoreTimestamp(long storeTimestamp);
-
- /**
- * The {@code STORE_HOST} header field contains the store host info of a message in server side.
- *
- * When a message is sent, STORE_HOST is ignored.
- *
- * When the send method returns it contains a server-assigned value.
- */
- Headers setStoreHost(String storeHost);
-
- /**
- * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds.
- *
- * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this
- * filed isn't set explicitly, this means this message should be delivered immediately.
- */
- Headers setDelayTime(long delayTime);
-
- /**
- * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value.
- *
- * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the
- * EXPIRE_TIME field is specified as zero, that indicates the message does not expire.
- *
- *
- * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not
- * define a notification of message expiration.
- *
- */
- Headers setExpireTime(long expireTime);
-
- /**
- * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority
- * value should be delivered preferentially.
- *
- * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default
- * priority is 5. The priority beyond this region will be ignored.
- *
- * OMS does not require or provide any guarantee that the message should be delivered in priority order
- * strictly, but the vendor should provide a best effort to deliver expedited messages ahead of normal
- * messages.
- *
- * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority.
- */
- Headers setPriority(short priority);
-
- /**
- * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee
- * the reliability level for a message.
- *
- * OMS defines two modes of message delivery:
- *
- * -
- * PERSISTENT, the persistent mode instructs the vendor should provide stable storage to ensure the message
- * won't be lost.
- *
- * -
- * NON_PERSISTENT, this mode does not require the message be logged to stable storage, in most cases, the memory
- * storage is enough for better performance and lower cost.
- *
- *
- */
- Headers setDurability(short durability);
-
- /**
- * The {@code messagekey} header field contains the custom key of a message.
- *
- * This key is a customer identifier for a class of messages, and this key may be used for server to hash or
- * dispatch messages, or even can use this key to implement order message.
- *
- */
- Headers setMessageKey(String messageKey);
-
- /**
- * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique
- * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at
- * where, and received by who.
- *
- * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can
- * be used to trace the whole call link with other parts in the whole system.
- */
- Headers setTraceId(String traceId);
-
- /**
- * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message
- * delivery.
- */
- Headers setDeliveryCount(int deliveryCount);
-
- /**
- * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a
- * transaction.
- *
- * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message,
- * and consumer received message also contains this field.
- */
- Headers setTransactionId(String transactionId);
-
- /**
- * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link
- * a response message with its request message.
- */
- Headers setCorrelationId(String correlationId);
-
- /**
- * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to
- * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user.
- */
- Headers setCompression(short compression);
-
- /**
- * See {@link Headers#setDestination(String)}
- *
- * @return destination
- */
- String getDestination();
-
- /**
- * See {@link Headers#setMessageId(String)}
- *
- * @return messageId
- */
- String getMessageId();
-
- /**
- * See {@link Headers#setBornTimestamp(long)}
- *
- * @return bornTimestamp
- */
- long getBornTimestamp();
-
- /**
- * See {@link Headers#setBornHost(String)}
- *
- * @return bornHost
- */
- String getBornHost();
-
- /**
- * See {@link Headers#setStoreTimestamp(long)}
- *
- * @return storeTimestamp
- */
- long getStoreTimestamp();
-
- /**
- * See {@link Headers#setStoreHost(String)}
- *
- * @return storeHost
- */
- String getStoreHost();
-
- /**
- * See {@link Headers#setDelayTime(long)}
- *
- * @return delayTime
- */
- long getDelayTime();
-
- /**
- * See {@link Headers#setExpireTime(long)}
- *
- * @return expireTime
- */
- long getExpireTime();
-
- /**
- * See {@link Headers#setPriority(short)}
- *
- * @return priority
- */
- short getPriority();
-
- /**
- * See {@link Headers#setDurability(short)}
- *
- * @return durability
- */
- short getDurability();
-
- /**
- * See {@link Headers#setMessageKey(String)}
- *
- * @return messageKey
- */
- String getMessageKey();
-
- /**
- * See {@link Headers#setTraceId(String)}
- *
- * @return traceId
- */
- String getTraceId();
-
- /**
- * See {@link Headers#setDeliveryCount(int)}
- *
- * @return deliveryCount
- */
- int getDeliveryCount();
-
- /**
- * See {@link Headers#setTransactionId(String)}
- *
- * @return transactionId
- */
- String getTransactionId();
-
- /**
- * See {@link Headers#setCorrelationId(String)}
- *
- * @return correlationId
- */
- String getCorrelationId();
-
- /**
- * See {@link Headers#setCompression(short)}
- *
- * @return compression
- */
- short getCompression();
-
- }
-
- /**
- * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}.
- *
- * @return the system headers of a {@code Message}
- */
- Headers headers();
-
- /**
- * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}.
- *
- * @return the user properties of a {@code Message}
- */
- KeyValue properties();
-
- /**
- * Get data from message body
- *
- * @return message body
- * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type
- */
- byte[] getData();
-
- /**
- * Set data to message body
- *
- * @param data set message body in binary stream
- */
- void setData(byte[] data);
-
-}
\ No newline at end of file
diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java
index 96cf2a20..e99ba2e4 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java
@@ -22,6 +22,7 @@
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSSecurityException;
import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
@@ -109,4 +110,13 @@ public interface MessagingAccessPoint {
* @throws OMSSecurityException if have no authority to obtain a resource manager.
*/
ResourceManager resourceManager();
+
+ /**
+ * Gets a {@link MessageFactory} instance from the specified {@code MessagingAccessPoint}.
+ *
+ * @return the resource manger
+ * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal
+ * error
+ */
+ MessageFactory messageFactory();
}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java
index 66e90965..eafdb18f 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java
@@ -43,10 +43,10 @@ public enum ServiceLifeState {
/**
* Service is stopping.
*/
- STOPING,
+ STOPPING,
/**
* Service has been stopped.
*/
- STOPED,
+ STOPPED,
}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java
index 78a17418..3066cd56 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java
@@ -18,6 +18,7 @@
package io.openmessaging;
import io.openmessaging.consumer.Consumer;
+import io.openmessaging.extension.Extension;
import io.openmessaging.producer.Producer;
/**
@@ -32,7 +33,7 @@
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
-public interface ServiceLifecycle {
+public interface ServiceLifecycle extends Extension {
/**
* Used for startup or initialization of a service endpoint. A service endpoint instance will be in a ready state
* after this method has been completed.
diff --git a/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java b/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java
new file mode 100644
index 00000000..b5b9d490
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ *
+ * A {@code Optional} is an annotation to mark some certain methods ,interfaces and etc. this annotation represented
+ * these methods or interfaces are not mandatory in OpenMessaging.
+ *
+ *
+ *
+ * If these methods or interfaces adopted by more and more vendors and end users, they may be become the mandatory
+ * interface in the future. Of course, if they are used very little, they may be removed.
+ *
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.PACKAGE, ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface Optional {
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java
index 96f202d3..bcf0c838 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java
@@ -17,13 +17,13 @@
package io.openmessaging.consumer;
-import io.openmessaging.BatchMessage;
-import io.openmessaging.Message;
import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.message.Message;
+import java.util.List;
/**
- * A message listener can implement this {@code BathMessageListener} interface and register itself to a consumer instance
- * to asynchronously receive messages.
+ * A message listener can implement this {@code BathMessageListener} interface and register itself to a consumer
+ * instance to asynchronously receive messages.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
@@ -36,8 +36,7 @@ public interface BatchMessageListener {
*
* @param batchMessage the received batchMessage.
*/
- void onReceived(BatchMessage batchMessage, Context context);
-
+ void onReceived(List batchMessage, Context context);
interface Context {
/**
@@ -47,9 +46,10 @@ interface Context {
*
* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
*/
- void success(Message... messages);
+ void success(MessageReceipt... messages);
+
/**
- * Acknowledges messages, which is related to this {@code MessageContext}.
+ * Acknowledges all messages in this batch, which is related to this {@code MessageContext}.
*
*
* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java
index fb99f97d..4391dea9 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java
@@ -17,7 +17,7 @@
package io.openmessaging.consumer;
-import io.openmessaging.Message;
+import io.openmessaging.Client;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSDestinationException;
@@ -25,7 +25,7 @@
import io.openmessaging.exception.OMSSecurityException;
import io.openmessaging.exception.OMSTimeOutException;
import io.openmessaging.interceptor.ConsumerInterceptor;
-
+import io.openmessaging.message.Message;
import java.util.List;
/**
@@ -33,10 +33,10 @@
* PushConsumer} client.
*
* @version OMS 1.0.0
- * @see MessagingAccessPoint#createConsumer().
+ * @see MessagingAccessPoint#createConsumer()
* @since OMS 1.0.0
*/
-public interface Consumer extends ServiceLifecycle {
+public interface Consumer extends ServiceLifecycle, Client {
/**
* Resumes the {@code Consumer} in push model after a suspend.
@@ -114,8 +114,8 @@ public interface Consumer extends ServiceLifecycle {
/**
* Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}.
*
- * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered messages is
- * coming.
+ * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered
+ * messages is coming.
*
* @param queueName a specified queue.
* @param listener a specified listener to receive new messages.
@@ -177,10 +177,9 @@ public interface Consumer extends ServiceLifecycle {
Message receive(long timeout);
/**
- * Receives the next batch messages from the bind queues of this consumer in pull model.
+ * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic
+ * should implement in the {@link MessageListener}.
*
- * This call blocks indefinitely until the messages is arrives, the timeout expires, or until this {@code PullConsumer}
- * is shut down.
*
* @param timeout receive messages will blocked at most timeout milliseconds.
* @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down.
@@ -196,7 +195,8 @@ public interface Consumer extends ServiceLifecycle {
*
* Messages that have been received but not acknowledged may be redelivered.
*
- * @param receiptHandle the receipt handle associated with the consumed message.
+ * @param receipt the receipt handle associated with the consumed message.
*/
- void ack(String receiptHandle);
+ void ack(MessageReceipt receipt);
+
}
\ No newline at end of file
diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java
index 97a6f414..81a3fa8e 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java
@@ -17,7 +17,7 @@
package io.openmessaging.consumer;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
import io.openmessaging.exception.OMSRuntimeException;
/**
diff --git a/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageReceipt.java
similarity index 80%
rename from openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java
rename to openmessaging-api/src/main/java/io/openmessaging/consumer/MessageReceipt.java
index 718936b7..26a75395 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageReceipt.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
+package io.openmessaging.consumer;
-package io.openmessaging;
-
-import java.util.List;
-
-public interface BatchMessage {
- /**
- * @return all messages in this {@code BatchMessage}
- */
- List messages();
+/**
+ * A {@code MessageReceipt} is a {@code Message} with a {@code Receipt}.
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+public interface MessageReceipt {
}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/exception/OMSUnsupportException.java b/openmessaging-api/src/main/java/io/openmessaging/exception/OMSUnsupportException.java
new file mode 100644
index 00000000..5720d452
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/exception/OMSUnsupportException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.exception;
+
+import io.openmessaging.annotation.Optional;
+
+/**
+ * The {@code OMSUnsupportException} must be thrown when the specified methods, headers or properties have not been
+ * provided by vendors, these methods or headers are usually marked by {@link Optional}.
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+public class OMSUnsupportException extends OMSRuntimeException {
+ /**
+ * @see OMSUnsupportException#OMSUnsupportException(int, String)
+ */
+ public OMSUnsupportException(int errorCode, String message) {
+ super(errorCode, message);
+ }
+
+ /**
+ * @see OMSUnsupportException#OMSUnsupportException(int, Throwable)
+ */
+ public OMSUnsupportException(int errorCode, Throwable cause) {
+ super(errorCode, cause);
+ }
+
+ /**
+ * @see OMSUnsupportException#OMSUnsupportException(int, String, Throwable)
+ */
+ public OMSUnsupportException(int errorCode, String message, Throwable cause) {
+ super(errorCode, message, cause);
+ }
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java b/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java
new file mode 100644
index 00000000..f6ea1254
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.extension;
+
+import io.openmessaging.annotation.Optional;
+import io.openmessaging.exception.OMSDestinationException;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.exception.OMSSecurityException;
+import io.openmessaging.exception.OMSTimeOutException;
+
+/**
+ *
+ * This interface contains some methods are used for getting configurations related implementation. but this interface
+ * are not mandatory.
+ *
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+@Optional
+public interface Extension {
+
+ /**
+ * This method used for getting the related queue's meta data, and this method is optional, vendors may not provide
+ * this method based on their implementation.
+ *
+ *
+ * @param queueName Queue name, message destination.
+ * @return {@link QueueMetaData} Queue config in the server
+ * @throws OMSSecurityException when have no authority to send messages to a given destination.
+ * @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
+ * @throws OMSDestinationException when have no given destination in the server.
+ * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
+ */
+ QueueMetaData getQueueMetaData(String queueName);
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java
new file mode 100644
index 00000000..20e2b32e
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.extension;
+
+import io.openmessaging.annotation.Optional;
+import io.openmessaging.message.Message;
+
+/**
+ *
+ * The {@code ExtensionHeader} interface contains extended properties for common implementations in current messaging
+ * and streaming field, such as the queue-based partitioning implementation, but the related properties in this
+ * interface are not mandatory.
+ *
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+@Optional
+public interface ExtensionHeader {
+ /**
+ * The {@code PARTITION} in extension header field contains the partition of target destination which the message
+ * is being sent.
+ *
+ *
+ * When a {@link Message} is set with this value, this message will be delivered to specified partition, but the
+ * premise is that the implementation of the server side is dependent on the partition or a queue-like storage
+ * mechanism.
+ *
+ *
+ * @param partition The specified partition will be sent to.
+ */
+ ExtensionHeader setPartition(int partition);
+
+ /**
+ * This method is only called by the server. and {@code OFFSET} represents this message offset in partition.
+ *
+ *
+ * @param offset The offset in the current partition, used to quickly get this message in the queue
+ */
+ ExtensionHeader setOffset(long offset);
+
+ /**
+ * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link a
+ * response message with its request message.
+ */
+ ExtensionHeader setCorrelationId(String correlationId);
+
+ /**
+ * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a transaction.
+ *
+ * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, and
+ * consumer received message also contains this field.
+ */
+ ExtensionHeader setTransactionId(String transactionId);
+
+ /**
+ * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side.
+ *
+ * When a message is sent, STORE_TIMESTAMP is ignored.
+ *
+ * When the send method returns it contains a server-assigned value.
+ *
+ * This filed is a {@code long} value, measured in milliseconds.
+ */
+ ExtensionHeader setStoreTimestamp(long storeTimestamp);
+
+ /**
+ * The {@code STORE_HOST} header field contains the store host info of a message in server side.
+ *
+ * When a message is sent, STORE_HOST is ignored.
+ *
+ * When the send method returns it contains a server-assigned value.
+ */
+ ExtensionHeader setStoreHost(String storeHost);
+
+ /**
+ * The {@code messagekey} header field contains the custom key of a message.
+ *
+ * This key is a customer identifier for a class of messages, and this key may be used for server to hash or
+ * dispatch messages, or even can use this key to implement order message.
+ *
+ */
+ ExtensionHeader setMessageKey(String messageKey);
+
+ /**
+ * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique
+ * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at where,
+ * and received by who.
+ *
+ * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can be
+ * used to trace the whole call link with other parts in the whole system.
+ */
+ ExtensionHeader setTraceId(String traceId);
+
+ /**
+ * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds.
+ *
+ * The message will be delivered after delayTime milliseconds starting from {@code BORN_TIMESTAMP} . When this filed
+ * isn't set explicitly, this means this message should be delivered immediately.
+ */
+ ExtensionHeader setDelayTime(long delayTime);
+
+ /**
+ * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value.
+ *
+ * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the
+ * EXPIRE_TIME field is specified as zero, that indicates the message does not expire.
+ *
+ *
+ * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not define a
+ * notification of message expiration.
+ *
+ */
+ ExtensionHeader setExpireTime(long expireTime);
+
+ /**
+ * This method will return the partition of this message belongs.
+ *
+ *
+ * @return The {@code PARTITION} to which the message belongs
+ */
+ int getPartiton();
+
+ /**
+ * This method will return the {@code OFFSET} in the partition to which the message belongs to, but the premise is
+ * that the implementation of the server side is dependent on the partition or a queue-like storage mechanism.
+ *
+ * @return The offset of the partition to which the message belongs.
+ */
+ long getOffset();
+
+ /**
+ * See {@link ExtensionHeader#setCorrelationId(String)}
+ *
+ * @return correlationId
+ */
+ String getCorrelationId();
+
+ /**
+ * See {@link ExtensionHeader#setTransactionId(String)}
+ *
+ * @return transactionId
+ */
+ String getTransactionId();
+
+ /**
+ * See {@link ExtensionHeader#setStoreTimestamp(long)}
+ *
+ * @return storeTimestamp
+ */
+ long getStoreTimestamp();
+
+ /**
+ * See {@link ExtensionHeader#setStoreHost(String)}
+ *
+ * @return storeHost
+ */
+ String getStoreHost();
+
+ /**
+ * See {@link ExtensionHeader#setDelayTime(long)}
+ *
+ * @return delayTime
+ */
+ long getDelayTime();
+
+ /**
+ * See {@link ExtensionHeader#setExpireTime(long)}
+ *
+ * @return expireTime
+ */
+ long getExpireTime();
+
+ /**
+ * See {@link ExtensionHeader#setMessageKey(String)}
+ *
+ * @return messageKey
+ */
+ String getMessageKey();
+
+ /**
+ * See {@link ExtensionHeader#setTraceId(String)}
+ *
+ * @return traceId
+ */
+ String getTraceId();
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java
new file mode 100644
index 00000000..22f9146a
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.extension;
+
+import io.openmessaging.annotation.Optional;
+import java.util.List;
+
+/**
+ * This interface {@code QueueMetaData} contains methods are used for getting configurations related some certain
+ * implementation. but this interface are not mandatory.
+ *
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+@Optional
+public interface QueueMetaData {
+
+ /**
+ * In order to improve performance, in some scenarios where message persistence is required, some message middleware
+ * will store messages on multiple partitions in multi servers.
+ *
+ * In some scenarios, it is very useful to get the relevant partitions meta data for a queue.
+ */
+ interface Partition {
+ /**
+ * Partition identifier
+ *
+ * @return Partition identifier
+ */
+ int partitionId();
+
+ /**
+ * The host of the server where the partition is located
+ *
+ *
+ * @return The host of the server where the partition is located
+ */
+ String partitonHost();
+ }
+
+ /**
+ * Queue name
+ *
+ *
+ * @return Queue name.
+ */
+ String queueName();
+
+ /**
+ * Get partition list belongs to the {@code queueName}
+ *
+ * @return List of {@link Partition} belongs to the specified queue.
+ */
+ List partitions();
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java
index 440974d2..98aba203 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java
@@ -17,7 +17,7 @@
package io.openmessaging.interceptor;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
import io.openmessaging.consumer.MessageListener;
/**
diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java
index eaefde35..e41f60a0 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java
@@ -17,13 +17,12 @@
package io.openmessaging.interceptor;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
/**
* A {@code ProducerInterceptor} is used to intercept send operations of producer.
*
- * The interceptor is able to view or modify the message being transmitted and collect
- * the send record.
+ * The interceptor is able to view or modify the message being transmitted and collect the send record.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
@@ -46,5 +45,5 @@ public interface ProducerInterceptor {
* @param attributes the extensible attributes delivered to the intercept thread.
*/
void postSend(Message message, Context attributes);
-
+
}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java
index be121b9b..67eb0b96 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java
@@ -31,6 +31,25 @@
public class DefaultKeyValue implements KeyValue {
private Map properties;
+ @Override
+ public KeyValue put(String key, boolean value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public boolean getBoolean(String key) {
+ if (!properties.containsKey(key)) {
+ return false;
+ }
+ return Boolean.valueOf(properties.get(key));
+ }
+
+ @Override
+ public boolean getBoolean(String key, boolean defaultValue) {
+ return properties.containsKey(key) ? getBoolean(key) : defaultValue;
+ }
+
@Override
public short getShort(String key) {
if (!properties.containsKey(key)) {
@@ -86,11 +105,6 @@ public int getInt(String key) {
return Integer.valueOf(properties.get(key));
}
- @Override
- public int getInt(final String key, final int defaultValue) {
- return properties.containsKey(key) ? getInt(key) : defaultValue;
- }
-
@Override
public long getLong(String key) {
if (!properties.containsKey(key)) {
@@ -112,21 +126,11 @@ public double getDouble(String key) {
return Double.valueOf(properties.get(key));
}
- @Override
- public double getDouble(final String key, final double defaultValue) {
- return properties.containsKey(key) ? getDouble(key) : defaultValue;
- }
-
@Override
public String getString(String key) {
return properties.get(key);
}
- @Override
- public String getString(final String key, final String defaultValue) {
- return properties.containsKey(key) ? getString(key) : defaultValue;
- }
-
@Override
public Set keySet() {
return properties.keySet();
diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java
new file mode 100644
index 00000000..1017c687
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.message;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.extension.ExtensionHeader;
+
+/**
+ * The {@code Header} interface is the root interface of all OMS messages, and the most commonly used by OMS message
+ * {@link Message}.
+ *
+ * The header contains fields used by the messaging system that describes the message's meta information, while the body
+ * contains the application data being transmitted.
+ *
+ * As for the message header, OMS defines three kinds types: headers {@link Header} {@link ExtensionHeader} and
+ * properties {@link KeyValue}, with respect to flexibility in vendor implementation and user usage.
+ *
+ * -
+ * System Headers, OMS defines some standard attributes that represent the characteristics of the message.
+ *
+ *
+ *
+ * The body contains the application data being transmitted, which is generally ignored by the messaging system and
+ * simply transmitted to its destination.
+ *
+ *
+ * The header part is placed in the implementation classes of {@code Message}.
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+public interface Header {
+ /**
+ * The {@code DESTINATION} header field contains the destination to which the message is being sent.
+ *
+ * When a message is set to the {@code Queue}, then the message will be sent to the specified destination.
+ *
+ * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in.
+ */
+ Header setDestination(String destination);
+
+ /**
+ * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code
+ * Producer}. this identifier is generated by producer.
+ */
+ Header setMessageId(String messageId);
+
+ /**
+ * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to be
+ * sent.
+ *
+ * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message in
+ * client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this value.
+ *
+ * When a message is received its, BORN_TIMESTAMP header field contains this same value.
+ *
+ * This filed is a {@code long} value, measured in milliseconds.
+ */
+ Header setBornTimestamp(long bornTimestamp);
+
+ /**
+ * The {@code BORN_HOST} header field contains the born host info of a message in client side.
+ *
+ * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the
+ * message's BORN_HOST header field contains this value.
+ *
+ * When a message is received, its BORN_HOST header field contains this same value.
+ */
+ Header setBornHost(String bornHost);
+
+ /**
+ * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority
+ * value should be delivered preferentially.
+ *
+ * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default
+ * priority is 5. The priority beyond this region will be ignored.
+ *
+ * OMS does not require or provide any guarantee that the message should be delivered in priority order strictly,
+ * but the vendor should provide a best effort to deliver expedited messages ahead of normal messages.
+ *
+ * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority.
+ */
+ Header setPriority(short priority);
+
+ /**
+ * The {@code DURABILITY} header field contains the persistent level of a message, the vendor should guarantee the
+ * reliability level for a message.
+ *
+ * OMS defines two modes of message delivery:
+ *
+ * -
+ * PERSISTENT, the persistent mode instructs the vendor should provide stable storage to ensure the message won't be
+ * lost.
+ *
+ * -
+ * NON_PERSISTENT, this mode does not require the message be logged to stable storage, in most cases, the memory
+ * storage is enough for better performance and lower cost.
+ *
+ *
+ */
+ Header setDurability(short durability);
+
+ /**
+ * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message delivery.
+ */
+ Header setDeliveryCount(int deliveryCount);
+
+ /**
+ * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to
+ * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user.
+ */
+ Header setCompression(short compression);
+
+ /**
+ * See {@link Header#setDestination(String)}
+ *
+ * @return destination
+ */
+ String getDestination();
+
+ /**
+ * See {@link Header#setMessageId(String)}
+ *
+ * @return messageId
+ */
+ String getMessageId();
+
+ /**
+ * See {@link Header#setBornTimestamp(long)}
+ *
+ * @return bornTimestamp
+ */
+ long getBornTimestamp();
+
+ /**
+ * See {@link Header#setBornHost(String)}
+ *
+ * @return bornHost
+ */
+ String getBornHost();
+
+ /**
+ * See {@link Header#setPriority(short)}
+ *
+ * @return priority
+ */
+ short getPriority();
+
+ /**
+ * See {@link Header#setDurability(short)}
+ *
+ * @return durability
+ */
+ short getDurability();
+
+ /**
+ * See {@link Header#setDeliveryCount(int)}
+ *
+ * @return deliveryCount
+ */
+ int getDeliveryCount();
+
+ /**
+ * See {@link Header#setCompression(short)}
+ *
+ * @return compression
+ */
+ short getCompression();
+}
diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java
new file mode 100644
index 00000000..e4705c62
--- /dev/null
+++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.message;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.consumer.BatchMessageListener;
+import io.openmessaging.consumer.Consumer;
+import io.openmessaging.consumer.MessageListener;
+import io.openmessaging.consumer.MessageReceipt;
+import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.extension.ExtensionHeader;
+import java.util.Optional;
+
+/**
+ * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is
+ * {@link Message}.
+ *
+ * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and
+ * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ.
+ *
+ * The header contains fields used by the messaging system that describes the message's meta information, while the body
+ * contains the application data being transmitted.
+ *
+ * As for the message header, OMS defines three kinds types: headers {@link Header} {@link ExtensionHeader} and
+ * properties {@link KeyValue}, with respect to flexibility in vendor implementation and user usage.
+ *
+ * -
+ * System Headers, OMS defines some standard attributes that represent the characteristics of the message.
+ *
+ * -
+ * User properties, some OMS vendors may require enhanced extra attributes of the message or some users may want to
+ * clarify some customized attributes to draw the body. OMS provides the improved scalability for these scenarios.
+ *
+ *
+ * The body contains the application data being transmitted, which is generally ignored by the messaging system and
+ * simply transmitted to its destination.
+ *
+ * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the
+ * messaging system. The application is responsible for explaining the concrete content and format of the message body,
+ * OMS is never aware of that.
+ *
+ * The body part is placed in the implementation classes of {@code Message}.
+ *
+ * @version OMS 1.0.0
+ * @since OMS 1.0.0
+ */
+public interface Message {
+ /**
+ * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}.
+ *
+ * @return the system headers of a {@code Message}
+ */
+ Header header();
+
+ /**
+ * This interface is optional, Therefore, users need to check whether the interface is implemented and the
+ * correctness of its implementation.
+ *
+ *
+ * @return The implementation of {@link ExtensionHeader}
+ */
+ @io.openmessaging.annotation.Optional
+ Optional extensionHeader();
+
+ /**
+ * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}.
+ *
+ * @return the user properties of a {@code Message}
+ */
+ KeyValue properties();
+
+ /**
+ * Get data from message body
+ *
+ * @return message body
+ * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type
+ */
+ byte[] getData();
+
+ /**
+ * Set data to message body
+ *
+ * @param data set message body in binary stream
+ */
+ void setData(byte[] data);
+
+ /**
+ * Get the {@code MessageReceipt} of this Message, which will be used to acknowledge this message.
+ *
+ * @see Consumer#ack(io.openmessaging.consumer.MessageReceipt)
+ * @see MessageListener.Context#ack()
+ * @see BatchMessageListener.Context#success(io.openmessaging.consumer.MessageReceipt...)
+ */
+ MessageReceipt getMessageReceipt();
+
+}
\ No newline at end of file
diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java
similarity index 95%
rename from openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java
rename to openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java
index 536f8ce1..4902d4b6 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package io.openmessaging;
+package io.openmessaging.message;
import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.message.Message;
/**
* A factory interface for creating {@code Message} objects.
diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java
index 089e4efb..3dbc10d4 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java
@@ -17,10 +17,9 @@
package io.openmessaging.producer;
+import io.openmessaging.Client;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
-import io.openmessaging.Message;
-import io.openmessaging.MessageFactory;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSDestinationException;
@@ -30,6 +29,8 @@
import io.openmessaging.exception.OMSTimeOutException;
import io.openmessaging.exception.OMSTransactionException;
import io.openmessaging.interceptor.ProducerInterceptor;
+import io.openmessaging.message.Message;
+import io.openmessaging.message.MessageFactory;
import java.util.List;
/**
@@ -49,11 +50,11 @@
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
-public interface Producer extends MessageFactory, ServiceLifecycle {
+public interface Producer extends MessageFactory, ServiceLifecycle, Client {
/**
* Sends a message to the specified destination synchronously, the destination should be preset to {@link
- * Message#headers()}, other header fields as well.
+ * Message#header()}, other header fields as well.
*
* @param message a message will be sent.
* @return the successful {@code SendResult}.
@@ -67,7 +68,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle {
/**
* Sends a message to the specified destination asynchronously, the destination should be preset to {@link
- * Message#headers()}, other header fields as well.
+ * Message#header()}, other header fields as well.
*
* The returned {@code Promise} will have the result once the operation completes, and the registered {@code
* FutureListener} will be notified, either because the operation was successful or because of an error.
@@ -98,7 +99,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle {
/**
* Send messages to the specified destination asynchronously, the destination should be preset to {@link
- * Message#headers()}, other header fields as well.
+ * Message#header()}, other header fields as well.
*
* The returned {@code Promise} will have the result once the operation completes, and the registered {@code
* FutureListener} will be notified, either because the operation was successful or because of an error.
@@ -108,14 +109,14 @@ public interface Producer extends MessageFactory, ServiceLifecycle {
* @see Future
* @see FutureListener
*/
- Future> sendAsync(List messages);
+ Future sendAsync(List messages);
/**
*
* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't care about the
* send result and also have no context to get the result.
*
- * @param messages a batch message will be sent.
+ * @param messages a batch message will be sent.
*/
void sendOneway(List messages);
@@ -127,7 +128,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle {
void addInterceptor(ProducerInterceptor interceptor);
/**
- * Removes a {@code ProducerInterceptor}.
+ * Remove a {@code ProducerInterceptor}.
*
* @param interceptor a producer interceptor will be removed.
*/
@@ -135,7 +136,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle {
/**
* Sends a transactional message to the specified destination synchronously, the destination should be preset to
- * {@link Message#headers()}, other header fields as well.
+ * {@link Message#header()}, other header fields as well.
*
* A transactional send result will be exposed to consumer if this prepare message send success, and then, you can
* execute your local transaction, when local transaction execute success, users can use {@link
@@ -150,7 +151,9 @@ public interface Producer extends MessageFactory, ServiceLifecycle {
* @throws OMSTimeOutException when the given timeout elapses before the send operation completes.
* @throws OMSDestinationException when have no given destination in the server.
* @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error.
- * @throws OMSTransactionException when used normal producer which haven't register {@link TransactionStateCheckListener}.
+ * @throws OMSTransactionException when used normal producer which haven't register {@link
+ * TransactionStateCheckListener}.
*/
TransactionalResult prepare(Message message);
+
}
\ No newline at end of file
diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java
index 99665544..efa0d5f1 100644
--- a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java
+++ b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java
@@ -17,7 +17,7 @@
package io.openmessaging.producer;
-import io.openmessaging.Message;
+import io.openmessaging.message.Message;
/**
* Each executor will be associated with a transactional message, can be used to execute local transaction branch and
diff --git a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java
index 14cb8d0b..efee3560 100644
--- a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java
+++ b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java
@@ -23,6 +23,7 @@
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.manager.ResourceManager;
+import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
import org.junit.Test;
@@ -50,6 +51,10 @@ public TestVendor(KeyValue keyValue) {
return null;
}
+ @Override public MessageFactory messageFactory() {
+ return null;
+ }
+
@Override
public String version() {
return OMS.specVersion;
diff --git a/pom.xml b/pom.xml
index d7aaf413..0ef1d346 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
io.openmessaging
parent
- 1.0.0-preview
+ 1.0.0-beta-SNAPSHOT
pom
openmessaging
@@ -51,8 +51,8 @@
UTF-8
- 1.6
- 1.6
+ 1.8
+ 1.8