Skip to content

Commit

Permalink
finish rocketmq starter
Browse files Browse the repository at this point in the history
  • Loading branch information
rhwayfun committed Dec 16, 2017
1 parent 566ba1e commit 47d0990
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package com.rhwayfun.springboot.rocketmq.starter.common;

import com.alibaba.fastjson.JSON;
import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqContent;
import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTag;
import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTopic;
import com.rhwayfun.springboot.rocketmq.starter.handler.MessageHandler;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Abstract message listener.
Expand All @@ -22,39 +26,68 @@
* @since 0.0.1
*/
public abstract class AbstractRocketMqConsumer
extends AbstractRocketMqSubscribe implements MessageHandler, MessageListenerConcurrently {
<Topic extends RocketMqTopic, Tag extends RocketMqTag, Content extends RocketMqContent>
implements MessageListenerConcurrently
//implements MessageListener
{

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected Class<Topic> topicClazz;

protected Class<Tag> tagClazz;

protected Class<Content> contentClazz;

public abstract Map<String, Set<String>> subscribeTopicTags();

public abstract boolean handle(String topic, String tag, Content content, MessageExt msg);

@PostConstruct
public void init() {
Class<? extends AbstractRocketMqConsumer> parentClazz = this.getClass();
Type genType = parentClazz.getGenericSuperclass();// 得到泛型父类
Type[] types = ((ParameterizedType) genType).getActualTypeArguments();//一个泛型类可能有多个泛型形参,比如ClassName<T,K> 这里有两个泛型形参T和K,Class Name<T> 这里只有1个泛型形参T
topicClazz = (Class<Topic>) types[0];
contentClazz = (Class<Content>) types[2];
logger.info("topicClazz:{}, contentClazz:{}", topicClazz, contentClazz);
}

public Class<?> getModelClass(Class modelClass, int index) {
Type genType = this.getClass().getGenericSuperclass();// 得到泛型父类
Type[] params = ((ParameterizedType) genType).getActualTypeArguments();//一个泛型类可能有多个泛型形参,比如ClassName<T,K> 这里有两个泛型形参T和K,Class Name<T> 这里只有1个泛型形参T
if (params.length - 1 < index) {
modelClass = null;
} else {
modelClass = (Class) params[index];
}
return modelClass;
}

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
/*byte[] body = msg.getBody();
String bodyString = getBodyString(body);
String key = msg.getKeys();
byte[] body = msg.getBody();
String topic = msg.getTopic();
String tags = msg.getTags();
logger.warn(Thread.currentThread().getName() + " Receive New Messages: " + bodyString + " ,key:" + key
+ ",tags:" + tags + ",topic:" + topic);*/

long bornTimestamp = msg.getBornTimestamp();
long currentTimeMillis = System.currentTimeMillis();
long timeElapsedFromStoreInMqToReceiveMsg = currentTimeMillis - bornTimestamp;
ConsumeConcurrentlyStatus consumeConcurrentlyStatus = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

// 300s = 5min 作为一个消费阈值,超过这个值的消息都作为无效消息判断
if (timeElapsedFromStoreInMqToReceiveMsg >= 300000) {
logger.warn("msg:{} is invalid, it was born {}s ago", msg, timeElapsedFromStoreInMqToReceiveMsg / 1000);
return consumeConcurrentlyStatus;
}

try {
consumeConcurrentlyStatus = handle(topic, tag, content, msg) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
consumeConcurrentlyStatus = handle(topic, tags, parseMsg(body, contentClazz), msg) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
} catch (Throwable t) {
logger.warn("mq handler error, msg info:{}", msg, t);
}

return consumeConcurrentlyStatus;
}

Expand All @@ -69,4 +102,17 @@ private String getBodyString(byte[] body) {
}
return bodyString;
}

