From e0b116845cb667c0f20d54ea4c666861b3ae3aed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=9E=E4=B8=80?= Date: Mon, 8 Jun 2020 16:59:24 +0800 Subject: [PATCH 1/2] add class info of pollGenericMessages --- .../openmessaging/samples/consumer/GenericPullConsumerApp.java | 2 +- .../src/main/java/io/openmessaging/api/PullConsumer.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java index 7d0659b5..9ca2509a 100644 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java +++ b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java @@ -57,7 +57,7 @@ public void run() { consumer.assign(topicPartitions); consumer.start(); - List> messages = consumer.pollGenericMessages(1000); + List> messages = consumer.pollGenericMessages(1000, MessageSample.class); for (GenericMessage message : messages) { System.out.println("Received message: " + message.getValue()); } diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java b/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java index 0c612e35..d716d22e 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java @@ -74,9 +74,10 @@ interface TopicPartitionChangeListener { * any topics or partitions before polling for data. * * @param timeout in millisecond + * @param tClass class of message body * @return */ - List> pollGenericMessages(long timeout); + List> pollGenericMessages(long timeout, Class tClass); /** * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long)} }. If this API is invoked From e5f2ecbd2f38cdf59d58f493e347d8e17a3c96af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=9E=E4=B8=80?= Date: Wed, 10 Jun 2020 17:55:00 +0800 Subject: [PATCH 2/2] optimize MessaageBuilder API --- .../consumer/GenericPullConsumerApp.java | 69 ------------------- .../io/openmessaging/api/MessageBuilder.java | 34 ++++----- .../io/openmessaging/api/PullConsumer.java | 10 --- 3 files changed, 17 insertions(+), 96 deletions(-) delete mode 100644 openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java deleted file mode 100644 index 9ca2509a..00000000 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging.samples.consumer; - -import io.openmessaging.api.GenericMessage; -import io.openmessaging.api.MessagingAccessPoint; -import io.openmessaging.api.OMS; -import io.openmessaging.api.OMSBuiltinKeys; -import io.openmessaging.api.PullConsumer; -import io.openmessaging.api.TopicPartition; -import io.openmessaging.samples.MessageSample; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -public class GenericPullConsumerApp { - public static void main(String[] args) { - //Load and start the vendor implementation from a specific OMS driver URL. - final MessagingAccessPoint messagingAccessPoint = - OMS.builder() - .endpoint("http://mq-instance-xxx-1234567890-test:8080") - .region("Shanghai") - .driver("rocketmq") - .build(); - - Properties properties = new Properties(); - properties.setProperty(OMSBuiltinKeys.DESERIALIZER, "io.openmessaging.openmeta.impl.Deserializer"); - properties.setProperty(OMSBuiltinKeys.OPEN_META_URL, "http://localhost:1234"); - - //Start a PullConsumer to receive messages from the specific queue. - final PullConsumer consumer = messagingAccessPoint.createPullConsumer(properties); - - //Register a shutdown hook to close the opened endpoints. - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - consumer.shutdown(); - } - })); - - Set topicPartitions = consumer.topicPartitions("NS://TOPIC"); - consumer.assign(topicPartitions); - consumer.start(); - - List> messages = consumer.pollGenericMessages(1000, MessageSample.class); - for (GenericMessage message : messages) { - System.out.println("Received message: " + message.getValue()); - } - //Acknowledge the consumed message - consumer.commitSync(); - consumer.shutdown(); - - } -} diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java b/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java index f1c6a13e..bdb5ea48 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java +++ b/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java @@ -28,55 +28,55 @@ public interface MessageBuilder { /** * Used for set topic. * @param topic message topic - * @return + * @return {@link MessageBuilder} */ - MessageBuilder withTopic(String topic); + MessageBuilder withTopic(String topic); /** * Used for message key. * * @param key message key - * @return + * @return {@link MessageBuilder} */ - MessageBuilder withKey(String key); + MessageBuilder withKey(String key); /** * Used for set message tags. * - * @param tags - * @return + * @param tags message tags + * @return {@link MessageBuilder} */ - MessageBuilder withTags(String tags); + MessageBuilder withTags(String tags); /** * Used for set message sharding key. * - * @param shardingKey - * @return + * @param shardingKey message shardingKey + * @return {@link MessageBuilder} */ - MessageBuilder withShardingKey(String shardingKey); + MessageBuilder withShardingKey(String shardingKey); /** * Used for set user properties. * - * @param key - * @param value - * @return + * @param key the key of user property + * @param value the value of user property + * @return {@link MessageBuilder} */ - MessageBuilder withProperty(final String key, final String value); + MessageBuilder withProperty(final String key, final String value); /** * Used for set message body. * * @param t object need to be serialized. - * @return + * @return {@link MessageBuilder} */ - MessageBuilder withValue(T t); + MessageBuilder withValue(T t); /** * Get the topic which this {@code MessageBuilder} belongs to. * - * @return + * @return message topic */ String getTopic(); diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java b/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java index d716d22e..e01cc8cd 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java @@ -69,16 +69,6 @@ interface TopicPartitionChangeListener { */ List poll(long timeout); - /** - * Fetch Objects for the topics or partitions specified using assign API. It is an error to not have subscribed to - * any topics or partitions before polling for data. - * - * @param timeout in millisecond - * @param tClass class of message body - * @return - */ - List> pollGenericMessages(long timeout, Class tClass); - /** * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long)} }. If this API is invoked * for the same message queue more than once, the latest offset will be used on the next poll(). Note that you may