Skip to content

Commit

Permalink
Merge be5f246 into 7579b15
Browse files Browse the repository at this point in the history
  • Loading branch information
suclogger committed Aug 10, 2018
2 parents 7579b15 + be5f246 commit 018a9b0
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 21 deletions.
36 changes: 33 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [x] 消息tag和key支持
* [x] 自动序列化和反序列化消息体
* [x] 消息的实际消费方IP追溯
* [x] 发送事务消息(NEW)
* [ ] ...
* [x] ~~发送即忘消息~~(可能由于直接抛弃所有异常导致消息静默丢失,弃用)
* [x] ~~拉取方式消费~~(配置方式复杂,位点可能发生偏移,弃用)
Expand All @@ -33,7 +34,7 @@
<dependency>
<groupId>com.maihaoche</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>0.0.7</version>
<version>0.1.0</version>
</dependency>
```

Expand Down Expand Up @@ -87,10 +88,10 @@ public class DemoProducer extends AbstractMQProducer{
##### 6. 创建消费方

详见[wiki](https://github.com/maihaoche/rocketmq-spring-boot-starter/wiki/%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5-Consumer)
**支持配置项解析**,如存在`suclogger-test-cluster`配置项,会优先将topic解析为配置项对应的值。
**支持springEL风格配置项解析**,如存在`suclogger-test-cluster`配置项,会优先将topic解析为配置项对应的值。

```java
@MQConsumer(topic = "suclogger-test-cluster", consumerGroup = "local_sucloger_dev")
@MQConsumer(topic = "${suclogger-test-cluster}", consumerGroup = "local_sucloger_dev")
public class DemoConsumer extends AbstractMQPushConsumer {

@Override
Expand Down Expand Up @@ -118,3 +119,32 @@ demoProducer.syncSend(msg)
```



------

### 发送事务消息###

> Since 0.1.0


##### 5.1 事务消息发送方#####

```java
@MQTransactionProducer(producerGroup = "${camaro.mq.transactionProducerGroup}")
public class DemoTransactionProducer extends AbstractMQTransactionProducer {

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// executeLocalTransaction
return LocalTransactionState.UNKNOW;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// LocalTransactionState.ROLLBACK_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
}
}
```

4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<properties>
<java.version>1.8</java.version>
<jacoco.version>0.7.9</jacoco.version>
<rocketmq.version>4.2.0</rocketmq.version>
<rocketmq.version>4.3.0</rocketmq.version>
<file_encoding>UTF-8</file_encoding>
</properties>

Expand All @@ -33,7 +33,7 @@

<groupId>com.maihaoche</groupId>
<artifactId>spring-boot-starter-rocketmq</artifactId>
<version>0.0.7</version>
<version>0.1.0</version>

<scm>
<url>https://github.com/maihaoche/rocketmq-spring-boot-starter</url>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.maihaoche.starter.mq.annotation;

import org.springframework.stereotype.Component;

import java.lang.annotation.*;

/**
* Created by pufang on 2018/7/26.
* RocketMQ事务消息生产者
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface MQTransactionProducer {

/**
* *重要* 事务的反查是基于同一个producerGroup为维度
*/
String producerGroup();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.maihaoche.starter.mq.base;

import com.maihaoche.starter.mq.MQException;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

/**
* Created by pufang on 20180726.
* RocketMQ的事务生产者的抽象基类
*/
@Slf4j
public abstract class AbstractMQTransactionProducer implements TransactionListener {

private TransactionMQProducer transactionProducer;

public void setProducer(TransactionMQProducer transactionProducer) {
this.transactionProducer = transactionProducer;
}

public SendResult sendMessageInTransaction(Message msg, Object arg) throws MQException {
try {
SendResult sendResult = transactionProducer.sendMessageInTransaction(msg, arg);
if(sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("事务消息发送失败,topic : {}, msgObj {}", msg.getTopic(), msg);
throw new MQException("事务消息发送失败,topic :" + msg.getTopic() + ", status :" + sendResult.getSendStatus());
}
log.info("发送事务消息成功,事务id: {}", msg.getTransactionId());
return sendResult;
} catch (Exception e) {
log.error("事务消息发送失败,topic : {}, msgObj {}", msg.getTopic(), msg);
throw new MQException("事务消息发送失败,topic :" + msg.getTopic() + ",e:" + e.getMessage());
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.maihaoche.starter.mq.base.AbstractMQPushConsumer;
import com.maihaoche.starter.mq.base.MessageExtConst;
import com.maihaoche.starter.mq.trace.common.OnsTraceConstants;
import com.maihaoche.starter.mq.trace.dispatch.AsyncAppender;
import com.maihaoche.starter.mq.trace.dispatch.impl.AsyncTraceAppender;
import com.maihaoche.starter.mq.trace.dispatch.impl.AsyncTraceDispatcher;
import com.maihaoche.starter.mq.trace.tracehook.OnsConsumeMessageHookImpl;
Expand All @@ -18,14 +17,12 @@
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.*;

/**
* Created by suclogger on 2017/6/28.
Expand All @@ -37,16 +34,21 @@
public class MQConsumerAutoConfiguration extends MQBaseAutoConfiguration {

private AsyncTraceDispatcher asyncTraceDispatcher;
// 维护一份map用于检测是否用同样的consumerGroup订阅了不同的topic+tag
private Map<String, String> validConsumerMap;

@PostConstruct
public void init() throws Exception {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(MQConsumer.class);
if(!CollectionUtils.isEmpty(beans) && mqProperties.getTraceEnabled()) {
initAsyncAppender();
}
validConsumerMap = new HashMap<>();
for (Map.Entry<String, Object> entry : beans.entrySet()) {
publishConsumer(entry.getKey(), entry.getValue());
}
// 清空map,等待回收
validConsumerMap = null;
}

private AsyncTraceDispatcher initAsyncAppender() {
Expand Down Expand Up @@ -80,22 +82,31 @@ private void publishConsumer(String beanName, Object bean) throws Exception {
if (!AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) {
throw new RuntimeException(bean.getClass().getName() + " - consumer未实现Consumer抽象类");
}
Environment environment = applicationContext.getEnvironment();

String consumerGroup = applicationContext.getEnvironment().getProperty(mqConsumer.consumerGroup());
if (StringUtils.isEmpty(consumerGroup)) {
consumerGroup = mqConsumer.consumerGroup();
String consumerGroup = environment.resolvePlaceholders(mqConsumer.consumerGroup());
String topic = environment.resolvePlaceholders(mqConsumer.topic());
String tags = "*";
if(mqConsumer.tag().length == 1) {
tags = environment.resolvePlaceholders(mqConsumer.tag()[0]);
} else if(mqConsumer.tag().length > 1) {
tags = StringUtils.join(mqConsumer.tag(), "||");
}
String topic = applicationContext.getEnvironment().getProperty(mqConsumer.topic());
if (StringUtils.isEmpty(topic)) {
topic = mqConsumer.topic();

// 检查consumerGroup
if(!StringUtils.isEmpty(validConsumerMap.get(consumerGroup))) {
String exist = validConsumerMap.get(consumerGroup);
throw new RuntimeException("消费组重复订阅,请新增消费组用于新的topic和tag组合: " + consumerGroup + "已经订阅了" + exist);
} else {
validConsumerMap.put(consumerGroup, topic + "-" + tags);
}

// 配置push consumer
if (AbstractMQPushConsumer.class.isAssignableFrom(bean.getClass())) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(mqProperties.getNameServerAddress());
consumer.setMessageModel(MessageModel.valueOf(mqConsumer.messageMode()));
consumer.subscribe(topic, StringUtils.join(mqConsumer.tag(), "||"));
consumer.subscribe(topic, tags);
consumer.setInstanceName(UUID.randomUUID().toString());
consumer.setVipChannelEnabled(mqProperties.getVipChannelEnabled());
AbstractMQPushConsumer abstractMQPushConsumer = (AbstractMQPushConsumer) bean;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package com.maihaoche.starter.mq.config;

import com.maihaoche.starter.mq.annotation.MQProducer;
import com.maihaoche.starter.mq.annotation.MQTransactionProducer;
import com.maihaoche.starter.mq.base.AbstractMQTransactionProducer;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.*;

/**
* Created by yipin on 2017/6/29.
Expand Down Expand Up @@ -43,4 +49,35 @@ public DefaultMQProducer exposeProducer() throws Exception {
return producer;
}

@PostConstruct
public void configTransactionProducer() {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(MQTransactionProducer.class);
if(CollectionUtils.isEmpty(beans)){
return;
}
ExecutorService executorService = new ThreadPoolExecutor(beans.size(), beans.size()*2, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
Environment environment = applicationContext.getEnvironment();
beans.entrySet().forEach( transactionProducer -> {
try {
AbstractMQTransactionProducer beanObj = AbstractMQTransactionProducer.class.cast(transactionProducer.getValue());
MQTransactionProducer anno = beanObj.getClass().getAnnotation(MQTransactionProducer.class);

TransactionMQProducer producer = new TransactionMQProducer(environment.resolvePlaceholders(anno.producerGroup()));
producer.setNamesrvAddr(mqProperties.getNameServerAddress());
producer.setExecutorService(executorService);
producer.setTransactionListener(beanObj);
producer.start();
beanObj.setProducer(producer);
} catch (Exception e) {
log.error("build transaction producer error : {}", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;

import java.util.*;

Expand All @@ -21,7 +21,7 @@
* Created by alvin on 16-3-7.
*/
public class AsyncTraceAppender extends AsyncAppender {
private final static Logger clientlog = ClientLogger.getLog();
private final static InternalLogger clientlog = ClientLogger.getLog();
/**
* batch大小
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.maihaoche.starter.mq.trace.dispatch.AsyncAppender;
import com.maihaoche.starter.mq.trace.dispatch.AsyncDispatcher;
import org.apache.rocketmq.client.log.ClientLogger;
import org.slf4j.Logger;
import org.apache.rocketmq.logging.InternalLogger;

import java.io.IOException;
import java.util.Properties;
Expand All @@ -19,7 +19,7 @@
* 异步提交消息轨迹等数据
*/
public class AsyncTraceDispatcher extends AsyncDispatcher {
private final static Logger clientlog = ClientLogger.getLog();
private final static InternalLogger clientlog = ClientLogger.getLog();
// RingBuffer 实现,size 必须为 2 的 n 次方
private final Object[] entries;
private final int queueSize;
Expand Down

0 comments on commit 018a9b0

Please sign in to comment.