Skip to content

Commit

Permalink
Issue apache#12 消息轨迹优化,支持异步发送和pull方式消费
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzlalvin committed Jan 26, 2016
1 parent 1e9f0d7 commit 9f5ae8a
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 46 deletions.
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;

import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
Expand All @@ -34,7 +35,15 @@ public class SendMessageContext {
private Exception exception;
private Object mqTraceContext;
private Map<String, String> props;
private DefaultMQProducerImpl producer;

public DefaultMQProducerImpl getProducer() {
return producer;
}

public void setProducer(final DefaultMQProducerImpl producer) {
this.producer = producer;
}

public String getProducerGroup() {
return producerGroup;
Expand Down
Expand Up @@ -15,19 +15,13 @@
*/
package com.alibaba.rocketmq.client.impl;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;

import com.alibaba.rocketmq.client.ClientConfig;
import com.alibaba.rocketmq.client.consumer.PullCallback;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullStatus;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.hook.SendMessageContext;
import com.alibaba.rocketmq.client.impl.consumer.PullResultExt;
import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
Expand Down Expand Up @@ -65,6 +59,12 @@
import com.alibaba.rocketmq.remoting.protocol.LanguageCode;
import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand Down Expand Up @@ -220,9 +220,10 @@ public SendResult sendMessage(//
final Message msg, // 3
final SendMessageRequestHeader requestHeader, // 4
final long timeoutMillis, // 5
final CommunicationMode communicationMode// 6
final CommunicationMode communicationMode,// 6
final SendMessageContext context//7
) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0);
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0,context);
}


Expand All @@ -236,7 +237,8 @@ public SendResult sendMessage(//
final SendCallback sendCallback, // 7
final TopicPublishInfo topicPublishInfo, // 8
final MQClientInstance instance, // 9
final int retryTimesWhenSendFailed // 10
final int retryTimesWhenSendFailed, // 10
final SendMessageContext context //11
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
if (sendSmartMsg) {
Expand All @@ -256,7 +258,7 @@ public SendResult sendMessage(//
case ASYNC:
final AtomicInteger times = new AtomicInteger();
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times);
retryTimesWhenSendFailed, times,context);
return null;
case SYNC:
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
Expand All @@ -283,50 +285,63 @@ private SendResult sendMessageSync(//


private void sendMessageAsync(//
final String addr, //
final String brokerName, //
final Message msg, //
final long timeoutMillis, //
final RemotingCommand request, //
final SendCallback sendCallback, //
final TopicPublishInfo topicPublishInfo, //
final MQClientInstance instance, //
final int retryTimesWhenSendFailed, //
final AtomicInteger times) throws InterruptedException, RemotingException {
final String addr, //
final String brokerName, //
final Message msg, //
final long timeoutMillis, //
final RemotingCommand request, //
final SendCallback sendCallback, //
final TopicPublishInfo topicPublishInfo, //
final MQClientInstance instance, //
final int retryTimesWhenSendFailed, //
final AtomicInteger times,
final SendMessageContext context) throws InterruptedException, RemotingException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
if (null == sendCallback)
return;

RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback){
//如果没有回调,则只尝试执行hook,不抛异常
try {
if(response!=null){
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
if(context!=null&&sendResult!=null){
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
}
} catch (Exception e) {
}
}
if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
assert sendResult != null;
assert sendResult != null&&context!=null;
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
sendCallback.onSuccess(sendResult);
}
catch (Exception e) {
onExceptionImpl(brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e);
retryTimesWhenSendFailed, times, e,context);
}
}
else {
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex);
retryTimesWhenSendFailed, times, ex,context);
}
else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex);
retryTimesWhenSendFailed, times, ex,context);
}
else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex);
retryTimesWhenSendFailed, times, ex,context);
}
}
}
Expand All @@ -343,7 +358,8 @@ private void onExceptionImpl(final String brokerName, //
final MQClientInstance instance, //
final int timesTotal, //
final AtomicInteger curTimes, //
final Exception e) {
final Exception e,//
final SendMessageContext context) {
int tmp = curTimes.incrementAndGet();
if (tmp <= timesTotal) {
MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(brokerName);
Expand All @@ -353,18 +369,23 @@ private void onExceptionImpl(final String brokerName, //
tmpmq.getBrokerName());
try {
sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
timesTotal, curTimes);
timesTotal, curTimes,context);
}
catch (InterruptedException e1) {
onExceptionImpl(brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes,
e1);
e1,context);
}
catch (RemotingException e1) {
onExceptionImpl(brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes,
e1);
e1,context);
}
}
else {
//最终发送失败的地方,记录轨迹埋点
if(context!=null){
context.setException(e);
context.getProducer().executeSendMessageHookAfter(context);
}
sendCallback.onException(e);
}
}
Expand Down
Expand Up @@ -20,12 +20,15 @@
import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullCallback;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
import com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
import com.alibaba.rocketmq.client.hook.ConsumeMessageHook;
import com.alibaba.rocketmq.client.hook.FilterMessageHook;
import com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.alibaba.rocketmq.client.impl.MQClientManager;
Expand All @@ -47,7 +50,6 @@
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -64,17 +66,46 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private PullAPIWrapper pullAPIWrapper;
private OffsetStore offsetStore;
private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);