private <T> T parseMsg(byte[] body, Class<? extends RocketMqContent> clazz){
T t = null;
if (body != null) {
try {
t = JSON.parseObject(body, clazz);
} catch (Exception e) {
logger.error("can not parse to object", e);
}
}
return t;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.rhwayfun.springboot.rocketmq.starter.config;

import com.rhwayfun.springboot.rocketmq.starter.common.AbstractRocketMqConsumer;
import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTag;
import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTopic;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
Expand All @@ -18,8 +17,9 @@
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* @author rhwayfun
Expand Down Expand Up @@ -50,22 +50,24 @@ public DefaultMQPushConsumer defaultMQPushConsumer(List<AbstractRocketMqConsumer
if (rocketMqProperties.getConsumeThreadMax() != null) {
consumer.setConsumeThreadMax(rocketMqProperties.getConsumeThreadMax());
}
if (rocketMqProperties.getMessageModel() != null) {
consumer.setMessageModel(MessageModel.valueOf(rocketMqProperties.getMessageModel()));
}
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(rocketMqProperties.getNameServer());

messageListeners.forEach(messageListener -> {
Map<RocketMqTopic, Set<RocketMqTag>> subscribeTopicTags = messageListener.subscribeTopicTags();
Map<String, Set<String>> subscribeTopicTags = messageListener.subscribeTopicTags();
subscribeTopicTags.entrySet().forEach(e -> {
try {
RocketMqTopic rocketMqTopic = e.getKey();
Set<RocketMqTag> rocketMqTags = e.getValue();
String rocketMqTopic = e.getKey();
Set<String> rocketMqTags = e.getValue();
if (CollectionUtils.isEmpty(rocketMqTags)) {
consumer.subscribe(rocketMqTopic.getTopic(), "*");
consumer.subscribe(rocketMqTopic, "*");
} else {
Set<String> tagSet = rocketMqTags.stream().map(RocketMqTag::getTag).collect(Collectors.toSet());
String tags = StringUtils.join(tagSet, " || ");
consumer.subscribe(rocketMqTopic.getTopic(), tags);
LOGGER.info("subscribe, topic:{}, tags:{}", rocketMqTopic.getTopic(), tags);
String tags = StringUtils.join(rocketMqTags, " || ");
consumer.subscribe(rocketMqTopic, tags);
LOGGER.info("subscribe, topic:{}, tags:{}", rocketMqTopic, tags);
}
} catch (MQClientException ex) {
LOGGER.error("consumer subscribe error", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class RocketMqProperties {
private String consumerGroupName;
private Integer consumeThreadMin;
private Integer consumeThreadMax;
//private String subscribes;
private String messageModel;//CLUSTERING、BROADCASTING,默认CLUSTER

public String getNameServer() {
return nameServer;
Expand Down Expand Up @@ -58,11 +58,12 @@ public void setConsumeThreadMax(Integer consumeThreadMax) {
this.consumeThreadMax = consumeThreadMax;
}

/*public String getSubscribes() {
return subscribes;
public String getMessageModel() {
return messageModel;
}

public void setMessageModel(String messageModel) {
this.messageModel = messageModel;
}

public void setSubscribes(String subscribes) {
this.subscribes = subscribes;
}*/
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.rhwayfun.springboot.rocketmq.starter.constants;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;

import java.io.Serializable;

/**
Expand All @@ -10,4 +13,8 @@ public class RocketMqContent implements Serializable {

private static final long serialVersionUID = 1L;

@Override
public String toString() {
return JSON.toJSONString(this, SerializerFeature.NotWriteDefaultValue);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.rhwayfun.springboot.rocketmq.mq;

import com.rhwayfun.springboot.rocketmq.starter.common.AbstractRocketMqConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* @author rhwayfun
* @since 0.0.1
*/
@Component
public class DemoRocketMqConsumerExample
extends AbstractRocketMqConsumer<DemoRocketMqTopic, DemoRocketMqTag, DemoRocketMqContent> {

@Override
public Map<String, Set<String>> subscribeTopicTags() {
Map<String, Set<String>> topicSetMap = new HashMap<>();
Set<String> tagSet = new HashSet<>();
tagSet.add("TagA");
tagSet.add("TagB");
topicSetMap.put("TopicA", tagSet);
return topicSetMap;
}

@Override
public boolean handle(String topic, String tag, DemoRocketMqContent content, MessageExt msg) {
logger.info("receive msg[{}], topic:{}, tag:{}, content:{}", msg, topic, tag, content);
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.rhwayfun.springboot.rocketmq.mq;

import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqContent;

/**
* @author rhwayfun
* @since 0.0.1
*/
public class DemoRocketMqContent extends RocketMqContent {

private int cityId;
private String desc;

public int getCityId() {
return cityId;
}

public void setCityId(int cityId) {
this.cityId = cityId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.rhwayfun.springboot.rocketmq.mq;

import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTag;

/**
* @author rhwayfun
* @since 0.0.1
*/
public class DemoRocketMqTag implements RocketMqTag {
@Override
public String getTag() {
return "TagA";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.rhwayfun.springboot.rocketmq.mq;

import com.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTopic;

/**
* @author rhwayfun
* @since 0.0.1
*/
public class DemoRocketMqTopic implements RocketMqTopic {
@Override
public String getTopic() {
return "TopicA";
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.rhwayfun.springboot.rocketmq.mq;
package com.rhwayfun.springboot.rocketmq.other;

import com.rhwayfun.springboot.rocketmq.starter.common.AbstractRocketMqConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
Expand Down
Loading

0 comments on commit 47d0990

Please sign in to comment.