diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java new file mode 100644 index 000000000000..9a15ca9e63ed --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionContext.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +import java.util.List; + +public class EndTransactionContext { + private String producerGroup; + private Message message; + private String brokerAddr; + private String msgId; + private String transactionId; + private LocalTransactionState transactionState; + private boolean fromTransactionCheck; + + public String getProducerGroup() { + return producerGroup; + } + + public void setProducerGroup(String producerGroup) { + this.producerGroup = producerGroup; + } + + public Message getMessage() { + return message; + } + + public void setMessage(Message message) { + this.message = message; + } + + public String getBrokerAddr() { + return brokerAddr; + } + + public void setBrokerAddr(String brokerAddr) { + this.brokerAddr = brokerAddr; + } + + public String getMsgId() { + return msgId; + } + + public void setMsgId(String msgId) { + this.msgId = msgId; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public LocalTransactionState getTransactionState() { + return transactionState; + } + + public void setTransactionState(LocalTransactionState transactionState) { + this.transactionState = transactionState; + } + + public boolean isFromTransactionCheck() { + return fromTransactionCheck; + } + + public void setFromTransactionCheck(boolean fromTransactionCheck) { + this.fromTransactionCheck = fromTransactionCheck; + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java new file mode 100644 index 000000000000..834cb2731264 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/hook/EndTransactionHook.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.hook; + +public interface EndTransactionHook { + String hookName(); + + void endTransaction(final EndTransactionContext context); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 2f9146d910fb..fac3ed3561f2 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -44,6 +44,8 @@ import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.hook.CheckForbiddenContext; import org.apache.rocketmq.client.hook.CheckForbiddenHook; +import org.apache.rocketmq.client.hook.EndTransactionContext; +import org.apache.rocketmq.client.hook.EndTransactionHook; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.impl.CommunicationMode; @@ -101,6 +103,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final ConcurrentMap topicPublishInfoTable = new ConcurrentHashMap(); private final ArrayList sendMessageHookList = new ArrayList(); + private final ArrayList endTransactionHookList = new ArrayList(); private final RPCHook rpcHook; private final BlockingQueue asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; @@ -171,6 +174,11 @@ public void registerSendMessageHook(final SendMessageHook hook) { log.info("register sendMessage Hook, {}", hook.hookName()); } + public void registerEndTransactionHook(final EndTransactionHook hook) { + this.endTransactionHookList.add(hook); + log.info("register endTransaction Hook, {}", hook.hookName()); + } + public void start() throws MQClientException { this.start(true); } @@ -386,6 +394,7 @@ private void processTransactionState( if (exception != null) { remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); } + doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true); try { DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, @@ -967,6 +976,36 @@ public void executeSendMessageHookAfter(final SendMessageContext context) { } } + public boolean hasEndTransactionHook() { + return !this.endTransactionHookList.isEmpty(); + } + + public void executeEndTransactionHook(final EndTransactionContext context) { + if (!this.endTransactionHookList.isEmpty()) { + for (EndTransactionHook hook : this.endTransactionHookList) { + try { + hook.endTransaction(context); + } catch (Throwable e) { + log.warn("failed to executeEndTransactionHook", e); + } + } + } + } + + public void doExecuteEndTransactionHook(Message msg, String msgId, String brokerAddr, LocalTransactionState state, + boolean fromTransactionCheck) { + if (hasEndTransactionHook()) { + EndTransactionContext context = new EndTransactionContext(); + context.setProducerGroup(defaultMQProducer.getProducerGroup()); + context.setBrokerAddr(brokerAddr); + context.setMessage(msg); + context.setMsgId(msgId); + context.setTransactionId(msg.getTransactionId()); + context.setTransactionState(state); + context.setFromTransactionCheck(fromTransactionCheck); + executeEndTransactionHook(context); + } + } /** * DEFAULT ONEWAY ------------------------------------------------------- */ @@ -1266,7 +1305,7 @@ public TransactionSendResult sendMessageInTransaction(final Message msg, } try { - this.endTransaction(sendResult, localTransactionState, localException); + this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } @@ -1290,6 +1329,7 @@ public SendResult send( } public void endTransaction( + final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { @@ -1318,6 +1358,7 @@ public void endTransaction( break; } + doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 24caf140544e..1c4a9315a8cd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -29,6 +29,7 @@ import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl; import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; @@ -167,6 +168,8 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean en traceDispatcher = dispatcher; this.defaultMQProducerImpl.registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); + this.defaultMQProducerImpl.registerEndTransactionHook( + new EndTransactionTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } @@ -252,6 +255,8 @@ public DefaultMQProducer(final String namespace, final String producerGroup, RPC traceDispatcher = dispatcher; this.getDefaultMQProducerImpl().registerSendMessageHook( new SendMessageTraceHookImpl(traceDispatcher)); + this.defaultMQProducerImpl.registerEndTransactionHook( + new EndTransactionTraceHookImpl(traceDispatcher)); } catch (Throwable e) { log.error("system mqtrace hook init failed ,maybe can't send msg trace data"); } @@ -916,24 +921,24 @@ public SendResult send(Collection msgs, MessageQueue messageQueue, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.defaultMQProducerImpl.send(batch(msgs), messageQueue, timeout); } - + @Override public void send(Collection msgs, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.defaultMQProducerImpl.send(batch(msgs), sendCallback); } - + @Override public void send(Collection msgs, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.defaultMQProducerImpl.send(batch(msgs), sendCallback, timeout); } - + @Override public void send(Collection msgs, MessageQueue mq, SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.defaultMQProducerImpl.send(batch(msgs), queueWithNamespace(mq), sendCallback); } - + @Override public void send(Collection msgs, MessageQueue mq, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index 63b512df7d53..4eb758df401f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -51,6 +51,10 @@ public TransactionMQProducer(final String namespace, final String producerGroup, super(namespace, producerGroup, rpcHook); } + public TransactionMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace, final String customizedTraceTopic) { + super(namespace, producerGroup, rpcHook, enableMsgTrace, customizedTraceTopic); + } + @Override public void start() throws MQClientException { this.defaultMQProducerImpl.initTransactionEnv(); diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java index f93aa38b8293..70c147e1eb34 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceBean.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.trace; +import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageType; @@ -32,7 +33,9 @@ public class TraceBean { private int retryTimes; private int bodyLength; private MessageType msgType; - + private LocalTransactionState transactionState; + private String transactionId; + private boolean fromTransactionCheck; public MessageType getMsgType() { return msgType; @@ -141,4 +144,28 @@ public int getBodyLength() { public void setBodyLength(int bodyLength) { this.bodyLength = bodyLength; } + + public LocalTransactionState getTransactionState() { + return transactionState; + } + + public void setTransactionState(LocalTransactionState transactionState) { + this.transactionState = transactionState; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public boolean isFromTransactionCheck() { + return fromTransactionCheck; + } + + public void setFromTransactionCheck(boolean fromTransactionCheck) { + this.fromTransactionCheck = fromTransactionCheck; + } } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index acf0dea8b8a2..b2b06452f62e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.trace; +import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.common.message.MessageType; import java.util.ArrayList; @@ -109,6 +110,27 @@ public static List decoderFromTraceDataString(String traceData) { subAfterContext.setGroupName(line[8]); } resList.add(subAfterContext); + } else if (line[0].equals(TraceType.EndTransaction.name())) { + TraceContext endTransactionContext = new TraceContext(); + endTransactionContext.setTraceType(TraceType.EndTransaction); + endTransactionContext.setTimeStamp(Long.parseLong(line[1])); + endTransactionContext.setRegionId(line[2]); + endTransactionContext.setGroupName(line[3]); + TraceBean bean = new TraceBean(); + bean.setTopic(line[4]); + bean.setMsgId(line[5]); + bean.setTags(line[6]); + bean.setKeys(line[7]); + bean.setStoreHost(line[8]); + bean.setMsgType(MessageType.values()[Integer.parseInt(line[9])]); + bean.setClientHost(line[10]); + bean.setTransactionId(line[11]); + bean.setTransactionState(LocalTransactionState.valueOf(line[12])); + bean.setFromTransactionCheck(Boolean.parseBoolean(line[13])); + + endTransactionContext.setTraceBeans(new ArrayList(1)); + endTransactionContext.getTraceBeans().add(bean); + resList.add(endTransactionContext); } } return resList; @@ -173,9 +195,26 @@ public static TraceTransferBean encoderFromContextBean(TraceContext ctx) { .append(ctx.getContextCode()).append(TraceConstants.CONTENT_SPLITOR) .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR) .append(ctx.getGroupName()).append(TraceConstants.FIELD_SPLITOR); - + } } + case EndTransaction: { + TraceBean bean = ctx.getTraceBeans().get(0); + sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getTimeStamp()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getRegionId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(ctx.getGroupName()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getTopic()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getTags()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getKeys()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getStoreHost()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getMsgType().ordinal()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getClientHost()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getTransactionId()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.getTransactionState().name()).append(TraceConstants.CONTENT_SPLITOR)// + .append(bean.isFromTransactionCheck()).append(TraceConstants.FIELD_SPLITOR); + } break; default: } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java index 79b19c17e4e5..8870ddcbdb3e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceType.java @@ -20,4 +20,5 @@ public enum TraceType { Pub, SubBefore, SubAfter, + EndTransaction, } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java new file mode 100644 index 000000000000..cbd755ba39e1 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionTraceHookImpl.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.hook; + +import org.apache.rocketmq.client.hook.EndTransactionContext; +import org.apache.rocketmq.client.hook.EndTransactionHook; +import org.apache.rocketmq.client.trace.AsyncTraceDispatcher; +import org.apache.rocketmq.client.trace.TraceBean; +import org.apache.rocketmq.client.trace.TraceContext; +import org.apache.rocketmq.client.trace.TraceDispatcher; +import org.apache.rocketmq.client.trace.TraceType; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.protocol.NamespaceUtil; + +import java.util.ArrayList; + +public class EndTransactionTraceHookImpl implements EndTransactionHook { + + private TraceDispatcher localDispatcher; + + public EndTransactionTraceHookImpl(TraceDispatcher localDispatcher) { + this.localDispatcher = localDispatcher; + } + + @Override + public String hookName() { + return "EndTransactionTraceHook"; + } + + @Override + public void endTransaction(EndTransactionContext context) { + //if it is message trace data,then it doesn't recorded + if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) { + return; + } + Message msg = context.getMessage(); + //build the context content of TuxeTraceContext + TraceContext tuxeContext = new TraceContext(); + tuxeContext.setTraceBeans(new ArrayList(1)); + tuxeContext.setTraceType(TraceType.EndTransaction); + tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup())); + //build the data bean object of message trace + TraceBean traceBean = new TraceBean(); + traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic())); + traceBean.setTags(context.getMessage().getTags()); + traceBean.setKeys(context.getMessage().getKeys()); + traceBean.setStoreHost(context.getBrokerAddr()); + traceBean.setMsgType(MessageType.Trans_msg_Commit); + traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId()); + traceBean.setMsgId(context.getMsgId()); + traceBean.setTransactionState(context.getTransactionState()); + traceBean.setTransactionId(context.getTransactionId()); + traceBean.setFromTransactionCheck(context.isFromTransactionCheck()); + String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION); + if (regionId == null || regionId.isEmpty()) { + regionId = MixAll.DEFAULT_TRACE_REGION_ID; + } + tuxeContext.setRegionId(regionId); + tuxeContext.getTraceBeans().add(traceBean); + tuxeContext.setTimeStamp(System.currentTimeMillis()); + localDispatcher.append(tuxeContext); + } + +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java index 249a0d1f14a2..bac12ea0ced4 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.trace; +import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageType; import org.junit.Assert; @@ -90,4 +91,46 @@ public void testEncoderFromContextBean() { Assert.assertEquals(traceTransferBean.getTransKey().size(), 2); } -} \ No newline at end of file + @Test + public void testEncoderFromContextBean_EndTransaction() { + TraceContext context = new TraceContext(); + context.setTraceType(TraceType.EndTransaction); + context.setGroupName("PID-test"); + context.setRegionId("DefaultRegion"); + context.setTimeStamp(time); + TraceBean traceBean = new TraceBean(); + traceBean.setTopic("topic-test"); + traceBean.setKeys("Keys"); + traceBean.setTags("Tags"); + traceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000"); + traceBean.setStoreHost("127.0.0.1:10911"); + traceBean.setClientHost("127.0.0.1@41700"); + traceBean.setMsgType(MessageType.Trans_msg_Commit); + traceBean.setTransactionId("transactionId"); + traceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE); + traceBean.setFromTransactionCheck(false); + List traceBeans = new ArrayList(); + traceBeans.add(traceBean); + context.setTraceBeans(traceBeans); + TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(context); + + Assert.assertEquals(traceTransferBean.getTransKey().size(), 2); + String traceData = traceTransferBean.getTransData(); + TraceContext contextAfter = TraceDataEncoder.decoderFromTraceDataString(traceData).get(0); + Assert.assertEquals(context.getTraceType(), contextAfter.getTraceType()); + Assert.assertEquals(context.getTimeStamp(), contextAfter.getTimeStamp()); + Assert.assertEquals(context.getGroupName(), contextAfter.getGroupName()); + TraceBean before = context.getTraceBeans().get(0); + TraceBean after = contextAfter.getTraceBeans().get(0); + Assert.assertEquals(before.getTopic(), after.getTopic()); + Assert.assertEquals(before.getMsgId(), after.getMsgId()); + Assert.assertEquals(before.getTags(), after.getTags()); + Assert.assertEquals(before.getKeys(), after.getKeys()); + Assert.assertEquals(before.getStoreHost(), after.getStoreHost()); + Assert.assertEquals(before.getMsgType(), after.getMsgType()); + Assert.assertEquals(before.getClientHost(), after.getClientHost()); + Assert.assertEquals(before.getTransactionId(), after.getTransactionId()); + Assert.assertEquals(before.getTransactionState(), after.getTransactionState()); + Assert.assertEquals(before.isFromTransactionCheck(), after.isFromTransactionCheck()); + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java new file mode 100644 index 000000000000..40bb01c48568 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace; + +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.EndTransactionContext; +import org.apache.rocketmq.client.hook.EndTransactionHook; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TransactionMQProducerWithTraceTest { + + @Spy + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); + @Mock + private MQClientAPIImpl mQClientAPIImpl; + @Mock + private EndTransactionHook endTransactionHook; + + private AsyncTraceDispatcher asyncTraceDispatcher; + + private TransactionMQProducer producer; + private DefaultMQProducer traceProducer; + + private Message message; + private String topic = "FooBar"; + private String producerGroupPrefix = "FooBar_PID"; + private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private String customerTraceTopic = "rmq_trace_topic_12345"; + + @Before + public void init() throws Exception { + TransactionListener transactionListener = new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }; + producer = new TransactionMQProducer(null, producerGroupTemp, null, true, null); + producer.setTransactionListener(transactionListener); + + producer.setNamesrvAddr("127.0.0.1:9876"); + message = new Message(topic, new byte[] {'a', 'b', 'c'}); + asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); + traceProducer = asyncTraceDispatcher.getTraceProducer(); + + producer.start(); + + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); + + Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + fieldTrace.setAccessible(true); + fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); + + Field fieldHooks = DefaultMQProducerImpl.class.getDeclaredField("endTransactionHookList"); + fieldHooks.setAccessible(true); + Listhooks = new ArrayList<>(); + hooks.add(endTransactionHook); + fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks); + + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) + .thenReturn(createSendResult(SendStatus.SEND_OK)); + + } + + @Test + public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + AtomicReference context = new AtomicReference<>(); + doAnswer(mock -> { + context.set(mock.getArgument(0)); + return null; + }).when(endTransactionHook).endTransaction(any()); + producer.sendMessageInTransaction(message, null); + + EndTransactionContext ctx = context.get(); + assertThat(ctx.getProducerGroup()).isEqualTo(producerGroupTemp); + assertThat(ctx.getMsgId()).isEqualTo("123"); + assertThat(ctx.isFromTransactionCheck()).isFalse(); + assertThat(new String(ctx.getMessage().getBody())).isEqualTo(new String(message.getBody())); + assertThat(ctx.getMessage().getTopic()).isEqualTo(topic); + } + + @After + public void terminate() { + producer.shutdown(); + } + + public static TopicRouteData createTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("BrokerA"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("BrokerA"); + queueData.setPerm(6); + queueData.setReadQueueNums(3); + queueData.setWriteQueueNums(4); + queueData.setTopicSynFlag(0); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + private SendResult createSendResult(SendStatus sendStatus) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("123"); + sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1)); + sendResult.setQueueOffset(456); + sendResult.setSendStatus(sendStatus); + sendResult.setRegionId("HZ"); + sendResult.setMessageQueue(new MessageQueue(topic, "broker-trace", 0)); + return sendResult; + } + +}