Skip to content

RocketMQ 客户端简单封装

javahongxi edited this page Jul 25, 2019 · 1 revision
public class ClientConfig {

    protected String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));

    protected String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public String getInstanceName() {
        return instanceName;
    }

    public void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }
}
@Slf4j
public class Producer extends ClientConfig implements FactoryBean<DefaultMQProducer>,InitializingBean,DisposableBean {

    private DefaultMQProducer producer;

    private String producerGroup;

    private volatile int defaultTopicQueueNums = 4;

    private int sendMsgTimeout = 3000;

    private int retryTimesWhenSendFailed = 2;

    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
        this.defaultTopicQueueNums = defaultTopicQueueNums;
    }

    public void setSendMsgTimeout(int sendMsgTimeout) {
        this.sendMsgTimeout = sendMsgTimeout;
    }

    public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
        this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
    }

    @Override
    public DefaultMQProducer getObject() throws Exception {
        return producer;
    }

    @Override
    public Class<?> getObjectType() {
        return DefaultMQProducer.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        producer = new DefaultMQProducer(producerGroup);
        producer.setInstanceName(instanceName);
        producer.setNamesrvAddr(namesrvAddr);
        producer.setDefaultTopicQueueNums(defaultTopicQueueNums);
        producer.setSendMsgTimeout(sendMsgTimeout);
        producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
        producer.start();
        log.info("Producer Group {} started!", producerGroup);
    }

    @Override
    public void destroy() throws Exception {
        if (producer != null) {
            producer.shutdown();
        }
    }
}
public class MessagingException extends RuntimeException {

    private static final long serialVersionUID = -5758410930844185841L;
    private int responseCode;
    private String errorMessage;

    public MessagingException(String errorMessage, Throwable cause) {
        super(FAQUrl.attachDefaultURL(errorMessage), cause);
        this.responseCode = -1;
        this.errorMessage = errorMessage;
    }

    public MessagingException(int responseCode, String errorMessage) {
        super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
                + errorMessage));
        this.responseCode = responseCode;
        this.errorMessage = errorMessage;
    }

    public int getResponseCode() {
        return responseCode;
    }

    public MessagingException setResponseCode(final int responseCode) {
        this.responseCode = responseCode;
        return this;
    }

    public String getErrorMessage() {
        return errorMessage;
    }

    public void setErrorMessage(final String errorMessage) {
        this.errorMessage = errorMessage;
    }
}
@Slf4j
public class RocketMQTemplate {

    private DefaultMQProducer defaultMQProducer;

    public RocketMQTemplate(DefaultMQProducer producer) {
        this.defaultMQProducer = producer;
    }

    public SendResult send(String topic, String body) {
        return send(topic, "", body);
    }

    public SendResult send(String topic, String tags, String body) {
        return send(topic, tags, "", body);
    }

    public SendResult send(String topic, String tags, String keys, String body) {
        try {
            return send(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
        } catch (Exception e) {
            log.error("send error, topic:{}, tags:{}, keys:{}, body:{}",
                    topic, tags, keys, body, e);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    private SendResult send(Message message) throws Exception {
        SendResult sendResult = this.defaultMQProducer.send(message);
        log.debug("send result: {}", sendResult);
        return sendResult;
    }

    public SendResult sendOrderly(String topic, String keys, String body) {
        return sendOrderly(topic, "", keys, body);
    }

    public SendResult sendOrderly(String topic, String tags, String keys, String body) {
        try {
            return sendOrderly(new Message(topic, tags, keys, body.getBytes(RemotingHelper.DEFAULT_CHARSET)));
        } catch (Exception e) {
            log.error("send error, topic:{}, tags:{}, keys:{}, body:{}",
                    topic, tags, keys, body, e);
            throw new MessagingException(e.getMessage(), e);
        }
    }

    private SendResult sendOrderly(Message message) throws Exception {
        SendResult sendResult = this.defaultMQProducer.send(message,
                new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        long id = NumberUtils.toLong(String.valueOf(arg));
                        int index = (int) (id % mqs.size());
                        return mqs.get(index);
                    }
                }, message.getKeys());
        log.debug("send result: {}", sendResult);
        return sendResult;
    }

}
@Slf4j
public class Consumer extends ClientConfig implements FactoryBean<DefaultMQPushConsumer>,InitializingBean,DisposableBean {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup;

    private MessageModel messageModel = MessageModel.CLUSTERING;

    private int consumeThreadMin = 20;

