Skip to content

Commit

Permalink
修复并行消费问题
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohaoxiang committed Aug 31, 2020
1 parent 4af790a commit ee17a79
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;

/**
Expand Down Expand Up @@ -104,6 +105,9 @@ protected void doStop() {
* @return 未归档的发送日志条数
*/
public long getSendBacklogNum() {
if (sendArchiveService == null) {
return 0;
}
return sendArchiveService.remainMessagesSum();
}

Expand All @@ -113,6 +117,9 @@ public long getSendBacklogNum() {
* @return 剩余未归档消费日志的大小(文件数量 * 文件大小)
*/
public long getConsumeBacklogNum() {
if (consumeArchiveService == null) {
return 0;
}
return consumeArchiveService.getRemainConsumeLogFileNum();
}

Expand All @@ -122,6 +129,9 @@ public long getConsumeBacklogNum() {
* @return 未归档的发送日志条数
*/
public Map<String, Long> getSendBacklogNumByTopic() {
if (sendArchiveService == null) {
return Collections.emptyMap();
}
return sendArchiveService.getArchivePosition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -109,7 +110,7 @@ public class SlideWindowConcurrentConsumer extends Service implements Concurrent
private ArchiveManager archiveManager;
private ConsumeConfig consumeConfig;

private static final long CLEAN_INTERVAL_SEC = 600L;
private static final long CLEAN_INTERVAL_SEC = 60L;

private final Map<ConsumePartition, SlideWindow> slideWindowMap = new ConcurrentHashMap<>();

Expand All @@ -133,21 +134,26 @@ public class SlideWindowConcurrentConsumer extends Service implements Concurrent
protected void doStart() throws Exception {
super.doStart();
// 定时清理已关闭并行消费或者已经失效的主题
scheduledExecutorService.scheduleAtFixedRate(
() -> slideWindowMap.entrySet().removeIf(entry -> {
ConsumePartition consumePartition = entry.getKey();
try {
org.joyqueue.domain.Consumer.ConsumerPolicy policy = clusterManager.getConsumerPolicy(TopicName.parse(consumePartition.getTopic()), consumePartition.getApp());
return !policy.isConcurrent();
} catch (Exception e) {
logger.warn("Clean expire error: {}", e.getMessage());
}
return false;
}),
CLEAN_INTERVAL_SEC + 32, CLEAN_INTERVAL_SEC, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(this::clearSlideWindow, CLEAN_INTERVAL_SEC + 32, CLEAN_INTERVAL_SEC, TimeUnit.SECONDS);
logger.info("SlideWindowConcurrentConsumer is started.");
}

protected void clearSlideWindow() {
Iterator<Map.Entry<ConsumePartition, SlideWindow>> iterator = slideWindowMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ConsumePartition, SlideWindow> entry = iterator.next();
ConsumePartition consumePartition = entry.getKey();

boolean isLeader = clusterManager.isLeader(consumePartition.getTopic(), consumePartition.getPartition());
org.joyqueue.domain.Consumer.ConsumerPolicy policy = clusterManager.tryGetConsumerPolicy(TopicName.parse(consumePartition.getTopic()), consumePartition.getApp());
if (policy != null && policy.isConcurrent() && isLeader) {
continue;
}

iterator.remove();
}
}

@Override
protected void doStop() {
super.doStop();
Expand Down Expand Up @@ -480,7 +486,9 @@ private boolean extendSlideWindowAndUpdatePullIndex(
}
// 更新最新拉取位置,即下次开始拉取的序号
long newPullIndex = pullIndex + msgCount;
logger.debug("set new pull index:{}, topic:{}, app:{}, partition:{}", newPullIndex, consumer.getTopic(), consumer.getApp(), partition);
if (logger.isDebugEnabled()) {
logger.debug("set new pull index:{}, topic:{}, app:{}, partition:{}", newPullIndex, consumer.getTopic(), consumer.getApp(), partition);
}
positionManager.updateLastMsgPullIndex(TopicName.parse(consumer.getTopic()), consumer.getApp(), partition, newPullIndex);
return true;
} else {
Expand Down Expand Up @@ -536,7 +544,7 @@ private long getPullIndex(Consumer consumer, short partition) throws JoyQueueExc
pullIndex = lastAckIndex;
}

logger.info("init concurrent pull index [{}]", pullIndex);
logger.info("init concurrent pull topic {}, app {}, partition {}, index [{}]", consumer.getTopic(), consumer.getApp(), partition, pullIndex);
}

return pullIndex;
Expand Down Expand Up @@ -759,12 +767,14 @@ ConsumedMessages appendUnsafe(String topic, short partition, long nextPullIndex,
this.nextPullIndex += count;
consumedMessagesMap.put(nextPullIndex, consumedMessages);
return consumedMessages;
}
}

CasLock getAppendLock() {
return appendLock;
}

private AtomicInteger counter = new AtomicInteger(0);

boolean ack(TopicName topic, String app, short partition, long startIndex, int count, PositionManager positionManager) throws JoyQueueException {
List<ConsumedMessages> toBeAcked = new LinkedList<>();
boolean ret = false;
Expand All @@ -782,9 +792,25 @@ boolean ack(TopicName topic, String app, short partition, long startIndex, int c
// 如果确认的片段是滑动窗口的第一段,需要在分区上ack,并向尾部缩小滑动窗口

while (!consumedMessagesMap.isEmpty() && (consumedMessages = consumedMessagesMap.firstEntry().getValue()).isAcked()) {
long lastMsgAckIndex = positionManager.getLastMsgAckIndex(topic, app, partition);
consumedMessagesMap.remove(consumedMessages.getStartIndex());
positionManager.updateLastMsgAckIndex(topic, app, partition,
consumedMessages.getStartIndex() + consumedMessages.getCount(), false);
if (!consumedMessages.isExpired()) {
positionManager.updateLastMsgAckIndex(topic, app, partition,
consumedMessages.getStartIndex() + consumedMessages.getCount(), false);

// if (lastMsgAckIndex >= consumedMessages.getStartIndex() && lastMsgAckIndex < consumedMessages.getStartIndex() + consumedMessages.getCount()) {
// positionManager.updateLastMsgAckIndex(topic, app, partition,
// consumedMessages.getStartIndex() + consumedMessages.getCount(), false);
// } else {
// logger.warn("Ack index not match, topic: {}, partition: {}, ack: [{} - {}), currentAckIndex: {}!",
// topic.getFullName(),
// partition,
// Format.formatWithComma(consumedMessages.getStartIndex()),
// Format.formatWithComma(consumedMessages.getStartIndex() + consumedMessages.getCount()),
// Format.formatWithComma(lastMsgAckIndex)
// );
// }
}
}
ret = true;
}
Expand Down Expand Up @@ -818,7 +844,7 @@ private static class ConsumedMessages {
ConsumedMessages(long startIndex, int count, long timeoutMs) {
this.startIndex = startIndex;
this.count = count;
expireTime = SystemClock.now() + timeoutMs;
this.expireTime = SystemClock.now() + timeoutMs;
}

int getCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
*/
package org.joyqueue.broker.manage.exporter.vertx;

import org.joyqueue.broker.monitor.converter.Converter;
import com.jd.laf.extension.ExtensionPoint;
import com.jd.laf.extension.ExtensionPointLazy;
import com.jd.laf.extension.SpiLoader;
import io.vertx.ext.web.RoutingContext;
import org.apache.commons.lang3.StringUtils;
import org.joyqueue.broker.monitor.converter.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,7 +57,13 @@ public Object invoke(RoutingContext context) throws Exception {
return result;
}

Converter converter = converters.get(target);
Converter converter = null;
for (Converter extension : converters.extensions()) {
if (target.startsWith(String.valueOf(extension.type()))) {
converter = extension;
break;
}
}
if (converter == null) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public StringResponse convert(BrokerStatExt brokerStatExt) {
response.addHeader("Date", new Date(time * 1000).toString());
response.addHeader("Transfer-Encoding", "chunked");

logger.info("Report Prometheus Convert");
logger.debug("Report Prometheus Convert");

return response;
}
Expand Down Expand Up @@ -103,6 +103,6 @@ private String buildPrometheusResult(List<MonitorRecord> records) {

@Override
public String type() {
return "prometheus";
return "Prometheus";
}
}
32 changes: 16 additions & 16 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -817,22 +817,22 @@
<fork>true</fork>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
<configuration>
<doclint>none</doclint>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-javadoc-plugin</artifactId>-->
<!-- <version>${maven-javadoc-plugin.version}</version>-->
<!-- <configuration>-->
<!-- <doclint>none</doclint>-->
<!-- </configuration>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>jar</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->

<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down

0 comments on commit ee17a79

Please sign in to comment.