private final long consumerStartTimestamp = System.currentTimeMillis();

private final RPCHook rpcHook;

private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();

public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
this.defaultMQPullConsumer = defaultMQPullConsumer;
this.rpcHook = rpcHook;
}

public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
this.consumeMessageHookList.add(hook);
log.info("register consumeMessageHook Hook, {}", hook.hookName());
}


public void executeHookBefore(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageBefore(context);
}
catch (Throwable e) {
}
}
}
}

public void executeHookAfter(final ConsumeMessageContext context) {
if (!this.consumeMessageHookList.isEmpty()) {
for (ConsumeMessageHook hook : this.consumeMessageHookList) {
try {
hook.consumeMessageAfter(context);
}
catch (Throwable e) {
}
}
}
}

public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, newTopic, queueNum, 0);
Expand Down Expand Up @@ -314,8 +345,21 @@ private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offs
CommunicationMode.SYNC, // 10
null// 11
);

return this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
ConsumeMessageContext consumeMessageContext = null;
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(this.groupName());
consumeMessageContext.setMq(mq);
consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
consumeMessageContext.setSuccess(false);
this.executeHookBefore(consumeMessageContext);
//拉模式消费消息,拉到消息即认为消费成功。
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
return pullResult;
}


Expand Down
Expand Up @@ -15,13 +15,6 @@
*/
package com.alibaba.rocketmq.client.impl.producer;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;

import org.slf4j.Logger;

import com.alibaba.rocketmq.client.QueryResult;
import com.alibaba.rocketmq.client.Validators;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
Expand All @@ -48,6 +41,12 @@
import com.alibaba.rocketmq.remoting.RPCHook;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;

import java.io.IOException;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.*;


/**
Expand Down Expand Up @@ -640,6 +639,7 @@ private SendResult sendKernelImpl(final Message msg, //

if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
Expand Down Expand Up @@ -681,7 +681,9 @@ private SendResult sendKernelImpl(final Message msg, //
communicationMode, // 6
sendCallback, // 7
topicPublishInfo, // 8
this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendFailed());
this.mQClientFactory,//9
this.defaultMQProducer.getRetryTimesWhenSendFailed(),//10
context );
break;
case ONEWAY:
case SYNC:
Expand All @@ -691,7 +693,8 @@ private SendResult sendKernelImpl(final Message msg, //
msg, // 3
requestHeader, // 4
timeout, // 5
communicationMode// 6
communicationMode,// 6
context
);
break;
default:
Expand Down

0 comments on commit 9f5ae8a

Please sign in to comment.