    private int consumeThreadMax = 64;

    private int pullThresholdForQueue = 1000;

    private int pullThresholdSizeForQueue = 100;

    private int consumeMessageBatchMaxSize = 1;

    private int pullBatchSize = 32;

    private int maxReconsumeTimes = -1;

    private long consumeTimeout = 15;

    private String topic;

    private String tags;

    private MessageListenerConcurrently messageListener;

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public void setMessageModel(String messageModel) {
        this.messageModel = MessageModel.valueOf(messageModel);
    }

    public void setConsumeThreadMin(int consumeThreadMin) {
        this.consumeThreadMin = consumeThreadMin;
    }

    public void setConsumeThreadMax(int consumeThreadMax) {
        this.consumeThreadMax = consumeThreadMax;
    }

    public void setPullThresholdForQueue(int pullThresholdForQueue) {
        this.pullThresholdForQueue = pullThresholdForQueue;
    }

    public void setPullThresholdSizeForQueue(int pullThresholdSizeForQueue) {
        this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
    }

    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
        this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
    }

    public void setPullBatchSize(int pullBatchSize) {
        this.pullBatchSize = pullBatchSize;
    }

    public void setMaxReconsumeTimes(int maxReconsumeTimes) {
        this.maxReconsumeTimes = maxReconsumeTimes;
    }

    public void setConsumeTimeout(long consumeTimeout) {
        this.consumeTimeout = consumeTimeout;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public void setTags(String tags) {
        this.tags = tags;
    }

    public void setMessageListener(MessageListenerConcurrently messageListener) {
        this.messageListener = messageListener;
    }

    @Override
    public DefaultMQPushConsumer getObject() throws Exception {
        return consumer;
    }

    @Override
    public Class<?> getObjectType() {
        return DefaultMQPushConsumer.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setInstanceName(instanceName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setMessageModel(messageModel);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.setPullThresholdForQueue(pullThresholdForQueue);
        consumer.setPullThresholdSizeForQueue(pullThresholdSizeForQueue);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        consumer.setPullBatchSize(pullBatchSize);
        consumer.setMaxReconsumeTimes(maxReconsumeTimes);
        consumer.setConsumeTimeout(consumeTimeout);
        consumer.subscribe(topic, tags);
        consumer.registerMessageListener(messageListener);
        consumer.start();
        log.info("Consumer Group {} started!", consumerGroup);
    }

    @Override
    public void destroy() throws Exception {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

demo:

rocketmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"
       default-autowire="byName">

    <bean id="defaultMQProducer" class="org.hongxi.whatsmars.rocketmq.config.spring.Producer">
        <property name="producerGroup" value="quick_start_producer_group" />
        <!--<property name="namesrvAddr" value="127.0.0.1:9876" />-->
    </bean>

    <bean id="rocketMQTemplate" class="org.hongxi.whatsmars.rocketmq.config.spring.RocketMQTemplate">
        <constructor-arg index="0" ref="defaultMQProducer" />
    </bean>
</beans>

rocketmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd"
       default-autowire="byName">

    <bean class="org.hongxi.whatsmars.rocketmq.config.spring.Consumer">
        <property name="consumerGroup" value="quick_start_consumer_group" />
        <!--<property name="namesrvAddr" value="127.0.0.1:9876" />-->
        <property name="topic" value="sdk-test" />
        <property name="tags" value="*" />
        <property name="messageListener" ref="demoMessageListener" />
    </bean>

    <bean id="demoMessageListener" class="org.hongxi.whatsmars.rocketmq.spring.demo.DemoMessageListener" />
</beans>
/**
 * -Drocketmq.namesrv.addr=127.0.0.1:9876
 */
public class Producer {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("rocketmq-producer.xml");
        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) context.getBean("rocketMQTemplate");
        for (int i = 0; i < 20; i++) {
            try {
                SendResult sendResult = rocketMQTemplate.send("sdk-test", "rocketMQTemplate");
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
/**
 * -Drocketmq.namesrv.addr=127.0.0.1:9876
 */
public class Consumer {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("rocketmq-consumer.xml");
        context.registerShutdownHook();
    }
}
public class DemoMessageListener implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + messages + "%n");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

首页

Java核心技术

Netty

RocketMQ深入研究

kafka深入研究

Pulsar深入研究

Dubbo源码导读

微服务架构

Redis

Elasticsearch

其他

杂谈

关于我

Clone this wiki locally