Skip to content

Commit

Permalink
[ISSUE apache#2833] Support trace for TranscationProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
yuz10 committed Apr 23, 2021
1 parent bc4ecb3 commit 6cee912
Show file tree
Hide file tree
Showing 11 changed files with 561 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;

public class EndTransactionContext {
private String producerGroup;
private Message message;
private String brokerAddr;
private String msgId;
private String transactionId;
private LocalTransactionState transactionState;
private boolean fromTransactionCheck;

public String getProducerGroup() {
return producerGroup;
}

public void setProducerGroup(String producerGroup) {
this.producerGroup = producerGroup;
}

public Message getMessage() {
return message;
}

public void setMessage(Message message) {
this.message = message;
}

public String getBrokerAddr() {
return brokerAddr;
}

public void setBrokerAddr(String brokerAddr) {
this.brokerAddr = brokerAddr;
}

public String getMsgId() {
return msgId;
}

public void setMsgId(String msgId) {
this.msgId = msgId;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}

public LocalTransactionState getTransactionState() {
return transactionState;
}

public void setTransactionState(LocalTransactionState transactionState) {
this.transactionState = transactionState;
}

public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}

public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.hook;

public interface EndTransactionHook {
String hookName();

void endTransaction(final EndTransactionContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.EndTransactionContext;
import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
Expand Down Expand Up @@ -101,6 +103,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
Expand Down Expand Up @@ -171,6 +174,11 @@ public void registerSendMessageHook(final SendMessageHook hook) {
log.info("register sendMessage Hook, {}", hook.hookName());
}

public void registerEndTransactionHook(final EndTransactionHook hook) {
this.endTransactionHookList.add(hook);
log.info("register endTransaction Hook, {}", hook.hookName());
}

public void start() throws MQClientException {
this.start(true);
}
Expand Down Expand Up @@ -386,6 +394,7 @@ private void processTransactionState(
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);

try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
Expand Down Expand Up @@ -967,6 +976,36 @@ public void executeSendMessageHookAfter(final SendMessageContext context) {
}
}

public boolean hasEndTransactionHook() {
return !this.endTransactionHookList.isEmpty();
}

public void executeEndTransactionHook(final EndTransactionContext context) {
if (!this.endTransactionHookList.isEmpty()) {
for (EndTransactionHook hook : this.endTransactionHookList) {
try {
hook.endTransaction(context);
} catch (Throwable e) {
log.warn("failed to executeEndTransactionHook", e);
}
}
}
}

public void doExecuteEndTransactionHook(Message msg, String msgId, String brokerAddr, LocalTransactionState state,
boolean fromTransactionCheck) {
if (hasEndTransactionHook()) {
EndTransactionContext context = new EndTransactionContext();
context.setProducerGroup(defaultMQProducer.getProducerGroup());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMsgId(msgId);
context.setTransactionId(msg.getTransactionId());
context.setTransactionState(state);
context.setFromTransactionCheck(fromTransactionCheck);
executeEndTransactionHook(context);
}
}
/**
* DEFAULT ONEWAY -------------------------------------------------------
*/
Expand Down Expand Up @@ -1266,7 +1305,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
}

try {
this.endTransaction(sendResult, localTransactionState, localException);
this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
Expand All @@ -1290,6 +1329,7 @@ public SendResult send(
}

public void endTransaction(
final Message msg,
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
Expand Down Expand Up @@ -1318,6 +1358,7 @@ public void endTransaction(
break;
}

doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message;
Expand Down Expand Up @@ -167,6 +168,8 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean en
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
Expand Down Expand Up @@ -252,6 +255,8 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
this.defaultMQProducerImpl.registerEndTransactionHook(
new EndTransactionTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
Expand Down Expand Up @@ -916,24 +921,24 @@ public SendResult send(Collection<Message> msgs, MessageQueue messageQueue,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout);
}

@Override
public void send(Collection<Message> msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback);
}

@Override
public void send(Collection<Message> msgs, SendCallback sendCallback,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout);
}

@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback);
}

@Override
public void send(Collection<Message> msgs, MessageQueue mq,
SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public TransactionMQProducer(final String namespace, final String producerGroup,
super(namespace, producerGroup, rpcHook);
}

public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) {
super(namespace, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic);
}

@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageType;

Expand All @@ -32,7 +33,9 @@ public class TraceBean {
private int retryTimes;
private int bodyLength;
private MessageType msgType;

private LocalTransactionState transactionState;
private String transactionId;
private boolean fromTransactionCheck;

public MessageType getMsgType() {
return msgType;
Expand Down Expand Up @@ -141,4 +144,28 @@ public int getBodyLength() {
public void setBodyLength(int bodyLength) {
this.bodyLength = bodyLength;
}

public LocalTransactionState getTransactionState() {
return transactionState;
}

public void setTransactionState(LocalTransactionState transactionState) {
this.transactionState = transactionState;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(String transactionId) {
this.transactionId = transactionId;
}

public boolean isFromTransactionCheck() {
return fromTransactionCheck;
}

public void setFromTransactionCheck(boolean fromTransactionCheck) {
this.fromTransactionCheck = fromTransactionCheck;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.trace;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.MessageType;

import java.util.ArrayList;
Expand Down Expand Up @@ -109,6 +110,27 @@ public static List<TraceContext> decoderFromTraceDataString(String traceData) {
subAfterContext.setGroupName(line[8]);
}
resList.add(subAfterContext);
} else if (line[0].equals(TraceType.EndTransaction.name())) {
TraceContext endTransactionContext = new TraceContext();
endTransactionContext.setTraceType(TraceType.EndTransaction);
endTransactionContext.setTimeStamp(Long.parseLong(line[1]));
endTransactionContext.setRegionId(line[2]);
endTransactionContext.setGroupName(line[3]);
TraceBean bean = new TraceBean();
bean.setTopic(line[4]);
bean.setMsgId(line[5]);
bean.setTags(line[6]);
bean.setKeys(line[7]);
bean.setStoreHost(line[8]);
bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]);
bean.setClientHost(line[10]);
bean.setTransactionId(line[11]);
bean.setTransactionState(LocalTransactionState.valueOf(line[12]));
bean.setFromTransactionCheck(Boolean.parseBoolean(line[13]));

endTransactionContext.setTraceBeans(new ArrayList<TraceBean>(1));
endTransactionContext.getTraceBeans().add(bean);
resList.add(endTransactionContext);
}
}
return resList;
Expand Down Expand Up @@ -173,9 +195,26 @@ public static TraceTransferBean encoderFromContextBean(TraceContext ctx) {
.append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)
.append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR);

}
}
case EndTransaction: {
TraceBean bean = ctx.getTraceBeans().get(0);
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)//
.append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR);
}
break;
default:
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ public enum TraceType {
Pub,
SubBefore,
SubAfter,
EndTransaction,
}
Loading

0 comments on commit 6cee912

Please sign in to comment.