Skip to content

Commit

Permalink
[ISSUE apache#6858] passing through ProxyContext for future expansion
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk committed Jun 6, 2023
1 parent bee5077 commit 40314b6
Show file tree
Hide file tree
Showing 43 changed files with 193 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ protected GrpcClientChannel registerProducer(ProxyContext ctx, String topicName)
// use topic name as producer group
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, languageCode, parseClientVersion(ctx.getClientVersion()));
this.messagingProcessor.registerProducer(ctx, topicName, clientChannelInfo);
TopicMessageType topicMessageType = this.messagingProcessor.getMetadataService().getTopicMessageType(topicName);
TopicMessageType topicMessageType = this.messagingProcessor.getMetadataService().getTopicMessageType(ctx, topicName);
if (TopicMessageType.TRANSACTION.equals(topicMessageType)) {
this.messagingProcessor.addTransactionSubscription(ctx, topicName, topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ protected CompletableFuture<AckMessageResultEntry> processAckMessage(ProxyContex
String handleString = ackMessageEntry.getReceiptHandle();

String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
if (messageReceiptHandle != null) {
handleString = messageReceiptHandle.getReceiptHandleStr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public CompletableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuratio
ReceiptHandle receiptHandle = ReceiptHandle.decode(request.getReceiptHandle());
String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());

MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle());
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), receiptHandle.getReceiptHandle());
if (messageReceiptHandle != null) {
receiptHandle = ReceiptHandle.decode(messageReceiptHandle.getReceiptHandleStr());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
receiptHandleProcessor.addReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
receiptHandleProcessor.addReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, messageExt.getMsgId(), receiptHandle, messageReceiptHandle);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public CompletableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessage

