Skip to content

Commit

Permalink
修复并行消费问题
Browse files Browse the repository at this point in the history
  • Loading branch information
gaohaoxiang committed Sep 3, 2020
1 parent e504c2f commit b2f207d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ public static TransportConfig convertTransportConfig(KeyValue attributes) {
transportConfig.setHeartbeatInterval(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.HEARTBEAT_INTERVAL, transportConfig.getHeartbeatInterval()));
transportConfig.setHeartbeatTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.HEARTBEAT_TIMEOUT, transportConfig.getHeartbeatTimeout()));
transportConfig.setSoLinger(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SO_LINGER, transportConfig.getSoLinger()));
transportConfig.setTcpNoDelay(attributes.getBoolean(JoyQueueTransportBuiltinKeys.CONNECTIONS, transportConfig.isTcpNoDelay()));
transportConfig.setTcpNoDelay(attributes.getBoolean(JoyQueueTransportBuiltinKeys.TCP_NO_DELAY, transportConfig.isTcpNoDelay()));
transportConfig.setKeepAlive(attributes.getBoolean(JoyQueueTransportBuiltinKeys.KEEPALIVE, transportConfig.isKeepAlive()));
transportConfig.setSoTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SO_TIMEOUT, transportConfig.getSoTimeout()));
transportConfig.setSendTimeout(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SEND_TIMEOUT, transportConfig.getSoTimeout()));
transportConfig.setSocketBufferSize(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.SOCKET_BUFFER_SIZE, transportConfig.getSocketBufferSize()));
transportConfig.setMaxOneway(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.MAX_ONEWAY, transportConfig.getMaxOneway()));
transportConfig.setMaxAsync(KeyValueHelper.getInt(attributes, JoyQueueTransportBuiltinKeys.MAX_ASYNC, transportConfig.getMaxAsync()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package org.joyqueue.client.internal.consumer.support;

import org.apache.commons.collections.CollectionUtils;
import org.joyqueue.client.internal.cluster.ClusterClientManager;
import org.joyqueue.client.internal.cluster.ClusterManager;
import org.joyqueue.client.internal.consumer.BaseMessageListener;
Expand All @@ -27,7 +26,6 @@
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptor;
import org.joyqueue.client.internal.consumer.interceptor.ConsumerInterceptorManager;
import org.joyqueue.client.internal.consumer.transport.ConsumerClientManager;
import org.joyqueue.client.internal.metadata.domain.PartitionGroupMetadata;
import org.joyqueue.client.internal.metadata.domain.TopicMetadata;
import org.joyqueue.client.internal.nameserver.NameServerConfig;
import org.joyqueue.client.internal.nameserver.helper.NameServerHelper;
Expand Down Expand Up @@ -133,13 +131,7 @@ protected MessagePoller createMessagePoller(String topic) {
}

if (config.getThread() == ConsumerConfig.NONE_THREAD) {
int maxPartition = 1;
if (CollectionUtils.isNotEmpty(topicMetadata.getPartitionGroups())) {
for (PartitionGroupMetadata partitionGroup : topicMetadata.getPartitionGroups()) {
maxPartition = Math.max(partitionGroup.getPartitions().size(), maxPartition);
}
}
config.setThread(maxPartition);
config.setThread(topicMetadata.getPartitions().size());
}

if (topicMetadata.getType().equals(TopicType.BROADCAST)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ protected void validate() throws Exception {
this.concurrentConsumption =
consumeConfig.useLegacyConcurrentConsumer() ?
new ConcurrentConsumption(clusterManager, storeService, partitionManager, messageRetry, positionManager, filterMessageSupport, archiveManager, sessionManager):
new SlideWindowConcurrentConsumer(clusterManager, storeService, partitionManager, messageRetry, positionManager, filterMessageSupport, archiveManager, consumeConfig)
new SlideWindowConcurrentConsumer(clusterManager, storeService, partitionManager, messageRetry, positionManager,
filterMessageSupport, archiveManager, consumeConfig, brokerContext.getEventBus());
;
this.resetBroadcastIndexTimer = new Timer("joyqueuue-consume-reset-broadcast-index-timer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import org.joyqueue.broker.consumer.model.ConsumePartition;
import org.joyqueue.broker.consumer.model.PullResult;
import org.joyqueue.broker.consumer.position.PositionManager;
import org.joyqueue.broker.event.BrokerEventBus;
import org.joyqueue.domain.Partition;
import org.joyqueue.domain.PartitionGroup;
import org.joyqueue.domain.TopicName;
import org.joyqueue.exception.JoyQueueCode;
import org.joyqueue.exception.JoyQueueException;
Expand All @@ -39,7 +41,9 @@
import org.joyqueue.store.PositionUnderflowException;
import org.joyqueue.store.ReadResult;
import org.joyqueue.store.StoreService;
import org.joyqueue.store.event.StoreNodeChangeEvent;
import org.joyqueue.toolkit.concurrent.CasLock;
import org.joyqueue.toolkit.concurrent.EventListener;
import org.joyqueue.toolkit.concurrent.NamedThreadFactory;
import org.joyqueue.toolkit.format.Format;
import org.joyqueue.toolkit.lang.Close;
Expand All @@ -57,7 +61,7 @@
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -109,6 +113,7 @@ public class SlideWindowConcurrentConsumer extends Service implements Concurrent
// 消费归档服务
private ArchiveManager archiveManager;
private ConsumeConfig consumeConfig;
private BrokerEventBus brokerEventBus;

private static final long CLEAN_INTERVAL_SEC = 60L;

Expand All @@ -117,8 +122,8 @@ public class SlideWindowConcurrentConsumer extends Service implements Concurrent
private final ScheduledExecutorService scheduledExecutorService;

SlideWindowConcurrentConsumer(ClusterManager clusterManager, StoreService storeService, PartitionManager partitionManager,
MessageRetry messageRetry, PositionManager positionManager,
FilterMessageSupport filterMessageSupport, ArchiveManager archiveManager, ConsumeConfig consumeConfig) {
MessageRetry messageRetry, PositionManager positionManager, FilterMessageSupport filterMessageSupport, ArchiveManager archiveManager,
ConsumeConfig consumeConfig, BrokerEventBus brokerEventBus) {
this.clusterManager = clusterManager;
this.storeService = storeService;
this.partitionManager = partitionManager;
Expand All @@ -128,32 +133,72 @@ public class SlideWindowConcurrentConsumer extends Service implements Concurrent
this.archiveManager = archiveManager;
this.consumeConfig = consumeConfig;
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ConcurrentConsumerClearExecutor", true));
this.brokerEventBus = brokerEventBus;
}

@Override
protected void doStart() throws Exception {
super.doStart();
// 定时清理已关闭并行消费或者已经失效的主题
scheduledExecutorService.scheduleAtFixedRate(this::clearSlideWindow, CLEAN_INTERVAL_SEC + 32, CLEAN_INTERVAL_SEC, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(this::clearSlideWindow, CLEAN_INTERVAL_SEC + 32, CLEAN_INTERVAL_SEC, TimeUnit.MILLISECONDS);
brokerEventBus.addListener(new EventListener() {
@Override
public void onEvent(Object event) {
if (event instanceof StoreNodeChangeEvent) {
onNodeChangeEvent((StoreNodeChangeEvent) event);
}
}
});
logger.info("SlideWindowConcurrentConsumer is started.");
}

protected void onNodeChangeEvent(StoreNodeChangeEvent event) {
if (event.getNodes().getRWNode() == null || event.getNodes().getRWNode().getId() != clusterManager.getBrokerId()) {
clearSlideWindow(event.getTopic(), event.getGroup());
}
}

protected void clearSlideWindow() {
Iterator<Map.Entry<ConsumePartition, SlideWindow>> iterator = slideWindowMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ConsumePartition, SlideWindow> entry = iterator.next();
ConsumePartition consumePartition = entry.getKey();

boolean isLeader = clusterManager.isLeader(consumePartition.getTopic(), consumePartition.getPartition());
org.joyqueue.domain.Consumer.ConsumerPolicy policy = clusterManager.tryGetConsumerPolicy(TopicName.parse(consumePartition.getTopic()), consumePartition.getApp());
if (policy != null && policy.isConcurrent() && isLeader) {
continue;
if (clearSlideWindow(consumePartition)) {
iterator.remove();
}
}
}

iterator.remove();
protected void clearSlideWindow(String topic, int group) {
PartitionGroup partitionGroup = clusterManager.getPartitionGroupByGroup(TopicName.parse(topic), group);
if (partitionGroup == null) {
return;
}
Set<Short> partitions = partitionGroup.getPartitions();
Iterator<Map.Entry<ConsumePartition, SlideWindow>> iterator = slideWindowMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<ConsumePartition, SlideWindow> entry = iterator.next();
ConsumePartition consumePartition = entry.getKey();

if (consumePartition.getTopic().equals(topic) && partitions.contains(consumePartition.getPartition())) {
if (clearSlideWindow(consumePartition)) {
iterator.remove();
}
}
}
}

protected boolean clearSlideWindow(ConsumePartition consumePartition) {
boolean isLeader = clusterManager.isLeader(consumePartition.getTopic(), consumePartition.getPartition());
org.joyqueue.domain.Consumer.ConsumerPolicy policy = clusterManager.tryGetConsumerPolicy(TopicName.parse(consumePartition.getTopic()), consumePartition.getApp());
if (policy != null && policy.isConcurrent() && isLeader) {
return false;
}
resetPullPositionFlag.remove(consumePartition);
return true;
}

@Override
protected void doStop() {
super.doStop();
Expand All @@ -177,18 +222,18 @@ protected void doStop() {
@Override
public PullResult getMessage(Consumer consumer, int count, long ackTimeout, long accessTimes, int concurrent) throws JoyQueueException {
// 消费普通分区消息
PullResult pullResult=null;
PullResult pullResult = null;
List<Short> priorityPartitionList = partitionManager.getPriorityPartition(TopicName.parse(consumer.getTopic()));
if (priorityPartitionList.size() > 0) {
// 高优先级分区消费
pullResult = getFromPartition(consumer, priorityPartitionList, count, ackTimeout, accessTimes, concurrent);
}
if(Objects.isNull(pullResult)||pullResult.isEmpty()){
if (pullResult == null || pullResult.isEmpty()) {
List<Short> partitionList = clusterManager.getLocalPartitions(TopicName.parse(consumer.getTopic()));
if (partitionManager.isRetry(consumer)) {
partitionList = new ArrayList<>(partitionList);
partitionList.add(Partition.RETRY_PARTITION_ID);
}
// if (partitionManager.isRetry(consumer)) {
// partitionList = new ArrayList<>(partitionList);
// partitionList.add(Partition.RETRY_PARTITION_ID);
// }
pullResult = getFromPartition(consumer, partitionList, count, ackTimeout, accessTimes, concurrent);
}
return pullResult;
Expand Down Expand Up @@ -803,7 +848,7 @@ boolean ack(TopicName topic, String app, short partition, long startIndex, int c
positionManager.updateLastMsgAckIndex(topic, app, partition,
consumedMessages.getStartIndex() + consumedMessages.getCount(), false);
} else {
logger.warn("Ack index not match, topic: {}, partition: {}, ack: [{} - {}), currentAckIndex: {}!",
logger.warn("Ack index not match, topic: {}, partition: {}, ack: [{} - {}], currentAckIndex: {}!",
topic.getFullName(),
partition,
Format.formatWithComma(consumedMessages.getStartIndex()),
Expand Down

0 comments on commit b2f207d

Please sign in to comment.