diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index f4dba4c9..f0a41026 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -17,11 +17,10 @@ package io.openmessaging.samples.consumer; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; -import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.Message; public class PullConsumerApp { public static void main(String[] args) { @@ -29,13 +28,8 @@ public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); - //Fetch a ResourceManager to create Queue resource. - ResourceManager resourceManager = messagingAccessPoint.resourceManager(); - resourceManager.createQueue("NS://HELLO_QUEUE"); - //Start a PullConsumer to receive messages from the specific queue. final Consumer consumer = messagingAccessPoint.createConsumer(); - consumer.start(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @@ -44,10 +38,15 @@ public void run() { consumer.stop(); } })); + consumer.bindQueue("NS://HELLO_QUEUE"); + consumer.start(); + Message message = consumer.receive(1000); System.out.println("Received message: " + message); //Acknowledge the consumed message consumer.ack(message.getMessageReceipt()); + consumer.stop(); + } } diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java index 0fed2032..98a9a2e5 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java @@ -17,12 +17,12 @@ package io.openmessaging.samples.consumer; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.Message; public class PushConsumerApp { public static void main(String[] args) { diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index 2da775fc..c733443c 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -18,11 +18,11 @@ package io.openmessaging.samples.producer; import io.openmessaging.Future; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.interceptor.Context; import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.message.Message; import io.openmessaging.producer.Producer; import io.openmessaging.producer.SendResult; import java.nio.charset.Charset; @@ -35,17 +35,19 @@ public static void main(String[] args) { OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); final Producer producer = messagingAccessPoint.createProducer(); - producer.start(); ProducerInterceptor interceptor = new ProducerInterceptor() { @Override public void preSend(Message message, Context attributes) { + System.out.println("PreSend message: " + message); } @Override public void postSend(Message message, Context attributes) { + System.out.println("PostSend message: " + message); } }; producer.addInterceptor(interceptor); + producer.start(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @@ -55,9 +57,11 @@ public void run() { } })); - //Sends a message to the specified destination synchronously. + //Send a message to the specified destination synchronously. Message message = producer.createMessage( - "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + message.header().setBornHost("127.0.0.1").setDurability((short) 0); + message.extensionHeader().get().setPartition(1); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); @@ -75,6 +79,7 @@ public void run() { Message msg = producer.createMessage("NS://HELLO_QUEUE", ("Hello" + i).getBytes()); messages.add(msg); } + producer.send(messages); producer.removeInterceptor(interceptor); producer.stop(); diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java index f6db2730..bed57642 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java @@ -17,7 +17,7 @@ package io.openmessaging.samples.producer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.producer.Producer; diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java index 3f3a9a66..ba267444 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java @@ -17,7 +17,7 @@ package io.openmessaging.samples.routing; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; diff --git a/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java b/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java deleted file mode 100644 index 718936b7..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package io.openmessaging; - -import java.util.List; - -public interface BatchMessage { - /** - * @return all messages in this {@code BatchMessage} - */ - List messages(); -} diff --git a/openmessaging-api/src/main/java/io/openmessaging/Client.java b/openmessaging-api/src/main/java/io/openmessaging/Client.java new file mode 100644 index 00000000..ed20ebad --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/Client.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging; + +import io.openmessaging.extension.Extension; +import java.util.Optional; + +/** + *

+ * A {@code Client} interface contains all the common behaviors of producer and consumer. which can be used to achieve + * some basic interaction with the server. + *

+ * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public interface Client { + /** + * Get the extension method, and this interface is optional, Therefore, users need to check whether this interface + * has been implemented by vendors. + *

+ * + * @return the implementation of {@link Extension} + */ + @io.openmessaging.annotation.Optional + Optional getExtension(); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java index 2cfb650a..54f47061 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java @@ -84,8 +84,8 @@ public interface KeyValue { KeyValue put(String key, String value); /** - * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is not - * found in this property list, false is returned. + * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is + * not found in this property list, false is returned. * * @param key the property key * @return the value in this {@code KeyValue} object with the specified key value @@ -94,8 +94,8 @@ public interface KeyValue { boolean getBoolean(String key); /** - * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is not - * found in this property list, false is returned. + * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is + * not found in this property list, false is returned. * * @param key the property key * @param defaultValue a default value @@ -135,17 +135,6 @@ public interface KeyValue { */ int getInt(String key); - /** - * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not - * found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, int) - */ - int getInt(String key, int defaultValue); - /** * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not * found in this property list, zero is returned. @@ -177,17 +166,6 @@ public interface KeyValue { */ double getDouble(String key); - /** - * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is - * not found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, double) - */ - double getDouble(String key, double defaultValue); - /** * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is * not found in this property list, {@code null} is returned. @@ -198,17 +176,6 @@ public interface KeyValue { */ String getString(String key); - /** - * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is - * not found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, String) - */ - String getString(String key, String defaultValue); - /** * Returns a {@link Set} view of the keys contained in this {@code KeyValue} object. *

diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java deleted file mode 100644 index db8cca91..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/Message.java +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging; - -import io.openmessaging.consumer.BatchMessageListener; -import io.openmessaging.consumer.Consumer; -import io.openmessaging.consumer.MessageListener; -import io.openmessaging.consumer.MessageReceipt; -import io.openmessaging.exception.OMSMessageFormatException; - -/** - * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is - * {@link Message}. - *

- * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and - * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ. - *

- * The header contains fields used by the messaging system that describes the message's meta information, while the body - * contains the application data being transmitted. - *

- * As for the message header, OMS defines two kinds types: headers {@link Headers} and properties {@link KeyValue}, with - * respect to flexibility in vendor implementation and user usage. - *

- * The body contains the application data being transmitted, which is generally ignored by the messaging system and - * simply transmitted to its destination. - *

- * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the - * messaging system. The application is responsible for explaining the concrete content and format of the message body, - * OMS is never aware of that. - * - * The body part is placed in the implementation classes of {@code Message}. - * - * @version OMS 1.0.0 - * @since OMS 1.0.0 - */ -public interface Message { - - interface Headers { - - /** - * The {@code DESTINATION} header field contains the destination to which the message is being sent. - *

- * When a message is sent this value is set to the right {@code Queue}, then the message will be sent to the - * specified destination. - *

- * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. - */ - Headers setDestination(String destination); - - /** - * The {@code MESSAGE_ID} header field contains a value that uniquely identifies each message sent by a {@code - * Producer}. - */ - Headers setMessageId(String messageId); - - /** - * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to - * be sent. - *

- * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message - * in client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this - * value. - *

- * When a message is received its, BORN_TIMESTAMP header field contains this same value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - Headers setBornTimestamp(long bornTimestamp); - - /** - * The {@code BORN_HOST} header field contains the born host info of a message in client side. - *

- * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the - * message's BORN_HOST header field contains this value. - *

- * When a message is received, its BORN_HOST header field contains this same value. - */ - Headers setBornHost(String bornHost); - - /** - * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. - *

- * When a message is sent, STORE_TIMESTAMP is ignored. - *

- * When the send method returns it contains a server-assigned value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - Headers setStoreTimestamp(long storeTimestamp); - - /** - * The {@code STORE_HOST} header field contains the store host info of a message in server side. - *

- * When a message is sent, STORE_HOST is ignored. - *

- * When the send method returns it contains a server-assigned value. - */ - Headers setStoreHost(String storeHost); - - /** - * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. - *

- * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this - * filed isn't set explicitly, this means this message should be delivered immediately. - */ - Headers setDelayTime(long delayTime); - - /** - * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value. - *

- * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the - * EXPIRE_TIME field is specified as zero, that indicates the message does not expire. - *

- *

- * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not - * define a notification of message expiration. - *

- */ - Headers setExpireTime(long expireTime); - - /** - * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority - * value should be delivered preferentially. - *

- * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default - * priority is 5. The priority beyond this region will be ignored. - *

- * OMS does not require or provide any guarantee that the message should be delivered in priority order - * strictly, but the vendor should provide a best effort to deliver expedited messages ahead of normal - * messages. - *

- * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. - */ - Headers setPriority(short priority); - - /** - * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee - * the reliability level for a message. - *

- * OMS defines two modes of message delivery: - *

- */ - Headers setDurability(short durability); - - /** - * The {@code messagekey} header field contains the custom key of a message. - *

- * This key is a customer identifier for a class of messages, and this key may be used for server to hash or - * dispatch messages, or even can use this key to implement order message. - *

- */ - Headers setMessageKey(String messageKey); - - /** - * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique - * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at - * where, and received by who. - *

- * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can - * be used to trace the whole call link with other parts in the whole system. - */ - Headers setTraceId(String traceId); - - /** - * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message - * delivery. - */ - Headers setDeliveryCount(int deliveryCount); - - /** - * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a - * transaction. - *

- * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, - * and consumer received message also contains this field. - */ - Headers setTransactionId(String transactionId); - - /** - * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link - * a response message with its request message. - */ - Headers setCorrelationId(String correlationId); - - /** - * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to - * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user. - */ - Headers setCompression(short compression); - - /** - * See {@link Headers#setDestination(String)} - * - * @return destination - */ - String getDestination(); - - /** - * See {@link Headers#setMessageId(String)} - * - * @return messageId - */ - String getMessageId(); - - /** - * See {@link Headers#setBornTimestamp(long)} - * - * @return bornTimestamp - */ - long getBornTimestamp(); - - /** - * See {@link Headers#setBornHost(String)} - * - * @return bornHost - */ - String getBornHost(); - - /** - * See {@link Headers#setStoreTimestamp(long)} - * - * @return storeTimestamp - */ - long getStoreTimestamp(); - - /** - * See {@link Headers#setStoreHost(String)} - * - * @return storeHost - */ - String getStoreHost(); - - /** - * See {@link Headers#setDelayTime(long)} - * - * @return delayTime - */ - long getDelayTime(); - - /** - * See {@link Headers#setExpireTime(long)} - * - * @return expireTime - */ - long getExpireTime(); - - /** - * See {@link Headers#setPriority(short)} - * - * @return priority - */ - short getPriority(); - - /** - * See {@link Headers#setDurability(short)} - * - * @return durability - */ - short getDurability(); - - /** - * See {@link Headers#setMessageKey(String)} - * - * @return messageKey - */ - String getMessageKey(); - - /** - * See {@link Headers#setTraceId(String)} - * - * @return traceId - */ - String getTraceId(); - - /** - * See {@link Headers#setDeliveryCount(int)} - * - * @return deliveryCount - */ - int getDeliveryCount(); - - /** - * See {@link Headers#setTransactionId(String)} - * - * @return transactionId - */ - String getTransactionId(); - - /** - * See {@link Headers#setCorrelationId(String)} - * - * @return correlationId - */ - String getCorrelationId(); - - /** - * See {@link Headers#setCompression(short)} - * - * @return compression - */ - short getCompression(); - - } - - /** - * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. - * - * @return the system headers of a {@code Message} - */ - Headers headers(); - - /** - * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. - * - * @return the user properties of a {@code Message} - */ - KeyValue properties(); - - /** - * Get data from message body - * - * @return message body - * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type - */ - byte[] getData(); - - /** - * Set data to message body - * - * @param data set message body in binary stream - */ - void setData(byte[] data); - - /** - * Get the {@code MessageReceipt} of this Message, which will be used to acknowledge this message. - * - * @see Consumer#ack(io.openmessaging.consumer.MessageReceipt) - * @see MessageListener.Context#ack() - * @see BatchMessageListener.Context#success(io.openmessaging.consumer.MessageReceipt...) - */ - MessageReceipt getMessageReceipt(); - -} \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java index 96cf2a20..e99ba2e4 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java +++ b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java @@ -22,6 +22,7 @@ import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; @@ -109,4 +110,13 @@ public interface MessagingAccessPoint { * @throws OMSSecurityException if have no authority to obtain a resource manager. */ ResourceManager resourceManager(); + + /** + * Gets a {@link MessageFactory} instance from the specified {@code MessagingAccessPoint}. + * + * @return the resource manger + * @throws OMSRuntimeException if the {@code MessagingAccessPoint} fails to handle this request due to some internal + * error + */ + MessageFactory messageFactory(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java index 66e90965..eafdb18f 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java +++ b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java @@ -43,10 +43,10 @@ public enum ServiceLifeState { /** * Service is stopping. */ - STOPING, + STOPPING, /** * Service has been stopped. */ - STOPED, + STOPPED, } diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java index 78a17418..3066cd56 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java +++ b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifecycle.java @@ -18,6 +18,7 @@ package io.openmessaging; import io.openmessaging.consumer.Consumer; +import io.openmessaging.extension.Extension; import io.openmessaging.producer.Producer; /** @@ -32,7 +33,7 @@ * @version OMS 1.0.0 * @since OMS 1.0.0 */ -public interface ServiceLifecycle { +public interface ServiceLifecycle extends Extension { /** * Used for startup or initialization of a service endpoint. A service endpoint instance will be in a ready state * after this method has been completed. diff --git a/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java b/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java new file mode 100644 index 00000000..13687903 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +/** + * A {@code MessageReceipt} is a {@code Message} with a {@code Receipt}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.PACKAGE, ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE}) +public @interface Optional { +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java index 54b34688..bcf0c838 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java @@ -17,12 +17,13 @@ package io.openmessaging.consumer; -import io.openmessaging.BatchMessage; import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.message.Message; +import java.util.List; /** - * A message listener can implement this {@code BathMessageListener} interface and register itself to a consumer instance - * to asynchronously receive messages. + * A message listener can implement this {@code BathMessageListener} interface and register itself to a consumer + * instance to asynchronously receive messages. * * @version OMS 1.0.0 * @since OMS 1.0.0 @@ -35,8 +36,7 @@ public interface BatchMessageListener { * * @param batchMessage the received batchMessage. */ - void onReceived(BatchMessage batchMessage, Context context); - + void onReceived(List batchMessage, Context context); interface Context { /** @@ -47,8 +47,9 @@ interface Context { * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ void success(MessageReceipt... messages); + /** - * Acknowledges messages, which is related to this {@code MessageContext}. + * Acknowledges all messages in this batch, which is related to this {@code MessageContext}. *

* * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 85169d96..a791e002 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -17,13 +17,15 @@ package io.openmessaging.consumer; -import io.openmessaging.*; +import io.openmessaging.Client; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSDestinationException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; import io.openmessaging.exception.OMSTimeOutException; import io.openmessaging.interceptor.ConsumerInterceptor; - +import io.openmessaging.message.Message; import java.util.List; /** @@ -34,7 +36,7 @@ * @see MessagingAccessPoint#createConsumer(). * @since OMS 1.0.0 */ -public interface Consumer extends ServiceLifecycle { +public interface Consumer extends ServiceLifecycle, Client { /** * Resumes the {@code Consumer} in push model after a suspend. @@ -112,8 +114,8 @@ public interface Consumer extends ServiceLifecycle { /** * Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}. *

- * {@link BatchMessageListener#onReceived(BatchMessage batchMessage, BatchMessageListener.Context context)} will be called when new delivered messages is - * coming. + * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered + * messages is coming. * * @param queueName a specified queue. * @param listener a specified listener to receive new messages. @@ -175,10 +177,9 @@ public interface Consumer extends ServiceLifecycle { Message receive(long timeout); /** - * Receives the next batch messages from the bind queues of this consumer in pull model. + * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic + * should implement in the {@link MessageListener}. *

- * This call blocks indefinitely until the messages is arrives, the timeout expires, or until this {@code PullConsumer} - * is shut down. * * @param timeout receive messages will blocked at most timeout milliseconds. * @return the next batch messages received from the bind queues, or null if the consumer is concurrently shut down. @@ -197,4 +198,5 @@ public interface Consumer extends ServiceLifecycle { * @param receipt the receipt handle associated with the consumed message. */ void ack(MessageReceipt receipt); + } \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java index 97a6f414..81a3fa8e 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.exception.OMSRuntimeException; /** diff --git a/openmessaging-api/src/main/java/io/openmessaging/exception/OMSUnsupportException.java b/openmessaging-api/src/main/java/io/openmessaging/exception/OMSUnsupportException.java new file mode 100644 index 00000000..5720d452 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/exception/OMSUnsupportException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.exception; + +import io.openmessaging.annotation.Optional; + +/** + * The {@code OMSUnsupportException} must be thrown when the specified methods, headers or properties have not been + * provided by vendors, these methods or headers are usually marked by {@link Optional}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public class OMSUnsupportException extends OMSRuntimeException { + /** + * @see OMSUnsupportException#OMSUnsupportException(int, String) + */ + public OMSUnsupportException(int errorCode, String message) { + super(errorCode, message); + } + + /** + * @see OMSUnsupportException#OMSUnsupportException(int, Throwable) + */ + public OMSUnsupportException(int errorCode, Throwable cause) { + super(errorCode, cause); + } + + /** + * @see OMSUnsupportException#OMSUnsupportException(int, String, Throwable) + */ + public OMSUnsupportException(int errorCode, String message, Throwable cause) { + super(errorCode, message, cause); + } +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java b/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java new file mode 100644 index 00000000..f6ea1254 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/Extension.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.extension; + +import io.openmessaging.annotation.Optional; +import io.openmessaging.exception.OMSDestinationException; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.exception.OMSSecurityException; +import io.openmessaging.exception.OMSTimeOutException; + +/** + *

+ * This interface contains some methods are used for getting configurations related implementation. but this interface + * are not mandatory. + *

+ * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Optional +public interface Extension { + + /** + * This method used for getting the related queue's meta data, and this method is optional, vendors may not provide + * this method based on their implementation. + *

+ * + * @param queueName Queue name, message destination. + * @return {@link QueueMetaData} Queue config in the server + * @throws OMSSecurityException when have no authority to send messages to a given destination. + * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. + * @throws OMSDestinationException when have no given destination in the server. + * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. + */ + QueueMetaData getQueueMetaData(String queueName); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java new file mode 100644 index 00000000..8da0d87f --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.extension; + +import io.openmessaging.annotation.Optional; +import io.openmessaging.message.Message; + +/** + *

+ * The {@code ExtensionHeader} interface contains extended properties for common implementations in current messaging + * and streaming field, such as the queue-based partitioning implementation, but the related properties in this + * interface are not mandatory. + *

+ * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Optional +public interface ExtensionHeader { + /** + * The {@code PARTITION} in extension header field contains the partition of target destination which the message + * is being sent. + *

+ * + * When a {@link Message} is set with this value, this message will be delivered to specified partition, but the + * premise is that the implementation of the server side is dependent on the partition or a queue-like storage + * mechanism. + *

+ * + * @param partition The specified partition will be sent to. + */ + ExtensionHeader setPartition(int partition); + + /** + * This method is only called by the server. and {@Code OFFSET} represents this message offset in partition. + *

+ * + * @param offset The offset in the current partition, used to quickly get this message in the queue + */ + ExtensionHeader setOffset(long offset); + + /** + * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link a + * response message with its request message. + */ + ExtensionHeader setCorrelationId(String correlationId); + + /** + * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a transaction. + *

+ * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, and + * consumer received message also contains this field. + */ + ExtensionHeader setTransactionId(String transactionId); + + /** + * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. + *

+ * When a message is sent, STORE_TIMESTAMP is ignored. + *

+ * When the send method returns it contains a server-assigned value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + ExtensionHeader setStoreTimestamp(long storeTimestamp); + + /** + * The {@code STORE_HOST} header field contains the store host info of a message in server side. + *

+ * When a message is sent, STORE_HOST is ignored. + *

+ * When the send method returns it contains a server-assigned value. + */ + ExtensionHeader setStoreHost(String storeHost); + + /** + * The {@code messagekey} header field contains the custom key of a message. + *

+ * This key is a customer identifier for a class of messages, and this key may be used for server to hash or + * dispatch messages, or even can use this key to implement order message. + *

+ */ + ExtensionHeader setMessageKey(String messageKey); + + /** + * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique + * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at where, + * and received by who. + *

+ * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can be + * used to trace the whole call link with other parts in the whole system. + */ + ExtensionHeader setTraceId(String traceId); + + /** + * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. + *

+ * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this filed + * isn't set explicitly, this means this message should be delivered immediately. + */ + ExtensionHeader setDelayTime(long delayTime); + + /** + * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value. + *

+ * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the + * EXPIRE_TIME field is specified as zero, that indicates the message does not expire. + *

+ *

+ * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not define a + * notification of message expiration. + *

+ */ + ExtensionHeader setExpireTime(long expireTime); + + /** + * This method will return the partition of this message belongs. + *

+ * + * @return The {@code PARTITION} to which the message belongs + */ + int getPartiton(); + + /** + * This method will return the {@Code OFFSET} in the partition to which the message belongs to, but the premise is + * that the implementation of the server side is dependent on the partition or a queue-like storage mechanism. + * + * @return The offset of the partition to which the message belongs. + */ + long getOffset(); + + /** + * See {@link ExtensionHeader#setCorrelationId(String)} + * + * @return correlationId + */ + String getCorrelationId(); + + /** + * See {@link ExtensionHeader#setTransactionId(String)} + * + * @return transactionId + */ + String getTransactionId(); + + /** + * See {@link ExtensionHeader#setStoreTimestamp(long)} + * + * @return storeTimestamp + */ + long getStoreTimestamp(); + + /** + * See {@link ExtensionHeader#setStoreHost(String)} + * + * @return storeHost + */ + String getStoreHost(); + + /** + * See {@link ExtensionHeader#setDelayTime(long)} + * + * @return delayTime + */ + long getDelayTime(); + + /** + * See {@link ExtensionHeader#setExpireTime(long)} + * + * @return expireTime + */ + long getExpireTime(); + + /** + * See {@link ExtensionHeader#setMessageKey(String)} + * + * @return messageKey + */ + String getMessageKey(); + + /** + * See {@link ExtensionHeader#setTraceId(String)} + * + * @return traceId + */ + String getTraceId(); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java new file mode 100644 index 00000000..22f9146a --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.extension; + +import io.openmessaging.annotation.Optional; +import java.util.List; + +/** + * This interface {@code QueueMetaData} contains methods are used for getting configurations related some certain + * implementation. but this interface are not mandatory. + *

+ * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +@Optional +public interface QueueMetaData { + + /** + * In order to improve performance, in some scenarios where message persistence is required, some message middleware + * will store messages on multiple partitions in multi servers. + *

+ * In some scenarios, it is very useful to get the relevant partitions meta data for a queue. + */ + interface Partition { + /** + * Partition identifier + * + * @return Partition identifier + */ + int partitionId(); + + /** + * The host of the server where the partition is located + *

+ * + * @return The host of the server where the partition is located + */ + String partitonHost(); + } + + /** + * Queue name + *

+ * + * @return Queue name. + */ + String queueName(); + + /** + * Get partition list belongs to the {@code queueName} + * + * @return List of {@link Partition} belongs to the specified queue. + */ + List partitions(); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java index 440974d2..98aba203 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java +++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java @@ -17,7 +17,7 @@ package io.openmessaging.interceptor; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.consumer.MessageListener; /** diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java index eaefde35..e41f60a0 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java +++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java @@ -17,13 +17,12 @@ package io.openmessaging.interceptor; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** * A {@code ProducerInterceptor} is used to intercept send operations of producer. *

- * The interceptor is able to view or modify the message being transmitted and collect - * the send record. + * The interceptor is able to view or modify the message being transmitted and collect the send record. * * @version OMS 1.0.0 * @since OMS 1.0.0 @@ -46,5 +45,5 @@ public interface ProducerInterceptor { * @param attributes the extensible attributes delivered to the intercept thread. */ void postSend(Message message, Context attributes); - + } diff --git a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java index 952e17c6..67eb0b96 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java @@ -105,11 +105,6 @@ public int getInt(String key) { return Integer.valueOf(properties.get(key)); } - @Override - public int getInt(final String key, final int defaultValue) { - return properties.containsKey(key) ? getInt(key) : defaultValue; - } - @Override public long getLong(String key) { if (!properties.containsKey(key)) { @@ -131,21 +126,11 @@ public double getDouble(String key) { return Double.valueOf(properties.get(key)); } - @Override - public double getDouble(final String key, final double defaultValue) { - return properties.containsKey(key) ? getDouble(key) : defaultValue; - } - @Override public String getString(String key) { return properties.get(key); } - @Override - public String getString(final String key, final String defaultValue) { - return properties.containsKey(key) ? getString(key) : defaultValue; - } - @Override public Set keySet() { return properties.keySet(); diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java new file mode 100644 index 00000000..1017c687 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.message; + +import io.openmessaging.KeyValue; +import io.openmessaging.extension.ExtensionHeader; + +/** + * The {@code Header} interface is the root interface of all OMS messages, and the most commonly used by OMS message + * {@link Message}. + *

+ * The header contains fields used by the messaging system that describes the message's meta information, while the body + * contains the application data being transmitted. + *

+ * As for the message header, OMS defines three kinds types: headers {@link Header} {@link ExtensionHeader} and + * properties {@link KeyValue}, with respect to flexibility in vendor implementation and user usage. + *

+ * The body contains the application data being transmitted, which is generally ignored by the messaging system and + * simply transmitted to its destination. + *

+ * + * The header part is placed in the implementation classes of {@code Message}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public interface Header { + /** + * The {@code DESTINATION} header field contains the destination to which the message is being sent. + *

+ * When a message is set to the {@code Queue}, then the message will be sent to the specified destination. + *

+ * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. + */ + Header setDestination(String destination); + + /** + * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code + * Producer}. this identifier is generated by producer. + */ + Header setMessageId(String messageId); + + /** + * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to be + * sent. + *

+ * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message in + * client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this value. + *

+ * When a message is received its, BORN_TIMESTAMP header field contains this same value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + Header setBornTimestamp(long bornTimestamp); + + /** + * The {@code BORN_HOST} header field contains the born host info of a message in client side. + *

+ * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the + * message's BORN_HOST header field contains this value. + *

+ * When a message is received, its BORN_HOST header field contains this same value. + */ + Header setBornHost(String bornHost); + + /** + * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority + * value should be delivered preferentially. + *

+ * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default + * priority is 5. The priority beyond this region will be ignored. + *

+ * OMS does not require or provide any guarantee that the message should be delivered in priority order strictly, + * but the vendor should provide a best effort to deliver expedited messages ahead of normal messages. + *

+ * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. + */ + Header setPriority(short priority); + + /** + * The {@code DURABILITY} header field contains the persistent level of a message, the vendor should guarantee the + * reliability level for a message. + *

+ * OMS defines two modes of message delivery: + *

+ */ + Header setDurability(short durability); + + /** + * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message delivery. + */ + Header setDeliveryCount(int deliveryCount); + + /** + * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to + * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user. + */ + Header setCompression(short compression); + + /** + * See {@link Header#setDestination(String)} + * + * @return destination + */ + String getDestination(); + + /** + * See {@link Header#setMessageId(String)} + * + * @return messageId + */ + String getMessageId(); + + /** + * See {@link Header#setBornTimestamp(long)} + * + * @return bornTimestamp + */ + long getBornTimestamp(); + + /** + * See {@link Header#setBornHost(String)} + * + * @return bornHost + */ + String getBornHost(); + + /** + * See {@link Header#setPriority(short)} + * + * @return priority + */ + short getPriority(); + + /** + * See {@link Header#setDurability(short)} + * + * @return durability + */ + short getDurability(); + + /** + * See {@link Header#setDeliveryCount(int)} + * + * @return deliveryCount + */ + int getDeliveryCount(); + + /** + * See {@link Header#setCompression(short)} + * + * @return compression + */ + short getCompression(); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java new file mode 100644 index 00000000..de514276 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.message; + +import io.openmessaging.KeyValue; +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.consumer.Consumer; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.MessageReceipt; +import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.extension.ExtensionHeader; +import java.util.Optional; + +/** + * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is + * {@link Message}. + *

+ * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and + * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ. + *

+ * The header contains fields used by the messaging system that describes the message's meta information, while the body + * contains the application data being transmitted. + *

+ * As for the message header, OMS defines three kinds types: headers {@link Header} {@link ExtensionHeader} and + * properties {@link KeyValue}, with respect to flexibility in vendor implementation and user usage. + *

+ * The body contains the application data being transmitted, which is generally ignored by the messaging system and + * simply transmitted to its destination. + *

+ * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the + * messaging system. The application is responsible for explaining the concrete content and format of the message body, + * OMS is never aware of that. + * + * The body part is placed in the implementation classes of {@code Message}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public interface Message { + /** + * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. + * + * @return the system headers of a {@code Message} + */ + Header header(); + + /** + * This interface is optional, Therefore, users need to check whether the interface is implemented and the + * correctness of its implementation. + *

+ * + * @return The implementation of {@link ExtensionHeader} + */ + Optional extensionHeader(); + + /** + * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. + * + * @return the user properties of a {@code Message} + */ + KeyValue properties(); + + /** + * Get data from message body + * + * @return message body + * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type + */ + byte[] getData(); + + /** + * Set data to message body + * + * @param data set message body in binary stream + */ + void setData(byte[] data); + + /** + * Get the {@code MessageReceipt} of this Message, which will be used to acknowledge this message. + * + * @see Consumer#ack(io.openmessaging.consumer.MessageReceipt) + * @see MessageListener.Context#ack() + * @see BatchMessageListener.Context#success(io.openmessaging.consumer.MessageReceipt...) + */ + MessageReceipt getMessageReceipt(); + +} \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java similarity index 95% rename from openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java rename to openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java index 536f8ce1..4902d4b6 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package io.openmessaging; +package io.openmessaging.message; import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.message.Message; /** * A factory interface for creating {@code Message} objects. diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 089e4efb..3dbc10d4 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -17,10 +17,9 @@ package io.openmessaging.producer; +import io.openmessaging.Client; import io.openmessaging.Future; import io.openmessaging.FutureListener; -import io.openmessaging.Message; -import io.openmessaging.MessageFactory; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSDestinationException; @@ -30,6 +29,8 @@ import io.openmessaging.exception.OMSTimeOutException; import io.openmessaging.exception.OMSTransactionException; import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.message.Message; +import io.openmessaging.message.MessageFactory; import java.util.List; /** @@ -49,11 +50,11 @@ * @version OMS 1.0.0 * @since OMS 1.0.0 */ -public interface Producer extends MessageFactory, ServiceLifecycle { +public interface Producer extends MessageFactory, ServiceLifecycle, Client { /** * Sends a message to the specified destination synchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. * * @param message a message will be sent. * @return the successful {@code SendResult}. @@ -67,7 +68,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a message to the specified destination asynchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. *

* The returned {@code Promise} will have the result once the operation completes, and the registered {@code * FutureListener} will be notified, either because the operation was successful or because of an error. @@ -98,7 +99,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Send messages to the specified destination asynchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. *

* The returned {@code Promise} will have the result once the operation completes, and the registered {@code * FutureListener} will be notified, either because the operation was successful or because of an error. @@ -108,14 +109,14 @@ public interface Producer extends MessageFactory, ServiceLifecycle { * @see Future * @see FutureListener */ - Future> sendAsync(List messages); + Future sendAsync(List messages); /** *

* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't care about the * send result and also have no context to get the result. * - * @param messages a batch message will be sent. + * @param messages a batch message will be sent. */ void sendOneway(List messages); @@ -127,7 +128,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { void addInterceptor(ProducerInterceptor interceptor); /** - * Removes a {@code ProducerInterceptor}. + * Remove a {@code ProducerInterceptor}. * * @param interceptor a producer interceptor will be removed. */ @@ -135,7 +136,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a transactional message to the specified destination synchronously, the destination should be preset to - * {@link Message#headers()}, other header fields as well. + * {@link Message#header()}, other header fields as well. *

* A transactional send result will be exposed to consumer if this prepare message send success, and then, you can * execute your local transaction, when local transaction execute success, users can use {@link @@ -150,7 +151,9 @@ public interface Producer extends MessageFactory, ServiceLifecycle { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSDestinationException when have no given destination in the server. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. - * @throws OMSTransactionException when used normal producer which haven't register {@link TransactionStateCheckListener}. + * @throws OMSTransactionException when used normal producer which haven't register {@link + * TransactionStateCheckListener}. */ TransactionalResult prepare(Message message); + } \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java index 99665544..efa0d5f1 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java @@ -17,7 +17,7 @@ package io.openmessaging.producer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** * Each executor will be associated with a transactional message, can be used to execute local transaction branch and diff --git a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java index 14cb8d0b..efee3560 100644 --- a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java +++ b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java @@ -23,6 +23,7 @@ import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.Consumer; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; import org.junit.Test; @@ -50,6 +51,10 @@ public TestVendor(KeyValue keyValue) { return null; } + @Override public MessageFactory messageFactory() { + return null; + } + @Override public String version() { return OMS.specVersion; diff --git a/pom.xml b/pom.xml index d7aaf413..a706aa59 100644 --- a/pom.xml +++ b/pom.xml @@ -51,8 +51,8 @@ UTF-8 - 1.6 - 1.6 + 1.8 + 1.8