String group = GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
String handleString = request.getReceiptHandle();
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle());
MessageReceiptHandle messageReceiptHandle = receiptHandleProcessor.removeReceiptHandle(ctx, grpcChannelManager.getChannel(ctx.getClientID()), group, request.getMessageId(), request.getReceiptHandle());
if (messageReceiptHandle != null) {
handleString = messageReceiptHandle.getReceiptHandleStr();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public CompletableFuture<QueryRouteResponse> queryRoute(ProxyContext ctx, QueryR
List<MessageQueue> messageQueueList = new ArrayList<>();
Map<String, Map<Long, Broker>> brokerMap = buildBrokerMap(proxyTopicRouteData.getBrokerDatas());

TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topicName);
TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(ctx, topicName);
for (QueueData queueData : proxyTopicRouteData.getQueueDatas()) {
String brokerName = queueData.getBrokerName();
Map<Long, Broker> brokerIdMap = brokerMap.get(brokerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public CompletableFuture<PopResult> popMessage(
) {
CompletableFuture<PopResult> future = new CompletableFuture<>();
try {
AddressableMessageQueue messageQueue = queueSelector.select(ctx, this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
AddressableMessageQueue messageQueue = queueSelector.select(ctx, this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic));
if (messageQueue == null) {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no readable queue");
}
Expand Down Expand Up @@ -287,7 +287,7 @@ public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, MessageQueue
CompletableFuture<PullResult> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService()
.buildAddressableMessageQueue(messageQueue);
.buildAddressableMessageQueue(ctx, messageQueue);
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(addressableMessageQueue.getTopic());
Expand All @@ -311,7 +311,7 @@ public CompletableFuture<Void> updateConsumerOffset(ProxyContext ctx, MessageQue
CompletableFuture<Void> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService()
.buildAddressableMessageQueue(messageQueue);
.buildAddressableMessageQueue(ctx, messageQueue);
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(addressableMessageQueue.getTopic());
Expand All @@ -329,7 +329,7 @@ public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, MessageQueu
CompletableFuture<Long> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService()
.buildAddressableMessageQueue(messageQueue);
.buildAddressableMessageQueue(ctx, messageQueue);
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
requestHeader.setTopic(addressableMessageQueue.getTopic());
Expand All @@ -345,7 +345,7 @@ public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, Set<Me
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Set<MessageQueue>> future = new CompletableFuture<>();
Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(mqSet);
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
Expand All @@ -370,7 +370,7 @@ public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, Set<Me
public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, Set<MessageQueue> mqSet,
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(mqSet);
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
Expand All @@ -394,7 +394,7 @@ public CompletableFuture<Long> getMaxOffset(ProxyContext ctx, MessageQueue messa
CompletableFuture<Long> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService()
.buildAddressableMessageQueue(messageQueue);
.buildAddressableMessageQueue(ctx, messageQueue);
GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader();
requestHeader.setTopic(addressableMessageQueue.getTopic());
requestHeader.setQueueId(addressableMessageQueue.getQueueId());
Expand All @@ -409,7 +409,7 @@ public CompletableFuture<Long> getMinOffset(ProxyContext ctx, MessageQueue messa
CompletableFuture<Long> future = new CompletableFuture<>();
try {
AddressableMessageQueue addressableMessageQueue = serviceManager.getTopicRouteService()
.buildAddressableMessageQueue(messageQueue);
.buildAddressableMessageQueue(ctx, messageQueue);
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
requestHeader.setTopic(addressableMessageQueue.getTopic());
requestHeader.setQueueId(addressableMessageQueue.getQueueId());
Expand All @@ -420,10 +420,10 @@ public CompletableFuture<Long> getMinOffset(ProxyContext ctx, MessageQueue messa
return FutureUtils.addExecutor(future, this.executor);
}

protected Set<AddressableMessageQueue> buildAddressableSet(Set<MessageQueue> mqSet) {
protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set<MessageQueue> mqSet) {
return mqSet.stream().map(mq -> {
try {
return serviceManager.getTopicRouteService().buildAddressableMessageQueue(mq);
return serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx, mq);
} catch (Exception e) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ protected void init() {

@Override
public SubscriptionGroupConfig getSubscriptionGroupConfig(ProxyContext ctx, String consumerGroupName) {
return this.serviceManager.getMetadataService().getSubscriptionGroupConfig(consumerGroupName);
return this.serviceManager.getMetadataService().getSubscriptionGroupConfig(ctx, consumerGroupName);
}

@Override
public ProxyTopicRouteData getTopicRouteDataForProxy(ProxyContext ctx, List<Address> requestHostAndPortList,
String topicName) throws Exception {
return this.serviceManager.getTopicRouteService().getTopicRouteForProxy(requestHostAndPortList, topicName);
return this.serviceManager.getTopicRouteService().getTopicRouteForProxy(ctx, requestHostAndPortList, topicName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(topic);
TopicMessageType topicMessageType = serviceManager.getMetadataService().getTopicMessageType(ctx, topic);
TopicMessageType messageType = TopicMessageType.parseFromMessageProperty(message.getProperties());
topicMessageTypeValidator.validate(topicMessageType, messageType);
}
}
}
AddressableMessageQueue messageQueue = queueSelector.select(ctx,
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(topic));
this.serviceManager.getTopicRouteService().getCurrentMessageQueueView(ctx, topic));
if (messageQueue == null) {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no writable queue");
}
Expand All @@ -102,7 +102,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
StringUtils.isNotBlank(sendResult.getTransactionId())) {
fillTransactionData(producerGroup, messageQueue, sendResult, messageList);
fillTransactionData(ctx, producerGroup, messageQueue, sendResult, messageList);
}
}
return sendResultList;
Expand All @@ -113,7 +113,7 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
return FutureUtils.addExecutor(future, this.executor);
}

protected void fillTransactionData(String producerGroup, AddressableMessageQueue messageQueue, SendResult sendResult, List<Message> messageList) {
protected void fillTransactionData(ProxyContext ctx, String producerGroup, AddressableMessageQueue messageQueue, SendResult sendResult, List<Message> messageList) {
try {
MessageId id;
if (sendResult.getOffsetMsgId() != null) {
Expand All @@ -122,6 +122,7 @@ protected void fillTransactionData(String producerGroup, AddressableMessageQueue
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
this.serviceManager.getTransactionService().addTransactionDataByBrokerName(
ctx,
messageQueue.getBrokerName(),
producerGroup,
sendResult.getQueueOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageRecei
});
} else {
SubscriptionGroupConfig subscriptionGroupConfig =
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(messageReceiptHandle.getGroup());
messagingProcessor.getMetadataService().getSubscriptionGroupConfig(context, messageReceiptHandle.getGroup());
if (subscriptionGroupConfig == null) {
log.error("group's subscriptionGroupConfig is null when renew. handle: {}", messageReceiptHandle);
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -240,12 +240,12 @@ protected boolean clientIsOffline(ReceiptHandleGroupKey groupKey) {
return this.messagingProcessor.findConsumerChannel(createContext("JudgeClientOnline"), groupKey.group, groupKey.channel) == null;
}

public void addReceiptHandle(Channel channel, String group, String msgID, String receiptHandle,
public void addReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle,
MessageReceiptHandle messageReceiptHandle) {
this.addReceiptHandle(new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle, messageReceiptHandle);
this.addReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle, messageReceiptHandle);
}

protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle,
protected void addReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, String receiptHandle,
MessageReceiptHandle messageReceiptHandle) {
if (key == null) {
return;
Expand All @@ -254,11 +254,11 @@ protected void addReceiptHandle(ReceiptHandleGroupKey key, String msgID, String
k -> new ReceiptHandleGroup()).put(msgID, receiptHandle, messageReceiptHandle);
}

public MessageReceiptHandle removeReceiptHandle(Channel channel, String group, String msgID, String receiptHandle) {
return this.removeReceiptHandle(new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle);
public MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel channel, String group, String msgID, String receiptHandle) {
return this.removeReceiptHandle(ctx, new ReceiptHandleGroupKey(channel, group), msgID, receiptHandle);
}

protected MessageReceiptHandle removeReceiptHandle(ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
protected MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, ReceiptHandleGroupKey key, String msgID, String receiptHandle) {
if (key == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public CompletableFuture<Void> endTransaction(ProxyContext ctx, String transacti
CompletableFuture<Void> future = new CompletableFuture<>();
try {
EndTransactionRequestData headerData = serviceManager.getTransactionService().genEndTransactionRequestHeader(
ctx,
producerGroup,
buildCommitOrRollback(transactionStatus),
fromTransactionCheck,
Expand Down Expand Up @@ -70,6 +71,6 @@ protected int buildCommitOrRollback(TransactionStatus transactionStatus) {
}

public void addTransactionSubscription(ProxyContext ctx, String producerGroup, String topic) {
this.serviceManager.getTransactionService().addTransactionSubscription(producerGroup, topic);
this.serviceManager.getTransactionService().addTransactionSubscription(ctx, producerGroup, topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand
if (topicMessageTypeValidator != null) {
// Do not check retry or dlq topic
if (!NamespaceUtil.isRetryTopic(topic) && !NamespaceUtil.isDLQTopic(topic)) {
TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(topic);
TopicMessageType topicMessageType = messagingProcessor.getMetadataService().getTopicMessageType(context, topic);
topicMessageTypeValidator.validate(topicMessageType, messageType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.utils.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.admin.AdminService;
Expand Down Expand Up @@ -191,7 +192,7 @@ protected class ProducerChangeListenerImpl implements ProducerChangeListener {
@Override
public void handle(ProducerGroupEvent event, String group, ClientChannelInfo clientChannelInfo) {
if (event == ProducerGroupEvent.GROUP_UNREGISTER) {
getTransactionService().unSubscribeAllTransactionTopic(group);
getTransactionService().unSubscribeAllTransactionTopic(ProxyContext.createForInner(this.getClass()), group);
}
}
}
Expand Down

0 comments on commit 40314b6

Please sign in to comment.