From 674d6864aa39d47963c009db6a70d7737898bf3a Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 31 Jan 2019 17:33:31 +0800 Subject: [PATCH 1/4] Add extension fields --- .../samples/consumer/PullConsumerApp.java | 9 ++-- .../samples/producer/ProducerApp.java | 6 +-- .../main/java/io/openmessaging/KeyValue.java | 42 +------------------ .../main/java/io/openmessaging/Message.java | 4 +- .../io/openmessaging/ServiceLifeState.java | 4 +- .../io/openmessaging/consumer/Consumer.java | 10 +++++ .../extension/ExtensionHeader.java | 32 ++++++++++++++ .../internal/DefaultKeyValue.java | 22 +--------- .../io/openmessaging/producer/Producer.java | 4 +- 9 files changed, 57 insertions(+), 76 deletions(-) create mode 100644 openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index b60f89e0..7d74f441 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -21,7 +21,6 @@ import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; -import io.openmessaging.manager.ResourceManager; public class PullConsumerApp { public static void main(String[] args) { @@ -29,13 +28,9 @@ public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); - //Fetch a ResourceManager to create Queue resource. - ResourceManager resourceManager = messagingAccessPoint.resourceManager(); - resourceManager.createQueue("NS://HELLO_QUEUE"); //Start a PullConsumer to receive messages from the specific queue. final Consumer consumer = messagingAccessPoint.createConsumer(); - consumer.start(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @@ -44,10 +39,14 @@ public void run() { consumer.stop(); } })); + consumer.bindQueue("NS://HELLO_QUEUE"); + consumer.start(); + Message message = consumer.receive(1000); System.out.println("Received message: " + message); //Acknowledge the consumed message consumer.ack(message.headers().getMessageId()); + consumer.stop(); } } diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index 2da775fc..d38a8146 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -35,7 +35,6 @@ public static void main(String[] args) { OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); final Producer producer = messagingAccessPoint.createProducer(); - producer.start(); ProducerInterceptor interceptor = new ProducerInterceptor() { @Override public void preSend(Message message, Context attributes) { @@ -46,6 +45,7 @@ public void postSend(Message message, Context attributes) { } }; producer.addInterceptor(interceptor); + producer.start(); //Register a shutdown hook to close the opened endpoints. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @@ -55,9 +55,9 @@ public void run() { } })); - //Sends a message to the specified destination synchronously. + //Send a message to the specified destination synchronously. Message message = producer.createMessage( - "NS://HELLO_QUEUE", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); diff --git a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java index d521bb6e..61c70793 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java @@ -82,18 +82,8 @@ public interface KeyValue { * @return the value in this {@code KeyValue} object with the specified key value * @see #put(String, short) */ - int getShort(String key); + short getShort(String key); - /** - * Searches for the {@code short} property with the specified key in this {@code KeyValue} object. If the key is not - * found in this property list, zero is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, short) - */ - int getShort(String key, short defaultValue); /** * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not @@ -105,16 +95,6 @@ public interface KeyValue { */ int getInt(String key); - /** - * Searches for the {@code int} property with the specified key in this {@code KeyValue} object. If the key is not - * found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, int) - */ - int getInt(String key, int defaultValue); /** * Searches for the {@code long} property with the specified key in this {@code KeyValue} object. If the key is not @@ -147,16 +127,6 @@ public interface KeyValue { */ double getDouble(String key); - /** - * Searches for the {@code double} property with the specified key in this {@code KeyValue} object. If the key is - * not found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, double) - */ - double getDouble(String key, double defaultValue); /** * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is @@ -168,16 +138,6 @@ public interface KeyValue { */ String getString(String key); - /** - * Searches for the {@code String} property with the specified key in this {@code KeyValue} object. If the key is - * not found in this property list, the default value argument is returned. - * - * @param key the property key - * @param defaultValue a default value - * @return the value in this {@code KeyValue} object with the specified key value - * @see #put(String, String) - */ - String getString(String key, String defaultValue); /** * Returns a {@link Set} view of the keys contained in this {@code KeyValue} object. diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java index 9d06f889..ecbb5ec4 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/Message.java @@ -68,8 +68,8 @@ interface Headers { Headers setDestination(String destination); /** - * The {@code MESSAGE_ID} header field contains a value that uniquely identifies each message sent by a {@code - * Producer}. + * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code + * Producer}. this identifier is generated by producer. */ Headers setMessageId(String messageId); diff --git a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java index 66e90965..eafdb18f 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java +++ b/openmessaging-api/src/main/java/io/openmessaging/ServiceLifeState.java @@ -43,10 +43,10 @@ public enum ServiceLifeState { /** * Service is stopping. */ - STOPING, + STOPPING, /** * Service has been stopped. */ - STOPED, + STOPPED, } diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index 948ec068..d894eb62 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -146,6 +146,16 @@ public interface Consumer extends ServiceLifecycle { */ Message receive(long timeout); + /** + * Receive message in asynchronous way. This call doesn't block user's thread, and user's message resolve logic + * should implement in the {@link MessageListener}. + * + * @param messageListener {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when + * new delivered message is coming. + * @return + */ + void receiveAsync(MessageListener messageListener); + /** * Acknowledges the specified and consumed message with the unique message receipt handle, in the scenario of using * manual commit. diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java new file mode 100644 index 00000000..c35b597b --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.extension; + +/** + * This header is extension header + */ +public interface ExtensionHeader { + /** + * Before send message, + * + * @param partition + */ + void setPartition(int partition); + + int getPartiton(); + +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java index e2f6477b..ef4695ba 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java +++ b/openmessaging-api/src/main/java/io/openmessaging/internal/DefaultKeyValue.java @@ -32,12 +32,7 @@ public class DefaultKeyValue implements KeyValue { private Map properties; @Override - public int getShort(String key) { - return 0; - } - - @Override - public int getShort(String key, short defaultValue) { + public short getShort(String key) { return 0; } @@ -83,11 +78,6 @@ public int getInt(String key) { return Integer.valueOf(properties.get(key)); } - @Override - public int getInt(final String key, final int defaultValue) { - return properties.containsKey(key) ? getInt(key) : defaultValue; - } - @Override public long getLong(String key) { if (!properties.containsKey(key)) { @@ -109,21 +99,11 @@ public double getDouble(String key) { return Double.valueOf(properties.get(key)); } - @Override - public double getDouble(final String key, final double defaultValue) { - return properties.containsKey(key) ? getDouble(key) : defaultValue; - } - @Override public String getString(String key) { return properties.get(key); } - @Override - public String getString(final String key, final String defaultValue) { - return properties.containsKey(key) ? getString(key) : defaultValue; - } - @Override public Set keySet() { return properties.keySet(); diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 8ebd35d5..0daaed7d 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -97,14 +97,14 @@ public interface Producer extends MessageFactory, ServiceLifecycle { void send(List messages); /** - * Adds a {@code ProducerInterceptor} to intercept send operations of producer. + * Add a {@code ProducerInterceptor} to intercept send operations of producer. * * @param interceptor a producer interceptor. */ void addInterceptor(ProducerInterceptor interceptor); /** - * Removes a {@code ProducerInterceptor}. + * Remove a {@code ProducerInterceptor}. * * @param interceptor a producer interceptor will be removed. */ From b8c1329f8949c0f24eab3d51733ad5c2b0d378d2 Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Fri, 1 Feb 2019 19:08:46 +0800 Subject: [PATCH 2/4] (1) Polish messsage header attributes based on specification (2) Add extension attributes and interface --- .../samples/consumer/PullConsumerApp.java | 5 +- .../samples/consumer/PushConsumerApp.java | 2 +- .../samples/producer/ProducerApp.java | 6 +- .../producer/TransactionProducerApp.java | 2 +- .../samples/routing/RoutingApp.java | 2 +- .../main/java/io/openmessaging/Message.java | 372 ------------------ .../openmessaging/MessagingAccessPoint.java | 1 + .../consumer/BatchMessageListener.java | 2 +- .../io/openmessaging/consumer/Consumer.java | 2 +- .../consumer/MessageListener.java | 2 +- .../extension/ExtensionHeader.java | 146 ++++++- .../extension/QueueMetaData.java | 9 +- .../interceptor/ConsumerInterceptor.java | 2 +- .../interceptor/ProducerInterceptor.java | 2 +- .../java/io/openmessaging/message/Header.java | 156 ++++++++ .../io/openmessaging/message/Message.java | 97 +++++ .../{ => message}/MessageFactory.java | 3 +- .../io/openmessaging/producer/Producer.java | 4 +- .../TransactionStateCheckListener.java | 2 +- 19 files changed, 417 insertions(+), 400 deletions(-) delete mode 100644 openmessaging-api/src/main/java/io/openmessaging/Message.java create mode 100644 openmessaging-api/src/main/java/io/openmessaging/message/Header.java create mode 100644 openmessaging-api/src/main/java/io/openmessaging/message/Message.java rename openmessaging-api/src/main/java/io/openmessaging/{ => message}/MessageFactory.java (95%) diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java index 7d74f441..04bc6ad7 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PullConsumerApp.java @@ -17,10 +17,10 @@ package io.openmessaging.samples.consumer; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; +import io.openmessaging.message.Message; public class PullConsumerApp { public static void main(String[] args) { @@ -28,7 +28,6 @@ public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://alice@rocketmq.apache.org/us-east"); - //Start a PullConsumer to receive messages from the specific queue. final Consumer consumer = messagingAccessPoint.createConsumer(); @@ -46,7 +45,7 @@ public void run() { Message message = consumer.receive(1000); System.out.println("Received message: " + message); //Acknowledge the consumed message - consumer.ack(message.headers().getMessageId()); + consumer.ack(message.header().getMessageId()); consumer.stop(); } } diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java index 0fed2032..98a9a2e5 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/PushConsumerApp.java @@ -17,12 +17,12 @@ package io.openmessaging.samples.consumer; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.Message; public class PushConsumerApp { public static void main(String[] args) { diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index d38a8146..e04082a9 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -18,11 +18,11 @@ package io.openmessaging.samples.producer; import io.openmessaging.Future; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.interceptor.Context; import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.message.Message; import io.openmessaging.producer.Producer; import io.openmessaging.producer.SendResult; import java.nio.charset.Charset; @@ -38,10 +38,12 @@ public static void main(String[] args) { ProducerInterceptor interceptor = new ProducerInterceptor() { @Override public void preSend(Message message, Context attributes) { + System.out.println("PreSend message: " + message); } @Override public void postSend(Message message, Context attributes) { + System.out.println("PostSend message: " + message); } }; producer.addInterceptor(interceptor); @@ -58,6 +60,8 @@ public void run() { //Send a message to the specified destination synchronously. Message message = producer.createMessage( "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + message.header().setBornHost("127.0.0.1").setDurability((short) 0); + message.extentionHeader().setPartition(1); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java index f6db2730..bed57642 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/TransactionProducerApp.java @@ -17,7 +17,7 @@ package io.openmessaging.samples.producer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.producer.Producer; diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java index 3f3a9a66..ba267444 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/routing/RoutingApp.java @@ -17,7 +17,7 @@ package io.openmessaging.samples.routing; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; diff --git a/openmessaging-api/src/main/java/io/openmessaging/Message.java b/openmessaging-api/src/main/java/io/openmessaging/Message.java deleted file mode 100644 index 981a1754..00000000 --- a/openmessaging-api/src/main/java/io/openmessaging/Message.java +++ /dev/null @@ -1,372 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging; - -import io.openmessaging.exception.OMSMessageFormatException; -import io.openmessaging.extension.ExtensionHeader; - -/** - * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is - * {@link Message}. - *

- * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and - * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ. - *

- * The header contains fields used by the messaging system that describes the message's meta information, while the body - * contains the application data being transmitted. - *

- * As for the message header, OMS defines two kinds types: headers {@link Headers} and properties {@link KeyValue}, with - * respect to flexibility in vendor implementation and user usage. - *

    - *
  • - * System Headers, OMS defines some standard attributes that represent the characteristics of the message. - *
  • - *
  • - * User properties, some OMS vendors may require enhanced extra attributes of the message or some users may want to - * clarify some customized attributes to draw the body. OMS provides the improved scalability for these scenarios. - *
  • - *
- * The body contains the application data being transmitted, which is generally ignored by the messaging system and - * simply transmitted to its destination. - *

- * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the - * messaging system. The application is responsible for explaining the concrete content and format of the message body, - * OMS is never aware of that. - * - * The body part is placed in the implementation classes of {@code Message}. - * - * @version OMS 1.0.0 - * @since OMS 1.0.0 - */ -public interface Message { - - interface Headers { - - /** - * The {@code DESTINATION} header field contains the destination to which the message is being sent. - *

- * When a message is set to the {@code Queue}, then the message will be sent to the specified destination. - *

- * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. - */ - Headers setDestination(String destination); - - /** - * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code - * Producer}. this identifier is generated by producer. - */ - Headers setMessageId(String messageId); - - /** - * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to - * be sent. - *

- * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message - * in client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this - * value. - *

- * When a message is received its, BORN_TIMESTAMP header field contains this same value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - Headers setBornTimestamp(long bornTimestamp); - - /** - * The {@code BORN_HOST} header field contains the born host info of a message in client side. - *

- * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the - * message's BORN_HOST header field contains this value. - *

- * When a message is received, its BORN_HOST header field contains this same value. - */ - Headers setBornHost(String bornHost); - - /** - * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. - *

- * When a message is sent, STORE_TIMESTAMP is ignored. - *

- * When the send method returns it contains a server-assigned value. - *

- * This filed is a {@code long} value, measured in milliseconds. - */ - Headers setStoreTimestamp(long storeTimestamp); - - /** - * The {@code STORE_HOST} header field contains the store host info of a message in server side. - *

- * When a message is sent, STORE_HOST is ignored. - *

- * When the send method returns it contains a server-assigned value. - */ - Headers setStoreHost(String storeHost); - - /** - * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. - *

- * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this - * filed isn't set explicitly, this means this message should be delivered immediately. - */ - Headers setDelayTime(long delayTime); - - /** - * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value. - *

- * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the - * EXPIRE_TIME field is specified as zero, that indicates the message does not expire. - *

- *

- * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not - * define a notification of message expiration. - *

- */ - Headers setExpireTime(long expireTime); - - /** - * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority - * value should be delivered preferentially. - *

- * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default - * priority is 5. The priority beyond this region will be ignored. - *

- * OMS does not require or provide any guarantee that the message should be delivered in priority order - * strictly, but the vendor should provide a best effort to deliver expedited messages ahead of normal - * messages. - *

- * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. - */ - Headers setPriority(short priority); - - /** - * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee - * the reliability level for a message. - *

- * OMS defines two modes of message delivery: - *

    - *
  • - * PERSISTENT, the persistent mode instructs the vendor should provide stable storage to ensure the message - * won't be lost. - *
  • - *
  • - * NON_PERSISTENT, this mode does not require the message be logged to stable storage, in most cases, the memory - * storage is enough for better performance and lower cost. - *
  • - *
- */ - Headers setDurability(short durability); - - /** - * The {@code messagekey} header field contains the custom key of a message. - *

- * This key is a customer identifier for a class of messages, and this key may be used for server to hash or - * dispatch messages, or even can use this key to implement order message. - *

- */ - Headers setMessageKey(String messageKey); - - /** - * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique - * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at - * where, and received by who. - *

- * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can - * be used to trace the whole call link with other parts in the whole system. - */ - Headers setTraceId(String traceId); - - /** - * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message - * delivery. - */ - Headers setDeliveryCount(int deliveryCount); - - /** - * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a - * transaction. - *

- * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, - * and consumer received message also contains this field. - */ - Headers setTransactionId(String transactionId); - - /** - * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link - * a response message with its request message. - */ - Headers setCorrelationId(String correlationId); - - /** - * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to - * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user. - */ - Headers setCompression(short compression); - - /** - * See {@link Headers#setDestination(String)} - * - * @return destination - */ - String getDestination(); - - /** - * See {@link Headers#setMessageId(String)} - * - * @return messageId - */ - String getMessageId(); - - /** - * See {@link Headers#setBornTimestamp(long)} - * - * @return bornTimestamp - */ - long getBornTimestamp(); - - /** - * See {@link Headers#setBornHost(String)} - * - * @return bornHost - */ - String getBornHost(); - - /** - * See {@link Headers#setStoreTimestamp(long)} - * - * @return storeTimestamp - */ - long getStoreTimestamp(); - - /** - * See {@link Headers#setStoreHost(String)} - * - * @return storeHost - */ - String getStoreHost(); - - /** - * See {@link Headers#setDelayTime(long)} - * - * @return delayTime - */ - long getDelayTime(); - - /** - * See {@link Headers#setExpireTime(long)} - * - * @return expireTime - */ - long getExpireTime(); - - /** - * See {@link Headers#setPriority(short)} - * - * @return priority - */ - short getPriority(); - - /** - * See {@link Headers#setDurability(short)} - * - * @return durability - */ - short getDurability(); - - /** - * See {@link Headers#setMessageKey(String)} - * - * @return messageKey - */ - String getMessageKey(); - - /** - * See {@link Headers#setTraceId(String)} - * - * @return traceId - */ - String getTraceId(); - - /** - * See {@link Headers#setDeliveryCount(int)} - * - * @return deliveryCount - */ - int getDeliveryCount(); - - /** - * See {@link Headers#setTransactionId(String)} - * - * @return transactionId - */ - String getTransactionId(); - - /** - * See {@link Headers#setCorrelationId(String)} - * - * @return correlationId - */ - String getCorrelationId(); - - /** - * See {@link Headers#setCompression(short)} - * - * @return compression - */ - short getCompression(); - - } - - /** - * This interface is optional, Therefore, users need to check whether the interface is implemented and the - * correctness of its implementation. - *

- * - * @return The implementation of {@link ExtensionHeader} - */ - ExtensionHeader extentionHeader(); - - /** - * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. - * - * @return the system headers of a {@code Message} - */ - Headers headers(); - - /** - * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. - * - * @return the user properties of a {@code Message} - */ - KeyValue properties(); - - /** - * Get data from message body - * - * @return message body - * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type - */ - byte[] getData(); - - /** - * Set data to message body - * - * @param data set message body in binary stream - */ - void setData(byte[] data); - -} \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java index a12feb74..e99ba2e4 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java +++ b/openmessaging-api/src/main/java/io/openmessaging/MessagingAccessPoint.java @@ -22,6 +22,7 @@ import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSSecurityException; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java index 1e316fd7..d4d4bbcf 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/BatchMessageListener.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.exception.OMSRuntimeException; import java.util.List; diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java index b795b76b..a956e80f 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/Consumer.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSDestinationException; diff --git a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java index 97a6f414..81a3fa8e 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java @@ -17,7 +17,7 @@ package io.openmessaging.consumer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.exception.OMSRuntimeException; /** diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java index b686c2d0..d9ef1bd2 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/ExtensionHeader.java @@ -16,7 +16,7 @@ */ package io.openmessaging.extension; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** *

@@ -41,23 +41,97 @@ public interface ExtensionHeader { * * @param partition The specified partition will be sent to. */ - void setPartition(int partition); + ExtensionHeader setPartition(int partition); /** - * This method will return the partition of this message belongs. + * This method is only called by the server. and {@Code OFFSET} represents this message offset in partition. *

* - * @return The {@code PARTITION} to which the message belongs + * @param offset The offset in the current partition, used to quickly get this message in the queue */ - int getPartiton(); + ExtensionHeader setOffset(long offset); /** - * This method is only called by the server. and {@Code OFFSET} represents this message offset in partition. + * A client can use the {@code CORRELATION_ID} field to link one message with another. A typical use is to link a + * response message with its request message. + */ + ExtensionHeader setCorrelationId(String correlationId); + + /** + * This field {@code TRANSACTION_ID} is used in transactional message, and it can be used to trace a transaction. + *

+ * So the same {@code TRANSACTION_ID} will be appeared not only in prepare message, but also in commit message, and + * consumer received message also contains this field. + */ + ExtensionHeader setTransactionId(String transactionId); + + /** + * The {@code STORE_TIMESTAMP} header field contains the store timestamp of a message in server side. + *

+ * When a message is sent, STORE_TIMESTAMP is ignored. + *

+ * When the send method returns it contains a server-assigned value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + ExtensionHeader setStoreTimestamp(long storeTimestamp); + + /** + * The {@code STORE_HOST} header field contains the store host info of a message in server side. + *

+ * When a message is sent, STORE_HOST is ignored. + *

+ * When the send method returns it contains a server-assigned value. + */ + ExtensionHeader setStoreHost(String storeHost); + + /** + * The {@code messagekey} header field contains the custom key of a message. + *

+ * This key is a customer identifier for a class of messages, and this key may be used for server to hash or + * dispatch messages, or even can use this key to implement order message. + *

+ */ + ExtensionHeader setMessageKey(String messageKey); + + /** + * The {@code TRACE_ID} header field contains the trace ID of a message, which represents a global and unique + * identification, to associate key events in the whole lifecycle of a message, like sent by who, stored at where, + * and received by who. + *

+ * And, the messaging system only plays exchange role in a distributed system in most cases, so the TraceID can be + * used to trace the whole call link with other parts in the whole system. + */ + ExtensionHeader setTraceId(String traceId); + + /** + * The {@code DELAY_TIME} header field contains a number that represents the delayed times in milliseconds. + *

+ * The message will be delivered after delayTime milliseconds starting from {@CODE BORN_TIMESTAMP} . When this filed + * isn't set explicitly, this means this message should be delivered immediately. + */ + ExtensionHeader setDelayTime(long delayTime); + + /** + * The {@code EXPIRE_TIME} header field contains the expiration time, it represents a time-to-live value. + *

+ * The {@code EXPIRE_TIME} represents a relative valid interval that a message can be delivered in it. If the + * EXPIRE_TIME field is specified as zero, that indicates the message does not expire. + *

+ *

+ * When an undelivered message's expiration time is reached, the message should be destroyed. OMS does not define a + * notification of message expiration. + *

+ */ + ExtensionHeader setExpireTime(long expireTime); + + /** + * This method will return the partition of this message belongs. *

* - * @param offset The offset in the current partition, used to quickly get this message in the queue + * @return The {@code PARTITION} to which the message belongs */ - void setOffset(long offset); + int getPartiton(); /** * This method will return the {@Code OFFSET} in the partition to which the message belongs to, but the premise is @@ -66,4 +140,60 @@ public interface ExtensionHeader { * @return The offset of the partition to which the message belongs. */ long getOffset(); + + /** + * See {@link ExtensionHeader#setCorrelationId(String)} + * + * @return correlationId + */ + String getCorrelationId(); + + /** + * See {@link ExtensionHeader#setTransactionId(String)} + * + * @return transactionId + */ + String getTransactionId(); + + /** + * See {@link ExtensionHeader#setStoreTimestamp(long)} + * + * @return storeTimestamp + */ + long getStoreTimestamp(); + + /** + * See {@link ExtensionHeader#setStoreHost(String)} + * + * @return storeHost + */ + String getStoreHost(); + + /** + * See {@link ExtensionHeader#setDelayTime(long)} + * + * @return delayTime + */ + long getDelayTime(); + + /** + * See {@link ExtensionHeader#setExpireTime(long)} + * + * @return expireTime + */ + long getExpireTime(); + + /** + * See {@link ExtensionHeader#setMessageKey(String)} + * + * @return messageKey + */ + String getMessageKey(); + + /** + * See {@link ExtensionHeader#setTraceId(String)} + * + * @return traceId + */ + String getTraceId(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java index 3bd8173d..5c71b6b2 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java +++ b/openmessaging-api/src/main/java/io/openmessaging/extension/QueueMetaData.java @@ -32,7 +32,7 @@ public interface QueueMetaData { * In order to improve performance, in some scenarios where message persistence is required, some message middleware * will store messages on multiple partitions in multi servers. *

- * In some scenarios, it is very useful to get the relevant metadata for these partitions. + * In some scenarios, it is very useful to get the relevant partitions meta data for a queue. */ interface Partition { /** @@ -44,8 +44,9 @@ interface Partition { /** * The host of the server where the partition is located + *

* - * @return + * @return The host of the server where the partition is located */ String partitonHost(); } @@ -54,14 +55,14 @@ interface Partition { * Queue name *

* - * @return Queue name + * @return Queue name. */ String queueName(); /** * Get partition list belongs to the {@code queueName} * - * @return List of {@link Partition} + * @return List of {@link Partition} belongs to the specified queue. */ List partitions(); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java index 440974d2..98aba203 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java +++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ConsumerInterceptor.java @@ -17,7 +17,7 @@ package io.openmessaging.interceptor; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.consumer.MessageListener; /** diff --git a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java index cea68149..e41f60a0 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java +++ b/openmessaging-api/src/main/java/io/openmessaging/interceptor/ProducerInterceptor.java @@ -17,7 +17,7 @@ package io.openmessaging.interceptor; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** * A {@code ProducerInterceptor} is used to intercept send operations of producer. diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java new file mode 100644 index 00000000..325c101d --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.message; + +public interface Header { + /** + * The {@code DESTINATION} header field contains the destination to which the message is being sent. + *

+ * When a message is set to the {@code Queue}, then the message will be sent to the specified destination. + *

+ * When a message is received, its destination is equivalent to the {@code Queue} where the message resides in. + */ + Header setDestination(String destination); + + /** + * The {@code MESSAGE_ID} header field contains a value that uniquely identify each message sent by a {@code + * Producer}. this identifier is generated by producer. + */ + Header setMessageId(String messageId); + + /** + * The {@code BORN_TIMESTAMP} header field contains the time a message was handed off to a {@code Producer} to be + * sent. + *

+ * When a message is sent, BORN_TIMESTAMP will be set with current timestamp as the born timestamp of a message in + * client side, on return from the send method, the message's BORN_TIMESTAMP header field contains this value. + *

+ * When a message is received its, BORN_TIMESTAMP header field contains this same value. + *

+ * This filed is a {@code long} value, measured in milliseconds. + */ + Header setBornTimestamp(long bornTimestamp); + + /** + * The {@code BORN_HOST} header field contains the born host info of a message in client side. + *

+ * When a message is sent, BORN_HOST will be set with the local host info, on return from the send method, the + * message's BORN_HOST header field contains this value. + *

+ * When a message is received, its BORN_HOST header field contains this same value. + */ + Header setBornHost(String bornHost); + + /** + * The {@code PRIORITY} header field contains the priority level of a message, a message with a higher priority + * value should be delivered preferentially. + *

+ * OMS defines a ten level priority value with 1 as the lowest priority and 10 as the highest, and the default + * priority is 5. The priority beyond this region will be ignored. + *

+ * OMS does not require or provide any guarantee that the message should be delivered in priority order strictly, + * but the vendor should provide a best effort to deliver expedited messages ahead of normal messages. + *

+ * If PRIORITY field isn't set explicitly, use {@code 5} as the default priority. + */ + Header setPriority(short priority); + + /** + * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee the + * reliability level for a message. + *

+ * OMS defines two modes of message delivery: + *

    + *
  • + * PERSISTENT, the persistent mode instructs the vendor should provide stable storage to ensure the message won't be + * lost. + *
  • + *
  • + * NON_PERSISTENT, this mode does not require the message be logged to stable storage, in most cases, the memory + * storage is enough for better performance and lower cost. + *
  • + *
+ */ + Header setDurability(short durability); + + /** + * The {@code DELIVERY_COUNT} header field contains a number, which represents the count of the message delivery. + */ + Header setDeliveryCount(int deliveryCount); + + /** + * The field {@code COMPRESSION} in headers represents the message body compress algorithm. vendors are free to + * choose the compression algorithm, but must ensure that the decompressed message is delivered to the user. + */ + Header setCompression(short compression); + + /** + * See {@link Header#setDestination(String)} + * + * @return destination + */ + String getDestination(); + + /** + * See {@link Header#setMessageId(String)} + * + * @return messageId + */ + String getMessageId(); + + /** + * See {@link Header#setBornTimestamp(long)} + * + * @return bornTimestamp + */ + long getBornTimestamp(); + + /** + * See {@link Header#setBornHost(String)} + * + * @return bornHost + */ + String getBornHost(); + + /** + * See {@link Header#setPriority(short)} + * + * @return priority + */ + short getPriority(); + + /** + * See {@link Header#setDurability(short)} + * + * @return durability + */ + short getDurability(); + + /** + * See {@link Header#setDeliveryCount(int)} + * + * @return deliveryCount + */ + int getDeliveryCount(); + + /** + * See {@link Header#setCompression(short)} + * + * @return compression + */ + short getCompression(); +} diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java new file mode 100644 index 00000000..293c21a3 --- /dev/null +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.message; + +import io.openmessaging.KeyValue; +import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.extension.ExtensionHeader; + +/** + * The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is + * {@link Message}. + *

+ * Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of header and + * body and is used by separate applications to exchange a piece of information, like Apache RocketMQ. + *

+ * The header contains fields used by the messaging system that describes the message's meta information, while the body + * contains the application data being transmitted. + *

+ * As for the message header, OMS defines three kinds types: headers {@link Header} {@link ExtensionHeader} and + * properties {@link KeyValue}, with respect to flexibility in vendor implementation and user usage. + *

    + *
  • + * System Headers, OMS defines some standard attributes that represent the characteristics of the message. + *
  • + *
  • + * User properties, some OMS vendors may require enhanced extra attributes of the message or some users may want to + * clarify some customized attributes to draw the body. OMS provides the improved scalability for these scenarios. + *
  • + *
+ * The body contains the application data being transmitted, which is generally ignored by the messaging system and + * simply transmitted to its destination. + *

+ * In BytesMessage, the body is just a byte array, may be compressed and uncompressed in the transmitting process by the + * messaging system. The application is responsible for explaining the concrete content and format of the message body, + * OMS is never aware of that. + * + * The body part is placed in the implementation classes of {@code Message}. + * + * @version OMS 1.0.0 + * @since OMS 1.0.0 + */ +public interface Message { + /** + * Returns all the system header fields of the {@code Message} object as a {@code KeyValue}. + * + * @return the system headers of a {@code Message} + */ + Header header(); + + /** + * This interface is optional, Therefore, users need to check whether the interface is implemented and the + * correctness of its implementation. + *

+ * + * @return The implementation of {@link ExtensionHeader} + */ + ExtensionHeader extentionHeader(); + + /** + * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. + * + * @return the user properties of a {@code Message} + */ + KeyValue properties(); + + /** + * Get data from message body + * + * @return message body + * @throws OMSMessageFormatException if the message body cannot be assigned to the specified type + */ + byte[] getData(); + + /** + * Set data to message body + * + * @param data set message body in binary stream + */ + void setData(byte[] data); + +} \ No newline at end of file diff --git a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java similarity index 95% rename from openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java rename to openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java index 536f8ce1..4902d4b6 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/MessageFactory.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/MessageFactory.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package io.openmessaging; +package io.openmessaging.message; import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.message.Message; /** * A factory interface for creating {@code Message} objects. diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 3bd854ce..8d8c1524 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -19,8 +19,8 @@ import io.openmessaging.Future; import io.openmessaging.FutureListener; -import io.openmessaging.Message; -import io.openmessaging.MessageFactory; +import io.openmessaging.message.Message; +import io.openmessaging.message.MessageFactory; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSDestinationException; diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java index 99665544..efa0d5f1 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/TransactionStateCheckListener.java @@ -17,7 +17,7 @@ package io.openmessaging.producer; -import io.openmessaging.Message; +import io.openmessaging.message.Message; /** * Each executor will be associated with a transactional message, can be used to execute local transaction branch and From 6eec44a18341a2cfff5ac5227e94802de71c7d8b Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Wed, 20 Feb 2019 20:07:42 +0800 Subject: [PATCH 3/4] Fix some spelling mistakes --- .../io/openmessaging/samples/producer/ProducerApp.java | 2 +- .../src/main/java/io/openmessaging/message/Message.java | 2 +- .../src/main/java/io/openmessaging/producer/Producer.java | 8 ++++---- .../internal/MessagingAccessPointAdapterTest.java | 5 +++++ 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java index e04082a9..2fb538c8 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/producer/ProducerApp.java @@ -61,7 +61,7 @@ public void run() { Message message = producer.createMessage( "NS://HELLO_QUEUE1", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); message.header().setBornHost("127.0.0.1").setDurability((short) 0); - message.extentionHeader().setPartition(1); + message.extensionHeader().setPartition(1); SendResult sendResult = producer.send(message); System.out.println("SendResult: " + sendResult); diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java index 293c21a3..4cf8bdb7 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/message/Message.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Message.java @@ -70,7 +70,7 @@ public interface Message { * * @return The implementation of {@link ExtensionHeader} */ - ExtensionHeader extentionHeader(); + ExtensionHeader extensionHeader(); /** * Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}. diff --git a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java index 8d8c1524..84b0afc2 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java @@ -53,7 +53,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a message to the specified destination synchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. * * @param message a message will be sent. * @return the successful {@code SendResult}. @@ -67,7 +67,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a message to the specified destination asynchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. *

* The returned {@code Promise} will have the result once the operation completes, and the registered {@code * FutureListener} will be notified, either because the operation was successful or because of an error. @@ -98,7 +98,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Send messages to the specified destination asynchronously, the destination should be preset to {@link - * Message#headers()}, other header fields as well. + * Message#header()}, other header fields as well. *

* The returned {@code Promise} will have the result once the operation completes, and the registered {@code * FutureListener} will be notified, either because the operation was successful or because of an error. @@ -135,7 +135,7 @@ public interface Producer extends MessageFactory, ServiceLifecycle { /** * Sends a transactional message to the specified destination synchronously, the destination should be preset to - * {@link Message#headers()}, other header fields as well. + * {@link Message#header()}, other header fields as well. *

* A transactional send result will be exposed to consumer if this prepare message send success, and then, you can * execute your local transaction, when local transaction execute success, users can use {@link diff --git a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java index 14cb8d0b..efee3560 100644 --- a/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java +++ b/openmessaging-api/src/test/java/io/openmessaging/internal/MessagingAccessPointAdapterTest.java @@ -23,6 +23,7 @@ import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.Consumer; import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; import org.junit.Test; @@ -50,6 +51,10 @@ public TestVendor(KeyValue keyValue) { return null; } + @Override public MessageFactory messageFactory() { + return null; + } + @Override public String version() { return OMS.specVersion; From 8bbfb365adf5d14471a9df04f6493fcbe41fef9c Mon Sep 17 00:00:00 2001 From: duhenglucky Date: Thu, 28 Feb 2019 14:17:32 +0800 Subject: [PATCH 4/4] Modify spelling errors of Durability --- .../src/main/java/io/openmessaging/message/Header.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java index 678eeee1..1017c687 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/message/Header.java +++ b/openmessaging-api/src/main/java/io/openmessaging/message/Header.java @@ -97,7 +97,7 @@ public interface Header { Header setPriority(short priority); /** - * The {@code RELIABILITY} header field contains the reliability level of a message, the vendor should guarantee the + * The {@code DURABILITY} header field contains the persistent level of a message, the vendor should guarantee the * reliability level for a message. *

* OMS defines two modes of message delivery: