Skip to content

Commit

Permalink
[ISSUE apache#1120] Add new feature: support namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
lollipopjin committed Mar 28, 2019
1 parent c508e1c commit f3d8d38
Show file tree
Hide file tree
Showing 28 changed files with 1,054 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ConsumeMessageContext {
private BrokerStatsManager.StatsType commercialRcvStats;
private int commercialRcvTimes;
private int commercialRcvSize;
private String namespace;

public String getConsumerGroup() {
return consumerGroup;
Expand Down Expand Up @@ -147,4 +148,12 @@ public int getCommercialRcvSize() {
public void setCommercialRcvSize(final int commercialRcvSize) {
this.commercialRcvSize = commercialRcvSize;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ public class SendMessageContext {
private long bornTimeStamp;
private MessageType msgType = MessageType.Trans_msg_Commit;
private boolean isSuccess = false;
//For Commercial

private String commercialOwner;
private BrokerStatsManager.StatsType commercialSendStats;
private int commercialSendSize;
private int commercialSendTimes;
private String namespace;

public boolean isSuccess() {
return isSuccess;
Expand Down Expand Up @@ -229,4 +230,12 @@ public int getCommercialSendTimes() {
public void setCommercialSendTimes(final int commercialSendTimes) {
this.commercialSendTimes = commercialSendTimes;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.broker.processor;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;

import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
Expand All @@ -27,11 +33,10 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
Expand All @@ -40,18 +45,14 @@
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.utils.ChannelUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Random;

public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);

Expand All @@ -73,9 +74,11 @@ protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
if (!this.hasSendMessageHook()) {
return null;
}
String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
SendMessageContext mqtraceContext;
mqtraceContext = new SendMessageContext();
mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
mqtraceContext.setNamespace(namespace);
mqtraceContext.setTopic(requestHeader.getTopic());
mqtraceContext.setMsgProps(requestHeader.getProperties());
mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
Expand Down Expand Up @@ -253,7 +256,9 @@ public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final
try {
final SendMessageRequestHeader requestHeader = parseRequestHeader(request);

String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic());
if (null != requestHeader) {
context.setNamespace(namespace);
context.setProducerGroup(requestHeader.getProducerGroup());
context.setTopic(requestHeader.getTopic());
context.setBodyLength(request.getBody().length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.rocketmq.broker.processor;

import java.net.SocketAddress;
import java.util.List;
import java.util.Map;

import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
Expand All @@ -33,6 +37,7 @@
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
Expand All @@ -49,10 +54,6 @@
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

import java.net.SocketAddress;
import java.util.List;
import java.util.Map;

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

private List<ConsumeMessageHook> consumeMessageHookList;
Expand Down Expand Up @@ -101,9 +102,11 @@ private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, fin
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {

ConsumeMessageContext context = new ConsumeMessageContext();
context.setNamespace(namespace);
context.setConsumerGroup(requestHeader.getGroup());
context.setTopic(requestHeader.getOriginTopic());
context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@
*/
package org.apache.rocketmq.client;

import java.util.HashSet;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
Expand All @@ -31,6 +37,7 @@ public class ClientConfig {
private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
protected String namespace;
/**
* Pulling topic information interval from the named server
*/
Expand Down Expand Up @@ -87,6 +94,38 @@ public void changeInstanceNameToPID() {
}
}


public String withNamespace(String resource) {
return NamespaceUtil.wrapNamespace(this.getNamespace(), resource);
}

public Set<String> withNamespace(Set<String> resourceSet) {
Set<String> resourceWithNamespace = new HashSet<String>();
for (String resource : resourceSet) {
resourceWithNamespace.add(withNamespace(resource));
}
return resourceWithNamespace;
}

public String withoutNamespace(String resource) {
return NamespaceUtil.withoutNamespace(resource, this.getNamespace());
}

public Set<String> withoutNamespace(Set<String> resourceSet) {
Set<String> resourceWithoutNamespace = new HashSet<String>();
for (String resource : resourceSet) {
resourceWithoutNamespace.add(withoutNamespace(resource));
}
return resourceWithoutNamespace;
}

public MessageQueue queueWithNamespace(MessageQueue queue) {
if (StringUtils.isEmpty(this.getNamespace())) {
return queue;
}

return new MessageQueue(withNamespace(queue.getTopic()), queue.getBrokerName(), queue.getQueueId());
}
public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;
this.clientIP = cc.clientIP;
Expand All @@ -99,6 +138,7 @@ public void resetClientConfig(final ClientConfig cc) {
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
this.useTLS = cc.useTLS;
this.namespace = cc.namespace;
this.language = cc.language;
}

Expand All @@ -115,6 +155,7 @@ public ClientConfig cloneClientConfig() {
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
cc.useTLS = useTLS;
cc.namespace = namespace;
cc.language = language;
return cc;
}
Expand Down Expand Up @@ -199,12 +240,20 @@ public void setLanguage(LanguageCode language) {
this.language = language;
}

public String getNamespace() {
return namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
}
}
Loading

0 comments on commit f3d8d38

Please sign in to comment.