RocketMQ客户端简单封装

javahongxi edited this page Jan 9, 2019 · 2 revisions
public class ClientConfig {

    protected String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));

    protected String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public String getInstanceName() {
        return instanceName;
    }

    public void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }
}
@Slf4j
public class Producer extends ClientConfig implements FactoryBean<DefaultMQProducer>,InitializingBean,DisposableBean {

    private DefaultMQProducer producer;

    private String producerGroup;

    private volatile int defaultTopicQueueNums = 4;

    private int sendMsgTimeout = 3000;

    private int retryTimesWhenSendFailed = 2;

    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
        this.defaultTopicQueueNums = defaultTopicQueueNums;
    }

    public void setSendMsgTimeout(int sendMsgTimeout) {
        this.sendMsgTimeout = sendMsgTimeout;
    }

    public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
        this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
    }

    @Override
    public DefaultMQProducer getObject() throws Exception {
        return producer;
    }

    @Override
    public Class<?> getObjectType() {
        return DefaultMQProducer.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new DefaultMQProducer(producerGroup);
        producer.setInstanceName(instanceName);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setDefaultTopicQueueNums(defaultTopicQueueNums);
        producer.setSendMsgTimeout(sendMsgTimeout);
        producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
        producer.start();
        log.info("Producer Group {} started!", producerGroup);
    }

    @Override
    public void destroy() throws Exception {
        if (producer != null) {
            producer.shutdown();
        }
    }
}
public class MessagingException extends RuntimeException {

    private static final long serialVersionUID = -5758410930844185841L;
    private int responseCode;
    private String errorMessage;

    public MessagingException(String errorMessage, Throwable cause) {
        super(FAQUrl.attachDefaultURL(errorMessage), cause);
        this.responseCode = -1;
        this.errorMessage = errorMessage;
    }

    public MessagingException(int responseCode, String errorMessage) {
        super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
                + errorMessage));
        this.responseCode = responseCode;
        this.errorMessage = errorMessage;
    }

    public int getResponseCode() {
        return responseCode;
    }

    public MessagingException setResponseCode(final int responseCode) {
        this.responseCode = responseCode;
        return this;
    }

    public String getErrorMessage() {
        return errorMessage;
    }

    public void setErrorMessage(final String errorMessage) {
        this.errorMessage = errorMessage;
    }
}
@Slf4j
public class RocketMQTemplate {

    private DefaultMQProducer defaultMQProducer;

    public RocketMQTemplate(DefaultMQProducer producer) {
        this.defaultMQProducer = producer;
    }

    public SendResult send(String topic, String body) {
        return send(topic, "", body);
    }

    public SendResult send(String topic, String tags, String body) {
        return send(topic, tags, "", body);
    }

