diff --git a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java b/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java deleted file mode 100644 index 7d0659b5..00000000 --- a/openmessaging-api-samples/src/main/java/io/openmessaging/samples/consumer/GenericPullConsumerApp.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.openmessaging.samples.consumer; - -import io.openmessaging.api.GenericMessage; -import io.openmessaging.api.MessagingAccessPoint; -import io.openmessaging.api.OMS; -import io.openmessaging.api.OMSBuiltinKeys; -import io.openmessaging.api.PullConsumer; -import io.openmessaging.api.TopicPartition; -import io.openmessaging.samples.MessageSample; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -public class GenericPullConsumerApp { - public static void main(String[] args) { - //Load and start the vendor implementation from a specific OMS driver URL. - final MessagingAccessPoint messagingAccessPoint = - OMS.builder() - .endpoint("http://mq-instance-xxx-1234567890-test:8080") - .region("Shanghai") - .driver("rocketmq") - .build(); - - Properties properties = new Properties(); - properties.setProperty(OMSBuiltinKeys.DESERIALIZER, "io.openmessaging.openmeta.impl.Deserializer"); - properties.setProperty(OMSBuiltinKeys.OPEN_META_URL, "http://localhost:1234"); - - //Start a PullConsumer to receive messages from the specific queue. - final PullConsumer consumer = messagingAccessPoint.createPullConsumer(properties); - - //Register a shutdown hook to close the opened endpoints. - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - consumer.shutdown(); - } - })); - - Set topicPartitions = consumer.topicPartitions("NS://TOPIC"); - consumer.assign(topicPartitions); - consumer.start(); - - List> messages = consumer.pollGenericMessages(1000); - for (GenericMessage message : messages) { - System.out.println("Received message: " + message.getValue()); - } - //Acknowledge the consumed message - consumer.commitSync(); - consumer.shutdown(); - - } -} diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java b/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java index f1c6a13e..bdb5ea48 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java +++ b/openmessaging-api/src/main/java/io/openmessaging/api/MessageBuilder.java @@ -28,55 +28,55 @@ public interface MessageBuilder { /** * Used for set topic. * @param topic message topic - * @return + * @return {@link MessageBuilder} */ - MessageBuilder withTopic(String topic); + MessageBuilder withTopic(String topic); /** * Used for message key. * * @param key message key - * @return + * @return {@link MessageBuilder} */ - MessageBuilder withKey(String key); + MessageBuilder withKey(String key); /** * Used for set message tags. * - * @param tags - * @return + * @param tags message tags + * @return {@link MessageBuilder} */ - MessageBuilder withTags(String tags); + MessageBuilder withTags(String tags); /** * Used for set message sharding key. * - * @param shardingKey - * @return + * @param shardingKey message shardingKey + * @return {@link MessageBuilder} */ - MessageBuilder withShardingKey(String shardingKey); + MessageBuilder withShardingKey(String shardingKey); /** * Used for set user properties. * - * @param key - * @param value - * @return + * @param key the key of user property + * @param value the value of user property + * @return {@link MessageBuilder} */ - MessageBuilder withProperty(final String key, final String value); + MessageBuilder withProperty(final String key, final String value); /** * Used for set message body. * * @param t object need to be serialized. - * @return + * @return {@link MessageBuilder} */ - MessageBuilder withValue(T t); + MessageBuilder withValue(T t); /** * Get the topic which this {@code MessageBuilder} belongs to. * - * @return + * @return message topic */ String getTopic(); diff --git a/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java b/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java index 0c612e35..e01cc8cd 100644 --- a/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java +++ b/openmessaging-api/src/main/java/io/openmessaging/api/PullConsumer.java @@ -69,15 +69,6 @@ interface TopicPartitionChangeListener { */ List poll(long timeout); - /** - * Fetch Objects for the topics or partitions specified using assign API. It is an error to not have subscribed to - * any topics or partitions before polling for data. - * - * @param timeout in millisecond - * @return - */ - List> pollGenericMessages(long timeout); - /** * Overrides the fetch offsets that the consumer will use on the next {@link #poll(long)} }. If this API is invoked * for the same message queue more than once, the latest offset will be used on the next poll(). Note that you may