Skip to content

Commit

Permalink
feat(rocketmq): AUT-686 optimize consume stats api (apache#15)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <admin@lv5.moe>
  • Loading branch information
ShadowySpirits committed Jun 29, 2023
1 parent 93250ef commit 04aa146
Show file tree
Hide file tree
Showing 23 changed files with 91 additions and 21 deletions.
2 changes: 1 addition & 1 deletion acl/pom.xml
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
Expand Down
2 changes: 1 addition & 1 deletion broker/pom.xml
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Expand Up @@ -138,6 +138,7 @@ public class BrokerMetricsManager {
public static final List<String> SYSTEM_GROUP_PREFIX_LIST = new ArrayList<String>() {
{
add(MixAll.CID_RMQ_SYS_PREFIX.toLowerCase());
add("automq_housekeeping".toLowerCase());
}
};

Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
Expand All @@ -54,6 +55,7 @@
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UnlockCallback;
Expand Down Expand Up @@ -161,6 +163,7 @@
import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.statictopic.LogicQueueMappingItem;
import org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
Expand Down Expand Up @@ -192,9 +195,11 @@
public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final BrokerController brokerController;
private final ConsumerLagCalculator consumerLagCalculator;

public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
consumerLagCalculator = new ConsumerLagCalculator(brokerController);
}

@Override
Expand Down Expand Up @@ -1500,6 +1505,14 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
topics.add(requestHeader.getTopic());
}

String group = requestHeader.getConsumerGroup();
ConsumerGroupInfo consumerGroupInfo = brokerController.getConsumerManager()
.getConsumerGroupInfo(group, true);
boolean isPop = false;
if (consumerGroupInfo != null) {
isPop = consumerGroupInfo.getConsumeType() == ConsumeType.CONSUME_POP;
}

for (String topic : topics) {
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
if (null == topicConfig) {
Expand Down Expand Up @@ -1549,11 +1562,28 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,

long pullOffset = this.brokerController.getConsumerOffsetManager().queryPullOffset(
requestHeader.getConsumerGroup(), topic, i);
if (isPop) {
pullOffset = brokerController.getPopMessageProcessor().getPopBufferMergeService().getLatestOffset(topic, group, i);
if (pullOffset < 0) {
pullOffset = brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
}
if (pullOffset < 0) {
pullOffset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
}
}

offsetWrapper.setBrokerOffset(brokerOffset);
offsetWrapper.setConsumerOffset(consumerOffset);
offsetWrapper.setPullOffset(Math.max(consumerOffset, pullOffset));

Pair<Long, Long> lagStats = consumerLagCalculator.getConsumerLagStats(requestHeader.getConsumerGroup(), topic, i, isPop);
offsetWrapper.setLagEstimatedAccumulation(lagStats.getObject1());
offsetWrapper.setEarliestUnconsumedTimestamp(lagStats.getObject2());

Pair<Long, Long> inflightStats = consumerLagCalculator.getInFlightMsgStats(requestHeader.getConsumerGroup(), topic, i, isPop);
offsetWrapper.setInFlightMsgCountEstimatedAccumulation(inflightStats.getObject1());
offsetWrapper.setEarliestUnPulledTimestamp(inflightStats.getObject2());

long timeOffset = consumerOffset - 1;
if (timeOffset >= 0) {
long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion container/pom.xml
Expand Up @@ -18,7 +18,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion controller/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion distribution/Dockerfile
Expand Up @@ -3,7 +3,7 @@ FROM openjdk:17-jdk-slim
RUN apt-get update \
&& apt-get install -y --no-install-recommends curl unzip procps vim inetutils-ping && apt-get clean

ENV ROCKETMQ_VERSION 5.1.1-SNAPSHOT
ENV ROCKETMQ_VERSION 5.1.1-automq-SNAPSHOT
ENV ROCKETMQ_HOME /root/rocketmq/rocketmq-${ROCKETMQ_VERSION}

WORKDIR ${ROCKETMQ_HOME}
Expand Down
2 changes: 1 addition & 1 deletion distribution/pom.xml
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion filter/pom.xml
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion namesrv/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion openmessaging/pom.xml
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Expand Up @@ -28,7 +28,7 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
Expand All @@ -37,7 +37,7 @@
<url>git@github.com:apache/rocketmq.git</url>
<connection>scm:git:git@github.com:apache/rocketmq.git</connection>
<developerConnection>scm:git:git@github.com:apache/rocketmq.git</developerConnection>
<tag>rocketmq-all-5.1.1</tag>
<tag>HEAD</tag>
</scm>

<mailingLists>
Expand Down
2 changes: 1 addition & 1 deletion proxy/pom.xml
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion remoting/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Expand Up @@ -20,7 +20,14 @@ public class OffsetWrapper {
private long brokerOffset;
private long consumerOffset;
private long pullOffset;


private long lagEstimatedAccumulation;
private long inFlightMsgCountEstimatedAccumulation;

private long lastTimestamp;
private long earliestUnconsumedTimestamp;
private long earliestUnPulledTimestamp;

public long getBrokerOffset() {
return brokerOffset;
Expand All @@ -46,11 +53,43 @@ public void setPullOffset(long pullOffset) {
this.pullOffset = pullOffset;
}

public long getLagEstimatedAccumulation() {
return lagEstimatedAccumulation;
}

public void setLagEstimatedAccumulation(long lagEstimatedAccumulation) {
this.lagEstimatedAccumulation = lagEstimatedAccumulation;
}

public long getInFlightMsgCountEstimatedAccumulation() {
return inFlightMsgCountEstimatedAccumulation;
}

public void setInFlightMsgCountEstimatedAccumulation(long inFlightMsgCountEstimatedAccumulation) {
this.inFlightMsgCountEstimatedAccumulation = inFlightMsgCountEstimatedAccumulation;
}

public long getLastTimestamp() {
return lastTimestamp;
}

public void setLastTimestamp(long lastTimestamp) {
this.lastTimestamp = lastTimestamp;
}

public long getEarliestUnconsumedTimestamp() {
return earliestUnconsumedTimestamp;
}

public void setEarliestUnconsumedTimestamp(long earliestUnconsumedTimestamp) {
this.earliestUnconsumedTimestamp = earliestUnconsumedTimestamp;
}

public long getEarliestUnPulledTimestamp() {
return earliestUnPulledTimestamp;
}

public void setEarliestUnPulledTimestamp(long earliestUnPulledTimestamp) {
this.earliestUnPulledTimestamp = earliestUnPulledTimestamp;
}
}
2 changes: 1 addition & 1 deletion srvutil/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion store/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion test/pom.xml
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion tieredstore/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion tools/pom.xml
Expand Up @@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>5.1.1</version>
<version>5.1.1-automq-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down

0 comments on commit 04aa146

Please sign in to comment.