    public SendResult send(String topic, String tags, String keys, String body) {
        try {
            return send(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
        } catch (Exception e) {
            log.error("send error, topic:{}, tags:{}, keys:{}, body:{}",
                    topic, tags, keys, body, e);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    private SendResult send(Message message) throws Exception {
        SendResult sendResult = this.defaultMQProducer.send(message);
        log.debug("send result: {}", sendResult);
        return sendResult;
    }

    public SendResult sendOrderly(String topic, String keys, String body) {
        return sendOrderly(topic, "", keys, body);
    }

    public SendResult sendOrderly(String topic, String tags, String keys, String body) {
        try {
            return sendOrderly(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
        } catch (Exception e) {
            log.error("send error, topic:{}, tags:{}, keys:{}, body:{}",
                    topic, tags, keys, body, e);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    private SendResult sendOrderly(Message message) throws Exception {
        SendResult sendResult = this.defaultMQProducer.send(message,
                new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        long id = NumberUtils.toLong(String.valueOf(arg));
                        int index = (int) (id % mqs.size());
                        return mqs.get(index);
                    }
                }, message.getKeys());
        log.debug("send result: {}", sendResult);
        return sendResult;
    }

}
@Slf4j
public class Consumer extends ClientConfig implements FactoryBean<DefaultMQPushConsumer>,InitializingBean,DisposableBean {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup;

    private MessageModel messageModel = MessageModel.CLUSTERING;

    private int consumeThreadMin = 20;

    private int consumeThreadMax = 64;

    private int pullThresholdForQueue = 1000;

    private int pullThresholdSizeForQueue = 100;

    private int consumeMessageBatchMaxSize = 1;

    private int pullBatchSize = 32;

    private int maxReconsumeTimes = -1;

    private long consumeTimeout = 15;

    private String topic;

    private String tags;

    private MessageListenerConcurrently messageListener;

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public void setMessageModel(String messageModel) {
        this.messageModel = MessageModel.valueOf(messageModel);
    }

    public void setConsumeThreadMin(int consumeThreadMin) {
        this.consumeThreadMin = consumeThreadMin;
    }

    public void setConsumeThreadMax(int consumeThreadMax) {
        this.consumeThreadMax = consumeThreadMax;
    }

    public void setPullThresholdForQueue(int pullThresholdForQueue) {
        this.pullThresholdForQueue = pullThresholdForQueue;
    }

    public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) {
        this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
    }

    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
    }

    public void setPullBatchSize(int pullBatchSize) {
        this.pullBatchSize = pullBatchSize;
    }

    public void setMaxReconsumeTimes(int maxReconsumeTimes) {
        this.maxReconsumeTimes = maxReconsumeTimes;
    }

    public void setConsumeTimeout(long consumeTimeout) {
        this.consumeTimeout = consumeTimeout;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void setTags(String tags) {
        this.tags = tags;
    }

    public void setMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
    }

    @Override
    public DefaultMQPushConsumer getObject() throws Exception {
        return consumer;
    }

    @Override
    public Class<?> getObjectType() {
        return DefaultMQPushConsumer.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setInstanceName(instanceName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setMessageModel(messageModel);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setPullThresholdForQueue(pullThresholdForQueue);
        consumer.setPullThresholdSizeForQueue(pullThresholdSizeForQueue);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        consumer.setPullBatchSize(pullBatchSize);
        consumer.setMaxReconsumeTimes(maxReconsumeTimes);
        consumer.setConsumeTimeout(consumeTimeout);
        consumer.subscribe(topic, tags);
        consumer.registerMessageListener(messageListener);
        consumer.start();
        log.info("Consumer Group {} started!", consumerGroup);
    }

    @Override
    public void destroy() throws Exception {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

demo:

rocketmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"
       default-autowire="byName">

    <bean id="defaultMQProducer" class="org.hongxi.whatsmars.rocketmq.config.spring.Producer">
        <property name="producerGroup" value="quick_start_producer_group" />
        <!--<property name="namesrvAddr" value="127.0.0.1:9876" />-->
    </bean>

    <bean id="rocketMQTemplate" class="org.hongxi.whatsmars.rocketmq.config.spring.RocketMQTemplate">
        <constructor-arg index="0" ref="defaultMQProducer" />
    </bean>
</beans>

rocketmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"
       default-autowire="byName">

    <bean class="org.hongxi.whatsmars.rocketmq.config.spring.Consumer">
        <property name="consumerGroup" value="quick_start_consumer_group" />
        <!--<property name="namesrvAddr" value="127.0.0.1:9876" />-->
        <property name="topic" value="sdk-test" />
        <property name="tags" value="*" />
        <property name="messageListener" ref="demoMessageListener" />
    </bean>

    <bean id="demoMessageListener" class="org.hongxi.whatsmars.rocketmq.spring.demo.DemoMessageListener" />
</beans>
/**
 * -Drocketmq.namesrv.addr=127.0.0.1:9876
 */
public class Producer {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("rocketmq-producer.xml");
        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) context.getBean("rocketMQTemplate");
        for (int i = 0; i < 20; i++) {
            try {
                SendResult sendResult = rocketMQTemplate.send("sdk-test", "rocketMQTemplate");
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
/**
 * -Drocketmq.namesrv.addr=127.0.0.1:9876
 */
public class Consumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("rocketmq-consumer.xml");
        context.registerShutdownHook();
    }
}
public class DemoMessageListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + messages + "%n");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.