Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

跟进rocketmq 4.3.0 版本 #13

Merged
merged 3 commits into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [x] 消息tag和key支持
* [x] 自动序列化和反序列化消息体
* [x] 消息的实际消费方IP追溯
* [x] 发送事务消息(NEW)
* [ ] ...
* [x] ~~发送即忘消息~~(可能由于直接抛弃所有异常导致消息静默丢失,弃用)
* [x] ~~拉取方式消费~~(配置方式复杂,位点可能发生偏移,弃用)
Expand All @@ -33,7 +34,7 @@
<dependency>
<groupId>com.maihaoche</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>0.0.7</version>
<version>0.1.0</version>
</dependency>
```

Expand Down Expand Up @@ -87,10 +88,10 @@ public class DemoProducer extends AbstractMQProducer{
##### 6. 创建消费方

详见[wiki](https://github.com/maihaoche/rocketmq-spring-boot-starter/wiki/%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5-Consumer):
**支持配置项解析**,如存在`suclogger-test-cluster`配置项,会优先将topic解析为配置项对应的值。
**支持springEL风格配置项解析**,如存在`suclogger-test-cluster`配置项,会优先将topic解析为配置项对应的值。

```java
@MQConsumer(topic = "suclogger-test-cluster", consumerGroup = "local_sucloger_dev")
@MQConsumer(topic = "${suclogger-test-cluster}", consumerGroup = "local_sucloger_dev")
public class DemoConsumer extends AbstractMQPushConsumer {

@Override
Expand Down Expand Up @@ -118,3 +119,32 @@ demoProducer.syncSend(msg)
```



------

### 发送事务消息###

> Since 0.1.0



##### 5.1 事务消息发送方#####

```java
@MQTransactionProducer(producerGroup = "${camaro.mq.transactionProducerGroup}")
public class DemoTransactionProducer extends AbstractMQTransactionProducer {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// executeLocalTransaction
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// LocalTransactionState.ROLLBACK_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
}
}
```

4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<properties>
<java.version>1.8</java.version>
<jacoco.version>0.7.9</jacoco.version>
<rocketmq.version>4.2.0</rocketmq.version>
<rocketmq.version>4.3.0</rocketmq.version>
<file_encoding>UTF-8</file_encoding>
</properties>

Expand All @@ -33,7 +33,7 @@

<groupId>com.maihaoche</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>0.0.7</version>
<version>0.1.0</version>

<scm>
<url>https://github.com/maihaoche/rocketmq-spring-boot-starter</url>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.maihaoche.starter.mq.annotation;

import org.springframework.stereotype.Component;

import java.lang.annotation.*;

/**
* Created by pufang on 2018/7/26.
* RocketMQ事务消息生产者
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface MQTransactionProducer {

/**
* *重要* 事务的反查是基于同一个producerGroup为维度
*/
String producerGroup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.maihaoche.starter.mq.base;

import com.maihaoche.starter.mq.MQException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
* Created by pufang on 20180726.
* RocketMQ的事务生产者的抽象基类
*/
@Slf4j
public abstract class AbstractMQTransactionProducer implements TransactionListener {

private TransactionMQProducer transactionProducer;

public void setProducer(TransactionMQProducer transactionProducer) {
this.transactionProducer = transactionProducer;
}

public SendResult sendMessageInTransaction(Message msg, Object arg) throws MQException {
try {
SendResult sendResult = transactionProducer.sendMessageInTransaction(msg, arg);
if(sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("事务消息发送失败,topic : {}, msgObj {}", msg.getTopic(), msg);
throw new MQException("事务消息发送失败,topic :" + msg.getTopic() + ", status :" + sendResult.getSendStatus());
}
log.info("发送事务消息成功,事务id: {}", msg.getTransactionId());
return sendResult;
} catch (Exception e) {
log.error("事务消息发送失败,topic : {}, msgObj {}", msg.getTopic(), msg);
throw new MQException("事务消息发送失败,topic :" + msg.getTopic() + ",e:" + e.getMessage());
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.maihaoche.starter.mq.base.AbstractMQPushConsumer;
import com.maihaoche.starter.mq.base.MessageExtConst;
import com.maihaoche.starter.mq.trace.common.OnsTraceConstants;
import com.maihaoche.starter.mq.trace.dispatch.AsyncAppender;
import com.maihaoche.starter.mq.trace.dispatch.impl.AsyncTraceAppender;
import com.maihaoche.starter.mq.trace.dispatch.impl.AsyncTraceDispatcher;
import com.maihaoche.starter.mq.trace.tracehook.OnsConsumeMessageHookImpl;
Expand All @@ -18,14 +17,12 @@
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.*;

/**
* Created by suclogger on 2017/6/28.
Expand All @@ -37,16 +34,21 @@
public class MQConsumerAutoConfiguration extends MQBaseAutoConfiguration {

private AsyncTraceDispatcher asyncTraceDispatcher;
// 维护一份map用于检测是否用同样的consumerGroup订阅了不同的topic+tag
private Map<String, String> validConsumerMap;

@PostConstruct
public void init() throws Exception {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(MQConsumer.class);
if(!CollectionUtils.isEmpty(beans) && mqProperties.getTraceEnabled()) {
initAsyncAppender();
}
validConsumerMap = new HashMap<>();
for (Map.Entry<String, Object> entry : beans.entrySet()) {
publishConsumer(entry.getKey(), entry.getValue());
}
// 清空map,等待回收
validConsumerMap = null;
}

private AsyncTraceDispatcher initAsyncAppender() {
Expand Down Expand Up @@ -80,22 +82,31 @@ private void publishConsumer(String beanName, Object bean) throws Exception {
if (!AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) {
throw new RuntimeException(bean.getClass().getName() + " - consumer未实现Consumer抽象类");
}
Environment environment = applicationContext.getEnvironment();

String consumerGroup = applicationContext.getEnvironment().getProperty(mqConsumer.consumerGroup());
if (StringUtils.isEmpty(consumerGroup)) {
consumerGroup = mqConsumer.consumerGroup();
String consumerGroup = environment.resolvePlaceholders(mqConsumer.consumerGroup());
String topic = environment.resolvePlaceholders(mqConsumer.topic());
String tags = "*";
if(mqConsumer.tag().length == 1) {
tags = environment.resolvePlaceholders(mqConsumer.tag()[0]);
} else if(mqConsumer.tag().length > 1) {
tags = StringUtils.join(mqConsumer.tag(), "||");
}
String topic = applicationContext.getEnvironment().getProperty(mqConsumer.topic());
if (StringUtils.isEmpty(topic)) {
topic = mqConsumer.topic();

// 检查consumerGroup
if(!StringUtils.isEmpty(validConsumerMap.get(consumerGroup))) {
String exist = validConsumerMap.get(consumerGroup);
throw new RuntimeException("消费组重复订阅,请新增消费组用于新的topic和tag组合: " + consumerGroup + "已经订阅了" + exist);
} else {
validConsumerMap.put(consumerGroup, topic + "-" + tags);
}

// 配置push consumer
if (AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(mqProperties.getNameServerAddress());
consumer.setMessageModel(MessageModel.valueOf(mqConsumer.messageMode()));
consumer.subscribe(topic, StringUtils.join(mqConsumer.tag(), "||"));
consumer.subscribe(topic, tags);
consumer.setInstanceName(UUID.randomUUID().toString());
consumer.setVipChannelEnabled(mqProperties.getVipChannelEnabled());
AbstractMQPushConsumer abstractMQPushConsumer = (AbstractMQPushConsumer) bean;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package com.maihaoche.starter.mq.config;

import com.maihaoche.starter.mq.annotation.MQProducer;
import com.maihaoche.starter.mq.annotation.MQTransactionProducer;
import com.maihaoche.starter.mq.base.AbstractMQTransactionProducer;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.*;

/**
* Created by yipin on 2017/6/29.
Expand Down Expand Up @@ -43,4 +49,35 @@ public DefaultMQProducer exposeProducer() throws Exception {
return producer;
}

@PostConstruct
public void configTransactionProducer() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(MQTransactionProducer.class);
if(CollectionUtils.isEmpty(beans)){
return;
}
ExecutorService executorService = new ThreadPoolExecutor(beans.size(), beans.size()*2, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
Environment environment = applicationContext.getEnvironment();
beans.entrySet().forEach( transactionProducer -> {
try {
AbstractMQTransactionProducer beanObj = AbstractMQTransactionProducer.class.cast(transactionProducer.getValue());
MQTransactionProducer anno = beanObj.getClass().getAnnotation(MQTransactionProducer.class);

TransactionMQProducer producer = new TransactionMQProducer(environment.resolvePlaceholders(anno.producerGroup()));
producer.setNamesrvAddr(mqProperties.getNameServerAddress());
producer.setExecutorService(executorService);
producer.setTransactionListener(beanObj);
producer.start();
beanObj.setProducer(producer);
} catch (Exception e) {
log.error("build transaction producer error : {}", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;

import java.util.*;

Expand All @@ -21,7 +21,7 @@
* Created by alvin on 16-3-7.
*/
public class AsyncTraceAppender extends AsyncAppender {
private final static Logger clientlog = ClientLogger.getLog();
private final static InternalLogger clientlog = ClientLogger.getLog();
/**
* batch大小
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.maihaoche.starter.mq.trace.dispatch.AsyncAppender;
import com.maihaoche.starter.mq.trace.dispatch.AsyncDispatcher;
import org.apache.rocketmq.client.log.ClientLogger;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;

import java.io.IOException;
import java.util.Properties;
Expand All @@ -19,7 +19,7 @@
* 异步提交消息轨迹等数据
*/
public class AsyncTraceDispatcher extends AsyncDispatcher {
private final static Logger clientlog = ClientLogger.getLog();
private final static InternalLogger clientlog = ClientLogger.getLog();
// RingBuffer 实现,size 必须为 2 的 n 次方
private final Object[] entries;
private final int queueSize;
Expand Down