From 674d6864aa39d47963c009db6a70d7737898bf3a Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 31 Jan 2019 17:33:31 +0800 Subject: [PATCH 01/15] Add extension fields --- .../samples/consumer/PullConsumerApp.java | 9 ++-- .../samples/producer/ProducerApp.java | 6 +-- .../main/java/io/openmessaging/KeyValue.java | 42 +------------------ .../main/java/io/openmessaging/Message.java | 4 +- .../io/openmessaging/ServiceLifeState.java | 4 +- .../io/openmessaging/consumer/Consumer.java | 10 +++++ .../extension/ExtensionHeader.java | 32 ++++++++++++++ .../internal/DefaultKeyValue.java | 22 +--------- .../io/openmessaging/producer/Producer.java | 4 +- 9 files changed, 57 insertions(+), 76 deletions(-) create mode 100644 openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index b60f89e0..7d74f441 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -21,7 +21,6 @@ import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; -import io.openmessaging.manager.ResourceManager; public class PullConsumerApp { public static void main(String[] args) { @@ -29,13 +28,9 @@ public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); - //Fetch a ResourceManager to create Queue resource. - ResourceManager resourceManager = messagingAccessPoint.resourceManager(); - resourceManager.createQueue("NS://HELLO_QUEUE"); //Start a PullConsumer to receive messages from the specific queue. final Consumer consumer = messagingAccessPoint.createConsumer(); - consumer.start(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @@ -44,10 +39,14 @@ public void run() { consumer.stop(); } })); + consumer.bindQueue("NS://HELLO_QUEUE"); + consumer.start(); + Message message = consumer.receive(1000); System.out.println("Received message: " + message); //Acknowledge the consumed message consumer.ack(message.headers().getMessageId()); + consumer.stop(); } } diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index 2da775fc..d38a8146 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -35,7 +35,6 @@ public static void main(String[] args) { OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); final Producer producer = messagingAccessPoint.createProducer(); - producer.start(); ProducerInterceptor interceptor = new ProducerInterceptor() { @Override public void preSend(Message message, Context attributes) { @@ -46,6 +45,7 @@ public void postSend(Message message, Context attributes) { } }; producer.addInterceptor(interceptor); + producer.start(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @@ -55,9 +55,9 @@ public void run() { } })); - //Sends a message to the specified destination synchronously. + //Send a message to the specified destination synchronously. Message message = producer.createMessage( - "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); diff --git a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java index d521bb6e..61c70793 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java @@ -82,18 +82,8 @@ public interface KeyValue { * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, short) */ - int getShort(String key); + short getShort(String key); - /** - * Searches for the {@code short} property with the specified key in this {@code KeyValue} object. If the key is not - * found in this property list, zero is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, short) - */ - int getShort(String key, short defaultValue); /** * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not @@ -105,16 +95,6 @@ public interface KeyValue { */ int getInt(String key); - /** - * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not - * found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, int) - */ - int getInt(String key, int defaultValue); /** * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not @@ -147,16 +127,6 @@ public interface KeyValue { */ double getDouble(String key); - /** - * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is - * not found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, double) - */ - double getDouble(String key, double defaultValue); /** * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is @@ -168,16 +138,6 @@ public interface KeyValue { */ String getString(String key); - /** - * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is - * not found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, String) - */ - String getString(String key, String defaultValue); /** * Returns a {@link Set} view of the keys contained in this {@code KeyValue} object. diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java index 9d06f889..ecbb5ec4 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/Message.java @@ -68,8 +68,8 @@ interface Headers { Headers setDestination(String destination); /** - * The {@code MESSAGE_ID} header field contains a value that uniquely identifies each message sent by a {@code - * Producer}. + * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code + * Producer}. this identifier is generated by producer. */ Headers setMessageId(String messageId); diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java index 66e90965..eafdb18f 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java +++ b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java @@ -43,10 +43,10 @@ public enum ServiceLifeState { /** * Service is stopping. */ - STOPING, + STOPPING, /** * Service has been stopped. */ - STOPED, + STOPPED, } diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 948ec068..d894eb62 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -146,6 +146,16 @@ public interface Consumer extends ServiceLifecycle { */ Message receive(long timeout); + /** + * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic + * should implement in the {@link MessageListener}. + * + * @param messageListener {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when + * new delivered message is coming. + * @return + */ + void receiveAsync(MessageListener messageListener); + /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using * manual commit. diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java new file mode 100644 index 00000000..c35b597b --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.extension; + +/** + * This header is extension header + */ +public interface ExtensionHeader { + /** + * Before send message, + * + * @param partition + */ + void setPartition(int partition); + + int getPartiton(); + +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java index e2f6477b..ef4695ba 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java @@ -32,12 +32,7 @@ public class DefaultKeyValue implements KeyValue { private Map properties; @Override - public int getShort(String key) { - return 0; - } - - @Override - public int getShort(String key, short defaultValue) { + public short getShort(String key) { return 0; } @@ -83,11 +78,6 @@ public int getInt(String key) { return Integer.valueOf(properties.get(key)); } - @Override - public int getInt(final String key, final int defaultValue) { - return properties.containsKey(key) ? getInt(key) : defaultValue; - } - @Override public long getLong(String key) { if (!properties.containsKey(key)) { @@ -109,21 +99,11 @@ public double getDouble(String key) { return Double.valueOf(properties.get(key)); } - @Override - public double getDouble(final String key, final double defaultValue) { - return properties.containsKey(key) ? getDouble(key) : defaultValue; - } - @Override public String getString(String key) { return properties.get(key); } - @Override - public String getString(final String key, final String defaultValue) { - return properties.containsKey(key) ? getString(key) : defaultValue; - } - @Override public Set keySet() { return properties.keySet(); diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 8ebd35d5..0daaed7d 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -97,14 +97,14 @@ public interface Producer extends MessageFactory, ServiceLifecycle { void send(List messages); /** - * Adds a {@code ProducerInterceptor} to intercept send operations of producer. + * Add a {@code ProducerInterceptor} to intercept send operations of producer. * * @param interceptor a producer interceptor. */ void addInterceptor(ProducerInterceptor interceptor); /** - * Removes a {@code ProducerInterceptor}. + * Remove a {@code ProducerInterceptor}. * * @param interceptor a producer interceptor will be removed. */ From b8c1329f8949c0f24eab3d51733ad5c2b0d378d2 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Fri, 1 Feb 2019 19:08:46 +0800 Subject: [PATCH 02/15] (1) Polish messsage header attributes based on specification (2) Add extension attributes and interface --- .../samples/consumer/PullConsumerApp.java | 5 +- .../samples/consumer/PushConsumerApp.java | 2 +- .../samples/producer/ProducerApp.java | 6 +- .../producer/TransactionProducerApp.java | 2 +- .../samples/routing/RoutingApp.java | 2 +- .../main/java/io/openmessaging/Message.java | 372 ------------------ .../openmessaging/MessagingAccessPoint.java | 1 + .../consumer/BatchMessageListener.java | 2 +- .../io/openmessaging/consumer/Consumer.java | 2 +- .../consumer/MessageListener.java | 2 +- .../extension/ExtensionHeader.java | 146 ++++++- .../extension/QueueMetaData.java | 9 +- .../interceptor/ConsumerInterceptor.java | 2 +- .../interceptor/ProducerInterceptor.java | 2 +- .../java/io/openmessaging/message/Header.java | 156 ++++++++ .../io/openmessaging/message/Message.java | 97 +++++ .../{ => message}/MessageFactory.java | 3 +- .../io/openmessaging/producer/Producer.java | 4 +- .../TransactionStateCheckListener.java | 2 +- 19 files changed, 417 insertions(+), 400 deletions(-) delete mode 100644 openmessaging-api/src/main/java/io/openmessaging/Message.java create mode 100644 openmessaging-api/src/main/java/io/openmessaging/message/Header.java create mode 100644 openmessaging-api/src/main/java/io/openmessaging/message/Message.java rename openmessaging-api/src/main/java/io/openmessaging/{ => message}/MessageFactory.java (95%) diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index 7d74f441..04bc6ad7 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -17,10 +17,10 @@ package io.openmessaging.samples.consumer; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; +import io.openmessaging.message.Message; public class PullConsumerApp { public static void main(String[] args) { @@ -28,7 +28,6 @@ public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); - //Start a PullConsumer to receive messages from the specific queue. final Consumer consumer = messagingAccessPoint.createConsumer(); @@ -46,7 +45,7 @@ public void run() { Message message = consumer.receive(1000); System.out.println("Received message: " + message); //Acknowledge the consumed message - consumer.ack(message.headers().getMessageId()); + consumer.ack(message.header().getMessageId()); consumer.stop(); } } diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java index 0fed2032..98a9a2e5 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java @@ -17,12 +17,12 @@ package io.openmessaging.samples.consumer; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.Message; public class PushConsumerApp { public static void main(String[] args) { diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index d38a8146..e04082a9 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -18,11 +18,11 @@ package io.openmessaging.samples.producer; import io.openmessaging.Future; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.interceptor.Context; import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.message.Message; import io.openmessaging.producer.Producer; import io.openmessaging.producer.SendResult; import java.nio.charset.Charset; @@ -38,10 +38,12 @@ public static void main(String[] args) { ProducerInterceptor interceptor = new ProducerInterceptor() { @Override public void preSend(Message message, Context attributes) { + System.out.println("PreSend message: " + message); } @Override public void postSend(Message message, Context attributes) { + System.out.println("PostSend message: " + message); } }; producer.addInterceptor(interceptor); @@ -58,6 +60,8 @@ public void run() { //Send a message to the specified destination synchronously. Message message = producer.createMessage( "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + message.header().setBornHost("127.0.0.1").setDurability((short) 0); + message.extentionHeader().setPartition(1); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java index f6db2730..bed57642 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java @@ -17,7 +17,7 @@ package io.openmessaging.samples.producer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.producer.Producer; diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java index 3f3a9a66..ba267444 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java @@ -17,7 +17,7 @@ package io.openmessaging.samples.routing; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java deleted file mode 100644 index 981a1754..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/Message.java +++ /dev/null @@ -1,372 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging; - -import io.openmessaging.exception.OMSMessageFormatException; -import io.openmessaging.extension.ExtensionHeader; - -/** - * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is - * {@link Message}. - *

- * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and - * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ. - *

- * The header contains fields used by the messaging system that describes the message's meta information, while the body - * contains the application data being transmitted. - *

- * As for the message header, OMS defines two kinds types: headers {@link Headers} and properties {@link KeyValue}, with - * respect to flexibility in vendor implementation and user usage. - *

    - *
  • - * System Headers, OMS defines some standard attributes that represent the characteristics of the message. - *
  • - *
  • - * User properties, some OMS vendors may require enhanced extra attributes of the message or some users may want to - * clarify some customized attributes to draw the body. OMS provides the improved scalability for these scenarios. - *
  • - *
- * The body contains the application data being transmitted, which is generally ignored by the messaging system and - * simply transmitted to its destination. - *

- * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the - * messaging system. The application is responsible for explaining the concrete content and format of the message body, - * OMS is never aware of that. - * - * The body part is placed in the implementation classes of {@code Message}. - * - * @version OMS 1.0.0 - * @since OMS 1.0.0 - */ -public interface Message { - - interface Headers { - - /** - * The {@code DESTINATION} header field contains the destination to which the message is being sent. - *

- * When a message is set to the {@code Queue}, then the message will be sent to the specified destination. - *

- * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. - */ - Headers setDestination(String destination); - - /** - * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code - * Producer}. this identifier is generated by producer. - */ - Headers setMessageId(String messageId); - - /** - * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to - * be sent. - *

- * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message - * in client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this - * value. - *

- * When a message is received its, BORN_TIMESTAMP header field contains this same value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - Headers setBornTimestamp(long bornTimestamp); - - /** - * The {@code BORN_HOST} header field contains the born host info of a message in client side. - *

- * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the - * message's BORN_HOST header field contains this value. - *

- * When a message is received, its BORN_HOST header field contains this same value. - */ - Headers setBornHost(String bornHost); - - /** - * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. - *

- * When a message is sent, STORE_TIMESTAMP is ignored. - *

- * When the send method returns it contains a server-assigned value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - Headers setStoreTimestamp(long storeTimestamp); - - /** - * The {@code STORE_HOST} header field contains the store host info of a message in server side. - *

- * When a message is sent, STORE_HOST is ignored. - *

- * When the send method returns it contains a server-assigned value. - */ - Headers setStoreHost(String storeHost); - - /** - * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. - *

- * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this - * filed isn't set explicitly, this means this message should be delivered immediately. - */ - Headers setDelayTime(long delayTime); - - /** - * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value. - *

- * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the - * EXPIRE_TIME field is specified as zero, that indicates the message does not expire. - *

- *

- * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not - * define a notification of message expiration. - *

- */ - Headers setExpireTime(long expireTime); - - /** - * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority - * value should be delivered preferentially. - *

- * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default - * priority is 5. The priority beyond this region will be ignored. - *

- * OMS does not require or provide any guarantee that the message should be delivered in priority order - * strictly, but the vendor should provide a best effort to deliver expedited messages ahead of normal - * messages. - *

- * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. - */ - Headers setPriority(short priority); - - /** - * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee - * the reliability level for a message. - *

- * OMS defines two modes of message delivery: - *

    - *
  • - * PERSISTENT, the persistent mode instructs the vendor should provide stable storage to ensure the message - * won't be lost. - *
  • - *
  • - * NON_PERSISTENT, this mode does not require the message be logged to stable storage, in most cases, the memory - * storage is enough for better performance and lower cost. - *
  • - *
- */ - Headers setDurability(short durability); - - /** - * The {@code messagekey} header field contains the custom key of a message. - *

- * This key is a customer identifier for a class of messages, and this key may be used for server to hash or - * dispatch messages, or even can use this key to implement order message. - *

- */ - Headers setMessageKey(String messageKey); - - /** - * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique - * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at - * where, and received by who. - *

- * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can - * be used to trace the whole call link with other parts in the whole system. - */ - Headers setTraceId(String traceId); - - /** - * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message - * delivery. - */ - Headers setDeliveryCount(int deliveryCount); - - /** - * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a - * transaction. - *

- * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, - * and consumer received message also contains this field. - */ - Headers setTransactionId(String transactionId); - - /** - * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link - * a response message with its request message. - */ - Headers setCorrelationId(String correlationId); - - /** - * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to - * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user. - */ - Headers setCompression(short compression); - - /** - * See {@link Headers#setDestination(String)} - * - * @return destination - */ - String getDestination(); - - /** - * See {@link Headers#setMessageId(String)} - * - * @return messageId - */ - String getMessageId(); - - /** - * See {@link Headers#setBornTimestamp(long)} - * - * @return bornTimestamp - */ - long getBornTimestamp(); - - /** - * See {@link Headers#setBornHost(String)} - * - * @return bornHost - */ - String getBornHost(); - - /** - * See {@link Headers#setStoreTimestamp(long)} - * - * @return storeTimestamp - */ - long getStoreTimestamp(); - - /** - * See {@link Headers#setStoreHost(String)} - * - * @return storeHost - */ - String getStoreHost(); - - /** - * See {@link Headers#setDelayTime(long)} - * - * @return delayTime - */ - long getDelayTime(); - - /** - * See {@link Headers#setExpireTime(long)} - * - * @return expireTime - */ - long getExpireTime(); - - /** - * See {@link Headers#setPriority(short)} - * - * @return priority - */ - short getPriority(); - - /** - * See {@link Headers#setDurability(short)} - * - * @return durability - */ - short getDurability(); - - /** - * See {@link Headers#setMessageKey(String)} - * - * @return messageKey - */ - String getMessageKey(); - - /** - * See {@link Headers#setTraceId(String)} - * - * @return traceId - */ - String getTraceId(); - - /** - * See {@link Headers#setDeliveryCount(int)} - * - * @return deliveryCount - */ - int getDeliveryCount(); - - /** - * See {@link Headers#setTransactionId(String)} - * - * @return transactionId - */ - String getTransactionId(); - - /** - * See {@link Headers#setCorrelationId(String)} - * - * @return correlationId - */ - String getCorrelationId(); - - /** - * See {@link Headers#setCompression(short)} - * - * @return compression - */ - short getCompression(); - - } - - /** - * This interface is optional, Therefore, users need to check whether the interface is implemented and the - * correctness of its implementation. - *

- * - * @return The implementation of {@link ExtensionHeader} - */ - ExtensionHeader extentionHeader(); - - /** - * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. - * - * @return the system headers of a {@code Message} - */ - Headers headers(); - - /** - * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. - * - * @return the user properties of a {@code Message} - */ - KeyValue properties(); - - /** - * Get data from message body - * - * @return message body - * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type - */ - byte[] getData(); - - /** - * Set data to message body - * - * @param data set message body in binary stream - */ - void setData(byte[] data); - -} \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java index a12feb74..e99ba2e4 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java +++ b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java @@ -22,6 +22,7 @@ import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java index 1e316fd7..d4d4bbcf 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.exception.OMSRuntimeException; import java.util.List; diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index b795b76b..a956e80f 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSDestinationException; diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java index 97a6f414..81a3fa8e 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.exception.OMSRuntimeException; /** diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java index b686c2d0..d9ef1bd2 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java @@ -16,7 +16,7 @@ */ package io.openmessaging.extension; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** *

@@ -41,23 +41,97 @@ public interface ExtensionHeader { * * @param partition The specified partition will be sent to. */ - void setPartition(int partition); + ExtensionHeader setPartition(int partition); /** - * This method will return the partition of this message belongs. + * This method is only called by the server. and {@Code OFFSET} represents this message offset in partition. *

* - * @return The {@code PARTITION} to which the message belongs + * @param offset The offset in the current partition, used to quickly get this message in the queue */ - int getPartiton(); + ExtensionHeader setOffset(long offset); /** - * This method is only called by the server. and {@Code OFFSET} represents this message offset in partition. + * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link a + * response message with its request message. + */ + ExtensionHeader setCorrelationId(String correlationId); + + /** + * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a transaction. + *

+ * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, and + * consumer received message also contains this field. + */ + ExtensionHeader setTransactionId(String transactionId); + + /** + * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. + *

+ * When a message is sent, STORE_TIMESTAMP is ignored. + *

+ * When the send method returns it contains a server-assigned value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + ExtensionHeader setStoreTimestamp(long storeTimestamp); + + /** + * The {@code STORE_HOST} header field contains the store host info of a message in server side. + *

+ * When a message is sent, STORE_HOST is ignored. + *

+ * When the send method returns it contains a server-assigned value. + */ + ExtensionHeader setStoreHost(String storeHost); + + /** + * The {@code messagekey} header field contains the custom key of a message. + *

+ * This key is a customer identifier for a class of messages, and this key may be used for server to hash or + * dispatch messages, or even can use this key to implement order message. + *

+ */ + ExtensionHeader setMessageKey(String messageKey); + + /** + * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique + * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at where, + * and received by who. + *

+ * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can be + * used to trace the whole call link with other parts in the whole system. + */ + ExtensionHeader setTraceId(String traceId); + + /** + * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. + *

+ * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this filed + * isn't set explicitly, this means this message should be delivered immediately. + */ + ExtensionHeader setDelayTime(long delayTime); + + /** + * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value. + *

+ * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the + * EXPIRE_TIME field is specified as zero, that indicates the message does not expire. + *

+ *

+ * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not define a + * notification of message expiration. + *

+ */ + ExtensionHeader setExpireTime(long expireTime); + + /** + * This method will return the partition of this message belongs. *

* - * @param offset The offset in the current partition, used to quickly get this message in the queue + * @return The {@code PARTITION} to which the message belongs */ - void setOffset(long offset); + int getPartiton(); /** * This method will return the {@Code OFFSET} in the partition to which the message belongs to, but the premise is @@ -66,4 +140,60 @@ public interface ExtensionHeader { * @return The offset of the partition to which the message belongs. */ long getOffset(); + + /** + * See {@link ExtensionHeader#setCorrelationId(String)} + * + * @return correlationId + */ + String getCorrelationId(); + + /** + * See {@link ExtensionHeader#setTransactionId(String)} + * + * @return transactionId + */ + String getTransactionId(); + + /** + * See {@link ExtensionHeader#setStoreTimestamp(long)} + * + * @return storeTimestamp + */ + long getStoreTimestamp(); + + /** + * See {@link ExtensionHeader#setStoreHost(String)} + * + * @return storeHost + */ + String getStoreHost(); + + /** + * See {@link ExtensionHeader#setDelayTime(long)} + * + * @return delayTime + */ + long getDelayTime(); + + /** + * See {@link ExtensionHeader#setExpireTime(long)} + * + * @return expireTime + */ + long getExpireTime(); + + /** + * See {@link ExtensionHeader#setMessageKey(String)} + * + * @return messageKey + */ + String getMessageKey(); + + /** + * See {@link ExtensionHeader#setTraceId(String)} + * + * @return traceId + */ + String getTraceId(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java index 3bd8173d..5c71b6b2 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java @@ -32,7 +32,7 @@ public interface QueueMetaData { * In order to improve performance, in some scenarios where message persistence is required, some message middleware * will store messages on multiple partitions in multi servers. *

- * In some scenarios, it is very useful to get the relevant metadata for these partitions. + * In some scenarios, it is very useful to get the relevant partitions meta data for a queue. */ interface Partition { /** @@ -44,8 +44,9 @@ interface Partition { /** * The host of the server where the partition is located + *

* - * @return + * @return The host of the server where the partition is located */ String partitonHost(); } @@ -54,14 +55,14 @@ interface Partition { * Queue name *

* - * @return Queue name + * @return Queue name. */ String queueName(); /** * Get partition list belongs to the {@code queueName} * - * @return List of {@link Partition} + * @return List of {@link Partition} belongs to the specified queue. */ List partitions(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java index 440974d2..98aba203 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java +++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java @@ -17,7 +17,7 @@ package io.openmessaging.interceptor; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.consumer.MessageListener; /** diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java index cea68149..e41f60a0 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java +++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java @@ -17,7 +17,7 @@ package io.openmessaging.interceptor; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** * A {@code ProducerInterceptor} is used to intercept send operations of producer. diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java new file mode 100644 index 00000000..325c101d --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.message; + +public interface Header { + /** + * The {@code DESTINATION} header field contains the destination to which the message is being sent. + *

+ * When a message is set to the {@code Queue}, then the message will be sent to the specified destination. + *

+ * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. + */ + Header setDestination(String destination); + + /** + * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code + * Producer}. this identifier is generated by producer. + */ + Header setMessageId(String messageId); + + /** + * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to be + * sent. + *

+ * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message in + * client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this value. + *

+ * When a message is received its, BORN_TIMESTAMP header field contains this same value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + Header setBornTimestamp(long bornTimestamp); + + /** + * The {@code BORN_HOST} header field contains the born host info of a message in client side. + *

+ * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the + * message's BORN_HOST header field contains this value. + *

+ * When a message is received, its BORN_HOST header field contains this same value. + */ + Header setBornHost(String bornHost); + + /** + * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority + * value should be delivered preferentially. + *

+ * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default + * priority is 5. The priority beyond this region will be ignored. + *

+ * OMS does not require or provide any guarantee that the message should be delivered in priority order strictly, + * but the vendor should provide a best effort to deliver expedited messages ahead of normal messages. + *

+ * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. + */ + Header setPriority(short priority); + + /** + * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee the + * reliability level for a message. + *

+ * OMS defines two modes of message delivery: + *

    + *
  • + * PERSISTENT, the persistent mode instructs the vendor should provide stable storage to ensure the message won't be + * lost. + *
  • + *
  • + * NON_PERSISTENT, this mode does not require the message be logged to stable storage, in most cases, the memory + * storage is enough for better performance and lower cost. + *
  • + *
+ */ + Header setDurability(short durability); + + /** + * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message delivery. + */ + Header setDeliveryCount(int deliveryCount); + + /** + * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to + * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user. + */ + Header setCompression(short compression); + + /** + * See {@link Header#setDestination(String)} + * + * @return destination + */ + String getDestination(); + + /** + * See {@link Header#setMessageId(String)} + * + * @return messageId + */ + String getMessageId(); + + /** + * See {@link Header#setBornTimestamp(long)} + * + * @return bornTimestamp + */ + long getBornTimestamp(); + + /** + * See {@link Header#setBornHost(String)} + * + * @return bornHost + */ + String getBornHost(); + + /** + * See {@link Header#setPriority(short)} + * + * @return priority + */ + short getPriority(); + + /** + * See {@link Header#setDurability(short)} + * + * @return durability + */ + short getDurability(); + + /** + * See {@link Header#setDeliveryCount(int)} + * + * @return deliveryCount + */ + int getDeliveryCount(); + + /** + * See {@link Header#setCompression(short)} + * + * @return compression + */ + short getCompression(); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java new file mode 100644 index 00000000..293c21a3 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.message; + +import io.openmessaging.KeyValue; +import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.extension.ExtensionHeader; + +/** + * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is + * {@link Message}. + *

+ * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and + * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ. + *

+ * The header contains fields used by the messaging system that describes the message's meta information, while the body + * contains the application data being transmitted. + *

+ * As for the message header, OMS defines three kinds types: headers {@link Header} {@link ExtensionHeader} and + * properties {@link KeyValue}, with respect to flexibility in vendor implementation and user usage. + *

    + *
  • + * System Headers, OMS defines some standard attributes that represent the characteristics of the message. + *
  • + *
  • + * User properties, some OMS vendors may require enhanced extra attributes of the message or some users may want to + * clarify some customized attributes to draw the body. OMS provides the improved scalability for these scenarios. + *
  • + *
+ * The body contains the application data being transmitted, which is generally ignored by the messaging system and + * simply transmitted to its destination. + *

+ * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the + * messaging system. The application is responsible for explaining the concrete content and format of the message body, + * OMS is never aware of that. + * + * The body part is placed in the implementation classes of {@code Message}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public interface Message { + /** + * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. + * + * @return the system headers of a {@code Message} + */ + Header header(); + + /** + * This interface is optional, Therefore, users need to check whether the interface is implemented and the + * correctness of its implementation. + *

+ * + * @return The implementation of {@link ExtensionHeader} + */ + ExtensionHeader extentionHeader(); + + /** + * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. + * + * @return the user properties of a {@code Message} + */ + KeyValue properties(); + + /** + * Get data from message body + * + * @return message body + * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type + */ + byte[] getData(); + + /** + * Set data to message body + * + * @param data set message body in binary stream + */ + void setData(byte[] data); + +} \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java similarity index 95% rename from openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java rename to openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java index 536f8ce1..4902d4b6 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package io.openmessaging; +package io.openmessaging.message; import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.message.Message; /** * A factory interface for creating {@code Message} objects. diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 3bd854ce..8d8c1524 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -19,8 +19,8 @@ import io.openmessaging.Future; import io.openmessaging.FutureListener; -import io.openmessaging.Message; -import io.openmessaging.MessageFactory; +import io.openmessaging.message.Message; +import io.openmessaging.message.MessageFactory; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSDestinationException; diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java index 99665544..efa0d5f1 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java @@ -17,7 +17,7 @@ package io.openmessaging.producer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** * Each executor will be associated with a transactional message, can be used to execute local transaction branch and From 6eec44a18341a2cfff5ac5227e94802de71c7d8b Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Wed, 20 Feb 2019 20:07:42 +0800 Subject: [PATCH 03/15] Fix some spelling mistakes --- .../io/openmessaging/samples/producer/ProducerApp.java | 2 +- .../src/main/java/io/openmessaging/message/Message.java | 2 +- .../src/main/java/io/openmessaging/producer/Producer.java | 8 ++++---- .../internal/MessagingAccessPointAdapterTest.java | 5 +++++ 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index e04082a9..2fb538c8 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -61,7 +61,7 @@ public void run() { Message message = producer.createMessage( "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); message.header().setBornHost("127.0.0.1").setDurability((short) 0); - message.extentionHeader().setPartition(1); + message.extensionHeader().setPartition(1); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java index 293c21a3..4cf8bdb7 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java @@ -70,7 +70,7 @@ public interface Message { * * @return The implementation of {@link ExtensionHeader} */ - ExtensionHeader extentionHeader(); + ExtensionHeader extensionHeader(); /** * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 8d8c1524..84b0afc2 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -53,7 +53,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a message to the specified destination synchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. * * @param message a message will be sent. * @return the successful {@code SendResult}. @@ -67,7 +67,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a message to the specified destination asynchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. *

* The returned {@code Promise} will have the result once the operation completes, and the registered {@code * FutureListener} will be notified, either because the operation was successful or because of an error. @@ -98,7 +98,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Send messages to the specified destination asynchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. *

* The returned {@code Promise} will have the result once the operation completes, and the registered {@code * FutureListener} will be notified, either because the operation was successful or because of an error. @@ -135,7 +135,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a transactional message to the specified destination synchronously, the destination should be preset to - * {@link Message#headers()}, other header fields as well. + * {@link Message#header()}, other header fields as well. *

* A transactional send result will be exposed to consumer if this prepare message send success, and then, you can * execute your local transaction, when local transaction execute success, users can use {@link diff --git a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java index 14cb8d0b..efee3560 100644 --- a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java +++ b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java @@ -23,6 +23,7 @@ import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.Consumer; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; import org.junit.Test; @@ -50,6 +51,10 @@ public TestVendor(KeyValue keyValue) { return null; } + @Override public MessageFactory messageFactory() { + return null; + } + @Override public String version() { return OMS.specVersion; From dfa20544f485d8114c16fe420b10252f585087a2 Mon Sep 17 00:00:00 2001 From: liyue25 Date: Fri, 22 Feb 2019 12:11:27 +0800 Subject: [PATCH 04/15] A new MessageReceipt interface is introduced to increase the compatibility of the consume message acknowledgement. --- .../samples/consumer/PullConsumerApp.java | 5 ++- ...hMessage.java => BatchConsumeMessage.java} | 4 +- .../java/io/openmessaging/ConsumeMessage.java | 41 +++++++++++++++++++ .../consumer/BatchMessageListener.java | 7 ++-- .../io/openmessaging/consumer/Consumer.java | 14 +++---- .../consumer/MessageReceipt.java | 27 ++++++++++++ 6 files changed, 82 insertions(+), 16 deletions(-) rename openmessaging-api/src/main/java/io/openmessaging/{BatchMessage.java => BatchConsumeMessage.java} (92%) create mode 100644 openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java create mode 100644 openmessaging-api/src/main/java/io/openmessaging/consumer/MessageReceipt.java diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index b60f89e0..819304a3 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -17,6 +17,7 @@ package io.openmessaging.samples.consumer; +import io.openmessaging.ConsumeMessage; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; @@ -45,9 +46,9 @@ public void run() { } })); consumer.bindQueue("NS://HELLO_QUEUE"); - Message message = consumer.receive(1000); + ConsumeMessage message = consumer.receive(1000); System.out.println("Received message: " + message); //Acknowledge the consumed message - consumer.ack(message.headers().getMessageId()); + consumer.ack(message.getMessageRecept()); } } diff --git a/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java b/openmessaging-api/src/main/java/io/openmessaging/BatchConsumeMessage.java similarity index 92% rename from openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java rename to openmessaging-api/src/main/java/io/openmessaging/BatchConsumeMessage.java index 718936b7..ff0ed6a3 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java +++ b/openmessaging-api/src/main/java/io/openmessaging/BatchConsumeMessage.java @@ -20,9 +20,9 @@ import java.util.List; -public interface BatchMessage { +public interface BatchConsumeMessage { /** * @return all messages in this {@code BatchMessage} */ - List messages(); + List messages(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java b/openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java new file mode 100644 index 00000000..3e36f30e --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging; + +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.consumer.Consumer; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.MessageReceipt; + +/** + * + * A {@code ConsumeMessage} is a {@code Message} with a {@code MessageReceipt}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public interface ConsumeMessage extends Message { + /** + * Get the {@code MessageReceipt} of this Message, which will be used to acknowledge this message. + * + * @see Consumer#ack(io.openmessaging.consumer.MessageReceipt) + * @see MessageListener.Context#ack() + * @see BatchMessageListener.Context#success(io.openmessaging.consumer.MessageReceipt...) + */ + MessageReceipt getMessageRecept(); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java index 96f202d3..1a7c8d26 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java @@ -17,8 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.BatchMessage; -import io.openmessaging.Message; +import io.openmessaging.BatchConsumeMessage; import io.openmessaging.exception.OMSRuntimeException; /** @@ -36,7 +35,7 @@ public interface BatchMessageListener { * * @param batchMessage the received batchMessage. */ - void onReceived(BatchMessage batchMessage, Context context); + void onReceived(BatchConsumeMessage batchMessage, Context context); interface Context { @@ -47,7 +46,7 @@ interface Context { * * @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error. */ - void success(Message... messages); + void success(MessageReceipt... messages); /** * Acknowledges messages, which is related to this {@code MessageContext}. *

diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index fb99f97d..2d320965 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -17,9 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.ServiceLifecycle; +import io.openmessaging.*; import io.openmessaging.exception.OMSDestinationException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; @@ -114,7 +112,7 @@ public interface Consumer extends ServiceLifecycle { /** * Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}. *

- * {@link BatchMessageListener#onReceived(List, BatchMessageListener.Context)} will be called when new delivered messages is + * {@link BatchMessageListener#onReceived(BatchConsumeMessage batchMessage, BatchMessageListener.Context context)} will be called when new delivered messages is * coming. * * @param queueName a specified queue. @@ -174,7 +172,7 @@ public interface Consumer extends ServiceLifecycle { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - Message receive(long timeout); + ConsumeMessage receive(long timeout); /** * Receives the next batch messages from the bind queues of this consumer in pull model. @@ -188,7 +186,7 @@ public interface Consumer extends ServiceLifecycle { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - List batchReceive(long timeout); + List batchReceive(long timeout); /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using @@ -196,7 +194,7 @@ public interface Consumer extends ServiceLifecycle { *

* Messages that have been received but not acknowledged may be redelivered. * - * @param receiptHandle the receipt handle associated with the consumed message. + * @param receipt the receipt handle associated with the consumed message. */ - void ack(String receiptHandle); + void ack(MessageReceipt receipt); } \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageReceipt.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageReceipt.java new file mode 100644 index 00000000..26a75395 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageReceipt.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.consumer; + +/** + * A {@code MessageReceipt} is a {@code Message} with a {@code Receipt}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public interface MessageReceipt { +} From 661a77ffb754e90ac34abd4f35c2daf490cea173 Mon Sep 17 00:00:00 2001 From: liyue25 Date: Mon, 25 Feb 2019 10:12:29 +0800 Subject: [PATCH 05/15] Move method io.openmessaging.ConsumeMessage.getMessageReceipt() to io.openmessaging.Message.getMessageReceipt(). Delete interface io.openmessaging.ConsumeMessage. Rename class BatchConsumeMessage to BatchMessage back. --- .../samples/consumer/PullConsumerApp.java | 5 +-- ...hConsumeMessage.java => BatchMessage.java} | 4 +- .../java/io/openmessaging/ConsumeMessage.java | 41 ------------------- .../main/java/io/openmessaging/Message.java | 13 ++++++ .../consumer/BatchMessageListener.java | 4 +- .../io/openmessaging/consumer/Consumer.java | 6 +-- 6 files changed, 22 insertions(+), 51 deletions(-) rename openmessaging-api/src/main/java/io/openmessaging/{BatchConsumeMessage.java => BatchMessage.java} (92%) delete mode 100644 openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index 819304a3..f4dba4c9 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -17,7 +17,6 @@ package io.openmessaging.samples.consumer; -import io.openmessaging.ConsumeMessage; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; @@ -46,9 +45,9 @@ public void run() { } })); consumer.bindQueue("NS://HELLO_QUEUE"); - ConsumeMessage message = consumer.receive(1000); + Message message = consumer.receive(1000); System.out.println("Received message: " + message); //Acknowledge the consumed message - consumer.ack(message.getMessageRecept()); + consumer.ack(message.getMessageReceipt()); } } diff --git a/openmessaging-api/src/main/java/io/openmessaging/BatchConsumeMessage.java b/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java similarity index 92% rename from openmessaging-api/src/main/java/io/openmessaging/BatchConsumeMessage.java rename to openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java index ff0ed6a3..718936b7 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/BatchConsumeMessage.java +++ b/openmessaging-api/src/main/java/io/openmessaging/BatchMessage.java @@ -20,9 +20,9 @@ import java.util.List; -public interface BatchConsumeMessage { +public interface BatchMessage { /** * @return all messages in this {@code BatchMessage} */ - List messages(); + List messages(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java b/openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java deleted file mode 100644 index 3e36f30e..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/ConsumeMessage.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging; - -import io.openmessaging.consumer.BatchMessageListener; -import io.openmessaging.consumer.Consumer; -import io.openmessaging.consumer.MessageListener; -import io.openmessaging.consumer.MessageReceipt; - -/** - * - * A {@code ConsumeMessage} is a {@code Message} with a {@code MessageReceipt}. - * - * @version OMS 1.0.0 - * @since OMS 1.0.0 - */ -public interface ConsumeMessage extends Message { - /** - * Get the {@code MessageReceipt} of this Message, which will be used to acknowledge this message. - * - * @see Consumer#ack(io.openmessaging.consumer.MessageReceipt) - * @see MessageListener.Context#ack() - * @see BatchMessageListener.Context#success(io.openmessaging.consumer.MessageReceipt...) - */ - MessageReceipt getMessageRecept(); -} diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java index 9d06f889..db8cca91 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/Message.java @@ -17,6 +17,10 @@ package io.openmessaging; +import io.openmessaging.consumer.BatchMessageListener; +import io.openmessaging.consumer.Consumer; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.MessageReceipt; import io.openmessaging.exception.OMSMessageFormatException; /** @@ -360,4 +364,13 @@ interface Headers { */ void setData(byte[] data); + /** + * Get the {@code MessageReceipt} of this Message, which will be used to acknowledge this message. + * + * @see Consumer#ack(io.openmessaging.consumer.MessageReceipt) + * @see MessageListener.Context#ack() + * @see BatchMessageListener.Context#success(io.openmessaging.consumer.MessageReceipt...) + */ + MessageReceipt getMessageReceipt(); + } \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java index 1a7c8d26..54b34688 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.BatchConsumeMessage; +import io.openmessaging.BatchMessage; import io.openmessaging.exception.OMSRuntimeException; /** @@ -35,7 +35,7 @@ public interface BatchMessageListener { * * @param batchMessage the received batchMessage. */ - void onReceived(BatchConsumeMessage batchMessage, Context context); + void onReceived(BatchMessage batchMessage, Context context); interface Context { diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 2d320965..85169d96 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -112,7 +112,7 @@ public interface Consumer extends ServiceLifecycle { /** * Bind the {@code Consumer} to a specified queue, with a {@code BatchMessageListener}. *

- * {@link BatchMessageListener#onReceived(BatchConsumeMessage batchMessage, BatchMessageListener.Context context)} will be called when new delivered messages is + * {@link BatchMessageListener#onReceived(BatchMessage batchMessage, BatchMessageListener.Context context)} will be called when new delivered messages is * coming. * * @param queueName a specified queue. @@ -172,7 +172,7 @@ public interface Consumer extends ServiceLifecycle { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - ConsumeMessage receive(long timeout); + Message receive(long timeout); /** * Receives the next batch messages from the bind queues of this consumer in pull model. @@ -186,7 +186,7 @@ public interface Consumer extends ServiceLifecycle { * @throws OMSTimeOutException when the given timeout elapses before the send operation completes. * @throws OMSRuntimeException when the {@code Producer} fails to send the message due to some internal error. */ - List batchReceive(long timeout); + List batchReceive(long timeout); /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using From 81eb092fcb52e4336287e5a870087644207602cd Mon Sep 17 00:00:00 2001 From: gaohaoxiang Date: Thu, 28 Feb 2019 10:43:41 +0800 Subject: [PATCH 06/15] Add boolean type to KeyValue --- .../main/java/io/openmessaging/KeyValue.java | 29 +++++++++++++++++++ .../internal/DefaultKeyValue.java | 19 ++++++++++++ 2 files changed, 48 insertions(+) diff --git a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java index 53f5b52f..2cfb650a 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java @@ -35,6 +35,14 @@ */ public interface KeyValue { + /** + * Inserts or replaces {@code boolean} value for the specified key. + * + * @param key the key to be placed into this {@code KeyValue} object + * @param value the value corresponding to key + */ + KeyValue put(String key, boolean value); + /** * Inserts or replaces {@code short} value for the specified key. * @@ -75,6 +83,27 @@ public interface KeyValue { */ KeyValue put(String key, String value); + /** + * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is not + * found in this property list, false is returned. + * + * @param key the property key + * @return the value in this {@code KeyValue} object with the specified key value + * @see #put(String, boolean) + */ + boolean getBoolean(String key); + + /** + * Searches for the {@code boolean} property with the specified key in this {@code KeyValue} object. If the key is not + * found in this property list, false is returned. + * + * @param key the property key + * @param defaultValue a default value + * @return the value in this {@code KeyValue} object with the specified key value + * @see #put(String, boolean) + */ + boolean getBoolean(String key, boolean defaultValue); + /** * Searches for the {@code short} property with the specified key in this {@code KeyValue} object. If the key is not * found in this property list, zero is returned. diff --git a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java index be121b9b..952e17c6 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java @@ -31,6 +31,25 @@ public class DefaultKeyValue implements KeyValue { private Map properties; + @Override + public KeyValue put(String key, boolean value) { + properties.put(key, String.valueOf(value)); + return this; + } + + @Override + public boolean getBoolean(String key) { + if (!properties.containsKey(key)) { + return false; + } + return Boolean.valueOf(properties.get(key)); + } + + @Override + public boolean getBoolean(String key, boolean defaultValue) { + return properties.containsKey(key) ? getBoolean(key) : defaultValue; + } + @Override public short getShort(String key) { if (!properties.containsKey(key)) { From 8bbfb365adf5d14471a9df04f6493fcbe41fef9c Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 14:17:32 +0800 Subject: [PATCH 07/15] Modify spelling errors of Durability --- .../src/main/java/io/openmessaging/message/Header.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java index 678eeee1..1017c687 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java @@ -97,7 +97,7 @@ public interface Header { Header setPriority(short priority); /** - * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee the + * The {@code DURABILITY} header field contains the persistent level of a message, the vendor should guarantee the * reliability level for a message. *

* OMS defines two modes of message delivery: From 4563284eb38e14d38ba70abff9e71bb33188a562 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 14:48:40 +0800 Subject: [PATCH 08/15] Add optional annotation to extension header --- .../src/main/java/io/openmessaging/message/Message.java | 1 + 1 file changed, 1 insertion(+) diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java index de514276..e4705c62 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java @@ -75,6 +75,7 @@ public interface Message { * * @return The implementation of {@link ExtensionHeader} */ + @io.openmessaging.annotation.Optional Optional extensionHeader(); /** From 42bf81f14c57cc8dce61793282214b0418dbe3ad Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 14:49:18 +0800 Subject: [PATCH 09/15] Add comments for Optional annotation --- .../java/io/openmessaging/annotation/Optional.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java b/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java index 13687903..b5b9d490 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java +++ b/openmessaging-api/src/main/java/io/openmessaging/annotation/Optional.java @@ -21,8 +21,17 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; + /** - * A {@code MessageReceipt} is a {@code Message} with a {@code Receipt}. + *

+ * A {@code Optional} is an annotation to mark some certain methods ,interfaces and etc. this annotation represented + * these methods or interfaces are not mandatory in OpenMessaging. + *

+ * + *

+ * If these methods or interfaces adopted by more and more vendors and end users, they may be become the mandatory + * interface in the future. Of course, if they are used very little, they may be removed. + *

* * @version OMS 1.0.0 * @since OMS 1.0.0 From 497597a001fc2b98d083284bbab7493d73ad9928 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 20:22:15 +0800 Subject: [PATCH 10/15] Change @Code to @code for generate java doc --- .../src/main/java/io/openmessaging/consumer/Consumer.java | 2 +- .../main/java/io/openmessaging/extension/ExtensionHeader.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index a791e002..4391dea9 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -33,7 +33,7 @@ * PushConsumer} client. * * @version OMS 1.0.0 - * @see MessagingAccessPoint#createConsumer(). + * @see MessagingAccessPoint#createConsumer() * @since OMS 1.0.0 */ public interface Consumer extends ServiceLifecycle, Client { diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java index 8da0d87f..ac7266da 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java @@ -46,7 +46,7 @@ public interface ExtensionHeader { ExtensionHeader setPartition(int partition); /** - * This method is only called by the server. and {@Code OFFSET} represents this message offset in partition. + * This method is only called by the server. and {@code OFFSET} represents this message offset in partition. *

* * @param offset The offset in the current partition, used to quickly get this message in the queue @@ -136,7 +136,7 @@ public interface ExtensionHeader { int getPartiton(); /** - * This method will return the {@Code OFFSET} in the partition to which the message belongs to, but the premise is + * This method will return the {@code OFFSET} in the partition to which the message belongs to, but the premise is * that the implementation of the server side is dependent on the partition or a queue-like storage mechanism. * * @return The offset of the partition to which the message belongs. From fde62ab02b99182d209fc2cd3327f91131502f93 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 20:36:20 +0800 Subject: [PATCH 11/15] Change CODE to code in java doc --- .../main/java/io/openmessaging/extension/ExtensionHeader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java index ac7266da..20e2b32e 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java @@ -109,7 +109,7 @@ public interface ExtensionHeader { /** * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. *

- * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this filed + * The message will be delivered after delayTime milliseconds starting from {@code BORN_TIMESTAMP} . When this filed * isn't set explicitly, this means this message should be delivered immediately. */ ExtensionHeader setDelayTime(long delayTime); From 49576ac5aea0736085a412cc94b4867f88b9d878 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 22:33:55 +0800 Subject: [PATCH 12/15] [maven-release-plugin] prepare release 1.0.0-beta-SNAPSHOT --- openmessaging-admin/pom.xml | 2 +- openmessaging-api-samples/pom.xml | 5 +++-- openmessaging-api/pom.xml | 2 +- pom.xml | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/openmessaging-admin/pom.xml b/openmessaging-admin/pom.xml index 3be431be..770f3316 100644 --- a/openmessaging-admin/pom.xml +++ b/openmessaging-admin/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-preview + 1.0.0-alpha 4.0.0 diff --git a/openmessaging-api-samples/pom.xml b/openmessaging-api-samples/pom.xml index 50c41e58..849ed849 100644 --- a/openmessaging-api-samples/pom.xml +++ b/openmessaging-api-samples/pom.xml @@ -2,12 +2,13 @@ io.openmessaging parent - 1.0.0-preview + 1.0.0-alpha 4.0.0 jar openmessaging-api-samples + openmessaging-1.0.0-alpha openmessaging-api-samples ${project.version} @@ -20,7 +21,7 @@ ${project.groupId} openmessaging-api - ${project.version} + 1.0.0-alpha org.slf4j diff --git a/openmessaging-api/pom.xml b/openmessaging-api/pom.xml index 693fd0e3..1330c096 100644 --- a/openmessaging-api/pom.xml +++ b/openmessaging-api/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-preview + 1.0.0-alpha 4.0.0 diff --git a/pom.xml b/pom.xml index a706aa59..7041248c 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.openmessaging parent - 1.0.0-preview + 1.0.0-alpha pom openmessaging From 321e07f2d1f35db23a0e37468456dcd58912433f Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 22:34:22 +0800 Subject: [PATCH 13/15] [maven-release-plugin] prepare for next development iteration --- openmessaging-admin/pom.xml | 2 +- openmessaging-api-samples/pom.xml | 6 +++--- openmessaging-api/pom.xml | 2 +- pom.xml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/openmessaging-admin/pom.xml b/openmessaging-admin/pom.xml index 770f3316..debe8f13 100644 --- a/openmessaging-admin/pom.xml +++ b/openmessaging-admin/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT 4.0.0 diff --git a/openmessaging-api-samples/pom.xml b/openmessaging-api-samples/pom.xml index 849ed849..d35c519d 100644 --- a/openmessaging-api-samples/pom.xml +++ b/openmessaging-api-samples/pom.xml @@ -2,13 +2,13 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT 4.0.0 jar openmessaging-api-samples - openmessaging-1.0.0-alpha + 1.0.0-beta-SNAPSHOT openmessaging-api-samples ${project.version} @@ -21,7 +21,7 @@ ${project.groupId} openmessaging-api - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT org.slf4j diff --git a/openmessaging-api/pom.xml b/openmessaging-api/pom.xml index 1330c096..9e493ee9 100644 --- a/openmessaging-api/pom.xml +++ b/openmessaging-api/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index 7041248c..0ef1d346 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT pom openmessaging From 69bd3ab876423def961c6f9fc001ea69d579a5a9 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 22:38:48 +0800 Subject: [PATCH 14/15] [maven-release-plugin] prepare release openmessaging-1.0.0-alpha --- openmessaging-admin/pom.xml | 2 +- openmessaging-api-samples/pom.xml | 6 +++--- openmessaging-api/pom.xml | 2 +- pom.xml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/openmessaging-admin/pom.xml b/openmessaging-admin/pom.xml index debe8f13..770f3316 100644 --- a/openmessaging-admin/pom.xml +++ b/openmessaging-admin/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-beta-SNAPSHOT + 1.0.0-alpha 4.0.0 diff --git a/openmessaging-api-samples/pom.xml b/openmessaging-api-samples/pom.xml index d35c519d..9f48659b 100644 --- a/openmessaging-api-samples/pom.xml +++ b/openmessaging-api-samples/pom.xml @@ -2,13 +2,13 @@ io.openmessaging parent - 1.0.0-beta-SNAPSHOT + 1.0.0-alpha 4.0.0 jar openmessaging-api-samples - 1.0.0-beta-SNAPSHOT + 1.0.0-alpha openmessaging-api-samples ${project.version} @@ -21,7 +21,7 @@ ${project.groupId} openmessaging-api - 1.0.0-beta-SNAPSHOT + 1.0.0-alpha org.slf4j diff --git a/openmessaging-api/pom.xml b/openmessaging-api/pom.xml index 9e493ee9..1330c096 100644 --- a/openmessaging-api/pom.xml +++ b/openmessaging-api/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-beta-SNAPSHOT + 1.0.0-alpha 4.0.0 diff --git a/pom.xml b/pom.xml index 0ef1d346..7041248c 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.openmessaging parent - 1.0.0-beta-SNAPSHOT + 1.0.0-alpha pom openmessaging From 44b95cea10101681b9ff2e257e25f519b33ed179 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 22:39:08 +0800 Subject: [PATCH 15/15] [maven-release-plugin] prepare for next development iteration --- openmessaging-admin/pom.xml | 2 +- openmessaging-api-samples/pom.xml | 6 +++--- openmessaging-api/pom.xml | 2 +- pom.xml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/openmessaging-admin/pom.xml b/openmessaging-admin/pom.xml index 770f3316..debe8f13 100644 --- a/openmessaging-admin/pom.xml +++ b/openmessaging-admin/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT 4.0.0 diff --git a/openmessaging-api-samples/pom.xml b/openmessaging-api-samples/pom.xml index 9f48659b..d35c519d 100644 --- a/openmessaging-api-samples/pom.xml +++ b/openmessaging-api-samples/pom.xml @@ -2,13 +2,13 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT 4.0.0 jar openmessaging-api-samples - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT openmessaging-api-samples ${project.version} @@ -21,7 +21,7 @@ ${project.groupId} openmessaging-api - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT org.slf4j diff --git a/openmessaging-api/pom.xml b/openmessaging-api/pom.xml index 1330c096..9e493ee9 100644 --- a/openmessaging-api/pom.xml +++ b/openmessaging-api/pom.xml @@ -2,7 +2,7 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index 7041248c..0ef1d346 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ io.openmessaging parent - 1.0.0-alpha + 1.0.0-beta-SNAPSHOT pom openmessaging